Categorygithub.com/Kevinello/kafka-client
modulepackage
0.0.10
Repository: https://github.com/kevinello/kafka-client.git
Documentation: pkg.go.dev

# README

kafka-client

kafka-client is a high level wrapped kafka API for kafka consuming and producing based on kafka-go

Features

Kafka consumer

  • Customized Message Handler
  • Multi topics Consumption(customized get topic function)
  • Concurrent workerpool for consuming messages(flexible workerpool size)
  • Consume error collection(when there is too many error, consumer will cancel itself)
  • Consumption frequency detect(reconnect kafka when message frequency is too low)
  • Safe manual close
  • Asynchronous cancellations and timeouts using Go contexts
  • Custom logger and verbose option
  • SASL authentication
  • TLS authentication

Kafka producer

  • Asynchronously / Synchronously write message
  • Safe manual close
  • Asynchronous cancellations and timeouts using Go contexts
  • Custom logger and verbose option
  • SASL authentication
  • TLS authentication

Usage

For basic usage examples, refer to the system test cases in the test directory.

To run the system tests, you need to have a Kafka broker(set auto.create.topics.enable to true, or create topics unit-test-topic-1 ... unit-test-topic-10 before starting tests) and set environment variables below:

  • KAFKA_BOOTSTRAP: kafka broker address
  • KAFKA_SASL_USERNAME(optional): kafka sasl username
  • KAFKA_SASL_PASSWORD(optional): kafka sasl password

SASL & TLS

To use SASL or TLS, you need to set Mechanism or TLS in ConsumerConfig and ProducerConfig, use ConsumerConfig as an example:

package test

import (
	"context"
	"crypto/tls"
	"log"
	"os"

	kc "github.com/Kevinello/kafka-client"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/scram"
)

func main() {
	mechanism, _ := scram.Mechanism(scram.SHA512, "username", "password")
	config := kc.ConsumerConfig{
		GroupID: "test-group",
		MessageHandler: func(msg *kafka.Message, consumer *kc.Consumer) (err error) {
			log.Printf("message: %s\n", string(msg.Value))
			return
		},
		Mechanism: mechanism,
		TLS:       &tls.Config{},
	}
	consumer, _ := kc.NewConsumer(context.Background(), config)
	select {
	case <-consumer.Closed():
		os.Exit(1)
	}
}

Replace the username and password with your own, and fill the TLS with your own tls config.

Project Structure

project-structure

# Packages

No description provided by the author

# Functions

GetAllTopic function decorator for get all topics, return GetTopicsFunc @param kafkaBootstrap string @return GetTopicsFunc @author kevineluo @update 2023-03-15 03:14:57.
GetTopicReMatch function decorator for get topics with regex match, return GetTopicsFunc matches found (resTopics) and an err if applicable.
NewConsumer creates a new Kafka consumer.
NewProducer creates a new Kafka producer.

# Constants

MechanismSASLPlain SASL PLAINTEXT authentication @update 2024-01-03 06:54:55.
MechanismSASLSCRAMSHA256 SASL SCRAM-SHA-256 authentication @update 2024-01-03 06:55:11.
MechanismSASLSCRAMSHA512 SASL SCRAM-SHA-512 authentication @update 2024-01-03 06:55:17.

# Variables

ErrClosedConsumer error when try to close closed consumer @update 2023-03-15 02:03:09.
ErrClosedProducer error when try to close closed producer @update 2023-03-30 05:12:28.
ErrNoTopics GetTopicsFunc got no topic @update 2023-03-14 01:18:01.
ErrTooManyConsumeError consumer.ErrCount reach MaxConsumeErrorCount @update 2023-03-15 02:02:27.

# Structs

Consumer struct holds data related to the consumer @author kevineluo @update 2023-02-24 01:53:37.
No description provided by the author
Producer struct holds data related to the producer @author kevineluo @update 2023-03-15 10:30:44.
No description provided by the author

# Type aliases

GetTopicsFunc way to get needed topic(implemented and provided by user) @return topics []string @return err error @author kevineluo @update 2023-03-28 07:16:54.
MessageHandler function which handles received messages from the Kafka broker.