Categorygithub.com/ihsanul14/go-confluent-kafka
modulepackage
1.0.1
Repository: https://github.com/ihsanul14/go-confluent-kafka.git
Documentation: pkg.go.dev

# README

go-confluent-kafka

A library provides consumer/producer to work with kafka, avro and schema registry in confluent

Producer

package main

import (
	"flag"
	"fmt"
	"gitlab.com/ihsanul14/go-confluent-kafka"
	"time"
)

var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"http://localhost:8081"}
var topic = "test"

func main() {
	var n int
	schema := `{
				"type": "record",
				"name": "Example",
				"fields": [
					{"name": "Id", "type": "string"},
					{"name": "Type", "type": "string"},
					{"name": "Data", "type": "string"}
				]
			}`
	producer, err := kafka.NewAvroProducer(kafkaServers, schemaRegistryServers)
	if err != nil {
		fmt.Printf("Could not create avro producer: %s", err)
	}
	flag.IntVar(&n, "n", 1, "number")
	flag.Parse()
	for i := 0; i < n; i++ {
		fmt.Println(i)
		addMsg(producer, schema)
	}
}

func addMsg(producer *kafka.AvroProducer, schema string) {
	value := `{
		"Id": "1",
		"Type": "example_type",
		"Data": "example_data"
	}`
	key := time.Now().String()
	err := producer.Add(topic, schema, []byte(value))
	fmt.Println(key)
	if err != nil {
		fmt.Printf("Could not add a msg: %s", err)
	}
}

Producer with Plain SASL - SSL

package main

import (
	"flag"
	"fmt"
	"gitlab.com/ihsanul14/go-confluent-kafka"
	"time"
)

var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"https://localhost:8081"}
var topic = "test"

func main() {
	var n int
	schema := `{
				"type": "record",
				"name": "Example",
				"fields": [
					{"name": "Id", "type": "string"},
					{"name": "Type", "type": "string"},
					{"name": "Data", "type": "string"}
				]
			}`

	var config = &AvroProducerConfig{
		KafkaServers : kafkaServers,
		SchemaRegistryServers: schemaRegistryServers,
		SASL: &SASLConfig{
			Username: "username",
			Password: "password",
			TLSConfig: &tls.Config{}
		}
	}
	producer, err := kafka.NewAvroProducer(config)
	if err != nil {
		fmt.Printf("Could not create avro producer: %s", err)
	}
	flag.IntVar(&n, "n", 1, "number")
	flag.Parse()
	for i := 0; i < n; i++ {
		fmt.Println(i)
		addMsg(producer, schema)
	}
}

func addMsg(producer *kafka.AvroProducer, schema string) {
	value := `{
		"Id": "1",
		"Type": "example_type",
		"Data": "example_data"
	}`
	key := time.Now().String()
	err := producer.Add(topic, schema, []byte(value))
	fmt.Println(key)
	if err != nil {
		fmt.Printf("Could not add a msg: %s", err)
	}
}

Consumer

package main

import (
	"fmt"
	"gitlab.com/ihsanul14/go-confluent-kafka"
	"github.com/bsm/sarama-cluster"
)

var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"http://localhost:8081"}
var topic = "test"

func main() {
	consumerCallbacks := kafka.ConsumerCallbacks{
		OnDataReceived: func(msg kafka.Message) {
			fmt.Println(msg)
		},
		OnError: func(err error) {
			fmt.Println("Consumer error", err)
		},
		OnNotification: func(notification *cluster.Notification) {
			fmt.Println(notification)
		},
	}

	consumer, err := kafka.NewAvroConsumer(kafkaServers, schemaRegistryServers, topic, "consumer-group", consumerCallbacks)
	if err != nil {
		fmt.Println(err)
	}
	consumer.Consume()
}

References

# Functions

avroConsumer is a basic consumer to interact with schema registry, avro and kafka.
NewAvroProducer is a basic producer to interact with schema registry, avro and kafka.
No description provided by the author
No description provided by the author
NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times.
NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses.

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance.
No description provided by the author
Error holds more detailed information about errors coming back from schema registry.
No description provided by the author
No description provided by the author
SchemaRegistryClient is a basic http client to interact with schema registry.

# Interfaces

SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry.