package
2.9.9
Repository: https://github.com/abulo/ratel.git
Documentation: pkg.go.dev

# README

rabbitmq: An easy golang amqp client

Introduction

rabbitmq 扩展了 github.com/streadway/amqp 包,在 amqp 包的基础上实现了网络断线重连、消息发送失败重试的能力。

由于消息的发送和接收,依赖 Channel 的创建,Channel 的创建依赖与 Connection 的连接。如果断开网络连接,Connection 就会断开。 但是 Channel 并不知道是谁创建了自己。所以通常情况下,我们有两种断线重连的方案:

  • 一种是 Connection 断开后,Channel 自己重新获取 Connection。此种方式是最简单的实现方案。 但此法的问题是,如果 Channel 数量极其庞大, 每个 Channel 都会创建 Connection 并连接。当网络状况不大好的时候,可能会有数量极其庞大的 Channel 反复尝试重连, 导致服务器资源占用会暴增,甚至加剧网络的阻塞。但其实我们只需要一个 Connection 去判断网络是否能连接、已连接。
  • 一种是 Connection 断开后,Connection 自己重连。如果重连不成功,Channel 不做任何操作。如果重连成功,自动重跑注册的操作。 而如果使用断线重连发送消息,将在 Connection 连接成功后继续发送需要发送的消息。此种方式实现比较复杂, 但避免了服务器资源占用以及加剧网络阻塞的问题。

rabbitmq 采用的是后者。

Feature

目前实现了

  • 断线重连
  • 消息重发
  • 同步的消息发送确认

Usage

package main

import (
  "log"
  "time"
  "github.com/abulo/ratel/v2/client/rabbitmq"
  "github.com/streadway/amqp"
)

func main() {
  // -- 创建 Connection 并连接服务器 --
  conn, err := rabbitmq.Dial("amqp://guest:guest@localhost:5672/", rabbitmq.DefaultTimesRetry())
  if err != nil {
    log.Fatal(err)
  }
  defer conn.Close()

  // -- 发送消息 --
  go func() {
    producer := conn.Producer()
    err = producer.Send("amq.direct", "key.direct",
      []byte("producer.Send() | "+time.Now().Format("2006-01-02 15:04:05")),
      rabbitmq.DefaultSendOpts())
    if err != nil {
      log.Fatal(err)
    }
  }()

  // -- 接收消息 --
  go func() {
    consumer := conn.Consumer()
    consumer.Receive(
      "queue.direct",
      rabbitmq.NewReceiveOptsBuilder().SetAutoAck(false).Build(),
      &rabbitmq.AbsReceiveListener{
        ConsumerMethod: func(d *amqp.Delivery) (brk bool) {
          log.Println("queue.direct ", d.DeliveryTag, " ", string(d.Body))
          err := d.Ack(false)
          if err != nil {
            log.Println(err)
          }
          return
        },
        FinishMethod: func(err error) {
          if err != nil {
            // 处理错误
            log.Fatal(err)
          }
          // defer xxx.close() // 关闭资源操作等
        },
      })
  }()

  time.Sleep(time.Second * 10)
}

小建议:

如无特殊需求,我们在全局只需创建一个 Connection,所有消息的发送和接收都使用 Producer 和 Consumer 处理。

Examples

消费

生产

Unresolved

  • 消息异步发送与确认
  • 重复消费
  • 网络分区状态下,找不到队列时消息重发(ReturnListener)
  • 消费者 ack 重试

License

LGPL