modulepackage
0.0.0-20230523053941-c75f18851701
Repository: https://github.com/eininst/rs.git
Documentation: pkg.go.dev
# README
RS
RS
Is a message queue for redis streams
⚙ Installation
go get -u github.com/eininst/rs
⚡ Quickstart
cli := rs.New(rcli *redis.Client)
You can customize it all you want:
cli := rs.New(examples.GetRedis(), rs.Config{
Sender: rs.SenderConfig{
//Evicts entries as long as the stream's length exceeds the specified threshold
MaxLen: rs.Int64(100),
},
})
Send a message
cli.Send("simple", rs.H{
"title": "this a simple message",
})
cli.Send("test", rs.H{
"something": "hello word",
})
cli.Send("order_status_change", rs.H{
"order_id": 100,
})
Send a delay message
cli.SendWithDelay("simple", rs.H{
"title": "this a delay message",
}, time.Second*10)
cli.SendWithTime("simple", rs.H{
"title": "this a timing message",
}, time.Now().Add(time.Minute * 5))
Receive message
package main
import (
"encoding/json"
"github.com/eininst/flog"
"github.com/eininst/rs"
examples "github.com/eininst/rs/examples/redis"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
cli := rs.New(examples.GetRedis(), rs.Config{
//default configuration for receiving messages
Receive: rs.ReceiveConfig{
Work: rs.Int(10), //Per stream goroutine number,
Timeout: time.Second * 20, //Retry after timeout
MaxRetries: rs.Int64(3), //Max retries
ReadCount: rs.Int64(50), //XReadGroup Count
BlockTime: time.Second * 20, //XReadGroup Block Time
},
})
cli.Handler("simple", func(ctx *rs.Context) {
defer ctx.Ack()
flog.Info(ctx.JSON.Raw)
})
cli.Receive(rs.Rctx{
Stream: "simple",
Handler: func(ctx *rs.Context) {
defer ctx.Ack()
jstr, _ := json.Marshal(ctx.Msg.Values)
flog.Info("received simple msg:", string(jstr))
},
})
cli.Receive(rs.Rctx{
Stream: "test",
Group: "group1",
MaxRetries: nil, //no retries
Handler: func(ctx *rs.Context) {
defer ctx.Ack()
jstr, _ := json.Marshal(ctx.Msg.Values)
flog.Info("received test msg:", string(jstr))
},
})
cli.Receive(rs.Rctx{
Stream: "test",
Group: "group2",
MaxRetries: nil, //no retries
Handler: func(ctx *rs.Context) {
defer ctx.Ack()
jstr, _ := json.Marshal(ctx.Msg.Values)
flog.Info("received test msg:", string(jstr))
},
})
cli.Receive(rs.Rctx{
Stream: "order_status_change",
Work: rs.Int(20),
Timeout: time.Second * 120,
MaxRetries: rs.Int64(6),
Handler: func(ctx *rs.Context) {
defer ctx.Ack()
orderId := ctx.Msg.Values["order_id"]
flog.Info("received order_status_change msg:", orderId)
},
})
cli.Listen()
}
2022/08/29 21:10:04 [RS] Stream "simple" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group1" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group2" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "order_status_change" working... # BlockTime=20s MaxRetries=6 ReadCount=50 Timeout=2m0s Work=20
2022/08/29 21:10:20 [INFO] receive.go:31 received simple msg: {"title":"this a simple message"}
2022/08/29 21:10:20 [INFO] receive.go:53 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:42 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:65 received order_status_change msg: 100
Graceful Shutdown
go func () {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
cli.Shutdown()
flog.Info("Graceful Shutdown")
}()
cli.Listen()
See examples
License
MIT
# Packages
No description provided by the author
# Functions
Bool 复制 bool 对象,并返回复制体的指针.
No description provided by the author
Float32 复制 float32 对象,并返回复制体的指针.
Float64 复制 float64 对象,并返回复制体的指针.
No description provided by the author
No description provided by the author
Int32 复制 int64 对象,并返回复制体的指针.
Int64 复制 int64 对象,并返回复制体的指针.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
String 复制 string 对象,并返回复制体的指针.
Time 复制 time.Time 对象,并返回复制体的指针.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Variables
No description provided by the author
No description provided by the author
No description provided by the author
# Structs
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Interfaces
No description provided by the author