Categorygithub.com/tetafro/kafka-dump
modulepackage
0.0.0-20250214201214-ea410b4cb9c9
Repository: https://github.com/tetafro/kafka-dump.git
Documentation: pkg.go.dev

# README

Kakfa Dump

Read kafka topic from timestamp, filter and save messages to a text file or to mongodb.

  • Uses kafka-go package.
  • Only works with Kafka >= v0.10.0.
  • Only works with JSON-encoded payload.
  • May take some time on start, when consuming from the exact timestamp, because it collects and resets offsets for all topic partitions.

Install

go get github.com/tetafro/kafka-dump

Get sample config and set values

curl -o config.yaml https://raw.githubusercontent.com/tetafro/kafka-dump/master/config.example.yaml

Run

Run (no flags, only config values)

kafka-dump

Output

INFO[15:45:39] Starting...
INFO[15:45:39] Saving messages to messages.txt
INFO[15:45:49] Read messages from 2020-12-30 14:22:00 to 2020-12-30 14:53:01 (total 140346, saved 1428)
INFO[15:45:59] Read messages from 2020-12-30 14:22:00 to 2020-12-30 14:54:00 (total 334520, saved 3425)
INFO[15:46:09] Read messages from 2020-12-30 14:22:01 to 2020-12-30 14:54:01 (total 525725, saved 5463)

Using mongodb

Run mongodb in docker, publish port to localhost

docker run -d \
    --publish 127.0.0.1:27017:27017 \
    --name mongo \
    mongo:4.2

Setup mongodb parameters in config

mongo:
  addr: mongodb://localhost:27017
  database: kafka
  collection: events

Run

$ kafka-dump
INFO[15:45:39] Starting...
INFO[15:45:39] Saving messages to mongodb://localhost:27017

Check

$ mongo mongodb://localhost:27017/kafka
> db.events.count()
166714

# Functions

NewDumper creates new dumper.
NewFieldFilter creates new field filter.
NewFileSystemStorage creates new filesystem storage.
NewKafkaConsumer creates new kafka consumer.
NewMongoStorage creates new mongodb storage.
ReadConfig reads configuration from a file.

# Structs

Config is a main app configuration.
Dumper is a main app's entity.
FieldFilter is a filter that makes a decision based on message's fields.
FileSystemStorage is a storage that saves messages to a file.
KafkaConf is a set of kafka parameters.
KafkaConsumer is a consumer that reads messages from kafka.
LogsConf is a logging configuration.
Message is a kafka message with timestamp.
MongoConf is a set of mongodb parameters.
MongoStorage is a storage that saves messages to mongodb.

# Interfaces

Consumer describes source of messages.
Filter describes a way to decide whether a message should be saved or not.
Storage describes a storage for messages.