package
0.0.0-20210318024954-d9e4b8ca2e42
Repository: https://github.com/pangpanglabs/goutils.git
Documentation: pkg.go.dev

# README

goutils/kafka

Wrapper of sarama

Getting Started

Producer

producer, err := kafka.NewProducer(brokers, topic, func(c *sarama.Config) {
        c.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
        c.Producer.Compression = sarama.CompressionGZIP     // Compress messages
        c.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

})
if err != nil {
        return err
}

msg := map[string]interface{}{
        "time": time.Now(),
        "idx":  i,
        "msg":  rand.Int(),
}
if err := producer.Send(msg); err != nil {
        return err
}

Consumer

consumer, err := kafka.NewConsumer(brokers, topic, kafka.AllPartitions, sarama.OffsetNewest)
if err != nil {
        return err
}

messages, err := consumer.Messages()
if err != nil {
        return err
}

for m := range messages {
        var v interface{}
        d := json.NewDecoder(bytes.NewReader(m.Value))
        d.UseNumber()

        if err := d.Decode(&v); err != nil{
                return err
        }

        fmt.Printf("[Receive] Offset:%d\tPartition:%d\tValue:%v\n", m.Offset, m.Partition, v)
}

Consumer Group

consumer, err := kafka.NewConsumerGroup(groupId, brokers, topic)
if err != nil {
        return err
}

messages, err := consumer.Messages()
if err != nil {
        return err
}

for m := range messages {
        var v interface{}
        d := json.NewDecoder(bytes.NewReader(m.Value))
        d.UseNumber()

        if err := d.Decode(&v); err != nil{
                return err
        }

        fmt.Printf("[Receive] Offset:%d\tPartition:%d\tValue:%v\n", m.Offset, m.Partition, v)
}

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author