Categorygithub.com/eininst/rs
repositorypackage
0.0.0-20230523053941-c75f18851701
Repository: https://github.com/eininst/rs.git
Documentation: pkg.go.dev

# Packages

No description provided by the author

# README

RS

RS Is a message queue for redis streams

⚙ Installation

go get -u github.com/eininst/rs

⚡ Quickstart

cli := rs.New(rcli *redis.Client)

You can customize it all you want:

cli := rs.New(examples.GetRedis(), rs.Config{
    Sender: rs.SenderConfig{
    	//Evicts entries as long as the stream's length exceeds the specified threshold
        MaxLen: rs.Int64(100),
    },
})

Send a message

cli.Send("simple", rs.H{
    "title": "this a simple message",
})

cli.Send("test", rs.H{
    "something": "hello word",
})

cli.Send("order_status_change", rs.H{
    "order_id": 100,
})

Send a delay message

cli.SendWithDelay("simple", rs.H{
    "title": "this a delay message",
}, time.Second*10)

cli.SendWithTime("simple", rs.H{
    "title": "this a timing message",
}, time.Now().Add(time.Minute * 5))

Receive message

package main

import (
	"encoding/json"
	"github.com/eininst/flog"
	"github.com/eininst/rs"
	examples "github.com/eininst/rs/examples/redis"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
    cli := rs.New(examples.GetRedis(), rs.Config{
        //default configuration for receiving messages
        Receive: rs.ReceiveConfig{
            Work:       rs.Int(10),       //Per stream goroutine number,
            Timeout:    time.Second * 20, //Retry after timeout
            MaxRetries: rs.Int64(3),      //Max retries
            ReadCount:  rs.Int64(50),     //XReadGroup Count
            BlockTime:  time.Second * 20, //XReadGroup Block Time
        },
    })

    cli.Handler("simple", func(ctx *rs.Context) {
        defer ctx.Ack()
        flog.Info(ctx.JSON.Raw)
    })

    cli.Receive(rs.Rctx{
        Stream: "simple",
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received simple msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "test",
        Group:      "group1",
        MaxRetries: nil, //no retries
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received test msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "test",
        Group:      "group2",
        MaxRetries: nil, //no retries
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received test msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "order_status_change",
        Work:       rs.Int(20),
        Timeout:    time.Second * 120,
        MaxRetries: rs.Int64(6),
        Handler: func(ctx *rs.Context) {
	    defer ctx.Ack()
            orderId := ctx.Msg.Values["order_id"]
            flog.Info("received order_status_change msg:", orderId)
        },
    })

    cli.Listen()
}
2022/08/29 21:10:04 [RS] Stream "simple" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group1" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group2" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "order_status_change" working... # BlockTime=20s MaxRetries=6 ReadCount=50 Timeout=2m0s Work=20
2022/08/29 21:10:20 [INFO] receive.go:31 received simple msg: {"title":"this a simple message"}
2022/08/29 21:10:20 [INFO] receive.go:53 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:42 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:65 received order_status_change msg: 100

Graceful Shutdown

go func () {
    quit := make(chan os.Signal)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
    <-quit
    
    cli.Shutdown()
    flog.Info("Graceful Shutdown")
}()

cli.Listen()

See examples

License

MIT