modulepackage
0.0.0-20180312212505-339d90433610
Repository: https://github.com/dropbox/kafka.git
Documentation: pkg.go.dev
# README
Kafka
Kafka is Go client library for Apache Kafka server, released under MIT license. Originally based on the great client from: https://github.com/optiopay/kafka
Kafka provides minimal abstraction over wire protocol, support for transparent failover and easy to use blocking API.
- godoc generated documentation,
- code examples
Example
Write all messages from stdin to kafka and print all messages from kafka topic to stdout.
package main
import (
"bufio"
"log"
"os"
"strings"
"github.com/dropbox/kafka"
"github.com/dropbox/kafka/proto"
)
const (
topic = "my-messages"
partition = 0
)
var kafkaAddrs = []string{"localhost:9092", "localhost:9093"}
// printConsumed read messages from kafka and print them out
func printConsumed(broker kafka.Client) {
conf := kafka.NewConsumerConf(topic, partition)
conf.StartOffset = kafka.StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
}
for {
msg, err := consumer.Consume()
if err != nil {
if err != kafka.ErrNoData {
log.Printf("cannot consume %q topic message: %s", topic, err)
}
break
}
log.Printf("message %d: %s", msg.Offset, msg.Value)
}
log.Print("consumer quit")
}
// produceStdin read stdin and send every non empty line as message
func produceStdin(broker kafka.Client) {
producer := broker.Producer(kafka.NewProducerConf())
input := bufio.NewReader(os.Stdin)
for {
line, err := input.ReadString('\n')
if err != nil {
log.Fatalf("input error: %s", err)
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
msg := &proto.Message{Value: []byte(line)}
if _, err := producer.Produce(topic, partition, msg); err != nil {
log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err)
}
}
}
func main() {
// connect to kafka cluster
broker, err := kafka.Dial(kafkaAddrs, kafka.NewBrokerConf("test-client"))
if err != nil {
log.Fatalf("cannot connect to kafka cluster: %s", err)
}
defer broker.Close()
go printConsumed(broker)
produceStdin(broker)
}
# Packages
No description provided by the author
Package kafkatest provides mock objects for high level kafka interface.
Package proto provides kafka binary protocol implementation.
# Functions
If initialized, Kafka connections will be cached globally.
NewBroker returns a broker to a given list of kafka addresses.
No description provided by the author
NewCluster connects to a cluster from a given list of kafka addresses and after successful metadata fetch, returns Cluster.
No description provided by the author
newConnectionPool creates a connection pool and initializes it.
NewConsumerConf returns the default consumer configuration.
No description provided by the author
No description provided by the author
NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.
NewProducerConf returns a default producer configuration.
No description provided by the author
# Constants
StartOffsetNewest configures the consumer to fetch messages produced after creating the consumer.
StartOffsetOldest configures the consumer to fetch starting from the oldest message available.
# Structs
Broker is an abstract connection to kafka cluster for the given configuration, and can be used to create clients to the cluster.
No description provided by the author
Cluster maintains the metadata and connectionPools for a Kafka cluster.
No description provided by the author
connectionPool is a way for us to manage multiple connections to a Kafka broker in a way that balances out throughput with overall number of connections.
Caches connections to a single cluster by ClientID.
No description provided by the author
MetadataCache is a threadsafe cache of ClusterMetadata by clusterName.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Interfaces
BatchConsumer is the interface that wraps the ConsumeBatch method.
Client is the interface implemented by Broker.
Consumer is the interface that wraps the Consume method.
DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.
OffsetCoordinator is the interface which wraps the Commit and Offset methods.
PartitionCountSource lets a DistributingProducer determine how many partitions exist for a particular topic.
Producer is the interface that wraps the Produce method.