Categorygithub.com/Clarilab/clarimq/v2
modulepackage
2.1.0
Repository: https://github.com/clarilab/clarimq.git
Documentation: pkg.go.dev

# README

ClariMQ

This library is a wrapper around the Go AMQP Client Library.

This library includes support for:

  • structured logging to multiple writers
  • automatic recovery
  • retry functionality
  • publishing cache

Supported Go Versions

This library supports the most recent Go, currently 1.22.4

INSTALL

go get github.com/Clarilab/clarimq/v2

USAGE

The Connection

First a connection instance needs to be initialized. The connection can be configured by passing needed connection options. Also there is the possibility to fully customize the configuration by passing a ConnectionOptions struct with the corresponding option. To ensure correct escaping of the URI, the SettingsToURI function can be used to convert a ConnectionSettings struct to a valid URI.

Note

Although it is possible to publish and consume with one connection, it is best practice to use two separate connections for publisher and consumer activities.

Example Connection with some options

conn, err := clarimq.NewConnection("amqp://user:password@localhost:5672/", 
	clarimq.WithConnectionOptionConnectionName("app-name-connection"),
	// more options can be passed
)
if err != nil {
	// handle error
}

Example Connection with custom options

connectionSettings := &clarimq.ConnectionSettings{
	UserName: "username",
	Password: "password",
	Host:     "host",
	Port:     5672,
}

connectionOptions := &clarimq.ConnectionOptions{
	Config: &clarimq.Config{
		ChannelMax:      0,
		FrameSize:       0,
		Heartbeat:       0,
		TLSClientConfig: &tls.Config{},
		Properties:      map[string]interface{}{},
		Locale:          "",
	},
	PrefetchCount:     1,
	RecoveryInterval: 1,
},

conn, err := clarimq.NewConnection(clarimq.SettingsToURI(connectionSettings), 
	clarimq.WithCustomConnectionOptions(connectionOptions),
)
if err != nil {
	// handle error
}

When the connection is no longer needed, it should be closed to conserve resources.

Example

if err := conn.Close(); err != nil {
	// handle error
}

Errors

The "NotifyErrors()" method provides a channel that returns any errors that may happen concurrently. Mainly custom errors of types clarimq.AMQPError and clarimq.RecoveryFailedError are returned.

Example

handleErrors := func(errChan <-chan error) {
	for err := range errChan {
		if err == nil {
			return
		}

		var amqpErr *clarimq.AMQPError
		var recoveryFailed *clarimq.RecoveryFailedError

		switch {
		case errors.As(err, &amqpErr):
			fmt.Println(amqpErr) // handle amqp error
 
		case errors.As(err, &recoveryFailed):
			fmt.Println(recoveryFailed) // handle recoveryFailed error

		default:
			panic(err) // handle all other errors
		}
	}
}

conn, err := clarimq.NewConnection(clarimq.SettingsToURI(settings))
if err != nil {
	// handle error
}

go handleFailedRecovery(conn.NotifyErrors())

Publish messages

To publish messages a publisher instance needs to be created. A previously created connection must be handed over to the publisher.

The publisher can be configured by passing needed connector options. Also there is the possibility to fully customize the configuration by passing a PublishOptions struct with the corresponding option.

Example

publisher, err := clarimq.NewPublisher(conn,
	clarimq.WithPublishOptionAppID("my-application"),
	clarimq.WithPublishOptionExchange("my-exchange"),
	// more options can be passed
)
if err != nil {
	// handle error
}

The publisher can then be used to publish messages. The target can be a queue name, or a topic if the publisher is configured to publish messages to an exchange.

Example Simple publish

if err := publisher.Publish(context.Background(), "my-target", "my-message"); err != nil {
	// handle error
}

Optionally the PublishWithOptions method can be used to configure the publish options just for this specific publish. The Method also gives the possibility to publish to multiple targets at once.

Example Publish with options

if err := publisher.PublishWithOptions(context.Background(), []string{"my-target-1","my-target-2"}, "my-message",
	clarimq.WithPublishOptionMessageID("99819a3a-388f-4199-b7e6-cc580d85a2e5"),
	clarimq.WithPublishOptionTracing("7634e958-1509-479e-9246-5b80ad8fc64c"),
); err != nil {
	// handle error
}

Consume Messages

To consume messages a consumer instance needs to be created. A previously created connection must be handed over to the consumer.

The consumer can be configured by passing needed consume options. Also there is the possibility to fully customize the configuration by passing a ConsumeOptions struct with the corresponding option.

Example

consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
		clarimq.WithConsumerOptionConsumerName("my-consumer"),
	// more options can be passed
)
if err != nil {
	// handle error
}

err := consumer.Start()
if err != nil {
	// handle error
}

The consumer can be set up to immediately start consuming messages from the broker by using the WithConsumerOptionConsumeAfterCreation option. The consumer then does not need to be started with the Start method. An error will be returned when trying to start an already started/running consumer.

Example

consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
		clarimq.WithConsumerOptionConsumeAfterCreation(true),
	// more options can be passed
)
if err != nil {
	// handle error
}

The consumer can be used to declare exchanges, queues and queue-bindings:

Example

consumer, err := clarimq.NewConsumer(conn, "my-queue", handler(),
	clarimq.WithConsumerOptionConsumerName("my-consumer"),
	clarimq.WithExchangeOptionDeclare(true),
	clarimq.WithExchangeOptionKind(clarimq.ExchangeTopic),
	clarimq.WithExchangeOptionName("my-exchange"),
	clarimq.WithQueueOptionDeclare(false), // is enabled by default, can be used to disable the default behavior
	clarimq.WithConsumerOptionBinding(
		clarimq.Binding{
			RoutingKey: "my-routing-key",
		},
	),
	// more options can be passed
)
if err != nil {
	// handle error
}

The consumer can be closed to stop consuming if needed. The consumer does not need to be explicitly closed for a graceful shutdown if its connection is closed afterwards. However when using the retry functionality without providing a connection, the consumer must be closed for a graceful shutdown of the retry connection to conserve resources.

Example

if err := consumer.Close(); err != nil {
	// handle error
}

Logging

Structured logging is supported either with the golang "log/slog" package or by passing a custom logger that implements the clarimq.Logger interface.

Note: Multiple loggers can be specified!

Example

conn, err := clarimq.NewConnection(connectionSettings, 
	clarimq.WithConnectionOptionLoggers(
		myCustomLogger, 
		clarimq.NewSlogLogger(mySlogLogger),
	)
)
if err != nil {
	// handle error
}

Return Handler

When publishing mandatory messages, they will be returned if it is not possible to route the message to the given destination. A return handler can be specified to handle the the return. The return contains the original message together with some information such as an error code and an error code description.

If no return handler is specified a log will be written to the logger at warn level.

Example

returnHandler := func(r clarimq.Return) {
	// handle the return
}

conn, err := clarimq.NewConnection(connectionSettings, 
	clarimq.WithConnectionOptionReturnHandler(
		clarimq.ReturnHandler(returnHandler),
	),
)
if err != nil {
	// handle error
}

Recovery

This library provides an automatic recovery with build-in exponential back-off functionality. When the connection to the broker is lost, the recovery will automatically try to reconnect. You can adjust the parameters of the back-off algorithm:

Example

conn, err := clarimq.NewConnection(settings,
	clarimq.WithConnectionOptionRecoveryInterval(2),    // default is 1 second
	clarimq.WithConnectionOptionBackOffFactor(3),        // default is 2
	clarimq.WithConnectionOptionMaxRecoveryRetries(16), // default is 10
)
if err != nil {
	// handle error
}

For the case the maximum number of retries is reached, a custom error of type RecoveryFailedError will be send to the error channel.

Publishing Cache

To prevent loosing messages from being published while the broker has downtime / the client is recovering, the Publishing Cache can be used to cache the messages and publish them as soon as the client is fully recovered. The cache itself is an interface that can be implemented to the users needs. For example it could be implemented to use a redis store or any other storage of choice.

Note: This feature will only work effectively if durable queues/exchanges are used!

When the Publishing Cache is set, the "Publish" and "PublishWithOptions" methods will return an clarimq.ErrPublishFailedChannelClosedCached error which can be checked and handled to the users needs.

When the Publishing Cache is not set, the "Publish" and "PublishWithOptions" methods will return an clarimq.ErrPublishFailedChannelClosed error which can be checked and handled to the users needs.

To ensure a clean cache (when using an external cache like f.e. redis) the publisher should be closed when exiting. This will call the "Flush()" method of the Publishing Cache implementation. This step is optional and it up to the user to decide.

When implementing the publishing cache, it must be properly protected from concurrent access by multiple publisher instances to avoid race conditions.

Hint: The "cache" sub-package provides a simple "in-memory-cache" implementation, that can be used for testing, but could also be used in production.

Example

publisher, err := clarimq.NewPublisher(publishConn,
	clarimq.WithPublisherOptionPublishingCache(cache.NewBasicMemoryCache()),
)
if err != nil {
	// handle error
}

defer func() {
	if err := publisher.Close(); err != nil {
		// handle error
	}
}()

if err = b.publisher.PublishWithOptions(context.Background(), "my-target", "my-message",); err != nil {
	switch {
		case errors.Is(err, clarimq.ErrPublishFailedChannelClosedCached):
			return nil // message has been cached
		case errors.Is(err, clarimq.ErrPublishFailedChannelClosed):
			return err
		default:
			panic(err)
	}
}

Retry

This library includes a retry functionality with a dead letter exchange and dead letter queues. To use the retry, some parameters have to be set:

Example

consumeConn, err := clarimq.NewConnection(clarimq.SettingsToURI(settings))
if err != nil {
	// handle error
}

retryOptions := &clarimq.RetryOptions{
	RetryConn: publishConn,
	Delays: []time.Duration{
		time.Second,
		time.Second * 2,
		time.Second * 3,
		time.Second * 4,
		time.Second * 5,
	},
	MaxRetries: 5,
	Cleanup:    true, // only set this to true if you want to remove all retry related queues and exchanges when closing the consumer
},

consumer, err := clarimq.NewConsumer(consumeConn, queueName, handler,
		clarimq.WithConsumerOptionDeadLetterRetry(retryOptions),
	)
if err != nil {
	// handle error
}

It is recommended to provide a separate publish connection for the retry functionality. If no connection is specified, a separate connection is established internally.

For each given delay a separate dead letter queue is declared. When a delivery is nacked by the consumer, it is republished via the delay queues one after another until it is acknowledged or the specified maximum number of retry attempts is reached.

External packages

Go AMQP Client Library

# Packages

No description provided by the author

# Functions

NewConnection creates a new connection.
NewConsumer creates a new Consumer instance.
Creates a new Publisher instance.
NewSlogLogger creates a new instance of SlogLogger.
SettingsToURI can be used to convert a ConnectionSettings struct to a valid AMQP URI to ensure correct escaping.
WithBindingOptionCustomBinding adds a new binding to the queue which allows you to set the binding options on a per-binding basis.
WithConnectionOptionAMQPConfig sets the amqp.Config that will be used to create the connection.
WithConnectionOptionBackOffFactor sets the exponential back-off factor.
WithConnectionOptionConnectionName sets the name of the connection.
WithConnectionOptionDecoder sets the decoder that will be used to decode messages.
WithConnectionOptionEncoder sets the encoder that will be used to encode messages.
WithConnectionOptionLoggers adds multiple loggers.
WithConnectionOptionMaxRecoveryRetries sets the limit for maximum retries.
WithConnectionOptionPrefetchCount sets the number of messages that will be prefetched.
WithConnectionOptionRecoveryInterval sets the initial recovery interval.
WithConnectionOptionReturnHandler sets an Handler that can be used to handle undeliverable publishes.
WithConsumerOptionConsumeAfterCreation sets the consume after creation property of the consumer.
WithConsumerOptionConsumerAutoAck sets the auto acknowledge property of the consumer.
WithConsumerOptionConsumerExclusive sets the exclusive property of this consumer, which means the broker will ensure that this is the only consumer from this queue.
WithConsumerOptionConsumerName sets the name of the consumer.
WithConsumerOptionDeadLetterRetry enables the dead letter retry.
WithConsumerOptionHandlerQuantity sets the number of message handlers, that will run concurrently.
WithConsumerOptionNoWait sets the exclusive no-wait property of this consumer, which means it does not wait for the broker to confirm the request and immediately begin deliveries.
WithConsumerOptionRoutingKey binds the queue to a routing key with the default binding options.
WithCustomConnectionOptions sets the connection options.
WithCustomConsumeOptions sets the consumer options.
WithCustomPublishOptions sets the publish options.
WithExchangeOptionArgs adds optional args to the exchange.
WithExchangeOptionAutoDelete sets whether the exchange is an auto-delete exchange.
WithExchangeOptionDeclare sets whether the exchange should be declared on startup if it doesn't already exist.
WithExchangeOptionDurable sets whether the exchange is a durable exchange.
WithExchangeOptionInternal sets whether the exchange is an internal exchange.
WithExchangeOptionKind ensures the queue is a durable queue.
WithExchangeOptionName sets the exchange name.
WithExchangeOptionNoWait sets whether the exchange is a no-wait exchange.
WithExchangeOptionPassive sets whether the exchange is a passive exchange.
WithPublisherOptionPublisherName sets the name of the publisher.
WithPublisherOptionPublishingCache enables the publishing cache.
WithPublishOptionAppID sets the application id.
WithPublishOptionContentEncoding sets the content encoding, i.e.
WithPublishOptionContentType sets the content type, i.e.
WithPublishOptionDeliveryMode sets the message delivery mode.
WithPublishOptionExchange sets the exchange to publish to.
WithPublishOptionExpiration sets the expiry/TTL of a message.
WithPublishOptionHeaders sets message header values, i.e.
WithPublishOptionMandatory sets whether the publishing is mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle.
WithPublishOptionMessageID sets the message identifier.
WithPublishOptionPriority sets the content priority from 0 to 9.
WithPublishOptionReplyTo sets the reply to field.
WithPublishOptionTimestamp sets the timestamp for the message.
WithPublishOptionTracing sets the content correlation identifier.
WithPublishOptionType sets the message type name.
WithPublishOptionUserID sets the user id e.g.
WithQueueOptionArgs adds optional args to the queue.
WithQueueOptionAutoDelete sets whether the queue is an auto-delete queue.
WithQueueOptionDeclare sets whether the queue should be declared upon startup if it doesn't already exist.
WithQueueOptionDurable sets whether the queue is a durable queue.
WithQueueOptionExclusive sets whether the queue is an exclusive queue.
WithQueueOptionNoWait sets whether the queue is a no-wait queue.
WithQueueOptionPassive sets whether the queue is a passive queue.
WithQueueOptionPriority if set a priority queue will be declared with the given maximum priority.
WithQueueOptionQuorum sets the queue quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability.

# Constants

Ack default ack this msg after you have successfully processed this delivery.
No description provided by the author
No description provided by the author
No description provided by the author
Constant for RabbitMQ's default exchange (direct exchange).
Constant for standard AMQP 0-9-1 direct exchange type.
Constant for standard AMQP 0-9-1 fanout exchange type.
Constant for standard AMQP 0-9-1 headers exchange type.
Constant for standard AMQP 0-9-1 topic exchange type.
HighestPriority indicates that the message should be published with highest priority.
HighPriority indicates that the message should be published with high priority.
LowestPriority indicates that the message should be published with lowest priority.
LowPriority indicates that the message should be published with low priority.
Message acknowledgement is left to the user using the msg.Ack() method.
NormalPriority indicates that the message should be published with normal priority.
NackDiscard the message will be dropped or delivered to a broker configured dead-letter queue.
NackRequeue deliver this message to a different consumer.
NoPriority indicates that the message should be published with no priority.
PersistentDelivery indicates that the message should be published as persistent message.
TransientDelivery indicates that the message should be published as transient message.

# Variables

ErrCacheNotSet occurs when the publishing cache is not set.
ErrConsumerAlreadyRunning occurs when the consumer is attempted to be started but already running.
ErrHealthyConnection occurs when a manual recovery is triggered but the connection persists.
ErrInvalidConnection occurs when an invalid connection is passed to a publisher or a consumer.
ErrMaxRetriesExceeded occurs when the maximum number of retries exceeds.
ErrNoActiveConnection occurs when there is no active connection while trying to get the failed recovery notification channel.
ErrPublishFailedChannelClosed occurs when the channel is accessed while being closed.
ErrPublishFailedChannelClosedCached occurs when the channel is accessed while being closed but publishing was cached.

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
ExchangeOptions are used to configure an exchange.
Publisher is a publisher for AMQP messages.
No description provided by the author
No description provided by the author
QueueOptions are used to configure a queue.
ErrRecoveryFailed occurs when the recovery failed after a connection loss.
No description provided by the author
SlogLogger is a clarimq.Logger implementation that uses slog.Logger.

# Interfaces

Logger is an interface that is be used for log messages.
Publishing is an interface for messages that are published to a broker.
No description provided by the author

# Type aliases

No description provided by the author
AMQPError is a custom error type that wraps amqp errors.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author