Categorygithub.com/andy2046/kafka-rest-go
module
0.1.2
Repository: https://github.com/andy2046/kafka-rest-go.git
Documentation: pkg.go.dev

# README

kafka

import "github.com/andy2046/kafka-rest-go/kafka"

Overview

Package kafka provides a thin wrapper around the REST API, providing a more convenient interface for accessing cluster metadata and producing and consuming data.

Index

Package files

consumer.go kafka.go partition.go topic.go

Constants

const (
    // JSON formated consumer
    JSON = Format("json")
    // Binary formated consumer
    Binary = Format("binary")
    // Avro formated consumer
    Avro = Format("avro")

    // Earliest is the oldest offset for API v2
    Earliest = Offset("earliest")
    // Latest is the newest offset for API v2
    Latest = Offset("latest")

    // Smallest is the oldest offset for API v1
    Smallest = Offset("smallest")
    // Largest is the newest offset for API v1
    Largest = Offset("largest")

    // V2 is API v2
    V2 = Version("v2")

    // V1 is API v1
    V1 = Version("v1")
)

Variables

var Defaults = Kafka{
    URL:         "http://localhost:8082",
    Timeout:     60 * time.Second,
    Accept:      "application/vnd.kafka+json, application/json",
    ContentType: "application/vnd.kafka+json",
    Format:      Binary,
    Offset:      Largest,
    Version:     V1,
}

Defaults for Kafka

func AvroFormat

func AvroFormat(k *Kafka) error

AvroFormat set Format to Avro

func BinaryFormat

func BinaryFormat(k *Kafka) error

BinaryFormat set Format to Binary

func EarliestOffset

func EarliestOffset(k *Kafka) error

EarliestOffset set Offset to Earliest

func JSONFormat

func JSONFormat(k *Kafka) error

JSONFormat set Format to JSON

func LargestOffset

func LargestOffset(k *Kafka) error

LargestOffset set Offset to Latest

func LatestOffset

func LatestOffset(k *Kafka) error

LatestOffset set Offset to Latest

func SetAccept

func SetAccept(accept string) func(*Kafka) error

SetAccept applies Accept to Kafka.

func SetContentType

func SetContentType(contentType string) func(*Kafka) error

SetContentType applies ContentType to Kafka.

func SetTimeout

func SetTimeout(timeout time.Duration) func(*Kafka) error

SetTimeout applies Timeout to Kafka.

func SetURL

func SetURL(url string) func(*Kafka) error

SetURL applies URL to Kafka.

func SmallestOffset

func SmallestOffset(k *Kafka) error

SmallestOffset set Offset to Earliest

func Stringer

func Stringer(v interface{}) (string, error)

Stringer returns formated string.

func URLJoin

func URLJoin(urlstr string, pathstrs ...string) (string, error)

URLJoin joins url with path and return the whole url string.

func V1Version

func V1Version(k *Kafka) error

V1Version set Version to V1

func V2Version

func V2Version(k *Kafka) error

V2Version set Version to V2

type Argument

type Argument struct {
    Timeout       int
    TopicName     string
    MaxBytes      int
    ConsumerName  string
    ConsumerGroup string
}

Argument is the argument for both method Records and Messages

type Broker

type Broker struct {
    Brokers []int `json:"brokers"`
}

Broker data

type ConsumerInstance

type ConsumerInstance struct {
    ConsumerName string `json:"instance_id"`
    BaseURI      string `json:"base_uri"`
}

ConsumerInstance data

type ConsumerOffset

type ConsumerOffset struct {
    Partition int    `json:"partition"`
    Offset    int64  `json:"offset"`
    Topic     string `json:"topic"`
    Metadata  string `json:"metadata,omitempty"`
}

ConsumerOffset are the offsets to commit

type ConsumerOffsets

type ConsumerOffsets struct {
    Offsets []*ConsumerOffset `json:"offsets"`
}

ConsumerOffsets data

type ConsumerOffsetsPartitions

type ConsumerOffsetsPartitions struct {
    Partitions []*ConsumerPartitions `json:"partitions"`
}

ConsumerOffsetsPartitions are the partitions for consumer committed offsets

type ConsumerPartitions

type ConsumerPartitions struct {
    Partition int    `json:"partition"`
    Topic     string `json:"topic"`
}

ConsumerPartitions are the partitions for consumer

type ConsumerRequest

type ConsumerRequest struct {
    Format     Format `json:"format"`
    Offset     Offset `json:"auto.offset.reset"`
    AutoCommit string `json:"auto.commit.enable"` // true or false
    Name       string `json:"name,omitempty"`
}

ConsumerRequest is the metadata needed to create a consumer instance

type Consumers

type Consumers struct {
    Kafka         *Kafka
    ConsumerGroup string
    List          []*ConsumerInstance
}

Consumers data

func (*Consumers) Assign

func (cs *Consumers) Assign(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error

Assign manually assign a list of partitions to this consumer.

func (*Consumers) Assignments

func (cs *Consumers) Assignments(consumerName string, consumerGroup ...string) (*ConsumerOffsetsPartitions, error)

Assignments get the list of partitions currently manually assigned to this consumer.

func (*Consumers) CommitOffsets

func (cs *Consumers) CommitOffsets(consumerOffsets *ConsumerOffsets, consumerName string, consumerGroup ...string) error

CommitOffsets commits a list of offsets for the consumer.

func (*Consumers) DeleteConsumer

func (cs *Consumers) DeleteConsumer(consumerName string, consumerGroup ...string) error

DeleteConsumer destroy the consumer instance.

func (*Consumers) Messages

func (cs *Consumers) Messages(messagesArg Argument) (*[]Message, error)

Messages consume messages from a topic via API v1. Messages arguments include MaxBytes (optional) TopicName ConsumerName ConsumerGroup. MaxBytes is the maximum number of bytes of unencoded keys and values that should be included in the response. Default is unlimited.

func (*Consumers) NewConsumer

func (cs *Consumers) NewConsumer(consumerRequest *ConsumerRequest, consumerGroup ...string) (*ConsumerInstance, error)

NewConsumer creates a new consumer instance in the consumer group.

func (*Consumers) Offsets

func (cs *Consumers) Offsets(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) (*ConsumerOffsets, error)

Offsets get the last committed offsets for the given partitions.

func (*Consumers) Poll

func (cs *Consumers) Poll(interval time.Duration, messagesArg Argument, onMessage func(error, *[]Message)) func()

Poll keep polling messages from a topic. the interval to poll messages is every interval ms, onMessage to handle polled messages. returned func is for cancellation.

func (*Consumers) Records

func (cs *Consumers) Records(recordsArg Argument) (*[]Message, error)

Records fetch message for the topics or partitions specified via API v2. Records arguments include Timeout (optional) MaxBytes (optional) ConsumerName ConsumerGroup. Timeout is the number of milliseconds for the underlying request to fetch the records. Default to 5000ms. MaxBytes is the maximum number of bytes of unencoded keys and values that should be included in the response. Default is unlimited.

func (*Consumers) Seek

func (cs *Consumers) Seek(consumerOffsets *ConsumerOffsets, consumerName string, consumerGroup ...string) error

Seek overrides the fetch offsets that the consumer will use for the next set of records to fetch.

func (*Consumers) SeekToBeginning

func (cs *Consumers) SeekToBeginning(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error

SeekToBeginning seek to the first offset for each of the given partitions.

func (*Consumers) SeekToEnd

func (cs *Consumers) SeekToEnd(consumerOffsetsPartitions *ConsumerOffsetsPartitions, consumerName string, consumerGroup ...string) error

SeekToEnd seek to the last offset for each of the given partitions.

func (*Consumers) Subscribe

func (cs *Consumers) Subscribe(topicSubscription *TopicSubscription, useTopicPattern bool, consumerName string, consumerGroup ...string) error

Subscribe to the given list of topics or a topic pattern.

func (*Consumers) Subscriptions

func (cs *Consumers) Subscriptions(consumerName string, consumerGroup ...string) (*TopicsSubscription, error)

Subscriptions get the current subscribed list of topics.

func (*Consumers) Unsubscribe

func (cs *Consumers) Unsubscribe(consumerName string, consumerGroup ...string) error

Unsubscribe from topics currently subscribed.

type ErrorMessage

type ErrorMessage struct {
    ErrorCode int    `json:"error_code,omitempty"`
    Message   string `json:"message,omitempty"`
}

ErrorMessage for API response

type Format

type Format string

Format is one of json, binary or avro

type Kafka

type Kafka struct {
    URL         string
    Timeout     time.Duration
    Accept      string
    ContentType string
    Format      Format
    Offset      Offset
    Version     Version
}

Kafka represents a Kafka REST API.

func New

func New(options ...func(*Kafka) error) (*Kafka, error)

New returns a Kafka instance with default setting.

func (*Kafka) Broker

func (k *Kafka) Broker() (*Broker, error)

Broker returns the brokers.

func (*Kafka) HTTPClient

func (k *Kafka) HTTPClient() *http.Client

HTTPClient creates a new http.Client with timeout.

func (*Kafka) NewConsumers

func (k *Kafka) NewConsumers(consumerGroup ...string) *Consumers

NewConsumers returns a Consumers instance.

func (*Kafka) NewTopics

func (k *Kafka) NewTopics() *Topics

NewTopics returns a Topics instance.

func (*Kafka) SetOption

func (k *Kafka) SetOption(options ...func(*Kafka) error) error

SetOption takes one or more option function and applies them in order to Kafka.

type Message

type Message struct {
    Topic     string          `json:"topic"`
    Key       json.RawMessage `json:"key"`
    Value     json.RawMessage `json:"value"`
    Partition int             `json:"partition"`
    Offset    int64           `json:"offset"`
}

Message is a single Kafka message

type Offset

type Offset string

Offset is either earliest or latest

type Partition

type Partition struct {
    Partition int       `json:"partition"`
    Leader    int       `json:"leader"`
    Replicas  []Replica `json:"replicas"`
}

Partition data

type Partitions

type Partitions struct {
    Kafka *Kafka
    Topic *Topic
    List  *[]Partition
}

Partitions data

func (*Partitions) Partition

func (ps *Partitions) Partition(partitionID int, topicName ...string) (*Partition, error)

Partition returns the Partition with provided partitionID.

func (*Partitions) Partitions

func (ps *Partitions) Partitions(topicName ...string) (*[]Partition, error)

Partitions lists partitions for the topic.

func (*Partitions) Produce

func (ps *Partitions) Produce(id int, message *ProducerMessage, topicName ...string) (*ProducerResponse, error)

Produce post message to the Partition with provided id.

type ProducerMessage

type ProducerMessage struct {
    KeySchema   string `json:"key_schema,omitempty"`
    KeySchemaID int    `json:"key_schema_id,omitempty"`
    //either value schema or value schema id must be provided for avro messages
    ValueSchema   string            `json:"value_schema,omitempty"`
    ValueSchemaID int               `json:"value_schema_id,omitempty"`
    Records       []*ProducerRecord `json:"records"`
}

ProducerMessage is the wrapper for the Topic / Partition data

type ProducerOffsets

type ProducerOffsets struct {
    Partition int    `json:"partition"`
    Offset    int64  `json:"offset"`
    ErrorCode int64  `json:"error_code"`
    Error     string `json:"error"`
}

ProducerOffsets are the resulting offsets for Topic / Partition

type ProducerRecord

type ProducerRecord struct {
    Key       json.RawMessage `json:"key,omitempty"`
    Value     json.RawMessage `json:"value"`
    Partition int             `json:"partition,omitempty"`
}

ProducerRecord is an individual message for Topic / Partition

type ProducerResponse

type ProducerResponse struct {
    KeySchemaID   int                `json:"key_schema_id"`
    ValueSchemaID int                `json:"value_schema_id"`
    Offsets       []*ProducerOffsets `json:"offsets"`
}

ProducerResponse is the Topic / Partition response

type Replica

type Replica struct {
    Broker int  `json:"broker"`
    Leader bool `json:"leader"`
    InSync bool `json:"in_sync"`
}

Replica data

type Topic

type Topic struct {
    Name       string          `json:"name"`
    Configs    json.RawMessage `json:"configs"`
    Partitions []*Partition    `json:"partitions"`
}

Topic data

type TopicNames

type TopicNames []string

TopicNames data

type TopicPatternSubscription

type TopicPatternSubscription struct {
    TopicPattern string `json:"topic_pattern"`
}

TopicPatternSubscription with topic pattern

type TopicSubscription

type TopicSubscription struct {
    Topics       *TopicsSubscription
    TopicPattern *TopicPatternSubscription
}

TopicSubscription data, topic_pattern and topics fields are mutually exclusive

type Topics

type Topics struct {
    Kafka *Kafka
    List  []*Topic
}

Topics data

func (*Topics) Names

func (ts *Topics) Names() (*TopicNames, error)

Names lists all topic names.

func (*Topics) NewPartitions

func (ts *Topics) NewPartitions(t ...*Topic) *Partitions

NewPartitions returns a Partitions instance.

func (*Topics) Produce

func (ts *Topics) Produce(topicName string, message *ProducerMessage) (*ProducerResponse, error)

Produce post message to the Topic with provided topicName.

func (*Topics) Topic

func (ts *Topics) Topic(topicName string) (*Topic, error)

Topic returns the Topic with provided topicName.

func (*Topics) Topics

func (ts *Topics) Topics() ([]*Topic, error)

Topics lists all topics.

type TopicsSubscription

type TopicsSubscription struct {
    Topics []string `json:"topics"`
}

TopicsSubscription with topics

type Version

type Version string

Version is the API version

# Packages

No description provided by the author
Package kafka provides a thin wrapper around the REST API, providing a more convenient interface for accessing cluster metadata and producing and consuming data.