modulepackage
0.0.0-20250214201214-ea410b4cb9c9
Repository: https://github.com/tetafro/kafka-dump.git
Documentation: pkg.go.dev
# README
Kakfa Dump
Read kafka topic from timestamp, filter and save messages to a text file or to mongodb.
- Uses kafka-go package.
- Only works with Kafka >= v0.10.0.
- Only works with JSON-encoded payload.
- May take some time on start, when consuming from the exact timestamp, because it collects and resets offsets for all topic partitions.
Install
go get github.com/tetafro/kafka-dump
Get sample config and set values
curl -o config.yaml https://raw.githubusercontent.com/tetafro/kafka-dump/master/config.example.yaml
Run
Run (no flags, only config values)
kafka-dump
Output
INFO[15:45:39] Starting...
INFO[15:45:39] Saving messages to messages.txt
INFO[15:45:49] Read messages from 2020-12-30 14:22:00 to 2020-12-30 14:53:01 (total 140346, saved 1428)
INFO[15:45:59] Read messages from 2020-12-30 14:22:00 to 2020-12-30 14:54:00 (total 334520, saved 3425)
INFO[15:46:09] Read messages from 2020-12-30 14:22:01 to 2020-12-30 14:54:01 (total 525725, saved 5463)
Using mongodb
Run mongodb in docker, publish port to localhost
docker run -d \
--publish 127.0.0.1:27017:27017 \
--name mongo \
mongo:4.2
Setup mongodb parameters in config
mongo:
addr: mongodb://localhost:27017
database: kafka
collection: events
Run
$ kafka-dump
INFO[15:45:39] Starting...
INFO[15:45:39] Saving messages to mongodb://localhost:27017
Check
$ mongo mongodb://localhost:27017/kafka
> db.events.count()
166714
# Functions
NewDumper creates new dumper.
NewFieldFilter creates new field filter.
NewFileSystemStorage creates new filesystem storage.
NewKafkaConsumer creates new kafka consumer.
NewMongoStorage creates new mongodb storage.
ReadConfig reads configuration from a file.
# Structs
Config is a main app configuration.
Dumper is a main app's entity.
FieldFilter is a filter that makes a decision based on message's fields.
FileSystemStorage is a storage that saves messages to a file.
KafkaConf is a set of kafka parameters.
KafkaConsumer is a consumer that reads messages from kafka.
LogsConf is a logging configuration.
Message is a kafka message with timestamp.
MongoConf is a set of mongodb parameters.
MongoStorage is a storage that saves messages to mongodb.