Categorygithub.com/shipperizer/kilo-franz
repository
1.2.2
Repository: https://github.com/shipperizer/kilo-franz.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# README

Kilo Franz: opinionated Kafka Consumer and Producer code built on top of segmentio/kafka-go library

test release codecov Go Reference

Library used for dealing with Kafka consumers and producers

Uses segmentio/kafka-go v0.4.17

Release process

DOCS

To have a better look at API reference do godoc -http=:6060 and then check the browser at http://localhost:6060/pkg/github.com/shipperizer/kilo-franz/

HOWTO

  • to create a consumer:
// ChannelConsumer is an implementation of the ConsumerInterface
// it will work with 1 goroutine taking care of pulling messages and
// #N workers (defined on constructor)
// Example:

cfg := streamConfig.NewConfig(5*time.Minute, &tlsSetup, nil)
readerCfg := streamConfig.NewReaderConfig(
	cfg,
	strings.Split(viper.GetString("kafka.url"), ","),
	viper.GetString("kafka.consumer.topic"),
	"test-app.cgroup",
	5,
)
reader := core.NewReader(readerCfg)

consumer, err := subscriber.NewChannelConsumer(
	reader,
	dummy.NewService(
		store.NewStore(
			store.StoreTableConfig{
				Logs: fmt.Sprint(tablePrefix, viper.GetString("dynamodb.tables.audit.logs")),
			},
			dynamoClient,
		),
		monitor,
		readerCfg.GetGroupID(),
	),
	monitor,
)
if err != nil {
	panic(err)
}
consumer.Start()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
Block until we receive our signal.
<-c
consumer.Stop()
log.Info("Shutting down")
os.Exit(0)