Categorygithub.com/chsendev/go-rabbitmq
modulepackage
1.0.4
Repository: https://github.com/chsendev/go-rabbitmq.git
Documentation: pkg.go.dev

# README

go-rabbitmq

基于amqp的封装,除了一些基础功能(声明交换机、队列、发送消息、消费消息),还封装了一些高级功能:

  • 生产者确认
  • 消费者确认
  • 消息多拦截器
  • 延迟消息
  • 统一消息处理
  • 消费者断开重连
  • 内置连接池
  • ...

安装

go get github.com/chsendev/go-rabbitmq

基础功能

初始化

rmq.Init("amqp://test:[email protected]:5672//test")

声明资源

方式一(常用)

// 声明名称为go-demo类型为Direct的交换机
// 与demo.queue1队列进行绑定,binding key为q1
// 与demo.queue2队列进行绑定,binding key为q2、q3
engine := rmq.New().
   Binding("go-demo", mq.Direct, "demo.queue1", "q1").
   Binding("go-demo", mq.Direct, "demo.queue2", "q2", "q3")

方式二

engine := rmq.New().
   Exchange("test.direct", mq.Direct). // 声明交换机
   Queue("test.queue1").  // 声明队列
   BindingKey("k1", "k2") // 声明test.direct和test.queue1的binging key
  1. 可只声明某个交换机或者某个队列
  2. 声明BindingKey之前必须声明交换机和声明队列

监听消息

engine.Listen("test.queue1", func (ctx *rmq.Context) error {
   var u User
   if err := ctx.ShouldBind(&u); err != nil {
	   return err
   }
   /// ...
})

获取Error

if engine.Error() != nil {
    panic(engine.Error())
}

发送消息

m := make(map[string]any)
m["name"] = "jack"
m["age"] = "10"
if err := rmq.Publish(context.Background(), "go-demo", "q1", m).Error(); err != nil {
    fmt.Println(err)
}

高级功能

消费者拉取消息限制

有时候需要限制每个消费者一次性拉取消息的条数,如果设置得当的话,可以起到能者多劳的效果(性能较好的消费者能够消费更多的消息)

rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithPrefetch(1))

设置消费者Ack策略

框架默认提供了三种策略:

  • none:自动进行Ack,无论消息是否消费成功
  • auto(默认):如果Handlers处理成功,则进行Ack,否则Nack
  • manual:完全由开发者手动调用ctx.Ack进行Ack
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeAuto))

设置框架日志

框架使用zap作为日志的管理,默认的级别为Info,若需要调整可使用WithLogLevel进行设置

rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithLogLevel("debug"))

连接重试策略

当与rabbitmq通行的channel断开链接时,框架内部会进行重试。

  • initialInterval(默认1s):失败后的初始等待时间
  • multiplier(默认2):失败后下次的等待时长倍数,下次等待时长 = initialInterval * multiplier
  • maxAttempts(默认3):最大重试次数
rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithRetry(time.Second, 2, 3))

生产者确认

  1. Publish Confirm
     if err := rmq.Publish(context.Background(), "go-demo", "q3", m, publish.WithConfirm(time.Second*3)); err != nil {
         fmt.Println(err)
     }
    
    若rabbitmq未在3秒内返回ack消息,则返回error
  2. Publish Return
     // 监听Publish Return的通道
     rmq.ListenNotifyReturn(func(msg *amqp.Return) {
     	fmt.Println(msg)
     })
    if err := rmq.Publish(context.Background(), "go-demo123", "q1", m, publish.WithNotifyReturn()); err != nil {
     	panic(err)
     }
    

消费者确认

框架默认提供了三种策略:none、auto、manual,如果设置manual,需要开发者手动调用ctx.Ack进行Ack

rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeManual))
engine := rmq.New().Binding("go-demo", mq.Direct, "demo.queue1", "q1")
engine.Listen("demo.queue1", func (ctx *rmq.Context) error {
   // ...
   err := ctx.Ack() // 或ctx.Nack()
   // ...
})

if engine.Error() != nil {
    panic(engine.Error())
}

消息拦截器

有时候消息可能需要经过多个处理器(handler)进行处理,例如:自定义消息的重试次数、消息消费统一错误处理等 自定义消息的重试次数:

type User struct {
	Name string
}

var messageDb = make(map[string]int)

// 前置检查
func preCheck(ctx *rmq.Context) error {
	num := messageDb[ctx.GetMessageId()]

	// 限制重试三次
	if num >= 3 {
		fmt.Println("达到重试次数")
		ctx.Abort()
		return ctx.Ack()
	}
	messageDb[ctx.GetMessageId()] = num + 1
	return nil
}

// 业务逻辑
func logic(ctx *rmq.Context) error {
	var u User
	if err := ctx.ShouldBind(&u); err != nil {
		return err
	}
	fmt.Println("获取到的消息:", u)
	// 手动触发异常
	return errors.New("手动触发的错误")
}

func TestRetry(t *testing.T) {
	rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeAuto))
	engine := rmq.New().Binding("go-demo", mq.Direct, "demo.queue1", "q1")

	g1 := engine.Group(preCheck)
	{
		g1.Listen("demo.queue1", logic)
	}

	if engine.Error() != nil {
		panic(engine.Error())
	}

	time.Sleep(time.Hour)
}

func TestRetryPublish(t *testing.T) {
	u := &User{Name: "jack"}
	rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeNone))
	err := rmq.Publish(context.Background(), "go-demo", "q1", u)
	fmt.Println(err)
}

延迟消息

延时消息:可以允许消息在指定时间后再发送给消费者,此功能需要额外的插件支持: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

func TestListenDelay(t *testing.T) {
	rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithLogLevel("debug"))
	engine := rmq.New().BindingWithDelay("delay-demo", mq.Direct, "demo.queue1", "q1")

	engine.Listen("demo.queue1", func(ctx *rmq.Context) error {
		var s string
		if err := ctx.ShouldBind(&s); err != nil {
			return err
		}
		log.Println("receive message: ", s)
		return nil
	})

	if engine.Error() != nil {
		panic(engine.Error())
	}

	time.Sleep(time.Hour)
}

func TestPublishDelay(t *testing.T) {
	rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithLogLevel("debug"))
	msg := "test delay message"
	err := rmq.Publish(context.Background(), "delay-demo", "q1", msg, publish.WithDelay(time.Second*10))
	if err != nil {
		panic(err)
	}
	log.Println("publish message: ", msg)
}

自定义消息头

消息头如同HTTP请求头,生产者在发送消息时附加,消费者可从中获取额外信息,增强消息传递的灵活性和数据描述的丰富性。

func TestHeaderReceive(t *testing.T) {
	rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeAuto), rmq.WithLogLevel("debug"))
	engine := rmq.New().Binding("go-demo", mq.Direct, "demo.queue1", "q1")
	engine.Listen("demo.queue1", func(ctx *rmq.Context) error {
		var u string
		if err := ctx.ShouldBind(&u); err != nil {
			return err
		}
		fmt.Println(u)
		fmt.Println(ctx.GetHeader("my-header"))
		return nil
	})
	if engine.Error() != nil {
		panic(engine.Error())
	}
	time.Sleep(time.Hour)
}

func TestHeaderPublish(t *testing.T) {
	u := &User{Name: "jack"}
	rmq.Init("amqp://test:[email protected]:5672//test", rmq.WithAckMode(config.AcknowledgeModeNone))
	err := rmq.Publish(context.Background(), "go-demo", "q1", u, publish.WithHeaders(map[string]any{"my-header": "hello"}))
	fmt.Println(err)
}

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

AutoAck auto ack.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author