package
0.0.0-20210318024954-d9e4b8ca2e42
Repository: https://github.com/pangpanglabs/goutils.git
Documentation: pkg.go.dev
# README
goutils/kafka
Wrapper of sarama
Getting Started
Producer
producer, err := kafka.NewProducer(brokers, topic, func(c *sarama.Config) {
c.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
c.Producer.Compression = sarama.CompressionGZIP // Compress messages
c.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
})
if err != nil {
return err
}
msg := map[string]interface{}{
"time": time.Now(),
"idx": i,
"msg": rand.Int(),
}
if err := producer.Send(msg); err != nil {
return err
}
Consumer
consumer, err := kafka.NewConsumer(brokers, topic, kafka.AllPartitions, sarama.OffsetNewest)
if err != nil {
return err
}
messages, err := consumer.Messages()
if err != nil {
return err
}
for m := range messages {
var v interface{}
d := json.NewDecoder(bytes.NewReader(m.Value))
d.UseNumber()
if err := d.Decode(&v); err != nil{
return err
}
fmt.Printf("[Receive] Offset:%d\tPartition:%d\tValue:%v\n", m.Offset, m.Partition, v)
}
Consumer Group
consumer, err := kafka.NewConsumerGroup(groupId, brokers, topic)
if err != nil {
return err
}
messages, err := consumer.Messages()
if err != nil {
return err
}
for m := range messages {
var v interface{}
d := json.NewDecoder(bytes.NewReader(m.Value))
d.UseNumber()
if err := d.Decode(&v); err != nil{
return err
}
fmt.Printf("[Receive] Offset:%d\tPartition:%d\tValue:%v\n", m.Offset, m.Partition, v)
}
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author