Categorygithub.com/Financial-Times/kafka-client-go/v4
modulepackage
4.3.0-dev-schema-registry-poc
Repository: https://github.com/financial-times/kafka-client-go.git
Documentation: pkg.go.dev

# README

kafka-client-go

CircleCI Go Report Card Coverage Status

Description

Library for producing and consuming messages directly from Kafka.

The library is NOT using Zookeeper to connect to Kafka under the hood.

Usage

Importing:

    import "github.com/Financial-Times/kafka-client-go/v4"

Producer

Creating a producer:

    config := kafka.ProducerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        Topic:                   "", // Topic to publish to 
    }
	
    producer, err := kafka.NewProducer(config) 
    // Error handling

Failing to establish a connection to Kafka will result in an error.

Sending a message:

    headers := map[string]string{}
    body := ""
    message := kafka.NewFTMessage(headers, body)
    
    err := producer.SendMessage(message)
    // Error handling

The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:

   err := producer.ConnectivityCheck()
   // Error handling

Connections should be closed by the client:

    err := producer.Close()
    // Error handling

Consumer

Creating a consumer:

    config := kafka.ConsumerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        ConsumerGroup:           "", // Unique name of a consumer group
    }

    topics := []*kafka.Topic{
        kafka.NewTopic(""),                             // Topic to consume from
        kafka.NewTopic("", kafka.WithLagTolerance(50)), // Topic to consume from with custom lag tolerance used for monitoring
    }
	
    logger := logger.NewUPPLogger(...)

    consumer, err := kafka.NewConsumer(config, topics, logger)
    // Error handling

Failing to establish a connection to Kafka will result in an error.

Consuming messages:

    handler := func(message kafka.FTMessage) {
        // Message handling
    }
    
    consumer.Start(handler)

The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:

   err := consumer.ConnectivityCheck()
   // Error handling

The health of the consumer process is also being monitored and its status can be accessed:

   err := consumer.MonitorCheck()
   // Error handling

Connections should be closed by the client:

    err := consumer.Close()
    // Error handling

Testing

    go test --race -v ./...

NB: Some tests in this project require a local Kafka (port 29092). Use the -short flag in order to omit those.

# Functions

DefaultConsumerOptions returns a new sarama configuration with predefined default settings.
DefaultProducerOptions creates a new Sarama producer configuration with default values.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
WithLagTolerance sets custom lag tolerance threshold used for monitoring.

# Constants

LagTechnicalSummary is used as technical summary in consumer monitoring healthchecks.
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author