Categorygithub.com/egsam98/kafka-pipe
modulepackage
1.2.0
Repository: https://github.com/egsam98/kafka-pipe.git
Documentation: pkg.go.dev

# README

Overview

The project is highly inspired by Kafka Connect, but implemented with Go. The project contains several connector classes designed for Change-Data-Capture (CDC) pipeline from specific source to sink. Some of them use internal storage based on Badger embedded database to keep offsets/log positions/other important metadata to prevent from data loss or duplication on connection issues. Internal storage is organized in "data" folder in a workspace of running app.

Keep in my mind to not mix connectors in common workspace unless they have unique names ("name" parameter in YAML configuration). It's not recommended to recreate Kafka topics and use the same state of connector's internal storage. If you have to then you might want to remove "data" folder also, but...

consider_your_position_carefully.gif

Installation

Docker image:

docker pull egsam98/kafka-pipe:{tag}

https://hub.docker.com/r/egsam98/kafka-pipe/tags

Go module:

go get github.com/egsam98/kafka-pipe

Supported connector classes:

ClassSourceSink
pg.SourcePostgreSQLKafka
pg.SnapshotPostgreSQLKafka
ch.SinkKafkaClickHouse
s3.SinkKafkaS3
s3.BackupS3Kafka

Get started

To start application you want to pass YAML configuration as an argument for application.

./{kafka-pipe bin} config.yaml

Common parameters of config:

name: "name of your connector"
class: "any class described in \"Supported classes\" section"
log:
  pretty: true
  level: "info"

Additional params are particular for every class.

pg.Source

Transmits events from PostgreSQL to Kafka topics via logical replication and "pgoutput" plugin. Postgres' last read LSN is stored on disk of internal storage. Config parameters

NameTypeRequiredDescription
pg:skip.deleteBoolSkip delete-events. Default is false
pg:urlString+Postgres connection URL
pg:publicationString+Publication name
pg:slotString+Replication slot name
pg:tablesList(String)+Tables list to subscribe for replication
pg:health.tableStringTable name. Default is public.pipe_health
kafka:brokersList(String)+List of Kafka brokers. Format is {hostname}:{port}
kafka:topic:prefixStringPrefix for created topics. Format is {prefix}.{postgres table name}
kafka:topic:replication.factorUIntTopic replication factor. Default is 1
kafka:topic:partitionsUIntNumber of partitions. Default is 1
kafka:topic:compression.typeStringCompression type. Default is producer
kafka:topic:cleanup.policyStringCleanup policy. Default is delete
kafka:topic:retentionStringRetention duration. Default is 168h (7 days)
kafka:topic:part_retention_sizeStringPartition retention size in bytes, kilobytes, megabytes or terabytes. Default: 10GB
kafka:topic:routesDictMapping of Postgres relation's regular expression to Kafka topic. Example for partitions: ^public.transaction(_\d+)?$: "public.transaction"
kafka:batch:sizeUIntProducer's batch size. Default is 10000
kafka:batch:timeoutStringProducer's batch timeout. Default is 5s

pg.Snapshot

Selects all/certain rows from tables and supplies to Kafka topics. Connector stops when all rows are produced

Config parameters

NameTypeRequiredDescription
pg:urlString+Postgres connection URL
pg:tablesList(String)+Tables list to execute snapshot
pg:conditionStringSQL condition part to select rows from table. Example: "WHERE id > 1". Default is empty string.
kafka:brokersList(String)+List of Kafka brokers. Format is {hostname}:{port}
kafka:topic:prefixStringPrefix for created topics. Format is {prefix}.{postgres table name}
kafka:topic:replication.factorUIntTopic replication factor. Default is 1
kafka:topic:partitionsUIntNumber of partitions. Default is 1
kafka:topic:compression.typeStringCompression type. Default is producer
kafka:topic:cleanup.policyStringCleanup policy. Default is delete
kafka:topic:retentionStringRetention duration. Default is 168h (7 days)
kafka:topic:part_retention_sizeStringPartition retention size in bytes, kilobytes, megabytes or terabytes. Default: 10GB
kafka:topic:routesDictMapping of Postgres relation's regular expression to Kafka topic. Example for partitions: ^public.transaction(_\d+)?$: "public.transaction"
kafka:batch:sizeUIntProducer's batch size. Default is 10000
kafka:batch:timeoutStringProducer's batch timeout. Default is 5s

ch.Sink

Transmits events from Kafka topics to ClickHouse. Deduplication mechanism is designed similarly to https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/docs/DESIGN.md#state-machine . Kafka partition offsets are additionally stored on disk of internal storage. Config parameters

NameTypeRequiredDescription
kafka:groupString+Group ID
kafka:brokersList(String)+List of Kafka brokers. Format is {hostname}:{port}
kafka:topicsList(String)+Topics to read from
kafka:sasl:protocolStringSASL protocol, additional settings here
kafka:rebalance_timeoutString How long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat). Default is 1m
kafka:workers_per_topicUIntNumber of workers (consumers) per 1 topic. Default is 1
kafka:batch:sizeUIntConsumer's batch size. Default is 10000
kafka:batch:timeoutStringConsumer's batch timeout. Default is 5s
kafka:fetch_max_bytesUInt Maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. Also note that this client sends a fetch to each broker concurrently, meaning the client will buffer up to brokers * max bytes worth of memory. This corresponds to the Java fetch.max.bytes setting. Default: 52428800 (50MB)
kafka:fetch_max_partition_bytesUInt Maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This corresponds to the Java max.partition.fetch.bytes setting. Default: 1048576 (1MB)
click_house:addrsList(String)+ClickHouse addresses list to connect
click_house:databaseString+Database name
click_house:userString+Username credential
click_house:passwordStringPassword credential
serde:formatString Serde's format for Kafka messages (key & value). Default is json. Additional settings for every Serde format are described here
routesDictRoutes of mapping Kafka topic regular expression to ClickHouse table. Example:
routes:
    public.transaction_\d+: transactions

Non-YAML configuration (available if you use connector as go-module):

func BeforeInsert(ctx context.Context, serde Serde, batch []*kgo.Record) ([]any, error)

Hook function that's called per every consumed batch before inserting one into ClickHouse. Due to specific ClickHouse implementations (see here and here) result must consist of struct pointers, i.e.

func BeforeInsert(ctx context.Context, serde Serde, batch []*kgo.Record) ([]any, error) {
	// Handle batch
	// ...
	return []any{&Data{Column1: 1}, &Data{Column1: 2}}, nil
}

s3.Sink

Transmits events from Kafka topics to S3-compatible storage. This connector polls Kafka records according batch settings (see kafka:batch:*), groups by truncated datetime + partition and then uploads collected data chunks into S3. The storage keys are built by following schema: {topic}/{truncated datetime}/{partition}/{min_offset in chunk}-{max_offset in chunk}.gz. If previous data chunk doesn't exceed 5MB limit it's merged with new chunk and deleted. If prev. chunk's max offset >= new chunk's max offset, new chunk is ignored.

Config parameters

NameTypeRequiredDescription
kafka:groupString+Group ID
kafka:brokersList(String)+List of Kafka brokers. Format is {hostname}:{port}
kafka:topicsList(String)+Topics to read from
kafka:sasl:protocolStringSASL protocol, additional settings here
kafka:rebalance_timeoutString How long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat). Default is 1m
kafka:workers_per_topicUIntNumber of workers (consumers) per 1 topic. Default is 1
kafka:batch:sizeUIntConsumer's batch size. Default is 10000
kafka:batch:timeoutStringConsumer's batch timeout. Default is 5s
kafka:fetch_max_bytesUInt Maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. Also note that this client sends a fetch to each broker concurrently, meaning the client will buffer up to brokers * max bytes worth of memory. This corresponds to the Java fetch.max.bytes setting. Default: 52428800 (50MB)
kafka:fetch_max_partition_bytesUInt Maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This corresponds to the Java max.partition.fetch.bytes setting. Default: 1048576 (1MB)
s3:endpointString+Connection endpoint
s3:bucketString+Bucket name
s3:idString+Access key ID (v4)
s3:secretString+Secret access key (V4)
s3:sslBooluse SSL connection
group_time_intervalStringDescribes how records' datetime is rounded down and collected into groups in S3. Default: 1h

s3.Backup

Restores Kafka messages from S3 storage back to topics. The connector stops when all selected objects are handled. Read offsets are saved to internal storage and skipped on rescan afterwards.

Config parameters:

NameTypeRequiredDescription
kafka:brokersList(String)+List of Kafka brokers. Format is {hostname}:{port}
kafka:batch:sizeUIntConsumer's batch size. Default is 10000
kafka:batch:timeoutStringConsumer's batch timeout. Default is 0. Equivalent to producer's linger
s3:endpointString+Connection endpoint
s3:bucketString+Bucket name
s3:idString+Access key ID (v4)
s3:secretString+Secret access key (V4)
s3:sslBooluse SSL connection
topicsList(String)+Topics to read from
date_sinceString+UTC date from the beginning of which to select objects. Format: "2006-01-02 15:04:05"
date_toString+UTC date by which to select objects. Format: "2006-01-02 15:04:05"

SASL

Specific settings for every SASL protocol. All params are optional.

plain

NameTypeRequiredDescription
userString+Username
passString+Password
zidStringAuthorization ID to use in authenticating

scram-256

NameTypeRequiredDescription
userString+Username
passString+Password
zidStringAuthorization ID to use in authenticating
nonceStringIf provided, is the string to use for authentication. Otherwise, this package uses 20 bytes read with crypto/rand (Go)
is_tokenBoolSuffixes the "tokenauth=true" extra attribute to initial authentication message

scram-512

See scram-256

oath

NameTypeRequiredDescription
tokenString+Oauthbearer token to use for a single session's authentication
zidStringAuthorization ID

aws

NameTypeRequiredDescription
access_keyString+AWS access key
secret_keyString+AWS secret key
session_tokenStringRead more https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
user_agentStringOverride the default "franz-go/<runtime.Version()>/". Read more https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent

Serde

i.e. Serialization and Deserialization. Supported formats: json, avro. Depending on the selected format, different settings are provided:

json

NameTypeRequiredDescription
time_formatStringDatetime formatter, possible values: rfc3339, timestamp, timestamp-milli. Default is timestamp-milli

avro

NameTypeRequiredDescription
schemasDict+Mapping of Kafka topics to Avro schema's source URLs. Supported HTTP schemas: file (local download), http(-s) (download via HTTP protocol). Example URL: file://schema.avsc

Examples of Serde configuration:

serde:
  format: json
  time_format: rfc3339
serde:
  format: avro
  schemas:
    topic_test: file://schema.avsc

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author

# Constants

No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author