repositorypackage
1.0.4
Repository: https://github.com/chsendev/go-rabbitmq.git
Documentation: pkg.go.dev
# README
go-rabbitmq
基于amqp的封装,除了一些基础功能(声明交换机、队列、发送消息、消费消息),还封装了一些高级功能:
- 生产者确认
- 消费者确认
- 消息多拦截器
- 延迟消息
- 统一消息处理
- 消费者断开重连
- 内置连接池
- ...
安装
go get github.com/chsendev/go-rabbitmq
基础功能
初始化
rmq.Init("amqp://test:[email protected]:5672//test")
声明资源
方式一(常用)
// 声明名称为go-demo类型为Direct的交换机
// 与demo.queue1队列进行绑定,binding key为q1
// 与demo.queue2队列进行绑定,binding key为q2、q3
engine := rmq.New().
Binding("go-demo", mq.Direct, "demo.queue1", "q1").
Binding("go-demo", mq.Direct, "demo.queue2", "q2", "q3")
方式二
engine := rmq.New().
Exchange("test.direct", mq.Direct). // 声明交换机
Queue("test.queue1"). // 声明队列
BindingKey("k1", "k2") // 声明test.direct和test.queue1的binging key
- 可只声明某个交换机或者某个队列
- 声明BindingKey之前必须声明交换机和声明队列
监听消息
engine.Listen("test.queue1", func (ctx *rmq.Context) error {
var u User
if err := ctx.ShouldBind(&u); err != nil {
return err
}
/// ...
})
获取Error
if engine.Error() != nil {
panic(engine.Error())
}
发送消息
m := make(map[string]any)
m["name"] = "jack"
m["age"] = "10"
if err := rmq.Publish(context.Background(), "go-demo", "q1", m).Error(); err != nil {
fmt.Println(err)
}
高级功能
消费者拉取消息限制
有时候需要限制每个消费者一次性拉取消息的条数,如果设置得当的话,可以起到能者多劳的效果(性能较好的消费者能够消费更多的消息)
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithPrefetch(1))
设置消费者Ack策略
框架默认提供了三种策略:
- none:自动进行Ack,无论消息是否消费成功
- auto(默认):如果Handlers处理成功,则进行Ack,否则Nack
- manual:完全由开发者手动调用ctx.Ack进行Ack
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeAuto))
设置框架日志
框架使用zap作为日志的管理,默认的级别为Info,若需要调整可使用WithLogLevel进行设置
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithLogLevel("debug"))
连接重试策略
当与rabbitmq通行的channel断开链接时,框架内部会进行重试。
- initialInterval(默认1s):失败后的初始等待时间
- multiplier(默认2):失败后下次的等待时长倍数,下次等待时长 = initialInterval * multiplier
- maxAttempts(默认3):最大重试次数
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithRetry(time.Second, 2, 3))
生产者确认
- Publish Confirm
若rabbitmq未在3秒内返回ack消息,则返回errorif err := rmq.Publish(context.Background(), "go-demo", "q3", m, publish.WithConfirm(time.Second*3)); err != nil { fmt.Println(err) }
- Publish Return
// 监听Publish Return的通道 rmq.ListenNotifyReturn(func(msg *amqp.Return) { fmt.Println(msg) }) if err := rmq.Publish(context.Background(), "go-demo123", "q1", m, publish.WithNotifyReturn()); err != nil { panic(err) }
消费者确认
框架默认提供了三种策略:none、auto、manual,如果设置manual,需要开发者手动调用ctx.Ack进行Ack
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeManual))
engine := rmq.New().Binding("go-demo", mq.Direct, "demo.queue1", "q1")
engine.Listen("demo.queue1", func (ctx *rmq.Context) error {
// ...
err := ctx.Ack() // 或ctx.Nack()
// ...
})
if engine.Error() != nil {
panic(engine.Error())
}
消息拦截器
有时候消息可能需要经过多个处理器(handler)进行处理,例如:自定义消息的重试次数、消息消费统一错误处理等 自定义消息的重试次数:
type User struct {
Name string
}
var messageDb = make(map[string]int)
// 前置检查
func preCheck(ctx *rmq.Context) error {
num := messageDb[ctx.GetMessageId()]
// 限制重试三次
if num >= 3 {
fmt.Println("达到重试次数")
ctx.Abort()
return ctx.Ack()
}
messageDb[ctx.GetMessageId()] = num + 1
return nil
}
// 业务逻辑
func logic(ctx *rmq.Context) error {
var u User
if err := ctx.ShouldBind(&u); err != nil {
return err
}
fmt.Println("获取到的消息:", u)
// 手动触发异常
return errors.New("手动触发的错误")
}
func TestRetry(t *testing.T) {
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeAuto))
engine := rmq.New().Binding("go-demo", mq.Direct, "demo.queue1", "q1")
g1 := engine.Group(preCheck)
{
g1.Listen("demo.queue1", logic)
}
if engine.Error() != nil {
panic(engine.Error())
}
time.Sleep(time.Hour)
}
func TestRetryPublish(t *testing.T) {
u := &User{Name: "jack"}
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeNone))
err := rmq.Publish(context.Background(), "go-demo", "q1", u)
fmt.Println(err)
}
延迟消息
延时消息:可以允许消息在指定时间后再发送给消费者,此功能需要额外的插件支持: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
func TestListenDelay(t *testing.T) {
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithLogLevel("debug"))
engine := rmq.New().BindingWithDelay("delay-demo", mq.Direct, "demo.queue1", "q1")
engine.Listen("demo.queue1", func(ctx *rmq.Context) error {
var s string
if err := ctx.ShouldBind(&s); err != nil {
return err
}
log.Println("receive message: ", s)
return nil
})
if engine.Error() != nil {
panic(engine.Error())
}
time.Sleep(time.Hour)
}
func TestPublishDelay(t *testing.T) {
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithLogLevel("debug"))
msg := "test delay message"
err := rmq.Publish(context.Background(), "delay-demo", "q1", msg, publish.WithDelay(time.Second*10))
if err != nil {
panic(err)
}
log.Println("publish message: ", msg)
}
自定义消息头
消息头如同HTTP请求头,生产者在发送消息时附加,消费者可从中获取额外信息,增强消息传递的灵活性和数据描述的丰富性。
func TestHeaderReceive(t *testing.T) {
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeAuto), rmq.WithLogLevel("debug"))
engine := rmq.New().Binding("go-demo", mq.Direct, "demo.queue1", "q1")
engine.Listen("demo.queue1", func(ctx *rmq.Context) error {
var u string
if err := ctx.ShouldBind(&u); err != nil {
return err
}
fmt.Println(u)
fmt.Println(ctx.GetHeader("my-header"))
return nil
})
if engine.Error() != nil {
panic(engine.Error())
}
time.Sleep(time.Hour)
}
func TestHeaderPublish(t *testing.T) {
u := &User{Name: "jack"}
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeNone))
err := rmq.Publish(context.Background(), "go-demo", "q1", u, publish.WithHeaders(map[string]any{"my-header": "hello"}))
fmt.Println(err)
}