Categorygithub.com/rohilsurana/kafka-offset
modulepackage
0.1.1
Repository: https://github.com/rohilsurana/kafka-offset.git
Documentation: pkg.go.dev

# README

kafka-offset

This project aims to simplify accessing kafka offset details easily and moving offsets of a consumer to offsets near a specific timestamp.

How to use

kafka-offset tool can be used to manipulate kafka consumer group offsets

Usage:
  kafka-offset [command]

Available Commands:
  completion     Generate the autocompletion script for the specified shell
  copy-consumer  copy consumer from source kafka to target kafka
  help           Help about any command
  reset-consumer reset consumer to a given timestamp for provided topic pattern

Flags:
  -h, --help   help for kafka-offset

Use "kafka-offset [command] --help" for more information about a command.

Example -

# to reset consumer to 10 min before current time 
kafka-offset reset-consumer \
  --before 10m \
  --topic-pattern test-topic  \
  --source-consumer-group-id test-kafka-consumer-id  \
  --source-kafka-brokers localhost:9092 \
  --execute # if omitted will result in a dry-run
  
# to copy consumer to another kafka with a different name, and additional time buffer to account for delays due to mirroring etc
kafka-offset copy-consumer \
  --buffer 30s \
  --topic-pattern test-topic \
  --source-consumer-group-id p-godata-id-test-offset-copy-firehose-0001 \
  --source-kafka-brokers localhost:9092 \
  --target-consumer-group-id p-godata-id-test-offset-copy-firehose-0003 \
  --target-kafka-brokers localhost:9093 \
  --execute # if omitted will result in a dry-run

Understanding how it works

On high level we have the consumer group id, list of topics or pattern, and brokers. We call the brokers to get list of partitions for all the matching topics. For each topic-partition we query the offset available in TimeIndex just after given timestamp.

Kafka Indexes

There are 3 different types of indexes in kafka -

  • TimeIndex - index of record timestamps and it's offsets (sparse, each offset is not tracked)
  • OffsetIndex - index of record offset to segment file
  • TransactionIndex - index of metadata of aborted transactions

For scope of this project we will deal only with TimeIndex.

TimeIndex

TimeIndex uses the max record timestamp of a batch and it's offset. It does not contain entry for timestamps for all offsets.

The timestamp used depends on the record timestamps. Hence, it could be either CreateTime or LogAppendTime.

# Packages

No description provided by the author
No description provided by the author