Categorygithub.com/LucaWolf/grabbit
modulepackage
1.1.2
Repository: https://github.com/lucawolf/grabbit.git
Documentation: pkg.go.dev

# README

grabbit

Golang wrapper for RabbitMQ managed connections.

Version 1.0.0 and beyond 🚀.
Note: API breaking from the previous β release (see CallbackProcessMessages)

Go Reference

Rationale

This is an alternative library providing auto-reconnection support. It's been heavily inspired by other projects (listed in credits) and my previous experiments. The reason for a new project (instead of cloning/contributing to an existing one) is that internals may start to diverge too much from the original and risk non-adoption.

Usage

Please use the wiki page for a detailed list of how to get the most out of this library.

Goals

What I'd like this library to provide is:

  • make use of the latest amqp091-go library; this is the up to date version building on streadway's original work
  • be able to share a connection between multiple channels
  • have connection and channels auto-recover (on infrastructure failure) via managers
  • replace the internal logging with an alternative. Current thought is to have some buffered channel over which detailed events are submitted (non-blocking)
  • have the topology defined by consumers and publishers. Once when creating and then during channels recovery (ephemeral queues/exchanges only)
  • provide an optional callback to the caller space during recoveries. This supplements in a synchronous (blocking) mode the logging replacement mechanism.
  • awaiting confirmation of the published events to be handled within the library. (perhaps allow an user defined function if needed). :recycle: I may abandon this in favor of application space handling :recycle:
  • consumers to accept user defined handlers for processing the received messages
  • the consumer handlers to also allow batch processing (with support for partial fulfillment of QoS expectations based on a timeout)
  • Bonus: optionally provide the users with access to the low level amqp.Channel. Unsafe initially. Note: safety migh have come for free if using the slightly higher level grabbit.Channel wrappers.

Non goals

  • not interested in concurrency safety of the channels. Publisher and consumers are relatively cheap, use plenty as needed instead of passing them across coroutines.

Credits

  • wagslane from whom I got heavily inspired to do the sane parameters, topology maintenance and consumer handlers. Please browse and star his repository.
  • Emir Ribic for his inspiring post that lead me to think about adding a resilience layer for the RabbitMQ client. You may want to read the full post
  • gbeletti from whose project I might pinch a few ideas. Regardless of drawing inspiration or not, his version made for an interesting reading.

# Functions

DefaultConsumerOptions creates some sane defaults for consuming messages.
DefaultPublisherOptions creates some sane defaults for publishing messages.
DeliveryDataFrom creates a DeliveryData object from an amqp.Delivery object.
DeliveryPropsFrom generates a DeliveriesProperties struct from an amqp.Delivery.
NewChannel creates a new managed Channel with the given Connection and optional ChannelOptions.
NewConnection creates a new managed Connection object with the given address, configuration, and option functions.
NewConsumer creates a consumer with the desired options and then starts consuming.
NewPublisher creates a publisher with the desired options.
RandConsumerName creates a random string for the consumers.
SomeErrFromError creates an OptionalError struct with the given error and isSet values.
SomeErrFromString creates an OptionalError from the specified text.
WithChannelOptionContext creates a function that sets the context of a ChannelOptions struct.
WithChannelOptionDelay returns a function that sets the "delayer" field of the ChannelOptions struct to the given DelayProvider.
WithChannelOptionDown returns a function that sets the callback function to be called when the channel is down.
WithChannelOptionName creates a function that sets the name field of the ChannelOptions struct.
WithChannelOptionNotification provides an application defined [Event] receiver to handle various alerts about the channel status.
WithChannelOptionNotifyPublish returns a function that sets the callback function for notifying the publish event in the ChannelOptions.
WithChannelOptionNotifyReturn generates a function that sets the returnNotifier callback for a ChannelOptions struct.
WithChannelOptionProcessor is a function that returns a function which sets the callback process messages for the ChannelOptions struct.
WithChannelOptionRecovering generates a function that sets the callback function to be called when recovering from an error in the ChannelOptions.
WithChannelOptionTopology returns a function that sets the topology options for a channel.
WithChannelOptionUp returns a function that sets the callback function to be executed when the channel is up.
WithChannelOptionUsageParams returns a function that sets the implementation parameters of the ChannelOptions struct.
WithConnectionOptionContext stores the application provided context.
WithConnectionOptionDelay provides an application space defined delay (between re-connection attempts) policy.
WithConnectionOptionDown stores the application space callback for connection down events.
WithConnectionOptionName assigns a tag to this connection.
WithConnectionOptionNotification provides an application defined [Event] receiver to handle various alerts about the connection status.
WithConnectionOptionPassword provides password refresh capabilities for dynamically protected services (future IAM).
WithConnectionOptionRecovering stores the application space callback for connection recovering events.
WithConnectionOptionUp stores the application space callback for connection established events.

# Constants

go:generate stringer -type=ClientType -trimprefix=Cli.
go:generate stringer -type=ClientType -trimprefix=Cli.
ACK (publish confirmed).
data confirmation channel is closed.
base channel has not been put into confirm mode.
NAK (publish negative acknowledgement).
lower sequence number than expected.
no timely response.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.
go:generate stringer -type=EventType -trimprefix=Event.

# Structs

Channel wraps the base amqp channel by creating a managed channel.
ChannelOptions represents the options for configuring a channel.
ChanUsageParameters embeds [PublisherUsageOptions] and [ConsumerUsageOptions].
Connection wraps a [SafeBaseConn] with additional attributes (impl.
ConnectionOptions defines a collection of attributes used internally by the [Connection].
Consumer implements an object allowing calling applications to receive messages on already established connections.
No description provided by the author
ConsumerUsageOptions defines parameters for driving the consumers behavior and indicating to the supporting channel to start consuming.
DefaultDelayer allows defining a basic (constant) delay policy.
DeferredConfirmation wraps [amqp.DeferredConfirmation] with additional data.
DeliveriesProperties captures the common attributes of multiple commonly grouped (i.e.
DeliveryData isolates the data part of each specific delivered message.
Event defines a simple body structure for the alerts received via the notification channels passed in [WithChannelOptionNotification] and [WithConnectionOptionNotification].
No description provided by the author
PersistentNotifiers are channels that have the lifespan of the channel.
Publisher implements an object allowing calling applications to publish messages on already established connections.
PublisherOptions defines publisher specific parameters.
PublisherUsageOptions defines parameters for driving the publishers behavior and indicating to the supporting channel that publishing operations are enabled.
SafeBaseChan wraps in a concurrency safe way the low level amqp.Channel.
SafeBaseConn wraps in a concurrency safe way the low level amqp.Connection.
SafeBool wraps a boolean in a concurrency safe way so it can be set, reset and tested from different coroutines.
TopologyBind defines the possible binding relation between exchanges or queues and exchanges.
TopologyOptions defines the infrastructure topology, i.e.

# Interfaces

DelayProvider allows passing a bespoke method for providing the delay policy for waiting between reconnection attempts.
SecretProvider allows passing a bespoke method for providing the secret required when connecting to the Rabbit engine.

# Type aliases

CallbackNotifyPublish defines a function type for handling the publish notifications.
CallbackNotifyReturn defines a function type for handling the return notifications.
CallbackProcessMessages defines a user passed function for processing the received messages.
CallbackWhenDown defines a function type used when connection was lost.
CallbackWhenRecovering defines a function used prior to recovering a connection.
CallbackWhenUp defines a function type used after a successful connection or channel recovery.
ClientType defines the class of objects that interact with the amqp functionality.
No description provided by the author
DeliveryPayload subtypes the actual content of deliveries.
EventType defines the class of alerts sent to the application layer.