Categorygithub.com/eininst/rs
modulepackage
0.0.0-20230523053941-c75f18851701
Repository: https://github.com/eininst/rs.git
Documentation: pkg.go.dev

# README

RS

RS Is a message queue for redis streams

⚙ Installation

go get -u github.com/eininst/rs

⚡ Quickstart

cli := rs.New(rcli *redis.Client)

You can customize it all you want:

cli := rs.New(examples.GetRedis(), rs.Config{
    Sender: rs.SenderConfig{
    	//Evicts entries as long as the stream's length exceeds the specified threshold
        MaxLen: rs.Int64(100),
    },
})

Send a message

cli.Send("simple", rs.H{
    "title": "this a simple message",
})

cli.Send("test", rs.H{
    "something": "hello word",
})

cli.Send("order_status_change", rs.H{
    "order_id": 100,
})

Send a delay message

cli.SendWithDelay("simple", rs.H{
    "title": "this a delay message",
}, time.Second*10)

cli.SendWithTime("simple", rs.H{
    "title": "this a timing message",
}, time.Now().Add(time.Minute * 5))

Receive message

package main

import (
	"encoding/json"
	"github.com/eininst/flog"
	"github.com/eininst/rs"
	examples "github.com/eininst/rs/examples/redis"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
    cli := rs.New(examples.GetRedis(), rs.Config{
        //default configuration for receiving messages
        Receive: rs.ReceiveConfig{
            Work:       rs.Int(10),       //Per stream goroutine number,
            Timeout:    time.Second * 20, //Retry after timeout
            MaxRetries: rs.Int64(3),      //Max retries
            ReadCount:  rs.Int64(50),     //XReadGroup Count
            BlockTime:  time.Second * 20, //XReadGroup Block Time
        },
    })

    cli.Handler("simple", func(ctx *rs.Context) {
        defer ctx.Ack()
        flog.Info(ctx.JSON.Raw)
    })

    cli.Receive(rs.Rctx{
        Stream: "simple",
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received simple msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "test",
        Group:      "group1",
        MaxRetries: nil, //no retries
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received test msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "test",
        Group:      "group2",
        MaxRetries: nil, //no retries
        Handler: func(ctx *rs.Context) {
            defer ctx.Ack()
            jstr, _ := json.Marshal(ctx.Msg.Values)
            flog.Info("received test msg:", string(jstr))
        },
    })

    cli.Receive(rs.Rctx{
        Stream:     "order_status_change",
        Work:       rs.Int(20),
        Timeout:    time.Second * 120,
        MaxRetries: rs.Int64(6),
        Handler: func(ctx *rs.Context) {
	    defer ctx.Ack()
            orderId := ctx.Msg.Values["order_id"]
            flog.Info("received order_status_change msg:", orderId)
        },
    })

    cli.Listen()
}
2022/08/29 21:10:04 [RS] Stream "simple" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group1" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "test:group2" working... # BlockTime=20s MaxRetries=3 ReadCount=50 Timeout=20s Work=10
2022/08/29 21:10:04 [RS] Stream "order_status_change" working... # BlockTime=20s MaxRetries=6 ReadCount=50 Timeout=2m0s Work=20
2022/08/29 21:10:20 [INFO] receive.go:31 received simple msg: {"title":"this a simple message"}
2022/08/29 21:10:20 [INFO] receive.go:53 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:42 received test msg: {"something":"hello word"}
2022/08/29 21:10:20 [INFO] receive.go:65 received order_status_change msg: 100

Graceful Shutdown

go func () {
    quit := make(chan os.Signal)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
    <-quit
    
    cli.Shutdown()
    flog.Info("Graceful Shutdown")
}()

cli.Listen()

See examples

License

MIT

# Packages

No description provided by the author

# Functions

Bool 复制 bool 对象,并返回复制体的指针.
No description provided by the author
Float32 复制 float32 对象,并返回复制体的指针.
Float64 复制 float64 对象,并返回复制体的指针.
No description provided by the author
No description provided by the author
Int32 复制 int64 对象,并返回复制体的指针.
Int64 复制 int64 对象,并返回复制体的指针.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
String 复制 string 对象,并返回复制体的指针.
Time 复制 time.Time 对象,并返回复制体的指针.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author