# README
kafka
import "github.com/andy2046/kafka-rest-go/kafka"
Overview
Package kafka provides a thin wrapper around the REST API, providing a more convenient interface for accessing cluster metadata and producing and consuming data.
Index
- Constants
- Variables
- func AvroFormat(k *Kafka) error
- func BinaryFormat(k *Kafka) error
- func EarliestOffset(k *Kafka) error
- func JSONFormat(k *Kafka) error
- func LargestOffset(k *Kafka) error
- func LatestOffset(k *Kafka) error
- func SetAccept(accept string) func(*Kafka) error
- func SetContentType(contentType string) func(*Kafka) error
- func SetTimeout(timeout time.Duration) func(*Kafka) error
- func SetURL(url string) func(*Kafka) error
- func SmallestOffset(k *Kafka) error
- func Stringer(v interface{}) (string, error)
- func URLJoin(urlstr string, pathstrs ...string) (string, error)
- func V1Version(k *Kafka) error
- func V2Version(k *Kafka) error
- type Argument
- type Broker
- type ConsumerInstance
- type ConsumerOffset
- type ConsumerOffsets
- type ConsumerOffsetsPartitions
- type ConsumerPartitions
- type ConsumerRequest
- type Consumers
- func (cs *Consumers) Assign(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error
- func (cs *Consumers) Assignments(consumerName string, consumerGroup ...string) (*ConsumerOffsetsPartitions, error)
- func (cs *Consumers) CommitOffsets(consumerOffsets *ConsumerOffsets, consumerName string, consumerGroup ...string) error
- func (cs *Consumers) DeleteConsumer(consumerName string, consumerGroup ...string) error
- func (cs Consumers) Messages(messagesArg Argument) ([]Message, error)
- func (cs *Consumers) NewConsumer(consumerRequest *ConsumerRequest, consumerGroup ...string) (*ConsumerInstance, error)
- func (cs *Consumers) Offsets(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) (*ConsumerOffsets, error)
- func (cs *Consumers) Poll(interval time.Duration, messagesArg Argument, onMessage func(error, *[]Message)) func()
- func (cs Consumers) Records(recordsArg Argument) ([]Message, error)
- func (cs *Consumers) Seek(consumerOffsets *ConsumerOffsets, consumerName string, consumerGroup ...string) error
- func (cs *Consumers) SeekToBeginning(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error
- func (cs *Consumers) SeekToEnd(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error
- func (cs *Consumers) Subscribe(topicSubscription *TopicSubscription, useTopicPattern bool, consumerName string, consumerGroup ...string) error
- func (cs *Consumers) Subscriptions(consumerName string, consumerGroup ...string) (*TopicsSubscription, error)
- func (cs *Consumers) Unsubscribe(consumerName string, consumerGroup ...string) error
- type ErrorMessage
- type Format
- type Kafka
- func New(options ...func(*Kafka) error) (*Kafka, error)
- func (k *Kafka) Broker() (*Broker, error)
- func (k *Kafka) HTTPClient() *http.Client
- func (k *Kafka) NewConsumers(consumerGroup ...string) *Consumers
- func (k *Kafka) NewTopics() *Topics
- func (k *Kafka) SetOption(options ...func(*Kafka) error) error
- type Message
- type Offset
- type Partition
- type Partitions
- type ProducerMessage
- type ProducerOffsets
- type ProducerRecord
- type ProducerResponse
- type Replica
- type Topic
- type TopicNames
- type TopicPatternSubscription
- type TopicSubscription
- type Topics
- func (ts *Topics) Names() (*TopicNames, error)
- func (ts *Topics) NewPartitions(t ...*Topic) *Partitions
- func (ts *Topics) Produce(topicName string, message *ProducerMessage) (*ProducerResponse, error)
- func (ts *Topics) Topic(topicName string) (*Topic, error)
- func (ts *Topics) Topics() ([]*Topic, error)
- type TopicsSubscription
- type Version
Package files
consumer.go kafka.go partition.go topic.go
Constants
const (
// JSON formated consumer
JSON = Format("json")
// Binary formated consumer
Binary = Format("binary")
// Avro formated consumer
Avro = Format("avro")
// Earliest is the oldest offset for API v2
Earliest = Offset("earliest")
// Latest is the newest offset for API v2
Latest = Offset("latest")
// Smallest is the oldest offset for API v1
Smallest = Offset("smallest")
// Largest is the newest offset for API v1
Largest = Offset("largest")
// V2 is API v2
V2 = Version("v2")
// V1 is API v1
V1 = Version("v1")
)
Variables
var Defaults = Kafka{
URL: "http://localhost:8082",
Timeout: 60 * time.Second,
Accept: "application/vnd.kafka+json, application/json",
ContentType: "application/vnd.kafka+json",
Format: Binary,
Offset: Largest,
Version: V1,
}
Defaults for Kafka
func AvroFormat
func AvroFormat(k *Kafka) error
AvroFormat set Format to Avro
func BinaryFormat
func BinaryFormat(k *Kafka) error
BinaryFormat set Format to Binary
func EarliestOffset
func EarliestOffset(k *Kafka) error
EarliestOffset set Offset to Earliest
func JSONFormat
func JSONFormat(k *Kafka) error
JSONFormat set Format to JSON
func LargestOffset
func LargestOffset(k *Kafka) error
LargestOffset set Offset to Latest
func LatestOffset
func LatestOffset(k *Kafka) error
LatestOffset set Offset to Latest
func SetAccept
func SetAccept(accept string) func(*Kafka) error
SetAccept applies Accept to Kafka.
func SetContentType
func SetContentType(contentType string) func(*Kafka) error
SetContentType applies ContentType to Kafka.
func SetTimeout
func SetTimeout(timeout time.Duration) func(*Kafka) error
SetTimeout applies Timeout to Kafka.
func SetURL
func SetURL(url string) func(*Kafka) error
SetURL applies URL to Kafka.
func SmallestOffset
func SmallestOffset(k *Kafka) error
SmallestOffset set Offset to Earliest
func Stringer
func Stringer(v interface{}) (string, error)
Stringer returns formated string.
func URLJoin
func URLJoin(urlstr string, pathstrs ...string) (string, error)
URLJoin joins url with path and return the whole url string.
func V1Version
func V1Version(k *Kafka) error
V1Version set Version to V1
func V2Version
func V2Version(k *Kafka) error
V2Version set Version to V2
type Argument
type Argument struct {
Timeout int
TopicName string
MaxBytes int
ConsumerName string
ConsumerGroup string
}
Argument is the argument for both method Records and Messages
type Broker
type Broker struct {
Brokers []int `json:"brokers"`
}
Broker data
type ConsumerInstance
type ConsumerInstance struct {
ConsumerName string `json:"instance_id"`
BaseURI string `json:"base_uri"`
}
ConsumerInstance data
type ConsumerOffset
type ConsumerOffset struct {
Partition int `json:"partition"`
Offset int64 `json:"offset"`
Topic string `json:"topic"`
Metadata string `json:"metadata,omitempty"`
}
ConsumerOffset are the offsets to commit
type ConsumerOffsets
type ConsumerOffsets struct {
Offsets []*ConsumerOffset `json:"offsets"`
}
ConsumerOffsets data
type ConsumerOffsetsPartitions
type ConsumerOffsetsPartitions struct {
Partitions []*ConsumerPartitions `json:"partitions"`
}
ConsumerOffsetsPartitions are the partitions for consumer committed offsets
type ConsumerPartitions
type ConsumerPartitions struct {
Partition int `json:"partition"`
Topic string `json:"topic"`
}
ConsumerPartitions are the partitions for consumer
type ConsumerRequest
type ConsumerRequest struct {
Format Format `json:"format"`
Offset Offset `json:"auto.offset.reset"`
AutoCommit string `json:"auto.commit.enable"` // true or false
Name string `json:"name,omitempty"`
}
ConsumerRequest is the metadata needed to create a consumer instance
type Consumers
type Consumers struct {
Kafka *Kafka
ConsumerGroup string
List []*ConsumerInstance
}
Consumers data
func (*Consumers) Assign
func (cs *Consumers) Assign(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error
Assign manually assign a list of partitions to this consumer.
func (*Consumers) Assignments
func (cs *Consumers) Assignments(consumerName string, consumerGroup ...string) (*ConsumerOffsetsPartitions, error)
Assignments get the list of partitions currently manually assigned to this consumer.
func (*Consumers) CommitOffsets
func (cs *Consumers) CommitOffsets(consumerOffsets *ConsumerOffsets, consumerName string, consumerGroup ...string) error
CommitOffsets commits a list of offsets for the consumer.
func (*Consumers) DeleteConsumer
func (cs *Consumers) DeleteConsumer(consumerName string, consumerGroup ...string) error
DeleteConsumer destroy the consumer instance.
func (*Consumers) Messages
func (cs *Consumers) Messages(messagesArg Argument) (*[]Message, error)
Messages consume messages from a topic via API v1. Messages arguments include MaxBytes (optional) TopicName ConsumerName ConsumerGroup. MaxBytes is the maximum number of bytes of unencoded keys and values that should be included in the response. Default is unlimited.
func (*Consumers) NewConsumer
func (cs *Consumers) NewConsumer(consumerRequest *ConsumerRequest, consumerGroup ...string) (*ConsumerInstance, error)
NewConsumer creates a new consumer instance in the consumer group.
func (*Consumers) Offsets
func (cs *Consumers) Offsets(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) (*ConsumerOffsets, error)
Offsets get the last committed offsets for the given partitions.
func (*Consumers) Poll
func (cs *Consumers) Poll(interval time.Duration, messagesArg Argument, onMessage func(error, *[]Message)) func()
Poll keep polling messages from a topic. the interval to poll messages is every interval ms, onMessage to handle polled messages. returned func is for cancellation.
func (*Consumers) Records
func (cs *Consumers) Records(recordsArg Argument) (*[]Message, error)
Records fetch message for the topics or partitions specified via API v2. Records arguments include Timeout (optional) MaxBytes (optional) ConsumerName ConsumerGroup. Timeout is the number of milliseconds for the underlying request to fetch the records. Default to 5000ms. MaxBytes is the maximum number of bytes of unencoded keys and values that should be included in the response. Default is unlimited.
func (*Consumers) Seek
func (cs *Consumers) Seek(consumerOffsets *ConsumerOffsets, consumerName string, consumerGroup ...string) error
Seek overrides the fetch offsets that the consumer will use for the next set of records to fetch.
func (*Consumers) SeekToBeginning
func (cs *Consumers) SeekToBeginning(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error
SeekToBeginning seek to the first offset for each of the given partitions.
func (*Consumers) SeekToEnd
func (cs *Consumers) SeekToEnd(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error
SeekToEnd seek to the last offset for each of the given partitions.
func (*Consumers) Subscribe
func (cs *Consumers) Subscribe(topicSubscription *TopicSubscription, useTopicPattern bool, consumerName string, consumerGroup ...string) error
Subscribe to the given list of topics or a topic pattern.
func (*Consumers) Subscriptions
func (cs *Consumers) Subscriptions(consumerName string, consumerGroup ...string) (*TopicsSubscription, error)
Subscriptions get the current subscribed list of topics.
func (*Consumers) Unsubscribe
func (cs *Consumers) Unsubscribe(consumerName string, consumerGroup ...string) error
Unsubscribe from topics currently subscribed.
type ErrorMessage
type ErrorMessage struct {
ErrorCode int `json:"error_code,omitempty"`
Message string `json:"message,omitempty"`
}
ErrorMessage for API response
type Format
type Format string
Format is one of json, binary or avro
type Kafka
type Kafka struct {
URL string
Timeout time.Duration
Accept string
ContentType string
Format Format
Offset Offset
Version Version
}
Kafka represents a Kafka REST API.
func New
func New(options ...func(*Kafka) error) (*Kafka, error)
New returns a Kafka instance with default setting.
func (*Kafka) Broker
func (k *Kafka) Broker() (*Broker, error)
Broker returns the brokers.
func (*Kafka) HTTPClient
func (k *Kafka) HTTPClient() *http.Client
HTTPClient creates a new http.Client with timeout.
func (*Kafka) NewConsumers
func (k *Kafka) NewConsumers(consumerGroup ...string) *Consumers
NewConsumers returns a Consumers instance.
func (*Kafka) NewTopics
func (k *Kafka) NewTopics() *Topics
NewTopics returns a Topics instance.
func (*Kafka) SetOption
func (k *Kafka) SetOption(options ...func(*Kafka) error) error
SetOption takes one or more option function and applies them in order to Kafka.
type Message
type Message struct {
Topic string `json:"topic"`
Key json.RawMessage `json:"key"`
Value json.RawMessage `json:"value"`
Partition int `json:"partition"`
Offset int64 `json:"offset"`
}
Message is a single Kafka message
type Offset
type Offset string
Offset is either earliest or latest
type Partition
type Partition struct {
Partition int `json:"partition"`
Leader int `json:"leader"`
Replicas []Replica `json:"replicas"`
}
Partition data
type Partitions
type Partitions struct {
Kafka *Kafka
Topic *Topic
List *[]Partition
}
Partitions data
func (*Partitions) Partition
func (ps *Partitions) Partition(partitionID int, topicName ...string) (*Partition, error)
Partition returns the Partition with provided partitionID.
func (*Partitions) Partitions
func (ps *Partitions) Partitions(topicName ...string) (*[]Partition, error)
Partitions lists partitions for the topic.
func (*Partitions) Produce
func (ps *Partitions) Produce(id int, message *ProducerMessage, topicName ...string) (*ProducerResponse, error)
Produce post message to the Partition with provided id.
type ProducerMessage
type ProducerMessage struct {
KeySchema string `json:"key_schema,omitempty"`
KeySchemaID int `json:"key_schema_id,omitempty"`
//either value schema or value schema id must be provided for avro messages
ValueSchema string `json:"value_schema,omitempty"`
ValueSchemaID int `json:"value_schema_id,omitempty"`
Records []*ProducerRecord `json:"records"`
}
ProducerMessage is the wrapper for the Topic / Partition data
type ProducerOffsets
type ProducerOffsets struct {
Partition int `json:"partition"`
Offset int64 `json:"offset"`
ErrorCode int64 `json:"error_code"`
Error string `json:"error"`
}
ProducerOffsets are the resulting offsets for Topic / Partition
type ProducerRecord
type ProducerRecord struct {
Key json.RawMessage `json:"key,omitempty"`
Value json.RawMessage `json:"value"`
Partition int `json:"partition,omitempty"`
}
ProducerRecord is an individual message for Topic / Partition
type ProducerResponse
type ProducerResponse struct {
KeySchemaID int `json:"key_schema_id"`
ValueSchemaID int `json:"value_schema_id"`
Offsets []*ProducerOffsets `json:"offsets"`
}
ProducerResponse is the Topic / Partition response
type Replica
type Replica struct {
Broker int `json:"broker"`
Leader bool `json:"leader"`
InSync bool `json:"in_sync"`
}
Replica data
type Topic
type Topic struct {
Name string `json:"name"`
Configs json.RawMessage `json:"configs"`
Partitions []*Partition `json:"partitions"`
}
Topic data
type TopicNames
type TopicNames []string
TopicNames data
type TopicPatternSubscription
type TopicPatternSubscription struct {
TopicPattern string `json:"topic_pattern"`
}
TopicPatternSubscription with topic pattern
type TopicSubscription
type TopicSubscription struct {
Topics *TopicsSubscription
TopicPattern *TopicPatternSubscription
}
TopicSubscription data, topic_pattern and topics fields are mutually exclusive
type Topics
type Topics struct {
Kafka *Kafka
List []*Topic
}
Topics data
func (*Topics) Names
func (ts *Topics) Names() (*TopicNames, error)
Names lists all topic names.
func (*Topics) NewPartitions
func (ts *Topics) NewPartitions(t ...*Topic) *Partitions
NewPartitions returns a Partitions instance.
func (*Topics) Produce
func (ts *Topics) Produce(topicName string, message *ProducerMessage) (*ProducerResponse, error)
Produce post message to the Topic with provided topicName.
func (*Topics) Topic
func (ts *Topics) Topic(topicName string) (*Topic, error)
Topic returns the Topic with provided topicName.
func (*Topics) Topics
func (ts *Topics) Topics() ([]*Topic, error)
Topics lists all topics.
type TopicsSubscription
type TopicsSubscription struct {
Topics []string `json:"topics"`
}
TopicsSubscription with topics
type Version
type Version string
Version is the API version