Categorygithub.com/Trendyol/go-pq-cdc-elasticsearch
modulepackage
0.0.11
Repository: https://github.com/trendyol/go-pq-cdc-elasticsearch.git
Documentation: pkg.go.dev

# README

go-pq-cdc-elasticsearch Go Reference Go Report Card

Elasticsearch connector for go-pq-cdc.

go-pq-cdc-elasticsearch streams documents from PostgreSql and writes to Elasticsearch index in near real-time.

Contents

Usage

⚠️ For production usage check the production tutorial doc

⚠️ For other usages check the dockerfile and code at examples.

go get github.com/Trendyol/go-pq-cdc-elasticsearch
package main

import (
  "context"
  "encoding/json"
  cdc "github.com/Trendyol/go-pq-cdc-elasticsearch"
  "github.com/Trendyol/go-pq-cdc-elasticsearch/config"
  "github.com/Trendyol/go-pq-cdc-elasticsearch/elasticsearch"
  cdcconfig "github.com/Trendyol/go-pq-cdc/config"
  "github.com/Trendyol/go-pq-cdc/pq/publication"
  "github.com/Trendyol/go-pq-cdc/pq/slot"
  "log/slog"
  "os"
  "strconv"
  "time"
)

func main() {
  slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
  ctx := context.TODO()
  cfg := config.Config{
    CDC: cdcconfig.Config{
      Host:      "127.0.0.1",
      Username:  "es_cdc_user",
      Password:  "es_cdc_pass",
      Database:  "es_cdc_db",
      DebugMode: false,
      Publication: publication.Config{
        Name: "es_cdc_publication",
        CreateIfNotExists: true,
        Operations: publication.Operations{
          publication.OperationInsert,
          publication.OperationDelete,
          publication.OperationTruncate,
          publication.OperationUpdate,
        },
        Tables: publication.Tables{publication.Table{
          Name:            "users",
          ReplicaIdentity: publication.ReplicaIdentityFull,
        }},
      },
      Slot: slot.Config{
        Name:                        "es_cdc_slot",
        CreateIfNotExists:           true,
        SlotActivityCheckerInterval: 3000,
      },
      Metric: cdcconfig.MetricConfig{
        Port: 8081,
      },
    },
    Elasticsearch: config.Elasticsearch{
      BatchSizeLimit:      10000,
      BatchTickerDuration: time.Millisecond * 100,
      TableIndexMapping: map[string]string{
        "public.users": "users",
      },
      TypeName: "_doc",
      URLs:     []string{"http://127.0.0.1:9200"},
    },
  }

  connector, err := cdc.NewConnector(ctx, cfg, Handler)
  if err != nil {
    slog.Error("new connector", "error", err)
    os.Exit(1)
  }

  defer connector.Close()
  connector.Start(ctx)
}

func Handler(msg cdc.Message) []elasticsearch.Action {
  switch msg.Type {
  case cdc.InsertMessage:
    b, _ := json.Marshal(msg.NewData)
    return []elasticsearch.Action{
      elasticsearch.NewIndexAction([]byte(strconv.Itoa(int(msg.NewData["id"].(int32)))), b, nil),
    }
  case cdc.DeleteMessage:
    return []elasticsearch.Action{
      elasticsearch.NewDeleteAction([]byte(strconv.Itoa(int(msg.OldData["id"].(int32)))), nil),
    }
  case cdc.UpdateMessage:
    msg.NewData["old_name"] = msg.OldData["name"]
    b, _ := json.Marshal(msg.NewData)
    return []elasticsearch.Action{
      elasticsearch.NewIndexAction([]byte(strconv.Itoa(int(msg.NewData["id"].(int32)))), b, nil),
    }
  default:
    return nil
  }
}


Examples

Availability

The go-pq-cdc operates in passive/active modes for PostgreSQL change data capture (CDC). Here's how it ensures availability:

  • Active Mode: When the PostgreSQL replication slot (slot.name) is active, go-pq-cdc continuously monitors changes and streams them to downstream systems as configured.
  • Passive Mode: If the PostgreSQL replication slot becomes inactive (detected via slot.slotActivityCheckerInterval), go-pq-cdc automatically captures the slot again and resumes data capturing. Other deployments also monitor slot activity, and when detected as inactive, they initiate data capturing.

This setup ensures continuous data synchronization and minimal downtime in capturing database changes.

Configuration

VariableTypeRequiredDefaultDescriptionOptions
cdc.hoststringyes-PostgreSQL hostShould be a valid hostname or IP address. Example: localhost.
cdc.usernamestringyes-PostgreSQL usernameShould have sufficient privileges to perform required database operations.
cdc.passwordstringyes-PostgreSQL passwordKeep secure and avoid hardcoding in the source code.
cdc.databasestringyes-PostgreSQL databaseThe database must exist and be accessible by the specified user.
cdc.debugModeboolnofalseFor debugging purposesEnables pprof for trace.
cdc.metric.portintno8080Set API portChoose a port that is not in use by other applications.
cdc.logger.logLevelstringnoinfoSet logging level[DEBUG, WARN, INFO, ERROR]
cdc.logger.loggerLoggernoslogSet loggerCan be customized with other logging frameworks if slog is not used.
cdc.publication.createIfNotExistsboolno-Create publication if not exists. Otherwise, return publication is not exists error.
cdc.publication.namestringyes-Set PostgreSQL publication nameShould be unique within the database.
cdc.publication.operations[]stringyes-Set PostgreSQL publication operations. List of operations to track; all or a subset can be specified.INSERT: Track insert operations.
UPDATE: Track update operations.
DELETE: Track delete operations.
cdc.publication.tables[]Tableyes-Set tables which are tracked by data change captureDefine multiple tables as needed.
cdc.publication.tables[i].namestringyes-Set the data change captured table nameMust be a valid table name in the specified database.
cdc.publication.tables[i].replicaIdentitystringyes-Set the data change captured table replica identity [FULL, DEFAULT]FULL: Captures all columns of old row when a row is updated or deleted.
DEFAULT: Captures only the primary key of old row when a row is updated or deleted.
cdc.slot.createIfNotExistsboolno-Create replication slot if not exists. Otherwise, return replication slot is not exists error.
cdc.slot.namestringyes-Set the logical replication slot nameShould be unique and descriptive.
cdc.slot.slotActivityCheckerIntervalintyes1000Set the slot activity check interval time in millisecondsSpecify as an integer value in milliseconds (e.g., 1000 for 1 second).
elasticsearch.tableIndexMappingmap[string]stringyes-Mapping of PostgreSQL table events to Elasticsearch indicesMaps table names to Elasticsearch indices.
elasticsearch.urls[]stringyes-Elasticsearch connection URLsList of URLs to connect to Elasticsearch instances.
elasticsearch.batchSizeLimitintno1000Maximum message count per batchFlush is triggered if this limit is exceeded.
elasticsearch.batchTickerDurationtime.Durationno10 secAutomatic batch flush intervalSpecify in a human-readable format, e.g., 10s for 10 seconds.
elasticsearch.batchByteSizeLimitstringno10mbMaximum size (bytes) per batchFlush is triggered if this size is exceeded.
elasticsearch.maxConnsPerHostintno512Maximum connections per hostLimits connections to each Elasticsearch host.
elasticsearch.maxIdleConnDurationtime.Durationno10 secDuration to keep idle connections aliveSpecify in a human-readable format, e.g., 10s for 10 seconds.
elasticsearch.typeNamestringno-Elasticsearch index type nameTypically used in Elasticsearch for index types.
elasticsearch.concurrentRequestintno1Number of concurrent bulk requestsSpecify the count of bulk requests that can run concurrently.
elasticsearch.compressionEnabledboolnofalseEnable compression for large messagesUseful if message sizes are large, but may increase CPU usage.
elasticsearch.disableDiscoverNodesOnStartboolnofalseDisable node discovery on client initializationSkips node discovery when the client starts.
elasticsearch.discoverNodesIntervaltime.Durationno5 minPeriodic node discovery intervalSpecify in a human-readable format, e.g., 5m for 5 minutes.

API

EndpointDescription
GET /statusReturns a 200 OK status if the client is able to ping the PostgreSQL server successfully.
GET /metricsPrometheus metric endpoint.
GET /debug/pprof/*(Only for debugMode=true) pprof

Exposed Metrics

The client collects relevant metrics related to PostgreSQL change data capture (CDC) and makes them available at the /metrics endpoint.

Metric NameDescriptionLabelsValue Type
go_pq_cdc_elasticsearch_process_latency_currentThe latest elasticsearch connector process latency in nanoseconds.slot_name, hostGauge
go_pq_cdc_elasticsearch_bulk_request_process_latency_currentThe latest elasticsearch connector bulk request process latency in nanoseconds.slot_name, hostGauge
go_pq_cdc_elasticsearch_index_totalTotal number of index operation.slot_name, host, index_nameCounter
go_pq_cdc_elasticsearch_delete_totalTotal number of delete operation.slot_name, host, index_nameCounter

You can also use all cdc related metrics explained here. All cdc related metrics are automatically injected. It means you don't need to do anything.

Compatibility

go-pq-cdc VersionMinimum PostgreSQL Server Version
0.0.2 or higher14

Breaking Changes

Date taking effectVersionChangeHow to check
----

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Constants

No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author

# Interfaces

No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author