# README
rocketmq
- 在入口,比如 main.go 里面隐式导入kafka包路径
import _ "github.com/fankane/go-utils/plugin/queue/rocketmq"
- 在运行文件根目录下的 system_plugin.yaml 文件(没有则新建一个)里面添加如下内容
plugins:
queue: # 插件类型
rocketmq:
producers:
default: # producer 名
name_server:
- 192.168.99.45:9876
name_space: "hufan_ns"
group_name: "test_group"
retries: 3
send_msg_timeout_ms: 10000 # 发送超时,单位:ms
p2:
name_server:
- 192.168.99.45:9876
name_space: "hufan_ns2"
group_name: "test_group2"
retries: 3
send_msg_timeout_ms: 10000 # 发送超时,单位:ms
consumers:
c1: # consumer 名称
topics:
- test_topic_hf3
async_consume: false #异步消费
name_server:
- 192.168.99.45:9876
name_space: "hufan_ns"
group_name: "tt"
consume_from: 2
consume_timestamp: "20240117144500"
filter_history_for_init: true
- 使用方式
- 3.1 生产者使用
// 默认生产者发送消息
DefaultProducer.SendSync(context.Background(), "topicName", []byte("hello"))
// 获取配置文件里面 p2 对应的生产者发送消息
GetProducer("p2").SendSync(context.Background(), "topicName", []byte("hello"))
- 3.2 消费者使用
// 给配置文件里面 c1 对应的消费者注册handler方法
// 当有消息过来时,会调用注册的 function并执行
RegisterHandler("c1", func(ctx context.Context, value []byte) error {
fmt.Println("value:", string(value))
return nil
})
- ConsumeFrom说明
- ConsumeFromLastOffset, ConsumeFromFirstOffset, ConsumeFromTimestamp
只在 ConsumeGroup 第一次启动时有效, 后续再启动接着上次消费的进度开始消费
- 如果想第一次启动 consume Group 的时候,也能过滤历史信息,可以配置
filter_history_for_init: true
此配置,根据borntimestamp, 来过滤,
1. 如果是 ConsumeFromLastOffset,则过滤掉创建消费者之前的信息
2. 如果是 ConsumeFromTimestamp,则过滤掉consume_timestamp指定时间之前的信息,如果没有配置consume_timestamp,默认consume_timestamp为创建消费者前30分钟
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
*
delay level definition:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/.
No description provided by the author
No description provided by the author
# Constants
No description provided by the author
No description provided by the author
No description provided by the author
# Variables
No description provided by the author
No description provided by the author
No description provided by the author
# Structs
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Type aliases
No description provided by the author
No description provided by the author
No description provided by the author