# README
go-confluent-kafka
A library provides consumer/producer to work with kafka, avro and schema registry in confluent
Producer
package main
import (
"flag"
"fmt"
"gitlab.com/ihsanul14/go-confluent-kafka"
"time"
)
var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"http://localhost:8081"}
var topic = "test"
func main() {
var n int
schema := `{
"type": "record",
"name": "Example",
"fields": [
{"name": "Id", "type": "string"},
{"name": "Type", "type": "string"},
{"name": "Data", "type": "string"}
]
}`
producer, err := kafka.NewAvroProducer(kafkaServers, schemaRegistryServers)
if err != nil {
fmt.Printf("Could not create avro producer: %s", err)
}
flag.IntVar(&n, "n", 1, "number")
flag.Parse()
for i := 0; i < n; i++ {
fmt.Println(i)
addMsg(producer, schema)
}
}
func addMsg(producer *kafka.AvroProducer, schema string) {
value := `{
"Id": "1",
"Type": "example_type",
"Data": "example_data"
}`
key := time.Now().String()
err := producer.Add(topic, schema, []byte(value))
fmt.Println(key)
if err != nil {
fmt.Printf("Could not add a msg: %s", err)
}
}
Producer with Plain SASL - SSL
package main
import (
"flag"
"fmt"
"gitlab.com/ihsanul14/go-confluent-kafka"
"time"
)
var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"https://localhost:8081"}
var topic = "test"
func main() {
var n int
schema := `{
"type": "record",
"name": "Example",
"fields": [
{"name": "Id", "type": "string"},
{"name": "Type", "type": "string"},
{"name": "Data", "type": "string"}
]
}`
var config = &AvroProducerConfig{
KafkaServers : kafkaServers,
SchemaRegistryServers: schemaRegistryServers,
SASL: &SASLConfig{
Username: "username",
Password: "password",
TLSConfig: &tls.Config{}
}
}
producer, err := kafka.NewAvroProducer(config)
if err != nil {
fmt.Printf("Could not create avro producer: %s", err)
}
flag.IntVar(&n, "n", 1, "number")
flag.Parse()
for i := 0; i < n; i++ {
fmt.Println(i)
addMsg(producer, schema)
}
}
func addMsg(producer *kafka.AvroProducer, schema string) {
value := `{
"Id": "1",
"Type": "example_type",
"Data": "example_data"
}`
key := time.Now().String()
err := producer.Add(topic, schema, []byte(value))
fmt.Println(key)
if err != nil {
fmt.Printf("Could not add a msg: %s", err)
}
}
Consumer
package main
import (
"fmt"
"gitlab.com/ihsanul14/go-confluent-kafka"
"github.com/bsm/sarama-cluster"
)
var kafkaServers = []string{"localhost:9092"}
var schemaRegistryServers = []string{"http://localhost:8081"}
var topic = "test"
func main() {
consumerCallbacks := kafka.ConsumerCallbacks{
OnDataReceived: func(msg kafka.Message) {
fmt.Println(msg)
},
OnError: func(err error) {
fmt.Println("Consumer error", err)
},
OnNotification: func(notification *cluster.Notification) {
fmt.Println(notification)
},
}
consumer, err := kafka.NewAvroConsumer(kafkaServers, schemaRegistryServers, topic, "consumer-group", consumerCallbacks)
if err != nil {
fmt.Println(err)
}
consumer.Consume()
}
References
- Kafka dangkaka
- Kafka sarama
- Encodes and decodes Avro data goavro
- Consumer group sarama-cluster
- schema-registry
- gitlab.com/mfahry/go-confluent-kafka
# Functions
avroConsumer is a basic consumer to interact with schema registry, avro and kafka.
NewAvroProducer is a basic producer to interact with schema registry, avro and kafka.
No description provided by the author
No description provided by the author
NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times.
NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses.
# Structs
No description provided by the author
No description provided by the author
No description provided by the author
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance.
No description provided by the author
Error holds more detailed information about errors coming back from schema registry.
No description provided by the author
No description provided by the author
SchemaRegistryClient is a basic http client to interact with schema registry.
# Interfaces
SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry.