modulepackage
0.0.0-20180809090225-01ce283b732b
Repository: https://github.com/cilium/kafka.git
Documentation: pkg.go.dev
# README
Kafka
Kafka is Go client library for Apache Kafka server, released under MIT license.
Kafka provides minimal abstraction over wire protocol, support for transparent failover and easy to use blocking API.
- godoc generated documentation,
- code examples
Example
Write all messages from stdin to kafka and print all messages from kafka topic to stdout.
package main
import (
"bufio"
"log"
"os"
"strings"
"github.com/optiopay/kafka"
"github.com/optiopay/kafka/proto"
)
const (
topic = "my-messages"
partition = 0
)
var kafkaAddrs = []string{"localhost:9092", "localhost:9093"}
// printConsumed read messages from kafka and print them out
func printConsumed(broker kafka.Client) {
conf := kafka.NewConsumerConf(topic, partition)
conf.StartOffset = kafka.StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
}
for {
msg, err := consumer.Consume()
if err != nil {
if err != kafka.ErrNoData {
log.Printf("cannot consume %q topic message: %s", topic, err)
}
break
}
log.Printf("message %d: %s", msg.Offset, msg.Value)
}
log.Print("consumer quit")
}
// produceStdin read stdin and send every non empty line as message
func produceStdin(broker kafka.Client) {
producer := broker.Producer(kafka.NewProducerConf())
input := bufio.NewReader(os.Stdin)
for {
line, err := input.ReadString('\n')
if err != nil {
log.Fatalf("input error: %s", err)
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
msg := &proto.Message{Value: []byte(line)}
if _, err := producer.Produce(topic, partition, msg); err != nil {
log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err)
}
}
}
func main() {
conf := kafka.NewBrokerConf("test-client")
conf.AllowTopicCreation = true
// connect to kafka cluster
broker, err := kafka.Dial(kafkaAddrs, conf)
if err != nil {
log.Fatalf("cannot connect to kafka cluster: %s", err)
}
defer broker.Close()
go printConsumed(broker)
produceStdin(broker)
}
# Packages
No description provided by the author
Package kafkatest provides mock objects for high level kafka interface.
Package proto provides kafka binary protocol implementation.
# Functions
Dial connects to any node from a given list of kafka addresses and after successful metadata fetch, returns broker.
Merge is merging consume result of any number of consumers into single stream and expose them through returned multiplexer.
NewBrokerConf returns the default broker configuration.
NewConsumerConf returns the default consumer configuration.
NewHashProducer wraps given producer and return DistributingProducer that publish messages to kafka, computing partition number from message key hash, using fnv hash and [0, numPartitions) range.
NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.
NewProducerConf returns a default producer configuration.
NewRandomProducer wraps given producer and return DistributingProducer that publish messages to kafka, randomly picking partition number from range [0, numPartitions).
NewRoundRobinProducer wraps given producer and return DistributingProducer that publish messages to kafka, choosing destination partition from cycle build from [0, numPartitions) range.
# Constants
StartOffsetNewest configures the consumer to fetch messages produced after creating the consumer.
StartOffsetOldest configures the consumer to fetch starting from the oldest message available.
# Variables
ErrClosed is returned as result of any request made using closed connection.
ErrMxClosed is returned as a result of closed multiplexer consumption.
ErrNoData is returned by consumers on Fetch when the retry limit is set and exceeded.
# Structs
Broker is an abstract connection to kafka cluster, managing connections to all kafka nodes.
BrokerConf represents the configuration of a broker.
ConsumerConf represents the configuration of a consumer.
Mx is multiplexer combining into single stream number of consumers.
OffsetCoordinatorConf represents the configuration of an offset coordinator.
ProducerConf represents the configuration of a producer.
# Interfaces
BatchConsumer is the interface that wraps the ConsumeBatch method.
Client is the interface implemented by Broker.
Consumer is the interface that wraps the Consume method.
DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.
Logger is general logging interface that can be provided by popular logging frameworks.
OffsetCoordinator is the interface which wraps the Commit and Offset methods.
Producer is the interface that wraps the Produce method.