modulepackage
1.5.4
Repository: https://github.com/trendyol/kafka-cronsumer.git
Documentation: pkg.go.dev
# README
Kafka C[r]onsumer


Description
Kafka Cronsumer is mainly used for retry/exception strategy management. It works based on cron expression and consumes messages in a timely manner with the power of auto pause and concurrency.
For details check our blog post
If you need a whole consumer lifecycle with exception management, check Kafka Konsumer
How Kafka Cronsumer Works
When to use it?
- Iteration-based back-off strategies are applicable
- Messages could be processed in an eventually consistent state
- Max retry exceeded messages could be ignored and send to dead letter topic
- To increase consumer resiliency
- To increase consumer performance with concurrency
When to avoid?
- Messages should be processed in order
- Messages should be certainly processed (we discard messages if max retry is exceeded)
- Messages should be committed (we use auto-commit interval for increasing performance)
- Messages with TTL (Time to Live)
Guide
Installation
go get github.com/Trendyol/kafka-cronsumer@latest
Examples
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Don't forget its cron based :)
Single Consumer
func main() {
// ...
var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
Single Consumer With Dead Letter
func main() {
// ...
var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return errors.New("error occurred")
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
Multiple Consumers
func main() {
// ...
var firstConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
return nil
}
first := cronsumer.New(firstCfg, firstConsumerFn)
first.Start()
var secondConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
return nil
}
second := cronsumer.New(secondCfg, secondConsumerFn)
second.Start()
// ...
}
Single Consumer With Metric collector
func main() {
// ...
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
return errors.New("err occurred")
}
c := cronsumer.New(config, consumeFn)
StartAPI(*config, c.GetMetricCollectors()...)
c.Start()
// ...
}
func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
// ...
f := fiber.New(
fiber.Config{},
)
metricMiddleware, err := NewMetricMiddleware(cfg, f, metricCollectors...)
f.Use(metricMiddleware)
// ...
}
Configurations
config | description | default | example |
---|---|---|---|
logLevel | Describes log level, valid options are debug , info , warn , and error | info | |
metricPrefix | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer . Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current . So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer | |
consumer.clientId | see doc | ||
consumer.cron | Cron expression when exception consumer starts to work at | */1 * * * * | |
consumer.backOffStrategy | Define consumer backoff strategy for retry topics | fixed | exponential, linear |
consumer.duration | Work duration exception consumer actively consuming messages | NonStopWork (zero duration) | 20s, 15m, 1h, NonStopWork (zero duration) |
consumer.brokers | broker address | ||
consumer.topic | Exception topic names | exception-topic | |
consumer.groupId | Exception consumer group id | exception-consumer-group | |
consumer.maxRetry | Maximum retry value for attempting to retry a message | 3 | |
consumer.concurrency | Number of goroutines used at listeners | 1 | |
consumer.verifyTopicOnStartup | it checks existence of the given retry topic on the kafka cluster. | false | |
consumer.minBytes | see doc | 1 | |
consumer.maxBytes | see doc | 1 MB | |
consumer.maxWait | see doc | 10s | |
consumer.commitInterval | see doc | 1s | |
consumer.heartbeatInterval | see doc | 3s | |
consumer.sessionTimeout | see doc | 30s | |
consumer.rebalanceTimeout | see doc | 30s | |
consumer.startOffset | see doc | earliest | |
consumer.retentionTime | see doc | 24h | |
consumer.skipMessageByHeaderFn | Function to filter messages based on headers, return true if you want to skip the message | nil | |
producer.clientId | see doc | ||
producer.brokers | Broker address if it is not given, uses consumer.Brokers addr | consumer.Brokers addr | |
producer.batchSize | see doc | 100 | |
producer.batchTimeout | see doc | 1s | |
producer.balancer | see doc | leastBytes | |
sasl.enabled | It enables sasl authentication mechanism | false | |
sasl.authType | Currently we only support SCRAM | "" | |
sasl.username | SCRAM username | "" | |
sasl.password | SCRAM password | "" | |
sasl.rootCAPath | see doc | "" | |
sasl.intermediateCAPath | "" | ||
sasl.rack | see doc | "" |
Exposed Metrics
Metric Name | Description | Value Type |
---|---|---|
kafka_cronsumer_retried_messages_total | Total number of retried messages. | Counter |
kafka_cronsumer_discarded_messages_total | Total number of discarded messages. | Counter |
Contribute
Use issues for everything
- For a small change, just send a PR.
- For bigger changes open an issue for discussion before sending a PR.
- PR should have:
- Test case
- Documentation
- Example (If it makes sense)
- You can also contribute by:
- Reporting issues
- Suggesting new features or enhancements
- Improve/fix documentation
Please adhere to this project's code of conduct
.
Maintainers
Code of Conduct
Contributor Code of Conduct. By participating in this project you agree to abide by its terms.
Libraries Used For This Project
Additional References
# Packages
No description provided by the author
# Functions
New returns the newly created kafka consumer instance.