modulepackage
0.0.10
Repository: https://github.com/kevinello/kafka-client.git
Documentation: pkg.go.dev
# README
kafka-client
kafka-client is a high level wrapped kafka API for kafka consuming and producing based on kafka-go
Features
Kafka consumer
- Customized Message Handler
- Multi topics Consumption(customized get topic function)
- Concurrent workerpool for consuming messages(flexible workerpool size)
- Consume error collection(when there is too many error, consumer will cancel itself)
- Consumption frequency detect(reconnect kafka when message frequency is too low)
- Safe manual close
- Asynchronous cancellations and timeouts using Go contexts
- Custom logger and verbose option
- SASL authentication
- TLS authentication
Kafka producer
- Asynchronously / Synchronously write message
- Safe manual close
- Asynchronous cancellations and timeouts using Go contexts
- Custom logger and verbose option
- SASL authentication
- TLS authentication
Usage
For basic usage examples, refer to the system test cases in the test
directory.
To run the system tests, you need to have a Kafka broker(set auto.create.topics.enable to true
, or create topics unit-test-topic-1
... unit-test-topic-10
before starting tests) and set environment variables below:
- KAFKA_BOOTSTRAP: kafka broker address
- KAFKA_SASL_USERNAME(optional): kafka sasl username
- KAFKA_SASL_PASSWORD(optional): kafka sasl password
SASL & TLS
To use SASL or TLS, you need to set Mechanism or TLS in ConsumerConfig
and ProducerConfig
, use ConsumerConfig
as an example:
package test
import (
"context"
"crypto/tls"
"log"
"os"
kc "github.com/Kevinello/kafka-client"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, _ := scram.Mechanism(scram.SHA512, "username", "password")
config := kc.ConsumerConfig{
GroupID: "test-group",
MessageHandler: func(msg *kafka.Message, consumer *kc.Consumer) (err error) {
log.Printf("message: %s\n", string(msg.Value))
return
},
Mechanism: mechanism,
TLS: &tls.Config{},
}
consumer, _ := kc.NewConsumer(context.Background(), config)
select {
case <-consumer.Closed():
os.Exit(1)
}
}
Replace the username
and password
with your own, and fill the TLS
with your own tls config.
Project Structure
# Packages
No description provided by the author
# Functions
GetAllTopic function decorator for get all topics, return GetTopicsFunc
@param kafkaBootstrap string @return GetTopicsFunc @author kevineluo @update 2023-03-15 03:14:57.
GetTopicReMatch function decorator for get topics with regex match, return GetTopicsFunc matches found (resTopics) and an err if applicable.
NewConsumer creates a new Kafka consumer.
NewProducer creates a new Kafka producer.
# Constants
MechanismSASLPlain SASL PLAINTEXT authentication @update 2024-01-03 06:54:55.
MechanismSASLSCRAMSHA256 SASL SCRAM-SHA-256 authentication @update 2024-01-03 06:55:11.
MechanismSASLSCRAMSHA512 SASL SCRAM-SHA-512 authentication @update 2024-01-03 06:55:17.
# Variables
ErrClosedConsumer error when try to close closed consumer @update 2023-03-15 02:03:09.
ErrClosedProducer error when try to close closed producer @update 2023-03-30 05:12:28.
ErrNoTopics GetTopicsFunc got no topic @update 2023-03-14 01:18:01.
ErrTooManyConsumeError consumer.ErrCount reach MaxConsumeErrorCount @update 2023-03-15 02:02:27.
# Structs
Consumer struct holds data related to the consumer
@author kevineluo @update 2023-02-24 01:53:37.
No description provided by the author
Producer struct holds data related to the producer
@author kevineluo @update 2023-03-15 10:30:44.
No description provided by the author
# Type aliases
GetTopicsFunc way to get needed topic(implemented and provided by user)
@return topics []string @return err error @author kevineluo @update 2023-03-28 07:16:54.
MessageHandler function which handles received messages from the Kafka broker.