# README
kafka
- 在入口,比如 main.go 里面隐式导入kafka包路径
import _ "github.com/fankane/go-utils/plugin/queue/kafka"
- 在运行文件根目录下的 system_plugin.yaml 文件(没有则新建一个)里面添加如下内容
plugins:
queue: # 插件类型
kafka:
producers:
default: # producer 名
send_type: sync # producer 发生类型 [sync, async]
addrs:
- 192.168.0.93:9092
p2:
send_type: async
addrs:
- 192.168.0.93:9092
consumers:
c1: # consumer 名称
addrs:
- 192.168.0.93:9092
topics:
- test1
- test2
group_id: test_group # 不填会默认生成一个随机值
concurrency_consume: false # 并发消费
concurrency_max: 100 # 并发数,当concurrency_consume=true时有效,不填默认1000
offset_initial: -1 # [可选] offset 初始值,当 consumer 时生效
reset_offset_info: # [可选] 设置使用 consumerGroup 消费时的初始offset值
- topic: test1
offset: 0 # -1 newest,
set_for_all: true # 是否对所有 partition 生效
- topic: test2
offset: -1
set_for_all: false
partitions_setting: # 当 set_for_all = false 时生效
- partition: 0
offset: 2
- 使用方式
- 3.1 生产者使用
// 默认生产者发送消息
DefaultProducer.SendMessage("topic name", []byte("key")), []byte("value"))
// 获取配置文件里面 p2 对应的生产者发送消息
GetProducer("p2").SendMessage("topic name", []byte("key")), []byte("value"))
- 3.2 消费者使用
// 给配置文件里面 c1 对应的消费者注册handler方法
// 当有消息过来时,会调用注册的 function并执行
RegisterHandler("c1", func(ctx context.Context, key, value []byte) error {
fmt.Println(fmt.Sprintf("topic:%s, partition:%d, offset:%d, timestamp:%s", Topic(ctx), Partition(ctx), Offset(ctx), Timestamp(ctx)),
"business key:", string(key), "value:", string(value))
return nil
})
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Variables
No description provided by the author
No description provided by the author
# Structs
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Interfaces
No description provided by the author
# Type aliases
No description provided by the author