Categorygithub.com/ricardo-ch/go-kafka/v2
modulepackage
2.2.0
Repository: https://github.com/ricardo-ch/go-kafka.git
Documentation: pkg.go.dev

# README

GO-KAFKA

Build Status Coverage Status Go Report Card

Go-kafka provides an easy way to use kafka listeners and producers with only a few lines of code. The listener is able to consume from multiple topics, and will execute a separate handler for each topic.

Quick start

Simple consumer

// topic-specific handlers
var handler1 kafka.Handler
var handler2 kafka.Handler

// map your topics to their handlers
handlers := map[string]kafka.Handler{
    "topic-1": handler1,
    "topic-2": handler2,
}

// define your listener
kafka.Brokers = []string{"localhost:9092"}
listener, _ := kafka.NewListener("my-consumer-group", handlers)
defer listener.Close()

// listen and enjoy
errc <- listener.Listen(ctx)

Simple producer

// define your producer
kafka.Brokers = []string{"localhost:9092"}
producer, _ := kafka.NewProducer()

// send your message
message := &sarama.ProducerMessage{
	Topic: "my-topic",
	Value: sarama.StringEncoder("my-message"),
}
_ = producer.Produce(message)

Features

  • Create a listener on multiple topics
  • Retry policy on message handling
  • Create a producer
  • Prometheus instrumenting

Consumer error handling

You can customize the error handling of the consumer. And if there's still an error after all possible retries (3 by default), the error is logged and the faulty event can be pushed to a deadletter topic.

Deadletter

By default, events that have exceeded the maximum number of retries will be pushed to a dead letter topic. This behaviour can be disabled through the PushConsumerErrorsToTopic property.

PushConsumerErrorsToTopic = false

The name of the deadletter topic is dynamically generated based on the original topic name and the consumer group. For example, if the original topic is my-topic and the consumer group is my-consumer-group, the deadletter topic will be my-consumer-group-my-topic-error. This pattern can be overridden through the ErrorTopicPattern property.

ErrorTopicPattern = "custom-deadletter-topic"

Retries

By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds). This can be configured through the following properties:

  • ConsumerMaxRetries
  • DurationBeforeRetry

If you want to achieve a blocking retry pattern (ie. continuously retrying until the event is successfully consumed), you can set ConsumerMaxRetries to InfiniteRetries (-1).

If you want to not retry specific errors, you can wrap them in a kafka.ErrNonRetriable error before returning them, or return a kafka.ErrNonRetriable directly.

// This error will not be retried
err := errors.New("my error")
return errors.Wrap(kafka.ErrNonRetriable, err.Error())

// This error will also not be retried
return kafka.ErrNonRetriable

Omitting specific errors

In certain scenarios, you might want to omit some errors. For example, you might want to discard outdated events that are not relevant anymore. Such events would increase a separate, dedicated metric instead of the error one, and would not be retried. To do so, wrap the errors that should lead to omitted events in a ErrEventOmitted, or return a kafka.ErrEventOmitted directly.

// This error will be omitted
err := errors.New("my error")
return errors.Wrap(kafka.ErrEventOmitted, err.Error())

// This error will also be omitted
return kafka.ErrEventOmitted

Instrumenting

Metrics for the listener and the producer can be exported to Prometheus. The following metrics are available:

Metric nameLabelsDescription
kafka_consumer_record_consumed_totalkafka_topic, consumer_groupNumber of messages consumed
kafka_consumer_record_latency_secondskafka_topic, consumer_groupLatency of consuming a message
kafka_consumer_record_omitted_totalkafka_topic, consumer_groupNumber of messages omitted
kafka_consumer_record_error_totalkafka_topic, consumer_groupNumber of errors when consuming a message
kafka_consumergroup_current_message_timestampkafka_topic, consumer_group, partition, typeTimestamp of the current message being processed. Type can be either of LogAppendTime or CreateTime.
kafka_producer_record_send_totalkafka_topicNumber of messages sent
kafka_producer_dead_letter_created_totalkafka_topicNumber of messages sent to a dead letter topic
kafka_producer_record_error_totalkafka_topicNumber of errors when sending a message

To activate the tracing on go-Kafka:

// define your listener
listener, _ := kafka.NewListener(brokers, "my-consumer-group", handlers, kafka.WithInstrumenting())
defer listener.Close()

// Instances a new HTTP server for metrics using prometheus 
go func() {
	httpAddr := ":8080" 
	mux.Handle("/metrics", promhttp.Handler())
	errc <- http.ListenAndServe(httpAddr, mux)
}()

Default configuration

Configuration of consumer/producer is opinionated. It aim to resolve simply problems that have taken us by surprise in the past. For this reason:

  • the default partioner is based on murmur2 instead of the one sarama use by default
  • offset retention is set to 30 days
  • initial offset is oldest

License

go-kafka is licensed under the MIT license. (http://opensource.org/licenses/MIT)

Contributing

Pull requests are the way to help us here. We will be really grateful.

# Packages

No description provided by the author
No description provided by the author

# Functions

DefaultTracing implements TracingFunc It fetches opentracing headers from the kafka message headers, then creates a span using the opentracing.GlobalTracer() usage: `listener, err = kafka.NewListener(brokers, appName, handlers, kafka.WithTracing(kafka.DefaultTracing))`.
DeserializeContextFromKafkaHeaders fetches tracing headers from json encoded carrier and returns the context.
GetContextFromKafkaMessage fetches tracing headers from the kafka message.
GetKafkaHeadersFromContext fetch tracing metadata from context and returns them in format []RecordHeader.
MurmurHasher creates murmur2 hasher implementing hash.Hash32 interface.
NewConsumerMetricsService creates a layer of service that add metrics capability.
No description provided by the author
NewJVMCompatiblePartitioner creates a Sarama partitioner that uses the same hashing algorithm as JVM Kafka clients.
NewListener creates a new instance of Listener.
NewProducer creates a new producer that uses the default sarama client.
No description provided by the author
SerializeKafkaHeadersFromContext fetches tracing metadata from context and serialize it into a json map[string]string.
WithDeadletterProducerInstrumenting adds the instrumenting layer on a deadletter producer.
WithInstrumenting adds the instrumenting layer on a listener.
WithProducerInstrumenting adds the instrumenting layer on a producer.
WithTracing accepts a TracingFunc to execute before each message.

# Constants

InfiniteRetries is a constant to define infinite retries.
No description provided by the author
No description provided by the author

# Variables

Brokers is the list of Kafka brokers to connect to.
Config is the sarama (cluster) config used for the consumer and producer.
ConsumerMaxRetries is the maximum number of time we want to retry to process an event before throwing the error.
DurationBeforeRetry is the duration we wait between process retries.
No description provided by the author
No description provided by the author
ErrorLogger is the instance of a StdLogger interface.
ErrorTopicPattern is the error topic name pattern.
Logger is the instance of a StdLogger interface.
PushConsumerErrorsToTopic is a boolean to define if messages in error have to be pushed to an error topic.

# Structs

ConsumerMetricsService object represents consumer metrics.
ProducerMetricsService is a service that provides metrics for the producer.

# Interfaces

Listener is able to listen multiple topics with one handler by topic.
No description provided by the author
StdLogger is used to log messages.

# Type aliases

Handler that handle received kafka messages.
Handlers defines a handler for a given topic.
ListenerOption add listener option.
ProducerHandler is a function that handles the production of a message.
ProducerOption is a function that is passed to the producer constructor to configure it.
TracingFunc is used to create tracing and/or propagate the tracing context from each messages to the go context.