Categorygithub.com/discord/zorkian-kafka
modulepackage
0.1.0
Repository: https://github.com/discord/zorkian-kafka.git
Documentation: pkg.go.dev

# README

GoDoc

Kafka

Kafka is Go client library for Apache Kafka server, released under MIT license. Originally based on the great client from: https://github.com/optiopay/kafka

Kafka provides minimal abstraction over wire protocol, support for transparent failover and easy to use blocking API.

Example

Write all messages from stdin to kafka and print all messages from kafka topic to stdout.

package main

import (
    "bufio"
    "log"
    "os"
    "strings"

    "github.com/discord/zorkian-kafka"
    "github.com/discord/zorkian-kafka/proto"
)

const (
    topic     = "my-messages"
    partition = 0
)

var kafkaAddrs = []string{"localhost:9092", "localhost:9093"}

// printConsumed read messages from kafka and print them out
func printConsumed(broker kafka.Client) {
    conf := kafka.NewConsumerConf(topic, partition)
    conf.StartOffset = kafka.StartOffsetNewest
    consumer, err := broker.Consumer(conf)
    if err != nil {
        log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
    }

    for {
        msg, err := consumer.Consume()
        if err != nil {
            if err != kafka.ErrNoData {
                log.Printf("cannot consume %q topic message: %s", topic, err)
            }
            break
        }
        log.Printf("message %d: %s", msg.Offset, msg.Value)
    }
    log.Print("consumer quit")
}

// produceStdin read stdin and send every non empty line as message
func produceStdin(broker kafka.Client) {
    producer := broker.Producer(kafka.NewProducerConf())
    input := bufio.NewReader(os.Stdin)
    for {
        line, err := input.ReadString('\n')
        if err != nil {
            log.Fatalf("input error: %s", err)
        }
        line = strings.TrimSpace(line)
        if line == "" {
            continue
        }

        msg := &proto.Message{Value: []byte(line)}
        if _, err := producer.Produce(topic, partition, msg); err != nil {
            log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err)
        }
    }
}

func main() {
    // connect to kafka cluster
    broker, err := kafka.Dial(kafkaAddrs, kafka.NewBrokerConf("test-client"))
    if err != nil {
        log.Fatalf("cannot connect to kafka cluster: %s", err)
    }
    defer broker.Close()

    go printConsumed(broker)
    produceStdin(broker)
}

# Packages

No description provided by the author
Package kafkatest provides mock objects for high level kafka interface.
Package proto provides kafka binary protocol implementation.

# Functions

InitializeMetadataCache will make Kafka connections will be cached globally.
NewBroker returns a broker to a given list of kafka addresses.
NewBrokerConf constructs default configuration.
NewCluster connects to a cluster from a given list of kafka addresses and after successful metadata fetch, returns Cluster.
NewClusterConnectionConf constructs a default configuration.
NewConsumerConf returns the default consumer configuration.
No description provided by the author
No description provided by the author
NewOffsetCoordinatorConf returns default OffsetCoordinator configuration.
NewProducerConf returns a default producer configuration.
SetLogger allows overriding the logger being used by Kafka clients.

# Constants

StartOffsetNewest configures the consumer to fetch messages produced after creating the consumer.
StartOffsetOldest configures the consumer to fetch starting from the oldest message available.

# Variables

ErrClosed is returned as result of any request made using closed connection.
ErrNoData is returned by consumers on Fetch when the retry limit is set and exceeded.
No description provided by the author

# Structs

Broker is an abstract connection to kafka cluster for the given configuration, and can be used to create clients to the cluster.
BrokerConf is the broker configuration container.
Cluster maintains the metadata and connectionPools for a Kafka cluster.
ClusterConnectionConf is configuration for the cluster connection pool.
ConsumerConf represents consumer configuration.
MetadataCache is a threadsafe cache of ClusterMetadata by clusterName.
NoConnectionsAvailable indicates that the connection pool is full.
OffsetCoordinatorConf is configuration for the offset coordinatior.
ProducerConf is the configuration for a producer.

# Interfaces

BatchConsumer is the interface that wraps the ConsumeBatch method.
Client is the interface implemented by Broker.
Consumer is the interface that wraps the Consume method.
DistributingProducer is the interface similar to Producer, but never require to explicitly specify partition.
OffsetCoordinator is the interface which wraps the Commit and Offset methods.
PartitionCountSource lets a DistributingProducer determine how many partitions exist for a particular topic.
Producer is the interface that wraps the Produce method.

# Type aliases

NodeMap maps a broker node ID to a connection handle.