Categorygithub.com/Financial-Times/kafka-client-go/v3
modulepackage
3.2.0-dev-schema-registry
Repository: https://github.com/financial-times/kafka-client-go.git
Documentation: pkg.go.dev

# README

kafka-client-go

CircleCI Go Report Card Coverage Status

Description

Library for producing and consuming messages directly from Kafka.

Only perseverant connections (for both producer & consumer) are provided i.e. establishing connections to Kafka is retried until successful.

Attempting to consume or produce messages or to check connectivity will fail if connection hasn't been established yet.

The library is NOT using Zookeeper to connect to Kafka under the hood.

Usage

Importing:

    import "github.com/Financial-Times/kafka-client-go/v3"

Producer

Creating a producer:

    config := kafka.ProducerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        Topic:                   "", // Topic to publish to 
        ConnectionRetryInterval: 0,  // Duration between each retry for establishing connection
    }

    logger := logger.NewUPPLogger(...)
    
    producer := kafka.NewProducer(config, logger)

The connection to Kafka is started in a separate go routine when creating the producer.

Sending a message:

    headers := map[string]string{}
    body := ""
    message := kafka.NewFTMessage(headers, body)
    
    err := producer.SendMessage(message)
    // Error handling

The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:

   err := producer.ConnectivityCheck()
   // Error handling

Connection should be closed by the client:

    err := producer.Close()
    // Error handling

Consumer

Creating a consumer:

    config := kafka.ConsumerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        ConsumerGroup:           "", // Unique name of a consumer group
        ConnectionRetryInterval: 0,  // Duration between each retry for establishing connection
}

    topics := []*kafka.Topic{
        kafka.NewTopic(""),                             // Topic to consume from
        kafka.NewTopic("", kafka.WithLagTolerance(50)), // Topic to consume from with custom lag tolerance used for monitoring
    }
	
    logger := logger.NewUPPLogger(...)

    consumer := kafka.NewConsumer(config, topics, logger)

Consuming messages:

Consumer groups are lazily initialized i.e. establishing connection to Kafka is done within Start().

    handler := func(message kafka.FTMessage) {
        // Message handling
    }
    
    go consumer.Start(handler) // Blocking until connection is established

The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:

   err := consumer.ConnectivityCheck()
   // Error handling

The health of the consumer process is also being monitored and its status can be accessed:

   err := consumer.MonitorCheck()
   // Error handling

Connections should be closed by the client:

    err := consumer.Close()
    // Error handling

Testing

    go test --race -v ./...

NB: Some tests in this project require a local Kafka (port 29092). Use the -short flag in order to omit those.

# Functions

DefaultConsumerOptions returns a new sarama configuration with predefined default settings.
DefaultProducerOptions creates a new Sarama producer configuration with default values.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
WithLagTolerance sets custom lag tolerance threshold used for monitoring.

# Constants

LagTechnicalSummary is used as technical summary in consumer monitoring healthchecks.
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author

# Structs

Consumer which will keep trying to reconnect to Kafka on a specified interval.
No description provided by the author
No description provided by the author
Producer which will keep trying to reconnect to Kafka on a specified interval.
No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author