Categorygithub.com/blugnu/kafka
repositorypackage
0.2.1
Repository: https://github.com/blugnu/kafka.git
Documentation: pkg.go.dev

# Packages

No description provided by the author

# README

ulog

making Kafka less Kafka-esque


blugnu/kafka

Features

  • Discoverable Configuration: Provides option functions for configuring Kafka clients, with separation of general, consumer and producer-specific configuration

  • Reduced Boilerplate: Provides a complete implementation of a Kafka consumer with a single function call, handling all the boilerplate code for you including offset commits, signal handling and graceful shutdown

  • Producer Retries: Provides a producer implementation that will retry sending messages to Kafka in the event of a timeout (configurable timeout and MaxRetries)

  • Mock Producer: Provides a mock producer implementation that can be used for testing that applications produce expected messages

Installation

go get github.com/blugnu/kafka

Usage

  • Establish a base configuration (e.g. bootstrap servers)
  • Configure a consumer and/or producer
  • Start the consumer

Example

package main

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "syscall"

  "github.com/blugnu/kafka"
)

func HandleEvent(ctx context.Context, msg *kafka.Message) error {
  fmt.Printf("received message: %s\n", string(msg.Value))
  return nil
}

func main() {
  // initialise a base configuration
  cfg := kafka.NewConfig(
    kafka.BootstrapServers("localhost:9092"),
  )

  // configure a consumer
  consumer, err := kafka.NewConsumer(cfg,
    kafka.ConsumerGroupID("my-group"),
    kafka.TopicHandler("event", kafka.HandlerFunc(HandleEvent)),
  )
  if err != nil {
    log.Fatal("error creating consumer:", err)
  }

  // start the consumer
  if err := consumer.Start(ctx); err != nil {
    log.Fatal(err)
  }

  if err := consumer.Wait(); err != nil {
    log.Fatal(err)
  }
}

Logging

To avoid importing a specific logging library or imposing a log format on applications, logs are written using internal log hooks. These are set to no-op by default.

To enable logging you must call the kafka.EnableLogs function, providing functions to log entries at different levels as required.

For example, the following might be used to initialise a blugnu/ulog context logger and enable logging of INFO level Kafka logs to that logger; logs at all other levels are left as no-ops:

func logger(ctx context.Context) (context.Context, ulog.Logger, func()) {
    log, cfn, err := ulog.NewLogger(
        ulog.WithLevel(ulog.DebugLevel),
    )
    if err != {
        log.Fatal(fmt.Errorf("error initialising logger: %v", err))
    }

    kafka.EnableLogs(&kafka.Loggers{
        Info: func(ctx context.Context, msg string, fields map[string]interface{}) {
            log := ulog.FromContext(ctx)
            log.Info(msg, ulog.Fields(fields))
        },
    })

    return ulog.ContextWithLogger(ctx, log), log, cfn
}

Logging functions are provided for each of the following levels:

  • Debug
  • Info
  • Error

Default Logging

A nil argument may be passed to EnableLogs to enable default logging, which will write logs using the standard log package.

Default logging is also emitted if a zero-value &Loggers{} is passed to EnableLogs, i.e. all functions set nil.

Note: Default logging is not recommended for production use.