package
2.9.9
Repository: https://github.com/abulo/ratel.git
Documentation: pkg.go.dev

# README

rabbitmq: An easy golang amqp client

Introduction

rabbitmq 扩展了 github.com/streadway/amqp 包,在 amqp 包的基础上实现了网络断线重连、消息发送失败重试的能力。

由于消息的发送和接收,依赖 Channel 的创建,Channel 的创建依赖与 Connection 的连接。如果断开网络连接,Connection 就会断开。 但是 Channel 并不知道是谁创建了自己。所以通常情况下,我们有两种断线重连的方案:

  • 一种是 Connection 断开后,Channel 自己重新获取 Connection。此种方式是最简单的实现方案。 但此法的问题是,如果 Channel 数量极其庞大, 每个 Channel 都会创建 Connection 并连接。当网络状况不大好的时候,可能会有数量极其庞大的 Channel 反复尝试重连, 导致服务器资源占用会暴增,甚至加剧网络的阻塞。但其实我们只需要一个 Connection 去判断网络是否能连接、已连接。
  • 一种是 Connection 断开后,Connection 自己重连。如果重连不成功,Channel 不做任何操作。如果重连成功,自动重跑注册的操作。 而如果使用断线重连发送消息,将在 Connection 连接成功后继续发送需要发送的消息。此种方式实现比较复杂, 但避免了服务器资源占用以及加剧网络阻塞的问题。

rabbitmq 采用的是后者。

Feature

目前实现了

  • 断线重连
  • 消息重发
  • 同步的消息发送确认

Usage

package main

import (
  "log"
  "time"
  "github.com/abulo/ratel/v2/client/rabbitmq"
  "github.com/streadway/amqp"
)

func main() {
  // -- 创建 Connection 并连接服务器 --
  conn, err := rabbitmq.Dial("amqp://guest:guest@localhost:5672/", rabbitmq.DefaultTimesRetry())
  if err != nil {
    log.Fatal(err)
  }
  defer conn.Close()

  // -- 发送消息 --
  go func() {
    producer := conn.Producer()
    err = producer.Send("amq.direct", "key.direct",
      []byte("producer.Send() | "+time.Now().Format("2006-01-02 15:04:05")),
      rabbitmq.DefaultSendOpts())
    if err != nil {
      log.Fatal(err)
    }
  }()

  // -- 接收消息 --
  go func() {
    consumer := conn.Consumer()
    consumer.Receive(
      "queue.direct",
      rabbitmq.NewReceiveOptsBuilder().SetAutoAck(false).Build(),
      &rabbitmq.AbsReceiveListener{
        ConsumerMethod: func(d *amqp.Delivery) (brk bool) {
          log.Println("queue.direct ", d.DeliveryTag, " ", string(d.Body))
          err := d.Ack(false)
          if err != nil {
            log.Println(err)
          }
          return
        },
        FinishMethod: func(err error) {
          if err != nil {
            // 处理错误
            log.Fatal(err)
          }
          // defer xxx.close() // 关闭资源操作等
        },
      })
  }()

  time.Sleep(time.Second * 10)
}

小建议:

如无特殊需求,我们在全局只需创建一个 Connection,所有消息的发送和接收都使用 Producer 和 Consumer 处理。

Examples

消费

生产

Unresolved

  • 消息异步发送与确认
  • 重复消费
  • 网络分区状态下,找不到队列时消息重发(ReturnListener)
  • 消费者 ack 重试

License

LGPL

# Functions

AdderGenerator 累加器生成器。生成的累加器从 0 开始累加,delta 表示需要累加的数字.
DefaultBindOpts returns the default queue binding options.
DefaultCtxRetry ...
DefaultQueueDeclareOpts holds the default queue declaration.
DefaultReceiveOpts 将 ReceiveOpts.autoAck 默认设置为 true.
DefaultSendOpts 默认消息发送选项:消息无格式,非持久化,启用默认重试配置(DefaultTimesRetry).
DefaultTimesRetry 创建一个默认的重试配置:总是重试,且间隔三秒.
Dial 如果 retryable 为 nil,则表示不启用断线重连.
NewConnection retryable 如果为 nil,则使用 emptyRetryable 替换。emptyRetryable 不会尝试重试操作。.
NewCtxRetry ...
NewDefaultSAdder 获取一个从 0 开始累加,每次加 1 的累加器。返回一个 uint64 的字符串.
NewQueueBindOptsBuilder returns a new QueueBindOptsBuilder.
NewQueueBuilder creates a new QueueBuilder instance.
NewQueueDeclareOptsBuilder creates a new QueueDeclareOptsBuilder.
No description provided by the author
NewSendOptsBuilder creates a new SendOptsBuilder.
NewTimesRetry 创建根据次数结束重试的配置.
NewTimesRetryBuilder ..
SAdderGenerator ..

# Constants

No description provided by the author
No description provided by the author

# Variables

MessageJSONPersistent JSON、持久化消息工厂方法.
MessageJSONTransient JSON、非持久化消息工厂方法.
MessagePlainPersistent 无格式、持久化消息工厂方法.
MessagePlainTransient 无格式、非持久化消息工厂方法.

# Structs

AbsReceiveListener 的抽象实现。 如果 ConsumerMethod 为 nil 或不赋值,将 panic; 如果 FinishMethod 为 nil 或不赋值,则默认不做任何操作。.
Channel represents a channel.
Connection amqp 连接。 Connection 创建后不会直接连接服务器,而是要调用 Dial 后才会执行连接服务器操作.
No description provided by the author
CtxRetry ...
No description provided by the author
Queue represents a queue of messages.
QueueBindOpts is the queue binding options.
QueueBindOptsBuilder is the builder for QueueBindOptsBuilder.
QueueBuilder ...
QueueDeclareOpts returns the list of queued operations.
QueueDeclareOptsBuilder is the builder for QueueDeclareOpts.
ReceiveOpts 消息接收选项。 如果 autoAck 设为 false,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去 消息。因此调用者应该主动调用 (*amqp.Delivery).Ack 确认消费,防止消息在内存(或者磁盘)中积累。 如果 autoAck 设为 true,消息会被服务器默认为已消费,可能会导致消费者无法处理数据时造成数据丢失。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。 consumerTag 用于唯一识别一个消费者,如果不填可自动生成。 其他参数如果没有特别需求,默认不填即可。.
No description provided by the author
SendOpts 消息发送选项。 mandatory 设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列, 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当该选项设置为 false 时, 出现上述情形,则消息直接被丢弃。 immediate 设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者, 那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。 RabbitMQ 3.0版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL 和 DLX 的方法替代。 messageFactory 如果未设置该选项,则默认使用 MessagePlainTransient 生产消息。 retryable 如果不设置该选项,表示不启用消息重发功能。.
SendOptsBuilder ...
TimesRetry ..
TimesRetryBuilder ..

# Interfaces

ReceiveListener ...
Retryable ..

# Type aliases

Adder 累加器。每次执行累加一定数额,返回一个 uint64。.
如果没有消息则该方法阻塞等待;否则本方法会被持续调用, 直到主动停止消费(即本方法返回 true)。 返回值 brk 表示是否 break,即在循环消费过程中是否需要终止消费。.
MessageFactory 消息工厂方法。默认提供了如: MessagePlainTransient, MessagePlainPersistent, MessageJSONPersistent 等 在内的工厂方法。 如果没有需要的工厂方法,则需要调用者自己提供对应的工厂方法。.
No description provided by the author
No description provided by the author
SAdder 返回一个 uint64 的字符串.
No description provided by the author