package
0.15.1
Repository: https://github.com/cisco-open/go-lanai.git
Documentation: pkg.go.dev

# README

Kafka

The Kafka module provides an abstraction for interfacing with Kafaka so that application code can focus on writing message processing code.

Binder

The Binder is the main interface for working with Kafka. The Kafka module provides a Kafka.Binder interface which your application can inject. Once your application have a reference to the Binder interface, you can create message producer from the Binder, or add your message consumer/subscriber to the binder.

Example

  1. To activate the Kafka module
Kafka.Use()
  1. Add Kafka properties to application.yml
# Following configuration serve as an example
# values specified in `kafka.bindings.default.*` are same as hardcoded defaults
#
# To overwrite defaults, add section with prefix `kafka.bindings.<your binding name>`,
# and specify the binding name when using Binder with `BindingName(...)` option
kafka:
  bindings:
    default:
      producer:
        log-level: "debug"
        ack-mode: "local" # all, local or none
        ack-timeout: 10s
        max-retry: 3
        backoff-interval: 100ms
        provisioning:
          auto-create-topic: true
          auto-add-partitions: true
          allow-lower-partitions: true
          partition-count: 1
          replication-factor: 1
      consumer:
        log-level: "debug"
        join-timeout: 60s
        max-retry: 4
        backoff-interval: 2s
    binding-name:
      producer:
        ...
      consumer:
        ...
  1. Inject the Kafka.Binder into your application
fx.Provide(NewComponent)

To create a producer from a Binder.

func NewComponent(b kafka.Binder) (*MyComponent, error) {
	p, err := b.Produce("MY_TOPIC", kafka.BindingName("my-binding-name"))
	if err != nil {
		return nil, err
	}
	return &MyComponent{Producer: p}, nil
}

Here you will have a component that have a reference to a message producer. The BindingName option allows binding specific configuration to be applied to your producer. See the documentation on BindingName for more details.

To add a consumer to the Binder, use fx.Invoke to registers the functions so that it's executed eagerly on application start. See fx documentation for the difference between fx.Invoke and fx.Provide.

fx.Invoke(AddConsumer)
func AddConsumer(Binder kafka.Binder) error {
	mc := &MyConsumer{
	}
	consumer, e := di.Binder.Consume("MY_TOPIC", kafkaGroup, kafka.BindingName("my-binding-name"))
	if e != nil {
		return e
	}
	if e := consumer.AddHandler(mc.MyMessageHandler); e != nil {
		return e
	}
	return nil
}

*MyConsumer has a method that implements Kafka.MessageHandlerFunc

See Kafka.MessageHandlerFunc for details on what methods are acceptable as message handler functions you can use in the consumer.AddHandler call.

See Kafka.Binder for details on additional details with regard to creating Producer, Consumer and Subscriber.

# Functions

No description provided by the author
AddInterceptors returns a DispatchOptions that add ConsumerHandlerInterceptor to a MessageHandlerFunc.
BindingName is a ProducerOptions or ConsumerOptions that specify the name of the binding.
No description provided by the author
FilterOnHeader returns a DispatchOptions specifying that the handler should be invoked when certain message header exists and matches the provided matcher.
KeyEncoder configures Producer with given encoder for serializing message key.
LogLevel is a ProducerOptions or ConsumerOptions that specify log level of Producer, Subscriber or Consumer.
No description provided by the author
No description provided by the author
No description provided by the author
Partitions configure Producer's topic provisioning, by specifying min partition required and their replica number (min.insync.replicas) in case topics are auto-created.
No description provided by the author
RequireAllAck waits for all in-sync replicas to commit before responding.
RequireLocalAck waits for only the local commit to succeed before responding.
RequireNoAck doesn't send any response, the TCP ACK is all you get.
Use Allow service to include this module in main().
WithConsumerProperties apply options configured via ConsumerProperties.
WithEncoder specify how message payload is encoded.
WithKey specify key used for the message.
WithProducerProperties apply options configured via ProducerProperties.

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
ErrorSubTypeCodeProvisioning.
ErrorSubTypeCodeProvisioning.
ErrorSubTypeCodeBindingInternal.
ErrorSubTypeCodeConnectivity.
ErrorSubTypeCodeProvisioning.
ErrorSubTypeCodeProvisioning.
ErrorSubTypeCodeProvisioning.
ErrorSubTypeCodeProvisioning.
All "SubType" values are used as mask sub-types of ErrorTypeCodeBinding.
All "SubType" values are used as mask sub-types of ErrorTypeCodeBinding.
All "SubType" values are used as mask sub-types of ErrorTypeConsumer.
All "SubType" values are used as mask sub-types of ErrorTypeConsumer.
All "SubType" values are used as mask sub-types of ErrorTypeProducer.
All "SubType" values are used as mask sub-types of ErrorTypeConsumer.
All "SubType" values are used as mask sub-types of ErrorTypeProducer.
All "SubType" values are used as mask sub-types of ErrorTypeProducer.
All "SubType" values are used as mask sub-types of ErrorTypeCodeBinding.
All "Type" values are used as mask.
All "Type" values are used as mask.
All "Type" values are used as mask.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Reserved kafka reserved error range.

# Variables

ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
ErrorTypes, can be used in errors.Is goland:noinspection GoUnusedGlobalVariable.
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Dispatcher process MessageContext and dispatch it to registered MessageHandlerFunc.
No description provided by the author
goland:noinspection GoNameStartsWithPackageName.
No description provided by the author
MessageContext internal use only, used by Interceptors and processors.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author
BindingLifecycle is the interface that controlling lifecycles of any Producer, Subscriber and GroupConsumer.
ConsumerDispatchFinalizer is the interface for other package to finalize message processing.
No description provided by the author
No description provided by the author
No description provided by the author
GroupConsumer provides consumer group workflow.
MessageHandlerFunc is message handling function that conform with following signature: func (ctx context.Context, [OPTIONAL_INPUT_PARAMS...]) error Where OPTIONAL_INPUT_PARAMS could contain following components (of which order is not important): - PAYLOAD_PARAM < AnyPayloadType >: message payload, where PayloadType could be any type other than interface, function or chan.
No description provided by the author
No description provided by the author
ProducerMessageFinalizer is the interface for other package to finalize message sending process.
No description provided by the author
No description provided by the author
Subscriber provides Pub-Sub workflow.

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author