Categorygithub.com/lovoo/goka
modulepackage
1.1.13
Repository: https://github.com/lovoo/goka.git
Documentation: pkg.go.dev

# README

Goka

License Unit Tests/System Tests GoDoc Go Report Card

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
	goka.WithConsumerGroupBuilder(
		goka.ConsumerGroupBuilderWithConfig(cfg),
	),
	// ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	for {
		time.Sleep(1 * time.Second)
		err = emitter.EmitSync("some-key", "some-value")
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}
	}
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		} else {
			log.Printf("Processor shutdown cleanly")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	go runEmitter() // emits one message every second forever
	runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

# Packages

Package codec provides a set of codecs to encode and decode various data types to and from byte slices([]byte).
This package provides a kafka mock that allows integration testing of goka processors.

# Functions

ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config.
Debug enables or disables debug logging using the global logger.
DefaultBackoffBuilder returnes a simpleBackoff with 10 seconds step increase and 2 minutes max wait.
DefaultConfig creates a new config used by goka per default Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify goka's global config.
DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
DefaultHasher returns an FNV hasher builder to assign keys to partitions.
Default returns the standard library logger.
DefaultProcessorStoragePath is the default path where processor state will be stored.
DefaultProducerBuilder creates a Kafka producer using the Sarama library.
DefaultRebalance is the default callback when a new partition assignment is received.
DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka.
DefaultViewStoragePath returns the default path where view state will be stored.
DefineGroup creates a group graph with a given group name and a list of edges.
GroupTable returns the name of the group table of group.
HeadersFromSarama converts sarama headers to goka's type.
Input represents an edge of an input stream topic.
Inputs creates edges of multiple input streams sharing the same codec and callback.
Join represents an edge of a copartitioned, log-compacted table topic.
Lookup represents an edge of a non-copartitioned, log-compacted table topic.
Loop represents the edge of the loopback topic of the group.
NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.
NewMockAutoConsumer returns a new mock Consumer instance.
NewMockBroker creates a new mock instance.
NewMockClient creates a new mock instance.
NewMockClusterAdmin creates a new mock instance.
NewMockConsumerGroup creates a new consumer group.
NewMockConsumerGroupClaim creates a new mocksconsumergroupclaim.
NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf.
NewMockProducer creates a new mock instance.
NewMockStorage creates a new mock instance.
NewMockTopicManager creates a new mock instance.
NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.
NewProducer creates new kafka producer for passed brokers.
NewPromise creates a new Promise.
NewPromiseWithFinisher creates a new Promise and a separate finish method.
NewSignal creates a new Signal based on the states.
NewSimpleBackoff returns a simple backoff waiting the specified duration longer each iteration until reset.
NewTopicManager creates a new topic manager using the sarama library.
NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 2 and rentention time of 1 hour.
NewView creates a new View object from a group.
Output represents an edge of an output stream topic.
Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams.
ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
ReplaceGlobalConfig registeres a standard config used during building if no other config is specified.
ResetSuffixes reset both `loopSuffix` and `tableSuffix` to their default value.
SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config.
SetLoopSuffix changes `loopSuffix` which is a suffix for loop topic of group.
SetTableSuffix changes `tableSuffix` which is a suffix for table topic.
StringsToStreams is a simple cast/conversion functions that allows to pass a slice of strings as a slice of Stream (Streams) Avoids the boilerplate loop over the string array that would be necessary otherwise.
TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
Visitor adds a visitor edge to the processor.
WithBackoffBuilder replaced the default backoff.
WithBackoffResetTimeout defines the timeout when the backoff will be reset.
WithClientID defines the client ID used to identify with Kafka.
WithConsumerGroupBuilder replaces the default consumer group builder.
WithConsumerSaramaBuilder replaces the default consumer group builder.
WithContextWrapper allows to intercept the context passed to each callback invocation.
WithCtxEmitHeaders sets kafka headers to use when emitting to kafka.
WithEmitterClientID defines the client ID used to identify with kafka.
WithEmitterDefaultHeaders configures the emitter with default headers which are included with every emit.
WithEmitterHasher sets the hash function that assigns keys to partitions.
WithEmitterLogger sets the logger the emitter should use.
WithEmitterProducerBuilder replaces the default producer builder.
WithEmitterTester configures the emitter to use passed tester.
WithEmitterTopicManagerBuilder replaces the default topic manager builder.
WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
WithHasher sets the hash function that assigns keys to partitions.
WithHotStandby configures the processor to keep partitions up to date which are not part of the current generation's assignment.
WithLogger sets the logger the processor should use.
WithNilHandling configures how the processor should handle messages with nil value.
WithPartitionChannelSize replaces the default partition channel size.
WithProducerBuilder replaces the default producer builder.
WithProducerDefaultHeaders configures the producer with default headers which are included with every emit.
WithRebalanceCallback sets the callback for when a new partition assignment is received.
WithRecoverAhead configures the processor to recover joins and the processor table ahead of joining the group.
WithStorageBuilder defines a builder for the storage of each partition.
WithTester configures all external connections of a processor, ie, storage, consumer and producer.
WithTopicManagerBuilder replaces the default topic manager builder.
WithUpdateCallback defines the callback called upon recovering a message from the log.
WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return in case of connection errors.
WithViewBackoffBuilder replaced the default backoff.
WithViewBackoffResetTimeout defines the timeout when the backoff will be reset.
WithViewCallback defines the callback called upon recovering a message from the log.
WithViewClientID defines the client ID used to identify with Kafka.
WithViewConsumerSaramaBuilder replaces the default sarama consumer builder.
WithViewHasher sets the hash function that assigns keys to partitions.
WithViewLogger sets the logger the view should use.
WithViewRestartable is kept only for backwards compatibility.
WithViewStorageBuilder defines a builder for the storage of each partition.
WithViewTester configures all external connections of a processor, ie, storage, consumer and producer.
WithViewTopicManagerBuilder replaces the default topic manager.

# Constants

NilDecode passes the nil value to decoder before calling ProcessCallback.
NilIgnore drops any message with nil value.
NilProcess passes the nil value to ProcessCallback.
PartitionConnecting indicates the partition trying to (re-)connect to Kafka.
PartitionInitializing indicates that the underlying storage is initializing (e.g.
PartitionPreparing indicates the end of the bulk-mode.
PartitionRecovering indicates the partition is recovering and the storage is writing updates in bulk-mode (if the storage implementation supports it).
PartitionRunning indicates the partition is recovered and processing updates in normal operation.
PartitionStopped indicates the partition stopped and should not be used anymore.
PPStateIdle marks the partition processor as idling (not started yet).
PPStateRecovering indicates a recovering partition processor.
PPStateRunning indicates a running partition processor.
PPStateStopped indicates a stopped partition processor.
PPStateStopping indicates a stopping partition processor.
ProcStateIdle indicates an idling partition processor (not started yet).
ProcStateRunning indicates a running partition processor.
ProcStateSetup indicates a partition processor during setup of a rebalance round.
ProcStateStarting indicates a starting partition processor, i.e.
ProcStateStopped indicates a stopped partition processor.
ProcStateStopping indicates a stopping partition processor.
TMConfigMismatchBehaviorFail makes checking the topic fail, if the configuration different than requested.
TMConfigMismatchBehaviorIgnore ignore wrong config values.
TMConfigMismatchBehaviorWarn warns if the topic is configured differently than requested.
ViewStateCatchUp - the view (i.e.
ViewStateConnecting - the view (i.e.
ViewStateIdle - the view is not started yet.
ViewStateInitializing - the view (i.e.
ViewStateRunning - the view (i.e.

# Variables

CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning when consuming multiple input topics with multiple processor instances.
ErrEmitterAlreadyClosed is returned when Emit is called after the emitter has been finished.
ErrVisitAborted indicates a call to VisitAll could not finish due to rebalance or processor shutdown.
StrictCopartitioningStrategy behaves like the copartitioning strategy but it will fail if two members of a consumer group request a different set of topics, which might indicate a bug or a reused consumer group name.

# Structs

DefaultUpdateContext implements the UpdateContext interface.
Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.
GroupGraph is the specification of a processor group.
InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.
MockAutoConsumer implements sarama's Consumer interface for testing purposes.
MockAutoPartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
MockBroker is a mock of Broker interface.
MockBrokerMockRecorder is the mock recorder for MockBroker.
MockClient is a mock of Client interface.
MockClientMockRecorder is the mock recorder for MockClient.
MockClusterAdmin is a mock of ClusterAdmin interface.
MockClusterAdminMockRecorder is the mock recorder for MockClusterAdmin.
MockConsumerGroup mocks the consumergroup.
MockConsumerGroupClaim mocks the consumergroupclaim.
MockConsumerGroupSession mocks the consumer group session used for testing.
MockProducer is a mock of Producer interface.
MockProducerMockRecorder is the mock recorder for MockProducer.
MockStorage is a mock of Storage interface.
MockStorageMockRecorder is the mock recorder for MockStorage.
MockTopicManager is a mock of TopicManager interface.
MockTopicManagerMockRecorder is the mock recorder for MockTopicManager.
OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.
PartitionProcessor handles message processing of one partition by serializing messages from different input topics.
PartitionProcStats represents metrics and measurements of a partition processor.
PartitionTable manages the usage of a table for one partition.
Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics.
ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.
Promise as in https://en.wikipedia.org/wiki/Futures_and_promises.
RecoveryStats groups statistics during recovery.
Signal allows synchronization on a state, waiting for that state and checking the current state.
StateChangeObserver wraps a channel that triggers when the signal's state changes.
TableStats represents stats for a table partition.
TopicManagerConfig contains options of to create tables and stream topics.
View is a materialized (i.e.
ViewStats represents the metrics of all partitions of a view.

# Interfaces

Backoff is used for adding backoff capabilities to the restarting of failing partition tables.
Broker is an interface for the sarama broker.
Codec decodes and encodes from and to []byte.
Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka.
Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.
Iterator allows one to iterate over the keys of a view.
Logger is the interface Goka and its subpackages use for logging.
Producer abstracts the kafka producer.
StateReader is a read only abstraction of a Signal to expose the current state.
Tester interface to avoid import cycles when a processor needs to register to the tester.
TopicManager provides an interface to create/check topics and their partitions.
UpdateContext defines the interface for UpdateCallback arguments.

# Type aliases

Assignment represents a partition:offset assignment for the current connection.
BackoffBuilder creates a backoff.
ConsumerGroupBuilder creates a `sarama.ConsumerGroup`.
ContextOption defines a configuration option to be used when performing operations on a context.
Edges is a slice of edge objects.
EmitterOption defines a configuration option to be used when creating an emitter.
Getter functions return a value for a key or an error.
Group is the name of a consumer group in Kafka and represents a processor group in Goka.
Headers represents custom message headers with a convenient interface.
NilHandling defines how nil messages should be handled by the processor.
PartitionStatus is the status of the partition of a table (group table or joined table).
PPRunMode configures how the partition processor participates as part of the processor.
ProcessCallback function is called for every message received by the processor.
ProcessorOption defines a configuration option to be used when creating a processor.
ProducerBuilder create a Kafka producer.
PromiseFinisher finishes a promise.
RebalanceCallback is invoked when the processor receives a new partition assignment.
SaramaConsumerBuilder creates a `sarama.Consumer`.
State types a state of the Signal.
Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete.
Streams is a slice of Stream names.
Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact.
TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be treated.
TopicManagerBuilder creates a TopicManager to check partition counts and create tables.
UpdateCallback is invoked upon arrival of a message for a table partition.
ViewOption defines a configuration option to be used when creating a view.
ViewState represents the state of the view.