# README
Usage
package main
import (
easyKafka "github.com/diazoxide/easy-kafka"
"log"
"os"
)
type MyMessageType struct {
From string `json:"from"`
Content string `json:"content"`
}
func main() {
logger := log.New(os.Stdout, "test", log.Ltime)
k := easyKafka.New[MyMessageType](
[]string{"kafka:9092"}, // KafkaUris,
"my-prefix", // Topics Prefix
"my-group", // Consumer Group
12, // Partitions
logger, // Logger
)
k.Produce( "my-topic", &MyMessageType{"test","test"} )
k.Consume([]string{"my-topic"}, func(message MyMessageType) error{
// Your code here
return nil
}, true)
// ...
}
Available methods
Consume
Produce
# Functions
BaseProducerInitialPartitionsCount sets initial partitions count.
BaseProducerWithErrorLogger sets error logger.
BaseProducerWithLogger sets logger.
BaseProducerWithWriterConfig sets writer config.
ConsumerConcurrency sets parallel tasks count.
ConsumerDynamicTopicsDiscovery enable dynamic topics discovery.
ConsumerDynamicTopicsDiscoveryInterval sets dynamic topics discovery interval.
ConsumerInitialPartitionsCount sets initial partitions count.
ConsumerTopicNamesExactMatch enable exact match for topic names.
ConsumerTopicNamesRegexMatch enable regex match for topic names.
ConsumerWithErrorLogger sets error logger.
ConsumerWithLogger sets logger.
ConsumerWithMaxBlockingTasks sets max blocking tasks.
ConsumerWithOnFailCommitHandler sets handler for commit error.
ConsumerWithReaderConfig sets reader config.
ConsumerWithReadMessageHandler sets handler for read message.
ConsumerWithTopicsListUpdatedHandler sets handler for topics list update.
ConsumerWithWrongMessageHandler sets handler for wrong message.
InitBaseProducer creates a new producer instance.
InitConsumer creates a new consumer instance.
InitProducer initializes a new Producer instance.
InitStream initializes a new Stream instance.
StreamWithConsumerOptions sets consumer options.
StreamWithErrorLogger sets error logger.
StreamWithLogger sets logger.
StreamWithParallelJobs sets parallel jobs count for stream consumer.
StreamWithProducerOptions sets producer options.
# Structs
BaseProducer is a wrapper around kafka.Writer.
Consumer is a wrapper around kafka.Reader.
No description provided by the author
Producer is a wrapper around BaseProducer.
Stream is a wrapper around Consumer and Producer.
# Type aliases
BaseProducerOption is a function that sets some option on the producer.
No description provided by the author
ConsumerHandler is a function that handles messages.
ConsumerOption is a function that sets some option.
No description provided by the author
ErrorHandler is a function that handles errors.
StreamOption is a function that modifies a Stream instance.
StreamRoutingRule is a function that modifies a message.