Categorygithub.com/chaos-star/queue-mate
repositorypackage
1.0.9-beta
Repository: https://github.com/chaos-star/queue-mate.git
Documentation: pkg.go.dev

# README

QueueMate

基于Rabbit MQ的消息队列助手

1. 初始化

rabbitInst := mate.NewRabbit(
    "127.0.0.1", //IP
    5672,        //端口
    "username",  //用户名
    "password",  //密码
    "/vhost",    //vhost
    30,          //最大空闲连接数 小于等于0时默认10
    3,           //单连接资源有效时长,单位:小时 小于等于0时默认1
    10,          //获取连接超时时间,单位:秒 小于等于0时默认10
    nil,         //日志对象为nil时,则为控制台输出
)

注: 日志对象必须是实现了以下方法
Info(string)
Error(string)

2. 连接池

QueueMate 连接池以vhost为单位复用连接,以配置maxIdle为上限,如无空闲连接则等待,连接回收后 重新分配,以配置maxLifeTime为最大生命周期,如过期则释放tcp资源,重新生成资源放入连接池,循环往复。

3.生产者

client := rabbitInst..NewClient()
err := client.Publish(
	client.Fanout,  //消息模式 Fanout、Topic、Direct
	"x_test", //交换机名称
	"x_test_route", //路由,Fanout时此参数按空处理
	[]byte(`{"name":"jack"}`) //发布消息体
	10,  //延时时间 单位:秒 当需延时N秒后执行传此参数,立即执行可不传此参数,请正确使用此参数
	)


4.消费者

//实现下面两个方法
// 1. RunConsume()(error) 消费者启动方法
// 2. Process([]byte)(error) 消费数据逻辑方法

Example:
type ExampleConsume struct {
}


func (t *ExampleConsume) RunConsume() (err error) {
client := global.GVA_MQ.NewClient()
err = client.Use(t).Retry(3).Receive(client.Fanout, "ex_test_exchange", nil, "qx_test_queue")
return
}
//注:
//client.Use(t)//使用当前消息处理方法,即业务逻辑,不选则返回错误
//client.Retry(3)//设置重试次数,不设置则不重试
//client.Receive( //非延时消费方法 参数与延时方法参数相同
//client.DelayReceive(//延时消费方法
      
参数1: client.Fanout    //消息模式 Fanout、Topic、Direct
参数2: "ex_test_change" //交换机
参数3: nil,             //路由 Fanout 此参数无效
参数4: "qx_test_queue"  // 消费队列名
// )


func (t *ExampleConsume) Process(body []byte) (err error) {
//在此处处理业务逻辑,当err返回nil时Ack,反之记录错误日志,如设置重试次数,N次失败后Ack并记录每次错误日志
return
}
	

5.启动消费任务

baseInst := mate.MQBase{}
baseInst.Add(new(consume.MyBook)) //添加消费任务
baseInst.Blocking().Run() //以阻塞方式启动任务,如需在其他阻塞任务中启动时则无需开启阻塞模式

//注:
//	baseInst.Add([]...MQBaseConsume) //添加任务,支持批量添加
//	baseIsnt.Blocking() //阻塞模式
//	baseInst.Run() //启动任务