# README
kafkactl - Kafka Management Tool - wiki
kafkactl - Package and CLI Tool (written in Go) for mgmt of Apache Kafka and related components.
The kafkactl tool currently features the following:
- Search and Manage Groups and Topics
- Show Topic Partition Details - Offsets, Replicas, Leaders, etc.
- Show Group Details - Lag, Offsets, Members, etc.
- Display / Modify Topic Configs
- Create / Delete Topics
- Delete Consumer Groups
- Reset Partition Offsets for a Group
- Tail a Topic in realtime
- Launch a Consumer Group for a Topic
- Produce to a Specific Topic Partition or all Partitions at once
- Perform a Preferred Replica Election for a target topic or for all topics
- Increase / Decrease Replicas
- Perform Topic Partition Migrations between Brokers
- Query burrow details
- Monitor burrow lag via Terminal Graph
- Explore Zookeeper Paths
- Create and Delete Zookeeper Values
- Pass stdin to create Kafka messages or Zookeeper Values
ToDo:
- Add Security Features
- Add Metrics Testing
kafkactl is actively developed with new features being added and tested. Thus, ongoing optimization and re-factoring will occur so ensure you are aware of the latest releases.
Package
The package is mostly a wrapper around the excellent sarama library and for the kafkactl tool itself. The kafkactl tool utilizes a context style config, similar to the kubernetes tool - kubectl, grouping a set of Kafka clusters to corresponding Zookeeper and Burrow instances.
package example usage:
Get package:
go get -u github.com/jbvmio/kafkactl
List Topics:
package main
import (
"fmt"
"log"
"github.com/jbvmio/kafkactl"
)
func main() {
client, err := kafkactl.NewClient("localhost:9092")
if err != nil {
log.Fatalf("Error: %v\n", err)
}
topics, err := client.GetTopicMeta()
if err != nil {
log.Fatalf("Error: %v\n", err)
}
for _, t := range topics {
fmt.Printf("TOPIC> %v PARTITION> %v LEADER> %v\n", t.Topic, t.Partition, t.Leader)
}
}
kafkactl tool - Get Started
Manual Download
- Download the latest kafkactl tool and extract to a $PATH directory.
- Run "kafkactl config --sample" to generate a sample config at $HOME/.kafkactl.yaml
- Edit the config file and save.
- Alternatively, pass all arguments to the command when running.
# kafkactl --broker brokerhost01:9092 admin create --topic newtopic01 --partitions 15 --rfactor 3
Using Docker
Run with Docker using cmdline args:
docker run --rm jbvmio/kafkactl -b brokerAddress:9092
docker run --rm jbvmio/kafkactl -b brokerAddress:9092 topics
docker run --rm jbvmio/kafkactl -b brokerAddress:9092 topics mytopic -x -m
docker run --rm jbvmio/kafkactl -b brokerAddress:9092 groups
docker run --rm jbvmio/kafkactl -b brokerAddress:9092 groups mygroup --lag
docker run --rm jbvmio/kafkactl -b brokerAddress:9092 admin create --topic mytopic2 --partitions 5 --rfactor 3
Using a config file with Docker:
- Generate a sample config:
docker run --rm -v ${PWD}:/home/kafkactl jbvmio/kafkactl config --sample
- This will generate a sample config named .kafkactl.yaml, edit this file with your values.
- At minimum, specify at least one entry containing a Kafka broker.
# Minimum Config:
current: Cluster1
entries:
- name: Cluster1
kafka:
- broker01:9092
- Now mount your .kafkactl.yaml file whenever running the Docker image:
docker run --rm -v ${PWD}/.kafkactl.yaml:/home/kafkactl/.kafkactl.yaml jbvmio/kafkactl
docker run --rm -v ${PWD}/.kafkactl.yaml:/home/kafkactl/.kafkactl.yaml jbvmio/kafkactl topics
docker run --rm -v ${PWD}/.kafkactl.yaml:/home/kafkactl/.kafkactl.yaml jbvmio/kafkactl groups
kafkactl tool - Example Commands
example config commands
kafkactl config --sample
kafkactl config --show
kafkactl config --use testcluster1
example config file (~/.kafkactl.yaml)
current: testCluster1
entries:
- name: testCluster1
kafka:
- brokerHost1:9092
- brokerHost2:9092
burrow:
- http://burrow1:3000
- http://burrow2:3000
zookeeper:
- http://zk1:2181
- http://zk2:2181
- name: testCluster2
kafka:
- brokerHost1:9092
- brokerHost2:9092
burrow:
- http://burrow1:3000
- http://burrow2:3000
zookeeper:
- http://zk1:2181
- http://zk2:2181
Usage
kafkactl -h
Usage of kafkactl:
Usage:
kafkactl [flags]
kafkactl [command]
Available Commands:
admin Perform Various Kafka Administration Tasks
burrow Query Burrow
config Manage kafkactl Configuration
consume Consume from Topics using Consumer Groups.
describe Return Topic or Group details
group Search and Retrieve Group Info
help Help about any command
message Retreive Targeted Messages from a Kafka Topic
meta Return Metadata
produce Produce messages to a Kafka topic
tail Tail a Topic
topic Search and Retrieve Available Topics
version Print kafkactl version and exit
zk Perform Various Zookeeper Administration Tasks
Flags:
-b, --broker string Bootstrap Kafka Broker
-g, --group string Specify a Target Group
-h, --help help for kafkactl
--port string Port used for Bootstrap Kafka Broker (default "9092")
-t, --topic string Specify a Target Topic
-v, --verbose Display any additional info and error output.
kafkactl admin -h
conf Get and Set Available Kafka Related Configs (Topics Only for Now)
create Create Topics
delete Delete Topics or Groups
offset Manage Offsets
pre Perform Preferred Replica Election Tasks
kafkactl zk -h
Available Commands:
ls List or Get Values of a Given Path in Zookeeper
Flags:
-c, --create string Create a Zookeeper Path (Use with --value for setting a value)
-d, --delete string Delete a Zookeeper Path/Value
-f, --force Force Operation
-h, --help help for zk
-s, --server string Specify a targeted Zookeeper Server and Port (eg. localhost:2181
--value string Create a Zookeeper Value (Use with --create to specify the path for the value) Wins over StdIn
Example output:
kafka
# kafkactl
Brokers: 5
Topics: 208
Groups: 126
Cluster: (Kafka: v1.1)
brokerhost01:9092/1
* brokerhost02:9092/2
brokerhost03:9092/3
brokerhost04:9092/4
brokerhost05:9092/5
(*)Controller
# kafkactl group mygroup05
GROUPTYPE GROUP COORDINATOR
consumer mygroup05 broker03:9092/3
kafkactl group mygroup05 --lag
GROUP TOPIC PART MEMBER OFFSET LAG
mygroup05 mytopic05 14 mygroup05-77885078-77abc 97 0
mygroup05 mytopic05 1 mygroup05-77885078-77abc 76 0
mygroup05 mytopic05 2 mygroup05-77885078-77abc 84 0
mygroup05 mytopic05 9 mygroup05-77885078-77abc 84 0
mygroup05 mytopic05 12 mygroup05-77885078-77abc 84 0
mygroup05 mytopic05 3 mygroup05-77885078-77abc 135 0
mygroup05 mytopic05 8 mygroup05-77885078-77abc 90 0
mygroup05 mytopic05 7 mygroup05-77885078-77abc 83 0
mygroup05 mytopic05 4 mygroup05-77885078-77abc 111 0
mygroup05 mytopic05 6 mygroup05-77885078-77abc 114 0
mygroup05 mytopic05 0 mygroup05-77885078-77abc 2329 0
mygroup05 mytopic05 5 mygroup05-77885078-77abc 85 0
mygroup05 mytopic05 11 mygroup05-77885078-77abc 139 0
mygroup05 mytopic05 10 mygroup05-77885078-77abc 125 0
mygroup05 mytopic05 13 mygroup05-77885078-77abc 97 0
zookeeper
# kafkactl zk ls /cluster/id
VALUE: /cluster/id
{"version":"1","id":"wwI7vYwOShKnEqVzDhhUYqi"}
# kafkactl zk ls /
PATH: /
cluster
controller
brokers
zookeeper
admin
isr_change_notification
log_dir_event_notification
controller_epoch
consumers
burrow
latest_producer_id_block
config
# kafkactl zk --create /testpath
Successfully Created: /testpath
# echo 'my testvalue from stdin' | kafkactl zk --create /testpath/testvalue
Successfully Created: /testpath/testvalue
# kafkactl zk ls /testpath/testvalue
VALUE: /testpath/testvalue
my testvalue from stdin
burrow
kafkactl burrow mygroup07 -x
BURROW GROUP TOPIC PARTITION LAG TOPICLAG STATUS
dc01-kafka mygroup07 mytopic07 0 0 0 OK
dc01-kafka mygroup07 mytopic07 1 0 0 OK
dc01-kafka mygroup07 mytopic07 2 0 0 OK
dc01-kafka mygroup07 mytopic07 3 0 0 OK
dc01-kafka mygroup07 mytopic07 4 0 0 OK
dc01-kafka mygroup07 mytopic07 5 0 0 OK
dc01-kafka mygroup07 mytopic07 6 0 0 OK
dc01-kafka mygroup07 mytopic07 7 0 0 OK
dc01-kafka mygroup07 mytopic07 8 0 0 OK
dc01-kafka mygroup07 mytopic07 9 0 0 OK
dc01-kafka mygroup07 mytopic07 10 0 0 OK
dc01-kafka mygroup07 mytopic07 11 0 0 OK
dc01-kafka mygroup07 mytopic07 12 0 0 OK
dc01-kafka mygroup07 mytopic07 13 0 0 OK
dc01-kafka mygroup07 mytopic07 14 0 0 OK
# Functions
BrokerAPIVersions returns the available API Versions for the given broker.
DefaultMessageFunc is default ProcessMessageFunc that can be used with a TopicHandlerMap.
GetConf returns a *sarama.Config and optionally sets the ClientID if specified.
GetTopicSummaries creates Topic Summaries for the give TopicMeta.
Log allows use of the same logger without having to initialize a client.
Logf allows use of the same logger without having to initialize a client.
Logger Enables Verbose Logging in the logFormat given.
MatchKafkaVersion parses the given versiona and returns the corresponding KafkaVersion from sarama.
NewClient returns a new KClient.
NewConsumerGroup returns a new ConsumerGroup.
NewCustomClient returns a new KClient using an sarama Config.
NewProducer returns a new Producer with Successes and Error channels that must be read from.
ReturnFirstValid returns the first available, connectable broker provided from a broker list.
Warnf allows use of the same logger without having to initialize a client.
# Constants
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
APIKey Codes.
Global Measurements.
Metric Measurement Names:.
Metric Measurement Names:.
Metric Measurement Names:.
Metric Measurement Names:.
Histogram Measurements.
Metric Measurement Names:.
Metere Measurements.
Metric Measurement Names:.
OffsetNewest stands for the log head offset, i.e.
OffsetOldest stands for the oldest offset available on the broker for a partition.
# Variables
APIDescriptions for APIs.
Kafka API Versions:.
Kafka API Versions:.
Kafka API Versions:.
Kafka API Versions:.
Kafka API Versions:.
Kafka API Versions:.
Kafka API Versions:.
# Structs
CGHandler implements sarama ConsumerGroupHandlers.
CGMeta contains Metadata related to a ConsumerGroup.
ClusterMeta contains metadata for a Kafka Cluster.
ConsumerGroup implements a sarama ConsumerGroup.
GroupLag details Group Lag.
GroupListMeta contains basic info for a Group.
GroupMeta contains detailed info for a Group.
GroupOffsetMap details offsets for each partition in a map.
HistoMetric contains Histogram values.
KClient is a Kafka Client ...
MemberMeta details a Member of a Group.
Message represents a Kafka message sent/received to/from a topic partition.
MeterMetric contains Meter values.
MetricCollection contains a collection of Metrics.
Producer is the implementation of an AsyncProducer.
ProducerError is the type of error generated when the producer fails to deliver a message.
QuickGroupMeta is GroupListMeta without the Coordinator for the Group.
RawMetric contains the raw metric measurements and values.
TopicMeta contains detailed information for a Topic.
TopicOffsetMap returns offset details for a Topic.
TopicSummary contains a summary for a Topic.
# Interfaces
Metric represents a metric measurement from Kafka.
OffsetAdmin is used for managing offsets.
# Type aliases
ProcessMessageFunc is used to iterate over Messages.
TopicHandlerMap maps topics to ProcessMessageFuncs.