# README
concurrency kafka consumer
Go versions
kafka-go requires Go version 1.15 or later.
Producer
p := kafka.NewProducer(kafka.ProducerConfig{
Version: "",
Brokers: []string{broker},
})
_, _, err := p.Publish(context.Background(), topic, "key", []byte(string("val")))
Consumer
// config
singleConf := kafka.ConsumerConfig{
Version: "",
Brokers: []string{"127.0.0.1:9092"},
Group: "test-group",
Topic: "test",
CacheCapacity: 100,
ConnectTimeout: time.Millisecond * time.Duration(5000),
}
batchConf := kafka.BatchConsumerConf{
CacheCapacity: 100,
Consumers: 4,
Processors: 4,
}
consumer := kafka.NewConsumer(singleConf, nil)
bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
log.Info("receive msg:", "value", data)
time.Sleep(time.Millisecond * 500)
return nil
}))
bc.Start()
trace interceptor
imports(
"github.com/oofpgDLD/kafka-go"
"github.com/oofpgDLD/kafka-go/trace"
)
// new consumer with trace interceptor
func newConsumer(singleConf kafka.ConsumerConfig, batchConf kafka.BatchConsumerConf) {
consumer := kafka.NewConsumer(singleConf, nil)
bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
log.Info("receive msg:", "value", data)
time.Sleep(time.Millisecond * 500)
return nil
}), xkafka.WithBatchConsumerInterceptors(trace.ConsumeInterceptor))
}
// new producer with trace interceptor
func newProducer() {
p := kafka.NewProducer(kafka.ProducerConfig{
Version: "",
Brokers: []string{broker},
}, xkafka.WithProducerInterceptors(trace.ProducerInterceptor))
}
Test
[kafka producer and consumer test | kafka 消费者和生产者测试]
License
kafka-go is under the MIT license. See the LICENSE file for details.
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
WithBatchConsumerInterceptors returns a ServerOption that sets the Interceptor for the producer.
No description provided by the author
No description provided by the author
WithProducerInterceptors returns a ServerOption that sets the Interceptor for the producer.
# Structs
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Interfaces
No description provided by the author
No description provided by the author
A ProducerOption sets options such as interceptor etc.
# Type aliases
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author