# README
kafka
kafka
is a kafka client library based on sarama encapsulation, producer supports synchronous and asynchronous production messages, consumer supports group and partition consumption messages, fully compatible with the usage of sarama.
Example of use
Producer
Synchronous Produce
package main
import (
"fmt"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
// default config are requiredAcks=WaitForAll, partitionerConstructor=NewHashPartitioner, returnSuccesses=true
p, err := kafka.InitSyncProducer(addrs, kafka.SyncProducerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer p.Close()
// Case 1: send sarama.ProducerMessage type message
msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L18
partition, offset, err := p.SendMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("partition:", partition, "offset:", offset)
// Case 2: send multiple types message
for _, data := range testData {
partition, offset, err := p.SendData(testTopic, data)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("partition:", partition, "offset:", offset)
}
}
Asynchronous Produce
package main
import (
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
p, err := kafka.InitAsyncProducer(addrs,
kafka.AsyncProducerWithVersion(sarama.V3_6_0_0),
kafka.AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
kafka.AsyncProducerWithFlushMessages(50),
kafka.AsyncProducerWithFlushFrequency(time.milliseconds*500),
)
if err != nil {
fmt.Println(err)
return
}
defer p.Close()
// Case 1: send sarama.ProducerMessage type message, supports multiple messages
msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L18
err = p.SendMessage(msg, msg)
if err != nil {
fmt.Println(err)
return
}
// Case 2: send multiple types message, supports multiple messages
err = p.SendData(testTopic, testData...)
if err != nil {
fmt.Println(err)
return
}
<-time.After(time.Second) // wait for all messages to be sent
}
Consumer
Consume Group
package main
import (
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
testTopic := "my-topic"
groupID := "my-group"
addrs := []string{"localhost:9092"}
// default config are offsetsInitial=OffsetOldest, autoCommitEnable=true, autoCommitInterval=time.Second
cg, err := kafka.InitConsumerGroup(addrs, groupID, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer cg.Close()
// Case 1: consume default handle message
go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn) // handleMsgFn is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L19
// Case 2: consume custom handle message
go cg.ConsumeCustom(context.Background(), []string{testTopic}, &myConsumerGroupHandler{ // myConsumerGroupHandler is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L26
autoCommitEnable: cg.autoCommitEnable,
})
<-time.After(time.Minute) // wait exit
}
Consume Partition
package main
import (
"fmt"
"github.com/IBM/sarama"
"github.com/go-dev-frame/sponge/pkg/kafka"
"time"
)
func main() {
testTopic := "my-topic"
addrs := []string{"localhost:9092"}
c, err := kafka.InitConsumer(addrs, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
if err != nil {
fmt.Println(err)
return
}
defer c.Close()
// Case 1: consume one partition
go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn) // // handleMsgFn is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L19
// Case 2: consume all partition
c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)
<-time.After(time.Minute) // wait exit
}
Topic Backlog
Obtain the total backlog of the topic and the backlog of each partition.
package main
import (
"fmt"
"github.com/go-dev-frame/sponge/pkg/kafka"
)
func main() {
m, err := kafka.InitClientManager(brokerList, groupID)
if err != nil {
panic(err)
}
defer m.Close()
total, backlogs, err := m.GetBacklog(topic)
if err != nil {
panic(err)
}
fmt.Println("total backlog:", total)
for _, backlog := range backlogs {
fmt.Printf("partation=%d, backlog=%d, next_consume_offset=%d\n", backlog.Partition, backlog.Backlog, backlog.NextConsumeOffset)
}
}
# Functions
AsyncProducerWithClientID set clientID.
AsyncProducerWithConfig set custom config.
AsyncProducerWithFlushBytes set flushBytes.
AsyncProducerWithFlushFrequency set flushFrequency.
AsyncProducerWithFlushMessages set flushMessages.
AsyncProducerWithHandleFailed set handleFailedFn.
AsyncProducerWithPartitioner set partitioner.
AsyncProducerWithRequiredAcks set requiredAcks.
AsyncProducerWithReturnSuccesses set returnSuccesses.
AsyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
AsyncProducerWithVersion set kafka version.
AsyncProducerWithZapLogger set zapLogger.
ConsumerWithClientID set clientID.
ConsumerWithConfig set custom config.
ConsumerWithGroupStrategies set groupStrategies.
ConsumerWithOffsetsAutoCommitEnable set offsetsAutoCommitEnable.
ConsumerWithOffsetsAutoCommitInterval set offsetsAutoCommitInterval.
ConsumerWithOffsetsInitial set offsetsInitial.
ConsumerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
ConsumerWithVersion set kafka version.
ConsumerWithZapLogger set zapLogger.
InitAsyncProducer init async producer.
InitClientManager init client manager.
InitConsumer init consumer.
InitConsumerGroup init consumer group.
InitSyncProducer init sync producer.
SyncProducerWithClientID set clientID.
SyncProducerWithConfig set custom config.
SyncProducerWithPartitioner set partitioner.
SyncProducerWithRequiredAcks set requiredAcks.
SyncProducerWithReturnSuccesses set returnSuccesses.
SyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
SyncProducerWithVersion set kafka version.
# Structs
AsyncProducer is async producer.
Backlog info.
ClientManager client manager.
Consumer consume partition.
ConsumerGroup consume group.
Message is a message to be sent to a topic.
SyncProducer is a sync producer.
# Type aliases
AsyncProducerOption set options.
AsyncSendFailedHandlerFn is a function that handles failed messages.
ConsumerOption set options.
HandleMessageFn is a function that handles a message from a partition consumer.
ProducerMessage is sarama ProducerMessage.
SyncProducerOption set options.