Categorygithub.com/databendcloud/bend-ingest-kafka
modulepackage
0.3.3
Repository: https://github.com/databendcloud/bend-ingest-kafka.git
Documentation: pkg.go.dev

# README

bend-ingest-kafka

Ingest kafka data into databend

Installation

go install  github.com/databendcloud/bend-ingest-kafka@latest

Or download the binary from the release page.

Usage

Json transform mode

The json transform mode is the default mode which will transform the kafka data into databend table, you can use it by setting the --is-json-transform to true.

Create a table according your kafka data structrue

For example, the kafka data like

{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}

you should create a table using

CREATE TABLE test_ingest (
			i64 Int64,
			u64 UInt64,
			f64 Float64,
			s   String,
			s2  String,
			a16 Array(Int16),
			a8  Array(UInt8),
			d   Date,
			t   DateTime);

execute bend-ingest-kafka

command line mode

bend-ingest-kafka
  --kafka-bootstrap-servers="127.0.0.1:9092,127.0.0.2:9092"\
  --kafka-topic="Your Topic"\
  --kafka-consumer-group= "Consumer Group"\
  --databend-dsn="http://root:[email protected]:8000"\
  --databend-table="db1.tbl" \
  --data-format="json" \
  --batch-size=100000 \
  --batch-max-interval=300

config file mode

Config the config file config/conf.json

{
  "kafkaBootstrapServers": "localhost:9092",
  "kafkaTopic": "ingest_test",
  "KafkaConsumerGroup": "test",
  "mockData": "",
  "isJsonTransform": true,
  "databendDSN": "https://host:port@host:port",
  "databendTable": "default.kfk_test",
  "batchSize": 1,
  "batchMaxInterval": 5,
  "dataFormat": "json",
  "workers": 1,
  "copyPurge": false,
  "copyForce": false
}

and execute the command

./bend-ingest-kafka 

Raw mode

The raw mode is used to ingest the raw data into databend table, you can use it by setting the isJsonTransform to false. In this mode, we will create a table with the name databendTable which columns are (uuid, koffset,kpartition, raw_data, record_metadata, add_time) and ingest the raw data into this table. The record_metadata is the metadata of the kafka record which contains the topic, partition, offset, create_time, key, and the add_time is the time when the record is added into databend.

Example

If the kafka json data is:

{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}

run the command

./bend-ingest-kafka 

with config.conf.json and the table default.kfk_test will be created and the data will be ingested into this table.

Parameter References

ParameterDescriptionDefaultexample
kafkaBootstrapServerskafka bootstrap servers"127.0.0.1:64103""127.0.0.1:9092,127.0.0.2:9092"
kafkaTopickafka topic"test""test"
KafkaConsumerGroupkafka consumer group"kafka-bend-ingest""test"
isSASLis saslfalsetrue
saslUsersasl user"""user"
saslPasswordsasl password"""password"
mockDatamock data""""
isJsonTransformis json transformtruetrue
databendDSNdatabend dsnno"http://localhost:8000"
databendTabledatabend tableno"db1.tbl"
batchSizebatch size10001000
batchMaxIntervalbatch max interval3030
dataFormatdata formatjson"json"
workersworkers thread number11
copyPurgecopy purgefalsefalse
copyForcecopy forcefalsefalse
DisableVariantCheckdisable variant checkfalsefalse
MinBytesmin bytes10241024
MaxBytesmax bytes10485761048576
MaxWaitmax wait1010
useReplaceModeuse replace modefalsefalse
userStageuser external stage name~~

NOTE:

  • The useReplaceMode is used to replace the data in the table, if the data already exists in the table, the new data will replace the old data. But the useReplaceMode is only supported when isJsonTransform false because it needs to add koffset and kpartition field in the target table.

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author