Categorygithub.com/conduitio/conduit-connector-kafka
repositorypackage
0.9.0
Repository: https://github.com/conduitio/conduit-connector-kafka.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# README

Conduit Connector Kafka

scarf pixel

The Kafka connector is one of Conduit builtin plugins. It provides both, a source and a destination connector for Apache Kafka.

How to build?

Run make build to build the connector.

Testing

Run make test to run all the unit and integration tests. Tests require Docker to be installed and running. The command will handle starting and stopping docker containers for you.

Tests will run twice, once against an Apache Kafka instance and a second time against a Redpanda instance.

Source

A Kafka source connector is represented by a single consumer in a Kafka consumer group. By virtue of that, a source's logical position is the respective consumer's offset in Kafka. Internally, though, we're not saving the offset as the position: instead, we're saving the consumer group ID, since that's all which is needed for Kafka to find the offsets for our consumer.

A source is getting associated with a consumer group ID the first time the Read() method is called.

Configuration

namedescriptionrequireddefault value
serversServers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster.true
topicsTopics is a comma separated list of Kafka topics from which records will be read, ex: "topic1,topic2".true
topicTopic is the Kafka topic to read from. Deprecated: use topics instead.false
clientIDA Kafka client ID.falseconduit-connector-kafka
readFromBeginningDetermines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition.falsefalse
groupIDDefines the consumer group ID.false
tls.enabledDefines whether TLS is enabled.falsefalse
clientCertA certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too.false
clientKeyA private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too.false
caCertThe Kafka broker's certificate, in PEM format.false
insecureSkipVerifyControls whether a client verifies the server's certificate chain and host name. If true, accepts any certificate presented by the server and any host name in that certificate.falsefalse
saslMechanismSASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed.false
saslUsernameSASL username. If provided, a password needs to be provided too.false
saslPasswordSASL password. If provided, a username needs to be provided too.false
retryGroupJoinErrorsdetermines whether the connector will continually retry on group join errorsfalsetrue

Destination

The destination connector sends records to Kafka.

Configuration

There's no global, connector configuration. Each connector instance is configured separately.

namedescriptionrequireddefault value
serversServers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster.true
topicTopic is the Kafka topic. It can contain a Go template that will be executed for each record to determine the topic. By default, the topic is the value of the opencdc.collection metadata field.false{{ index .Metadata "opencdc.collection" }}
clientIDA Kafka client ID.falseconduit-connector-kafka
acksAcks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. none = fire and forget, one = wait for the leader to acknowledge the writes, all = wait for the full ISR to acknowledge the writes.falseall
deliveryTimeoutMessage delivery timeout.false
batchBytesLimits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's max.message.bytes.false1000012
compressionCompression applied to messages. Possible values: none, gzip, snappy, lz4, zstd.falsesnappy
clientCertA certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too.false
clientKeyA private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too.false
caCertThe Kafka broker's certificate, in PEM format.false
insecureSkipVerifyControls whether a client verifies the server's certificate chain and host name. If true, accepts any certificate presented by the server and any host name in that certificate.falsefalse
saslMechanismSASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed.false
saslUsernameSASL username. If provided, a password needs to be provided too.false
saslPasswordSASL password. If provided, a username needs to be provided too.false

Output format

The output format can be adjusted using configuration options provided by the connector SDK:

  • sdk.record.format: used to choose the format
  • sdk.record.format.options: used to configure the specifics of the chosen format

See this article for more info on configuring the output format.

Batching

Batching can also be configured using connector SDK provided options:

  • sdk.batch.size: maximum number of records in batch before it gets written to the destination (defaults to 0, no batching)
  • sdk.batch.delay: maximum delay before an incomplete batch is written to the destination (defaults to 0, no limit)