Categorygithub.com/bigzhuk/kafka-adapter
modulepackage
1.8.8
Repository: https://github.com/bigzhuk/kafka-adapter.git
Documentation: pkg.go.dev

# README

Kafka Adapter

usage: import "github.com/iddqdeika/kafka-adapter"

Adapter provides possibility to work with kafka topics for writing messages and reading with Ack/Nack functionality.

Two main interfaces in Queue (instance of adapter) and Message

Queue.PutWithCtx(ctx context.Context, topic string, data []byte) error - puts given data into topic, returns error when ctx was closed

Queue.Put(topic string, data []byte) error - the same method, but with context.Background() ctx.

Queue.GetWithCtx(ctx context.Context, topic string) (Message, error) - gets single message from topic, returns error when ctx was closed

Queue.Get(topic string) (Message, error) - the same method, but with context.Background() ctx.

Message.Data() []byte - returns kafka message body

Message.Ack() error - acquires message, incrementing kafka's partition offset

Message.Nack() error - unacquires message, with reader re-establishing, to enable other consumers within consumer group to read this message. NOTE: to enable this functionality - summ of Concurrency param on all Consumers with same ConsumerGroupID must be higher than topic's partition count

Important:

If ConsumerGroupID was not set in config (or equals to empty string), then each message would be auto-acked, and acquiring (msg.Ack()/msg.Nack()) would return "unavailable when GroupID is not set" error.

Example:

import (
    queue "github.com/iddqdeika/kafka-adapter"
    "log"
 )
 
 func example() {
    topic := "my_topic"
    consumerGroup := "my_group"
    broker := "my-kafka-host.lan:9092"
    messageToSend := []byte("some message")
    
    cfg := queue.KafkaCfg{
		Concurrency:       100,
		QueueToReadNames:  []string{topic},
		QueueToWriteNames: []string{topic},
		Brokers:           []string{broker},
		ConsumerGroupID:   consumerGroup,
		DefaultTopicConfig: struct {
			NumPartitions     int
			ReplicationFactor int
		}{NumPartitions: 1, ReplicationFactor: 1},
	}

	log.Printf("starting adapter")
	q, err := queue.FromStruct(cfg, queue.DefaultLogger)
	if err != nil {
		log.Fatalf("cant init kafka adapter: %v", err)
	}
	defer q.Close()
	log.Printf("adapter started")

	log.Printf("putting message")
	err = q.Put(topic, messageToSend)
	if err != nil {
		log.Fatalf("cant put message in topic: %v", err)
	}
	log.Printf("message put")

	log.Printf("getting message")
	msg, err := q.Get(topic)
	if err != nil {
		log.Fatalf("cant get message from topic: %v", err)
	}
	message := string(msg.Data())
	log.Printf("message got: %v", message)

	log.Printf("acking message")
	err = msg.Ack()
	if err != nil {
		log.Fatalf("cant ack message: %v", err)
	}
	log.Printf("message acked")
	q.Close()
	log.Printf("adapter closed")
 }

Additional:

FromConfig(cfg Config, logger Logger) - constructor, which uses interface Config

LoadJsonConfig(filename string) - Config implementation, loading data from json file

DefaultLogger - default implementation of logger, just forwards all errors to fmt.Printf method

# Packages

No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
KV - пара ключ-значение, которые можно использовать в качестве данных сообщения kafka ключ может быть пустым, но надо учитывать, что в топиках с компакцией по ключу, а не по дате, в таком случае.
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author