Categorygithub.com/Trendyol/go-dcp
modulepackage
1.2.0-rc.2
Repository: https://github.com/trendyol/go-dcp.git
Documentation: pkg.go.dev

# README

Go Dcp Go Reference Go Report Card

This repository contains go implementation of a Couchbase Database Change Protocol (DCP) client.

Contents

Why?

Example

package main

import (
  "github.com/Trendyol/go-dcp"
  "github.com/Trendyol/go-dcp/logger"
  "github.com/Trendyol/go-dcp/models"
)

func listener(ctx *models.ListenerContext) {
  switch event := ctx.Event.(type) {
  case models.DcpMutation:
    logger.Log.Info(
      "mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
      event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
    )
  case models.DcpDeletion:
    logger.Log.Info(
      "deleted(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  case models.DcpExpiration:
    logger.Log.Info(
      "expired(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  }

  ctx.Ack()
}

func main() {
  connector, err := dcp.NewDcp("config.yml", listener)
  if err != nil {
    panic(err)
  }

  defer connector.Close()

  connector.Start()
}

Usage

$ go get github.com/Trendyol/go-dcp

Configuration

VariableTypeRequiredDefaultDescription
hosts[]stringyes-Couchbase host like localhost:8091.
usernamestringyes-Couchbase username.
passwordstringyes-Couchbase password.
bucketNamestringyes-Couchbase DCP bucket.
dcp.group.namestringyesDCP group name for vbuckets.
scopeNamestringno_defaultCouchbase scope name.
collectionNames[]stringno_defaultCouchbase collection names.
connectionBufferSizeuint, stringno20mbgocbcore library buffer size. 20mb is default. Check this if you get OOM Killed.
maxQueueSizeintno2048The maximum number of requests that can be queued waiting to be sent to a node. 2048 is default. Check this if you get queue overflowed or queue full.
connectionTimeouttime.Durationno5sCouchbase connection timeout.
secureConnectionboolnofalseEnable TLS connection of Couchbase.
rootCAPathstringno*not setif secureConnection set true this field is required.
debugboolnofalseFor debugging purpose.
dcp.bufferSizeintno16mbGo DCP listener pre-allocated buffer size. 16mb is default. Check this if you get OOM Killed.
dcp.connectionBufferSizeuint, stringno20mbgocbcore library buffer size. 20mb is default. Check this if you get OOM Killed.
dcp.connectionTimeouttime.Durationno5sDCP connection timeout.
dcp.maxQueueSizeintno2048The maximum number of requests that can be queued waiting to be sent to a node. 2048 is default. Check this if you get queue overflowed or queue full.
dcp.listener.skipUntiltime.TimenoSet this if you want to skip events until certain time.
dcp.group.membership.typestringnoDCP membership types. couchbase, kubernetesHa, kubernetesStatefulSet, static or dynamic. Check examples for details.
dcp.group.membership.memberNumberintno1Set this if membership is static. Other methods will ignore this field.
dcp.group.membership.totalMembersintno1Set this if membership is static or kubernetesStatefulSet. Other methods will ignore this field.
dcp.group.membership.rebalanceDelaytime.Durationno20sWorks for autonomous mode. If membership is dynamic, it is ignored and set to 0s.
dcp.group.membership.configmap[string]stringno*not setSet key-values of config. expirySeconds,heartbeatInterval,heartbeatToleranceDuration,monitorInterval,timeout for couchbase type
dcp.config.disableChangeStreamsboolnofalseSet this to true if you did not want to get older versions of changes for Couchbase Server 7.2.0+ using Magma storage buckets
leaderElection.enabledboolnofalseSet this true for memberships kubernetesHa.
leaderElection.typestringnokubernetesLeader Election types. kubernetes
leaderElection.configmap[string]stringno*not setSet key-values of config. leaseLockName,leaseLockNamespace, leaseDuration, renewDeadline, retryPeriod for kubernetes type.
leaderElection.rpc.portintno8081This field is usable for kubernetesStatefulSet membership.
checkpoint.typestringnoautoSet checkpoint type auto or manual.
checkpoint.autoResetstringnoearliestSet checkpoint start point to earliest or latest.
checkpoint.intervaltime.Durationno20sCheckpoint checking interval.
checkpoint.timeouttime.Durationno60sCheckpoint checking timeout.
healthCheck.disabledboolnofalseDisable Couchbase connection health check.
healthCheck.intervaltime.Durationno20sCouchbase connection health checking interval duration.
healthCheck.timeouttime.Durationno5sCouchbase connection health checking timeout duration.
rollbackMitigation.disabledboolnofalseDisable reprocessing for roll-backed Vbucket offsets.
rollbackMitigation.intervaltime.Durationno500msPersisted sequence numbers polling interval.
rollbackMitigation.configWatchIntervaltime.Durationno2sCluster config changes listener interval.
metadata.typestringnocouchbaseMetadata storing types. file or couchbase.
metadata.readOnlyboolnofalseSet this for debugging state purposes.
metadata.configmap[string]stringno*not setSet key-values of config. bucket,scope,collection,maxQueueSize,connectionBufferSize,connectionTimeout for couchbase type
api.disabledboolnofalseDisable metric endpoints
api.portintno8080Set API port
metric.pathstringno/metricsSet metric endpoint path.
logging.levelstringnoinfoSet logging level.

Environment Variables

These environment variables will overwrite the corresponding configs.

VariableTypeCorresponding ConfigDescription
GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBERintdcp.group.membership.memberNumberTo be able to prevent making deployment to scale up or down.
GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERSintdcp.group.membership.totalMembersTo be able to prevent making deployment to scale up or down.

Monitoring

The client offers an API that handles different endpoints and expose several metrics.

API

EndpointDescriptionDebug ModeBody
GET /statusReturns a 200 OK status if the client is able to ping the couchbase server successfully.
GET /rebalanceTriggers a rebalance operation for the vBuckets.
GET /states/offsetReturns the current offsets for each vBucket.x
GET /states/followersReturns the list of follower clients if service discovery enabledx
GET /debug/pprof/*Fiber Pprofx
PUT /membership/infoUpdates membership info and applies rebalance.{"memberNumber": 1,"totalMembers": 3 }

The Client collects relevant metrics and makes them available at /metrics endpoint. In case you haven't configured a metric.path, the metrics will be exposed at the /metrics.

Exposed metrics

Metric NameDescriptionLabelsValue Type
cbgo_mutation_totalThe total number of mutations on a specific vBucketvbId: ID of the vBucketCounter
cbgo_deletion_totalThe total number of deletions on a specific vBucketvbId: ID of the vBucketCounter
cbgo_expiration_totalThe total number of expirations on a specific vBucketvbId: ID of the vBucketCounter
cbgo_agent_queue_currentThe current number of agent queueaddress: Couchbase, is dcp: Is Dcp AgentGauge
cbgo_agent_queue_maxThe max number of agent queueaddress: Couchbase, is dcp: Is Dcp AgentGauge
cbgo_seq_no_currentThe current sequence number on a specific vBucketvbId: ID of the vBucketGauge
cbgo_start_seq_no_currentThe starting sequence number on a specific vBucketvbId: ID of the vBucketGauge
cbgo_end_seq_no_currentThe ending sequence number on a specific vBucketvbId: ID of the vBucketGauge
cbgo_persist_seq_no_currentThe persist sequence number on a specific vBucketvbId: ID of the vBucketGauge
cbgo_lag_currentThe current lag on a specific vBucketvbId: ID of the vBucketGauge
cbgo_total_lag_currentThe current total lagN/AGauge
cbgo_process_latency_ms_currentThe latest process latency in millisecondsN/AGauge
cbgo_dcp_latency_ms_currentThe latest consumed dcp message latency in millisecondsN/ACounter
cbgo_rebalance_currentThe number of total rebalanceN/ACounter
cbgo_active_stream_currentThe number of total active streamN/AGauge
cbgo_total_members_currentThe total number of members in the clusterN/AGauge
cbgo_member_number_currentThe number of the current memberN/AGauge
cbgo_membership_type_currentThe type of membership of the current memberMembership typeGauge
cbgo_offset_write_currentThe latest number of the offset writeN/AGauge
cbgo_offset_write_latency_ms_currentThe latest offset write latency in millisecondsN/AGauge

Compatibility

Go DCP VersionMinimum Couchbase Server Version
x<1.1.166.5.x
1.1.16>=x5.x.x

Breaking Changes

Date taking effectVersionChangeHow to check
December 14, 2023v1.1.19dcp.config.[DisableExpiryOpcode,DisableStreamEndByClient, EnableChangeStreams] removedReview your configs

Examples

Grafana Metric Dashboard

Grafana & Prometheus Example

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

NewDcp creates a new Dcp client config: path to a configuration file or a configuration struct listener is a callback function that will be called when a mutation, deletion or expiration event occurs.
No description provided by the author

# Interfaces

No description provided by the author