Categorygithub.com/rabbitmq/rabbitmq-stream-go-client

# README

RabbitMQ Stream GO Client


Build codecov

Experimental client for RabbitMQ Stream Queues

Table of Contents

Overview

Experimental client for RabbitMQ Stream Queues

Installing

go get -u github.com/rabbitmq/[email protected]

imports:

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly

Run server with Docker


You may need a server to test locally. Let's start the broker:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
    rabbitmq:3.9-management

The broker should start in a few seconds. When it’s ready, enable the stream plugin and stream_management:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management

Management UI: http://localhost:15672/
Stream uri: rabbitmq-stream://guest:guest@localhost:5552

Getting started for impatient

See getting started example.

Examples

See examples directory for more use cases.

Usage

Connect

Standard way to connect single node:

env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("guest").
			SetPassword("guest"))
	CheckErr(err)

you can define the number of producers per connections, the default value is 1:

stream.NewEnvironmentOptions().
SetMaxProducersPerClient(2))

you can define the number of consumers per connections, the default value is 1:

stream.NewEnvironmentOptions().
SetMaxConsumersPerClient(2))

To have the best performance you should use the default values. Note about multiple consumers per connection: The IO threads is shared across the consumers, so if one consumer is slow it could impact other consumers performances

Multi hosts

It is possible to define multi hosts, in case one fails to connect the clients tries random another one.

addresses := []string{
		"rabbitmq-stream://guest:guest@host1:5552/%2f",
		"rabbitmq-stream://guest:guest@host2:5552/%2f",
		"rabbitmq-stream://guest:guest@host3:5552/%2f"}

env, err := stream.NewEnvironment(
			stream.NewEnvironmentOptions().SetUris(addresses))

Load Balancer

The stream client is supposed to reach all the hostnames, in case of load balancer you can use the stream.AddressResolver parameter in this way:

addressResolver := stream.AddressResolver{
		Host: "load-balancer-ip",
		Port: 5552,
	}
env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("host").
			SetPort(5552).
			SetAddressResolver(addressResolver).

In this configuration the client tries the connection until reach the right node.

This rabbitmq blog post explains the details.

See also "Using a load balancer" example in the examples directory

TLS

To configure TLS you need to set the IsTLS parameter:

env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5551). // standard TLS port
			SetUser("guest").
			SetPassword("guest").
			IsTLS(true).
			SetTLSConfig(&tls.Config{}),
	)

The tls.Config is the standard golang tls library https://pkg.go.dev/crypto/tls
See also "Getting started TLS" example in the examples directory

Streams

To define streams you need to use the the enviroment interfaces DeclareStream and DeleteStream.

It is highly recommended to define stream retention policies during the stream creation, like MaxLengthBytes or MaxAge:

err = env.DeclareStream(streamName,
		stream.NewStreamOptions().
		SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))

Note: The function DeclareStream returns stream.StreamAlreadyExists if a stream is already defined.

Publish messages

To publish a message you need a *stream.Producer instance:

producer, err :=  env.NewProducer("my-stream", nil)

With ProducerOptions is possible to customize the Producer behaviour:

type ProducerOptions struct {
	Name       string // Producer name, it is useful to handle deduplication messages
	QueueSize  int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
	BatchSize  int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
	BatchPublishingDelay int    // Period to send a batch of messages.
}

The client provides two interfaces to send messages. send:

var message message.StreamMessage
message = amqp.NewMessage([]byte("hello"))
err = producer.Send(message)

and BatchSend:

var messages []message.StreamMessage
for z := 0; z < 10; z++ {
  messages = append(messages, amqp.NewMessage([]byte("hello")))
}
err = producer.BatchSend(messages)

producer.Send:

  • accepts one message as parameter
  • automatically aggregates the messages
  • automatically splits the messages in case the size is bigger than requestedMaxFrameSize
  • automatically splits the messages based on batch-size
  • sends the messages in case nothing happens in producer-send-timeout
  • is asynchronous

producer.BatchSend:

  • accepts an array messages as parameter
  • is synchronous

Send vs BatchSend

The BatchSend is the primitive to send the messages, Send introduces a smart layer to publish messages and internally uses BatchSend.

The Send interface works in most of cases, In some condition is about 15/20 slower than BatchSend. See also this thread.

Publish Confirmation

For each publish the server sends back to the client the confirmation, the client provides an interface to receive the confirmation:

//optional publish confirmation channel
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)
	
func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
	go func() {
		for confirmed := range confirms {
			for _, msg := range confirmed {
				if msg.Confirmed {
					fmt.Printf("message %s stored \n  ", msg.Message.GetData())
				} else {
					fmt.Printf("message %s failed \n  ", msg.Message.GetData())
				}
			}
		}
	}()
}

It is up to the user to decide what to do with confirmed and unconfirmed messages.
See also "Getting started" example in the examples directory

Publish Errors

In some case the server can send back to the client an error, for example the producer-id does not exist or permission problems. the client provides an interface to receive the errors:

chPublishError := producer.NotifyPublishError()
handlePublishError(chPublishError)

It is up to the user to decide what to do with error messages.

Deduplication

The stream plugin can handle deduplication data, see this blog post for more details: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find an "Deduplication" example in the examples directory.
Run it more than time, the messages count will be always 10.

Consume messages

In order to consume messages from a stream you need to use the NewConsumer interface, ex:

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
	fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)
}

consumer, err := env.NewConsumer(
		"my-stream",
		handleMessages,
		....

management UI With ConsumerOptions it is possible to customize the consumer behaviour.

  stream.NewConsumerOptions().
  SetConsumerName("my_consumer").                  // set a consumer name
  SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning

See also "Offset Start" example in the examples directory

Track Offset

The server can store the offset given a consumer, in this way:

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		if atomic.AddInt32(&count, 1)%1000 == 0 {
			err := consumerContext.Consumer.StoreOffset()
			....

consumer, err := env.NewConsumer(
..
stream.NewConsumerOptions().
			SetConsumerName("my_consumer"). <------ 

Note: AVOID to store the offset for each single message, it will reduce the performances

See also "Offset Tracking" example in the examples directory

Handle Close

Client provides an interface to handle the producer/consumer close.

channelClose := consumer.NotifyClose()
defer consumerClose(channelClose)
func consumerClose(channelClose stream.ChannelClose) {
	event := <-channelClose
	fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason)
}

In this way it is possible to handle fail-over

Performance test tool

Performance test tool it is useful to execute tests. See also the Java Performance tool

To install you can download the version from github:

Mac:

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz

Linux:

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz

Windows

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip

execute stream-perf-test --help to see the parameters. By default it executes a test with one producer, one consumer.

here an example:

stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/  --fixed-body 400 --time 10

Performance test tool Docker

A docker image is available: pivotalrabbitmq/go-stream-perf-test, to test it:

Run the server is host mode:

 docker run -it --rm --name rabbitmq --network host \
    rabbitmq:3.9-management

enable the plugin:

 docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

then run the docker image:

docker run -it --network host  pivotalrabbitmq/go-stream-perf-test

To see all the parameters:

docker run -it --network host  pivotalrabbitmq/go-stream-perf-test --help

Build form source

make build

To execute the tests you need a docker image, you can use:

make rabbitmq-server

to run a ready rabbitmq-server with stream enabled for tests.

then make test

Project status


The client is a work in progress, the API(s) could change management UI

# Packages

No description provided by the author
No description provided by the author
No description provided by the author