Categorygithub.com/Trendyol/go-pq-cdc-kafka
repositorypackage
0.0.6
Repository: https://github.com/trendyol/go-pq-cdc-kafka.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author

# README

go-pq-cdc-kafka Go Reference Go Report Card

Kafka connector for go-pq-cdc.

go-pq-cdc-kafka is a high-performance tool designed to replicate PostgreSQL data to Kafka using PostgreSQL's replication protocol. It provides an efficient alternative to Debezium, particularly in resource-constrained environments.

Key Features

  • Optimized for Speed and Efficiency: Minimal resource consumption and faster processing, designed to handle high-throughput data replication.
  • Real-Time Data Streaming: Streams data directly from PostgreSQL to Kafka, ensuring up-to-date synchronization across systems.
  • Automatic Failover: In the event of a failure, go-pq-cdc-kafka can quickly recover and resume data replication.
  • Concurrency: Built with Go's concurrency model (goroutines and channels), ensuring lightweight and highly performant parallel operations.

Why Choose go-pq-cdc-kafka Over Debezium?

Debezium vs go-pq-cdc-kafka benchmark

go-pq-cdc-kafka significantly outperforms Debezium in terms of speed and resource efficiency, particularly for PostgreSQL databases. Here’s a comparison:

Benchmark Results

go-pq-cdc-kafkaDebezium
Row Count10 million10 million
Elapsed Time2.5 minutes21 minutes
CPU Usage (Max)44%181%
Memory Usage (Max)130 MB1.07 GB
Received Traffic4.36 MiB/s7.97 MiB/s
Sent Traffic5.96 MiB/s6.27 MiB/s

Availability and Failover

The go-pq-cdc-kafka ensures high availability with passive/active modes for PostgreSQL Change Data Capture (CDC).

  • Active Mode: When the PostgreSQL replication slot is active, go-pq-cdc-kafka continuously monitors changes and streams them to Kafka.

  • Passive Mode: If the replication slot becomes inactive, it automatically captures the slot and resumes data streaming. Additionally, other deployments monitor the slot’s status, ensuring redundancy and failover capabilities.

This architecture guarantees minimal downtime and continuous data synchronization, even in the event of failure. Additionally, Go’s faster cold starts provide quicker recovery times compared to Debezium, further minimizing potential downtime.

Usage

⚠️ For production usage check the production tutorial doc

⚠️ For other usages check the dockerfile and code at examples.

go get github.com/Trendyol/go-pq-cdc-kafka
package main

import (
	"context"
	"encoding/json"
	"log/slog"
	"os"
	"strconv"
	"time"

	cdc "github.com/Trendyol/go-pq-cdc-kafka"
	"github.com/Trendyol/go-pq-cdc-kafka/config"
	cdcconfig "github.com/Trendyol/go-pq-cdc/config"
	"github.com/Trendyol/go-pq-cdc/pq/publication"
	"github.com/Trendyol/go-pq-cdc/pq/slot"
	gokafka "github.com/segmentio/kafka-go"
)

func main() {
	slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
	ctx := context.TODO()
	cfg := config.Connector{
		CDC: cdcconfig.Config{
			Host:      "127.0.0.1",
			Username:  "cdc_user",
			Password:  "cdc_pass",
			Database:  "cdc_db",
			DebugMode: false,
			Publication: publication.Config{
				CreateIfNotExists: true,
				Name:              "cdc_publication",
				Operations: publication.Operations{
					publication.OperationInsert,
					publication.OperationDelete,
					publication.OperationTruncate,
					publication.OperationUpdate,
				},
				Tables: publication.Tables{publication.Table{
					Name:            "users",
					ReplicaIdentity: publication.ReplicaIdentityFull,
				}},
			},
			Slot: slot.Config{
				CreateIfNotExists:           true,
				Name:                        "cdc_slot",
				SlotActivityCheckerInterval: 3000,
			},
			Metric: cdcconfig.MetricConfig{
				Port: 8081,
			},
			Logger: cdcconfig.LoggerConfig{
				LogLevel: slog.LevelInfo,
			},
		},
		Kafka: config.Kafka{
			TableTopicMapping:           map[string]string{"public.users": "users.0"},
			Brokers:                     []string{"localhost:19092"},
			AllowAutoTopicCreation:      true,
			ProducerBatchTickerDuration: time.Millisecond * 200,
		},
	}

	connector, err := cdc.NewConnector(ctx, cfg, Handler)
	if err != nil {
		slog.Error("new connector", "error", err)
		os.Exit(1)
	}

	defer connector.Close()
	connector.Start(ctx)
}

func Handler(msg *cdc.Message) []gokafka.Message {
	slog.Info("change captured", "message", msg)
	if msg.Type.IsUpdate() || msg.Type.IsInsert() {
		msg.NewData["operation"] = msg.Type
		newData, _ := json.Marshal(msg.NewData)

		return []gokafka.Message{
			{
				Headers: nil,
				Key:     []byte(strconv.Itoa(int(msg.NewData["id"].(int32)))),
				Value:   newData,
			},
		}
	}

	if msg.Type.IsDelete() {
		msg.OldData["operation"] = msg.Type
		oldData, _ := json.Marshal(msg.OldData)

		return []gokafka.Message{
			{
				Headers: nil,
				Key:     []byte(strconv.Itoa(int(msg.OldData["id"].(int32)))),
				Value:   oldData,
			},
		}
	}

	return []gokafka.Message{}
}

Examples

Configuration

VariableTypeRequiredDefaultDescriptionOptions
cdc.hoststringyes-PostgreSQL hostShould be a valid hostname or IP address. Example: localhost.
cdc.usernamestringyes-PostgreSQL usernameShould have sufficient privileges to perform required database operations.
cdc.passwordstringyes-PostgreSQL passwordKeep secure and avoid hardcoding in the source code.
cdc.databasestringyes-PostgreSQL databaseThe database must exist and be accessible by the specified user.
cdc.debugModeboolnofalseFor debugging purposesEnables pprof for trace.
cdc.metric.portintno8080Set API portChoose a port that is not in use by other applications.
cdc.logger.logLevelstringnoinfoSet logging level[DEBUG, WARN, INFO, ERROR]
cdc.logger.loggerLoggernoslogSet loggerCan be customized with other logging frameworks if slog is not used.
cdc.publication.createIfNotExistsboolno-Create publication if not exists. Otherwise, return publication is not exists error.
cdc.publication.namestringyes-Set PostgreSQL publication nameShould be unique within the database.
cdc.publication.operations[]stringyes-Set PostgreSQL publication operations. List of operations to track; all or a subset can be specified.INSERT: Track insert operations.
UPDATE: Track update operations.
DELETE: Track delete operations.
cdc.publication.tables[]Tableyes-Set tables which are tracked by data change captureDefine multiple tables as needed.
cdc.publication.tables[i].namestringyes-Set the data change captured table nameMust be a valid table name in the specified database.
cdc.publication.tables[i].replicaIdentitystringyes-Set the data change captured table replica identity [FULL, DEFAULT]FULL: Captures all columns of old row when a row is updated or deleted.
DEFAULT: Captures only the primary key of old row when a row is updated or deleted.
publication.tables[i].schemastringnopublicSet the data change captured table schema nameMust be a valid table name in the specified database.
cdc.slot.createIfNotExistsboolno-Create replication slot if not exists. Otherwise, return replication slot is not exists error.
cdc.slot.namestringyes-Set the logical replication slot nameShould be unique and descriptive.
cdc.slot.slotActivityCheckerIntervalintyes1000Set the slot activity check interval time in millisecondsSpecify as an integer value in milliseconds (e.g., 1000 for 1 second).
kafka.tableTopicMappingmap[string]stringyes-Mapping of PostgreSQL table events to Kafka topicsMaps table names to Kafka topics.
kafka.brokers[]stringyes-Broker IP and port information
kafka.producerBatchSizeintegerno2000Maximum message count for batch, if exceeded, flush will be triggered.
kafka.producerBatchBytesstringno10mbMaximum size (bytes) for batch. If exceeded, flush will be triggered.
kafka.producerMaxAttemptsintnomaxIntLimit on how many attempts will be made to deliver a message.
kafka.producerBatchTickerDurationtime.Durationno10sBatch is flushed automatically at specific time intervals for long-waiting messages in the batch.
kafka.readTimeouttime.Durationno30sTimeout for read operations in segmentio/kafka-go.
kafka.writeTimeouttime.Durationno30sTimeout for write operations in segmentio/kafka-go.
kafka.compressionintegerno0Compression can be used if message size is large, CPU usage may be affected.0: None
1: Gzip
2: Snappy
3: Lz4
4: Zstd
kafka.balancerstringnoHashDefine balancer strategy.[Hash, LeastBytes, RoundRobin, ReferenceHash, CRC32Balancer, Murmur2Balancer]
kafka.requiredAcksintegerno1Number of acknowledgments from partition replicas required before receiving a response.0: fire-and-forget
1: wait for leader to acknowledge writes
-1: wait for full ISR
kafka.secureConnectionboolnofalseEnable secure Kafka connection.
kafka.rootCA[]byteno-Define root CA certificate.
kafka.interCA[]byteno-Define intermediate CA certificate.
kafka.scramUsernamestringno-Define SCRAM username.
kafka.scramPasswordstringno-Define SCRAM password.
kafka.metadataTTLtime.Durationno60sTTL for the metadata cached by segmentio/kafka-go. Increase it to reduce network requests.For more detail, check docs.
kafka.metadataTopics[]stringno-Topic names for the metadata cached by segmentio/kafka-go. Define topics here that the connector may produce.For more detail, check docs.
kafka.clientIDstringno-Unique identifier that the transport communicates to the brokers.For more detail, check docs.
kafka.allowAutoTopicCreationboolnofalseCreate topic if missing.For more detail, check docs.

API

EndpointDescription
GET /statusReturns a 200 OK status if the client is able to ping the PostgreSQL server successfully.
GET /metricsPrometheus metric endpoint.
GET /debug/pprof/*(Only for debugMode=true) pprof

Exposed Metrics

The client collects relevant metrics related to PostgreSQL change data capture (CDC) and makes them available at the /metrics endpoint.

Metric NameDescriptionLabelsValue Type
go_pq_cdc_kafka_process_latency_currentThe latest kafka connector process latency in nanoseconds.slot_name, hostGauge
go_pq_cdc_kafka_bulk_request_process_latency_currentThe latest kafka connector bulk request process latency in nanoseconds.slot_name, hostGauge
go_pq_cdc_kafka_write_totalThe total number of successful in write operation to kafka.slot_name, host, topic_nameCounter
go_pq_cdc_kafka_err_totalThe total number of unsuccessful in write operation to kafka.slot_name, host, topic_nameCounter

You can also use all cdc related metrics explained here. All cdc related metrics are automatically injected. It means you don't need to do anything.

Compatibility

go-pq-cdc-kafka VersionMinimum PostgreSQL Server Version
0.0.0 or higher14

Breaking Changes

Date taking effectVersionChangeHow to check
----