Categorygithub.com/twitchscience/kinsumer
modulepackage
0.0.0-20240315191529-9a48088063ec
Repository: https://github.com/twitchscience/kinsumer.git
Documentation: pkg.go.dev

# README

Kinsumer

Native Go consumer for AWS Kinesis streams.

Build Status Go Report Card

Rationale

There are several very good ways to consume Kinesis streams, primarily The Amazon Kinesis Client Library, and it is recommended that be investigated as an option.

Kinsumer is designed for a cluster of Go clients that want each client to consume from multiple shards. Kinsumer is designed to be at-least-once with a strong effort to be exactly-once. Kinsumer by design does not attempt to keep shards on a specific client and will shuffle them around as needed.

Behavior

Kinsumer is designed to suit a specific use case of kinesis consuming, specifically when you need to have multiple clients each handling multiple shards and you do not care which shard is being consumed by which client.

Kinsumer will rebalance shards to each client whenever it detects the list of shards or list of clients has changed, and does not attempt to keep shards on the same client.

If you are running multiple Kinsumer apps against a single stream, make sure to increase the throttleDelay to at least 50ms + (200ms * <the number of reader apps>). Note that Kinesis does not support more than two readers per writer on a fully utilized stream, so make sure you have enough stream capacity.

Example

See cmd/noopkinsumer for a fully working example of a kinsumer client.

Testing

Testing with local test servers

By default the tests look for a dynamodb server at localhost:4567 and kinesis server at localhost:4568

For example using kinesalite and dynalite

kinesalite --port 4568 --createStreamMs 1 --deleteStreamMs 1 --updateStreamMs 1 --shardLimit 1000 &
dynalite --port 4567 --createTableMs 1 --deleteTableMs 1 --updateTableMs 1 &

Then go test ./...

Testing with real aws resources

It's possible to run the test against real AWS resources, but the tests create and destroy resources, which can be finicky, and potentially expensive.

Make sure you have your credentials setup in a way that aws-sdk-go is happy with, or be running on an EC2 instance.

Then go test . -dynamo_endpoint= -kinesis_endpoint= -resource_change_timeout=30s

# Packages

No description provided by the author
No description provided by the author
No description provided by the author

# Functions

New returns a Kinsumer Interface with default kinesis and dynamodb instances, to be used in ec2 instances to get default auth and config.
NewConfig returns a default Config struct.
NewWithInterfaces allows you to override the Kinesis and Dynamo instances for mocking or using a local set of servers.
NewWithSession should be used if you want to override the Kinesis and Dynamo instances with a non-default aws session.

# Variables

ErrConfigInvalidBufferSize - BufferSize config value is mandatory.
ErrConfigInvalidCommitFrequency - CommitFrequency config value is mandatory.
ErrConfigInvalidDynamoCapacity - Dynamo read/write capacity cannot be 0.
ErrConfigInvalidLeaderActionFrequency - LeaderActionFrequency config value is mandatory.
ErrConfigInvalidLogger - Logger cannot be nil.
ErrConfigInvalidShardCheckFrequency - ShardCheckFrequency config value is mandatory.
ErrConfigInvalidStats - Stats cannot be nil.
ErrConfigInvalidThrottleDelay - ThrottleDelay config value must be at least 200ms.
ErrNoApplicationName - Need an application name for the dynamo table names.
ErrNoDynamoInterface - Need a dynamodb instance.
ErrNoKinesisInterface - Need a kinesis instance.
ErrNoShardsAssigned - We found shards, but got assigned none.
ErrNoStreamName - Need a kinesis stream name.
ErrNoSuchStream - No such stream.
ErrRunTwice - Run() can only ever be run once.
ErrStreamBusy - Stream is busy.
ErrThisClientNotInDynamo - Unable to find this client in the client list.

# Structs

Config holds all configuration values for a single Kinsumer instance.
DefaultLogger is a logger that will log using the standard golang log library.
Kinsumer is a Kinesis Consumer that tries to reduce duplicate reads while allowing for multiple clients each processing multiple shards.
NoopStatReceiver is a statreceiver that doesn't do anything, use it if you do not want to collect stats, or as a base if you want to just collect a subset of stats.

# Interfaces

Logger is a minimal interface to allow custom loggers to be used.
A StatReceiver will have its methods called as operations happen inside a running kinsumer, and is useful for tracking the operation of the consumer.