Categorygithub.com/Trendyol/go-pq-cdc
repositorypackage
0.0.9
Repository: https://github.com/trendyol/go-pq-cdc.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author
No description provided by the author

# README

go-pq-cdc Go Reference Go Report Card

go-pq-cdc is designed to provide efficient and lightweight Change Data Capture (CDC) for PostgreSQL databases. The architecture leverages PostgreSQL's built-in logical replication capabilities to capture changes in the database and stream these changes to downstream systems, such as Kafka, Elasticsearch etc. The entire system is written in Golang, ensuring low resource consumption and high performance.

Debezium vs go-pq-cdc benchmark

Contents

Why?

CDC systems are crucial for real-time data synchronization, analytics, and event-driven architectures. Our main goal is to build a cdc base library for faster and stateful systems.

Usage

go get github.com/Trendyol/go-pq-cdc
package main

import (
	"context"
	cdc "github.com/Trendyol/go-pq-cdc"
	"github.com/Trendyol/go-pq-cdc/config"
	"github.com/Trendyol/go-pq-cdc/pq/message/format"
	"github.com/Trendyol/go-pq-cdc/pq/publication"
	"github.com/Trendyol/go-pq-cdc/pq/replication"
	"github.com/Trendyol/go-pq-cdc/pq/slot"
	"log/slog"
	"os"
)

func main() {
	ctx := context.Background()
	cfg := config.Config{
		Host:      "127.0.0.1",
		Username:  "cdc_user",
		Password:  "cdc_pass",
		Database:  "cdc_db",
		DebugMode: false,
		Publication: publication.Config{
			CreateIfNotExists: true,
			Name: "cdc_publication",
			Operations: publication.Operations{
				publication.OperationInsert,
				publication.OperationDelete,
				publication.OperationTruncate,
				publication.OperationUpdate,
			},
			Tables: publication.Tables{publication.Table{
				Name:            "users",
				ReplicaIdentity: publication.ReplicaIdentityFull,
			}},
		},
		Slot: slot.Config{
			Name:                        "cdc_slot",
			CreateIfNotExists:           true,
			SlotActivityCheckerInterval: 3000,
		},
		Metric: config.MetricConfig{
			Port: 8081,
		},
		Logger: config.LoggerConfig{
			LogLevel: slog.LevelInfo,
		},
	}

	connector, err := cdc.NewConnector(ctx, cfg, Handler)
	if err != nil {
		slog.Error("new connector", "error", err)
		os.Exit(1)
	}

	defer connector.Close()
	connector.Start(ctx)
}

func Handler(ctx *replication.ListenerContext) {
	switch msg := ctx.Message.(type) {
	case *format.Insert:
		slog.Info("insert message received", "new", msg.Decoded)
	case *format.Delete:
		slog.Info("delete message received", "old", msg.OldDecoded)
	case *format.Update:
		slog.Info("update message received", "new", msg.NewDecoded, "old", msg.OldDecoded)
	}

	if err := ctx.Ack(); err != nil {
		slog.Error("ack", "error", err)
	}
}

Examples

Availability

The go-pq-cdc operates in passive/active modes for PostgreSQL change data capture (CDC). Here's how it ensures availability:

  • Active Mode: When the PostgreSQL replication slot (slot.name) is active, go-pq-cdc continuously monitors changes and streams them to downstream systems as configured.
  • Passive Mode: If the PostgreSQL replication slot becomes inactive (detected via slot.slotActivityCheckerInterval), go-pq-cdc automatically captures the slot again and resumes data capturing. Other deployments also monitor slot activity, and when detected as inactive, they initiate data capturing.

This setup ensures continuous data synchronization and minimal downtime in capturing database changes.

Configuration

VariableTypeRequiredDefaultDescriptionOptions
hoststringyes-PostgreSQL hostShould be a valid hostname or IP address. Example: localhost.
usernamestringyes-PostgreSQL usernameShould have sufficient privileges to perform required database operations.
passwordstringyes-PostgreSQL passwordKeep secure and avoid hardcoding in the source code.
databasestringyes-PostgreSQL databaseThe database must exist and be accessible by the specified user.
debugModeboolnofalseFor debugging purposesEnables pprof for trace.
metric.portintno8080Set API portChoose a port that is not in use by other applications.
logger.logLevelstringnoinfoSet logging level[DEBUG, WARN, INFO, ERROR]
logger.loggerLoggernoslogSet loggerCan be customized with other logging frameworks if slog is not used.
publication.createIfNotExistsboolno-Create publication if not exists. Otherwise, return publication is not exists error.
publication.namestringyes-Set PostgreSQL publication nameShould be unique within the database.
publication.operations[]stringyes-Set PostgreSQL publication operations. List of operations to track; all or a subset can be specified.INSERT: Track insert operations.
UPDATE: Track update operations.
DELETE: Track delete operations.
publication.tables[]Tableyes-Set tables which are tracked by data change captureDefine multiple tables as needed.
publication.tables[i].namestringyes-Set the data change captured table nameMust be a valid table name in the specified database.
publication.tables[i].replicaIdentitystringyes-Set the data change captured table replica identity [FULL, DEFAULT]FULL: Captures all columns when a row is updated or deleted.
DEFAULT: Captures only the primary key when a row is updated or deleted.
slot.createIfNotExistsboolno-Create replication slot if not exists. Otherwise, return replication slot is not exists error.
slot.namestringyes-Set the logical replication slot nameShould be unique and descriptive.
slot.slotActivityCheckerIntervalintno1000Set the slot activity check interval time in millisecondsSpecify as an integer value in milliseconds (e.g., 1000 for 1 second).

API

EndpointDescription
GET /statusReturns a 200 OK status if the client is able to ping the PostgreSQL server successfully.
GET /metricsPrometheus metric endpoint.
GET /debug/pprof/*(Only for debugMode=true) pprof

Exposed Metrics

The client collects relevant metrics related to PostgreSQL change data capture (CDC) and makes them available at the /metrics endpoint.

Metric NameDescriptionLabelsValue Type
go_pq_cdc_update_totalThe total number of UPDATE operations captured on specific tables.slot_name, hostCounter
go_pq_cdc_delete_totalThe total number of DELETE operations captured on specific tables.slot_name, hostCounter
go_pq_cdc_insert_totalThe total number of INSERT operations captured on specific tables.slot_name, hostCounter
go_pq_cdc_cdc_latency_currentThe current latency in capturing data changes from PostgreSQL.slot_name, hostGauge
go_pq_cdc_process_latency_currentThe current latency in processing the captured data changes.slot_name, hostGauge
go_pq_cdc_replication_slot_slot_confirmed_flush_lsnThe last confirmed flush Log Sequence Number (LSN) in the PostgreSQL replication slot.slot_name, hostGauge
go_pq_cdc_replication_slot_slot_current_lsnThe current Log Sequence Number (LSN) being processed in the PostgreSQL replication slot.slot_name, hostGauge
go_pq_cdc_replication_slot_slot_is_activeIndicates whether the PostgreSQL replication slot is currently active (1 for active, 0 for inactive).slot_name, hostGauge
go_pq_cdc_replication_slot_slot_lagThe replication lag measured by the difference between the current LSN and the confirmed flush LSN.slot_name, hostGauge
go_pq_cdc_replication_slot_slot_retained_wal_sizeThe size of Write-Ahead Logging (WAL) files retained for the replication slot in bytes.slot_name, hostGauge
runtime metricsPrometheus CollectorN/AN/A

Grafana Dashboard

Import the grafana dashboard json file. Dashboard

Compatibility

go-pq-cdc VersionMinimum PostgreSQL Server Version
0.0.2 or higher14

Breaking Changes

Date taking effectVersionChangeHow to check
----