Categorygithub.com/Trendyol/kafka-cronsumer
modulepackage
1.5.4
Repository: https://github.com/trendyol/kafka-cronsumer.git
Documentation: pkg.go.dev

# README

Kafka C[r]onsumer Go Reference Go Report Card

Description

Kafka Cronsumer is mainly used for retry/exception strategy management. It works based on cron expression and consumes messages in a timely manner with the power of auto pause and concurrency.

For details check our blog post

If you need a whole consumer lifecycle with exception management, check Kafka Konsumer

How Kafka Cronsumer Works

How Kafka Cronsumer Works

When to use it?

  • Iteration-based back-off strategies are applicable
  • Messages could be processed in an eventually consistent state
  • Max retry exceeded messages could be ignored and send to dead letter topic
  • To increase consumer resiliency
  • To increase consumer performance with concurrency

When to avoid?

  • Messages should be processed in order
  • Messages should be certainly processed (we discard messages if max retry is exceeded)
  • Messages should be committed (we use auto-commit interval for increasing performance)
  • Messages with TTL (Time to Live)

Guide

Installation

go get github.com/Trendyol/kafka-cronsumer@latest

Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want. Don't forget its cron based :)

Single Consumer

func main() {
  // ...
  var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("consumer > Message received: %s\n", string(message.Value))
    return nil
  }

  c := cronsumer.New(kafkaConfig, consumeFn)
  c.Run()
}

Single Consumer With Dead Letter

func main() {
  // ...
  var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("consumer > Message received: %s\n", string(message.Value))
    return errors.New("error occurred")
  }

  c := cronsumer.New(kafkaConfig, consumeFn)
  c.Run()
}

Multiple Consumers

func main() {
  // ...
  var firstConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
    return nil
  }
  first := cronsumer.New(firstCfg, firstConsumerFn)
  first.Start()

  var secondConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
    return nil
  }
  second := cronsumer.New(secondCfg, secondConsumerFn)
  second.Start()
  // ...    
}

Single Consumer With Metric collector

func main() {
  // ...
  var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
    return errors.New("err occurred")
  }
  
  c := cronsumer.New(config, consumeFn)
  StartAPI(*config, c.GetMetricCollectors()...)
  c.Start()
  // ...    
}

func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
  // ...
  f := fiber.New(
    fiber.Config{},
  )
  
  metricMiddleware, err := NewMetricMiddleware(cfg, f, metricCollectors...)
  
  f.Use(metricMiddleware)
  // ...
}

Configurations

configdescriptiondefaultexample
logLevelDescribes log level, valid options are debug, info, warn, and errorinfo
metricPrefixMetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_currentkafka_cronsumer
consumer.clientIdsee doc
consumer.cronCron expression when exception consumer starts to work at*/1 * * * *
consumer.backOffStrategyDefine consumer backoff strategy for retry topicsfixedexponential, linear
consumer.durationWork duration exception consumer actively consuming messagesNonStopWork (zero duration)20s, 15m, 1h, NonStopWork (zero duration)
consumer.brokersbroker address
consumer.topicException topic namesexception-topic
consumer.groupIdException consumer group idexception-consumer-group
consumer.maxRetryMaximum retry value for attempting to retry a message3
consumer.concurrencyNumber of goroutines used at listeners1
consumer.verifyTopicOnStartupit checks existence of the given retry topic on the kafka cluster.false
consumer.minBytessee doc1
consumer.maxBytessee doc1 MB
consumer.maxWaitsee doc10s
consumer.commitIntervalsee doc1s
consumer.heartbeatIntervalsee doc3s
consumer.sessionTimeoutsee doc30s
consumer.rebalanceTimeoutsee doc30s
consumer.startOffsetsee docearliest
consumer.retentionTimesee doc24h
consumer.skipMessageByHeaderFnFunction to filter messages based on headers, return true if you want to skip the messagenil
producer.clientIdsee doc
producer.brokersBroker address if it is not given, uses consumer.Brokers addrconsumer.Brokers addr
producer.batchSizesee doc100
producer.batchTimeoutsee doc1s
producer.balancersee docleastBytes
sasl.enabledIt enables sasl authentication mechanismfalse
sasl.authTypeCurrently we only support SCRAM""
sasl.usernameSCRAM username""
sasl.passwordSCRAM password""
sasl.rootCAPathsee doc""
sasl.intermediateCAPath""
sasl.racksee doc""

Exposed Metrics

Metric NameDescriptionValue Type
kafka_cronsumer_retried_messages_totalTotal number of retried messages.Counter
kafka_cronsumer_discarded_messages_totalTotal number of discarded messages.Counter

Contribute

Use issues for everything

  • For a small change, just send a PR.
  • For bigger changes open an issue for discussion before sending a PR.
  • PR should have:
    • Test case
    • Documentation
    • Example (If it makes sense)
  • You can also contribute by:
    • Reporting issues
    • Suggesting new features or enhancements
    • Improve/fix documentation

Please adhere to this project's code of conduct.

Maintainers

Code of Conduct

Contributor Code of Conduct. By participating in this project you agree to abide by its terms.

Libraries Used For This Project

Additional References

# Packages

No description provided by the author

# Functions

New returns the newly created kafka consumer instance.