Categorygithub.com/Trendyol/go-dcp-elasticsearch
repositorypackage
1.2.0-rc.2
Repository: https://github.com/trendyol/go-dcp-elasticsearch.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# README

Go Dcp Elasticsearch

Go Reference Go Report Card

Go implementation of the Elasticsearch Connect Couchbase.

Go Dcp Elasticsearch streams documents from Couchbase Database Change Protocol (DCP) and writes to Elasticsearch index in near real-time.

Features

  • Less resource usage and higher throughput(see Benchmarks).
  • Custom routing support(see Example).
  • Update multiple documents for a DCP event(see Example).
  • Handling different DCP events such as expiration, deletion and mutation(see Example).
  • Elasticsearch compression request body support.
  • Managing batch configurations such as maximum batch size, batch bytes, batch ticker durations.
  • Scale up and down by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or Static, see examples).
  • Easily manageable configurations.

Benchmarks

The benchmark was made with the 1,001,006 Couchbase document, because it is possible to more clearly observe the difference in the batch structure between the two packages. Default configurations for Java Elasticsearch Connect Couchbase used for both connectors.

PackageTime to Process EventsElasticsearch Indexing Rate(/s)Average CPU Usage(Core)Average Memory Usage
Go Dcp Elasticsearch(Go 1.20)50sgo0.486408MB
Java Elasticsearch Connect Couchbase(JDK15)80sgo0.311091MB

Example

Struct Config

func mapper(event couchbase.Event) []document.ESActionDocument {
  if event.IsMutated {
    e := document.NewIndexAction(event.Key, event.Value, nil)
    return []document.ESActionDocument{e}
  }
  e := document.NewDeleteAction(event.Key, nil)
  return []document.ESActionDocument{e}
}

func main() {
  connector, err := dcpelasticsearch.NewConnectorBuilder(config.Config{
    Elasticsearch: config.Elasticsearch{
      CollectionIndexMapping: map[string]string{
        "_default": "indexname",
      },
      Urls: []string{"http://localhost:9200"},
    },
    Dcp: dcpConfig.Dcp{
      Username:   "user",
      Password:   "password",
      BucketName: "dcp-test",
      Hosts:      []string{"localhost:8091"},
      Dcp: dcpConfig.ExternalDcp{
        Group: dcpConfig.DCPGroup{
          Name: "groupName",
          Membership: dcpConfig.DCPGroupMembership{
            Type: "static",
          },
        },
      },
      Metadata: dcpConfig.Metadata{
        Config: map[string]string{
          "bucket":     "checkpoint-bucket-name",
          "scope":      "_default",
          "collection": "_default",
        },
        Type: "couchbase",
      },
    },
  }).
    SetMapper(mapper).
    Build()
  if err != nil {
    panic(err)
  }

  defer connector.Close()
  connector.Start()
}

File Config

Default Mapper

Configuration

Dcp Configuration

Check out on go-dcp

Elasticsearch Specific Configuration

VariableTypeRequiredDefaultDescription
elasticsearch.collectionIndexMappingmap[string]stringyesDefines which Couchbase collection events will be written to which index
elasticsearch.urls[]stringyesElasticsearch connection urls
elasticsearch.usernamestringnoThe username of Elasticsearch
elasticsearch.passwordstringnoThe password of Elasticsearch
elasticsearch.typeNamestringnoDefines Elasticsearch index type name
elasticsearch.batchSizeLimitintno1000Maximum message count for batch, if exceed flush will be triggered.
elasticsearch.batchTickerDurationtime.Durationno10sBatch is being flushed automatically at specific time intervals for long waiting messages in batch.
elasticsearch.batchByteSizeLimitint, stringno10mbMaximum size(byte) for batch, if exceed flush will be triggered. 10mb is default.
elasticsearch.maxConnsPerHostintno512Maximum number of connections per each host which may be established
elasticsearch.maxIdleConnDurationtime.Durationno10sIdle keep-alive connections are closed after this duration.
elasticsearch.compressionEnabledbooleannofalseCompression can be used if message size is large, CPU usage may be affected.
elasticsearch.concurrentRequestintno1Concurrent bulk request count
elasticsearch.disableDiscoverNodesOnStartbooleannofalseDisable discover nodes when initializing the client.
elasticsearch.discoverNodesIntervaltime.Durationno5mDiscover nodes periodically
elasticsearch.rejectionLog.indexstringnocbes-rejectsRejection log index name. cbes-rejects is default.
elasticsearch.rejectionLog.includeSourcebooleannofalseIncludes rejection log source info. false is default.

Exposed metrics

Metric NameDescriptionLabelsValue Type
elasticsearch_connector_latency_msTime to adding to the batch.N/AGauge
elasticsearch_connector_bulk_request_process_latency_msTime to process bulk request.N/AGauge

You can also use all DCP-related metrics explained here. All DCP-related metrics are automatically injected. It means you don't need to do anything.

Grafana Metric Dashboard

Grafana & Prometheus Example

Contributing

Go Dcp Elasticsearch is always open for direct contributions. For more information please check our Contribution Guideline document.

License

Released under the MIT License.