package
1.0.4
Repository: https://github.com/obse4/gocommon.git
Documentation: pkg.go.dev

# README

goCommon kafka

使用

  • producer
import (
    "github.com/obse4/goCommon/kafka"
    "fmt"
)

func main() {
    var kafkaProducer = kafka.KafkaProducerConfig{
		Name:    "kafka_producer_test",
		Brokers: []string{"127.0.0.1:9092"},
		Topic:   "test",
	}

    kafka.NewKafkaProducer(&kafkaProducer)

    err := kafkaProducer.Producer.SendMessage(kafka.ProducerMessage{
		Topic: "test",
		Value: "obse4^-^",
	})

	if err != nil {
		fmt.Printf("new message err %s\n", err.Error())
	}
}
  • consumer
import (
    "github.com/obse4/goCommon/kafka"
    "fmt"
)

func main() {
    var kafkaConsumer = kafka.KafkaConsumerConfig{
		Name:            "kafka_consumer_test",
		Brokers:         []string{"127.0.0.1:9092"},
		Topics:          []string{"test"},
		GroupId:         "consumer_test",
		AutoOffsetReset: "earliest",
	}
	kafka.NewKafkaConsumer(&kafkaConsumer)

    // func (*kafka.Consumer).RegisterHandle(f kafka.ConsumeFunction, mark bool)
    // f func(msg *kafka.ConsumerMessage, sess kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error
    // msg 消息指针,sess、claim不常用
    // mark 标记已消费消息
    go func() {
		kafkaConsumer.Consumer.RegisterHandle(func(msg *kafka.ConsumerMessage, sess kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
			fmt.Printf("topic %s val %s, timestamp %s\n", msg.Topic, msg.Value, msg.Timestamp.Format("2006-01-02 15:04:05"))
			return nil
		}, true)

		kafkaConsumer.Consumer.Consume(context.TODO())
	}()

    //  测试Consumer.Close()
    // 在另一个goroutine中执行关闭操作
	go func() {
		time.AfterFunc(5*time.Second, func() {
			fmt.Println("close")
			kafkaConsumer.Consumer.Close()
		})
	}()

    // 10s后执行结束
    time.Sleep(time.Second * 10)
}

# Packages

No description provided by the author

# Functions

新建消费者.
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author