package
4.1.0
Repository: https://github.com/onsdigital/dp-kafka.git
Documentation: pkg.go.dev

# README

KafkaTest

This package contains mocks intended to be used by users of this library for testing.

Empty mocks

If you require to implement your own mock functionality, you can use the empty mocks, which are created using moq to implement the interfaces kafkatest.ConsumerGroup, kafkatest.Producer and Message

These kind of mocks are recommended for unit-test, where you may only need to check that a particular function has been called with the expected parameters.

These interfaces expose the same methods as the real Producer and ConsumerGroup structs. You can instantiate the mocks like so:

consumer := kafkatest.IConsumerGroupMock{...}
producer := kafkatest.IProducerMock{...}
message := kafkatest.MessageMock{...}

Functional mocks

The previous mocks have been extended by implementing functionality that emulates a real Producer, Consumer and message, but without communicating with any real Kafka broker.

These kind of mocks are recommended for component-test, where you may want to have a fully functional mock that behaves like the real library, but without the overhead of deploying a full kafka cluster.

If you require a functional mock to test how you interact with kafka, you can use these mocks (kafkatest.MessageConsumer, kafaktest.MessageProducer and kafkatest.Message) like so:

Consumer

1- Create consumer mock

kafkaConsumer, err := kafkatest.NewConsumer(
    ctx,
    &kafka.ConsumerGroupConfig{
        BrokerAddrs:       Addr,
        Topic:             ConsumedTopic,
        GroupName:         GroupName,
        MinBrokersHealthy: &ConsumerMinBrokersHealthy,
        KafkaVersion:      &Version,
    },
    &kafkatest.ConsumerConfig{
        NumPartitions:     10,
        ChannelBufferSize: 10,
		InitAtCreation:    false,
	},
)

Please, provide the kafka ConsumerGroupConfig as you would do for a real kafka consumer, and the required kafkatest.ConsumerConfig according to your needs for the mock.

This will create a new kafkatest consumer, with NumPartitions (e.g. 10) go-routines running the sarama handler, one emulating each kafka partition.

The sarama message and error channels will have ChannelBufferSize (e.g. 10)

And the consumer will successfully initialise at creation time if InitAtCreation is true. Otherwise, it will fail to initialise at creation time, but it will succeed shortly after when Initialise() is called.

If no kafkatest.ConsumerConfig is provided, the default values will be used

2- Use the mock:

You can provide the Mock inside the kafkatest.Consumer to your service under test. For example, you may override a service kafka getter function like so:

service.GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) {
    kafkaConsumer, err := kafkatest.NewConsumer(...)
    return kafkaConsumer.Mock
}

3- Queue messages to the mock

Usually, when you use this consumer for testing, you want to queue kafka events, so that they are consumed by the service under test that is using the kafka consumer.

To queue a new messages to be consumed by the mock, you can call QueueMessage with the schema and event that you want to be queued for consumption, like so:

// create event that will be queued (matches schema.MySchema)
event := &models.MyEvent{
    Field1: "value one",
    FieldN: "value N"
}

// queue the event with the corresponding schema
if err := kafkaConsumer.QueueMessage(schema.MySchema, event); err != nil {
	return fmt.Errorf("failed to queue event: %w", err)
}

Producer

1- Create producer mock

kafkaProducer, err := kafkatest.NewProducer(
    ctx,
    &kafka.ProducerConfig{
        BrokerAddrs:       Addr,
        Topic:             ProducerTopic,
        MinBrokersHealthy: &ProducerMinBrokersHealthy,
        KafkaVersion:      &Version,
    },
    &kafkatest.ProducerConfig{
        ChannelBufferSize: 10,
		InitAtCreation:    false,
	},
)

Please, provide the kafka ProducerConfig as you would do for a real kafka producer, and the required kafkatest.ProducerConfig according to your needs for the mock.

The sarama message and error channels will have ChannelBufferSize (e.g. 10)

And the producer will successfully initialise at creation time if InitAtCreation is true. Otherwise, it will fail to initialise at creation time, but it will succeed shortly after when Initialise() is called.

If no kafkatest.ProducerConfig is provided, the default values will be used

2- Use the mock:

You can provide the Mock inside the kafkatest.Producer to your service under test. For example, you may override a service kafka getter function like so:

service.GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) {
    kafkaProducer, err := kafkatest.NewProducer(...)
    return kafkaProducer.Mock
}

3- Wait for message to be sent

Usually, when you use this consumer for testing, you want to check that a message is sent, so that it can be validated.

To expect a message to be sent through the mock, you can call WaitForMessageSent with the expected schema and an event pointer. The function will block until a message is sent or the provided timeout expires. If the event is sent, it will be unmarshaled to the provided pointer.

// create empty event pointer of the type you expect (matches schema.MySchema)
var e = &models.MyEvent{}

// wait for an event to be sent, with the corresponding schema
if err := kafkaProducer.WaitForMessageSent(schema.MySchema, e, timeout); err != nil {
	return fmt.Errorf("failed to expect sent message: %w", err)
}

Alternatively, your test might want to check that no message is sent (for example, if you are testing a case where a dependency returns an error).

You can check that no message is sent within a time window by calling WaitNoMessageSent with the duration of the window. Please, note that this function will block execution until the timeWindow duration has elapsed, until a message is sent through the mock, or until the producer is closed (in the latter 2 cases an error will be returned).

timeWindow := 5 * time.Second

// Wait up to 'timeWindow', expecting that no message is sent
if err := kafkaProducer.WaitNoMessageSent(timeWindow); err != nil {
    // handle error (message sent or producer closed)
}

Message

message := kafkatest.NewMessage(data, offset)

# Functions

NewConsumer creates a testing consumer for testing.
NewMessage returns a new mock message containing the given data.
NewProducer creates a testing producer for testing.
No description provided by the author
NewSaramaConsumerGroupSessionMock returns a new sarama consuemr group session mock with the provided number of partitions it also returns a func to cancel the context, to be used when a session is ending.
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author

# Structs

Consumer is an extension of the moq ConsumerGroup with implementation of required functions and Sarama mocks to emulate a fully functional Kafka ConsumerGroup.
No description provided by the author
IConsumerGroupMock is a mock implementation of kafka.IConsumerGroup.
IProducerMock is a mock implementation of kafka.IProducer.
Message allows a mock message to return the configured data, and capture whether commit has been called.
No description provided by the author
Producer is an extension of the moq Producer with implementation of required functions and Sarama mocks to emulate a fully functional kafka Producer.
No description provided by the author

# Type aliases

No description provided by the author