Categorygithub.com/connectfit-team/rabbitmq
modulepackage
0.6.0
Repository: https://github.com/connectfit-team/rabbitmq.git
Documentation: pkg.go.dev

# README

RabbitMQ Client

RabbitMQ Client provide a simple yet robust abstraction around the most widely used Go AMQP 0.9.1 client. This package has been designed to ease the interactions with the RabbitMQ server and let the developer focus on what really matter.

⚙️ Installation

go get github.com/connectfit-team/rabbitmq

⚡️ Quickstart

📖 Publisher

package main

import (
	"context"
	"log"
	"os"

	"github.com/connectfit-team/rabbitmq"
	"github.com/rabbitmq/amqp091-go"
)

func main() {
	ctx := context.Background()

	logger := log.New(os.Stdout, "RabbitMQ Client :", log.LstdFlags)

	c := rabbitmq.NewClient(
		logger,
	)
	err := c.Connect(ctx)
	if err != nil {
		panic(err)
	}

	msg := amqp091.Publishing{
		Body: []byte("Created user foo"),
	}
	err = c.Publish(ctx, msg, "user.created")
	if err != nil {
		panic(err)
	}
}

📖 Consumer

package main

import (
	"context"
	"fmt"
	"log"
	"os"

	"github.com/connectfit-team/rabbitmq"
)

func main() {
	ctx := context.Background()

	logger := log.New(os.Stdout, "RabbitMQ client: ", 0)

	c := rabbitmq.NewClient(logger)
	err := c.Connect(ctx)
	if err != nil {
		panic(err)
	}
	defer c.Close()

	queue, err := c.QueueDeclare("user.created")
	if err != nil {
		panic(err)
	}

	msgs, err := c.Consume(ctx, "user-event-consumer", queue.Name)
	if err != nil {
		panic(err)
	}
	for msg := range msgs {
		// Handle the messages
		fmt.Printf("Event: %s\n", string(msg.Body))

		// Acknowledge the message to the server
		msg.Ack(false)
	}
}

🪄 Features

  • Automatic connection recovery(including channel and consumers recovery)
  • Context handling(gracefully shutdown on context cancellation)

📚 Documentation

For further information you can generates documentation for the project through the godoc command:

godoc -http=:[port]

And then browse the documentation at http://localhost:[port]/pkg/github.com/connectfit-team/rabbitmq/

👀 Examples

📖 Publish a delayed message (using the RabbitMQ delayed message exchange plugin)

package main

import (
	"context"
	"log"
	"os"
	"time"

	"github.com/connectfit-team/rabbitmq"
	"github.com/rabbitmq/amqp091-go"
)

func main() {
	ctx := context.Background()

	logger := log.New(os.Stdout, "RabbitMQ Client :", log.LstdFlags)

	c := rabbitmq.NewClient(
		logger,
	)
	err := c.Connect(ctx)
	if err != nil {
		panic(err)
	}

	err = c.ExchangeDeclare(
		"user",
		rabbitmq.WithDelayedMessageExchangeType(rabbitmq.DirectExchangeType),
	)
	if err != nil {
		panic(err)
	}

	msg := amqp091.Publishing{
		ContentType: "text/plain",
		Body:        []byte("Created user foo"),
	}
	err = c.Publish(
		ctx,
		msg,
		"user.created",
		rabbitmq.WithPublishExchangeName("user"),
		rabbitmq.WithMessageDelay(time.Second*5),
	)
	if err != nil {
		panic(err)
	}
}

📝 To Do List

  • Channel pooling
  • Add more methods from the procotol

# Functions

NewClient creates a new client instance.
WithChannelInitializationRetryDelay configures the delay used by the client between each initialization(channel declaration).
WithConnectionRetryDelay configures the delay used by the client between each connection retry.
WithConsumeArgument sets an optionnal argument for the consume call.
WithConsumeArguments sets the optionnal arguments for the consume call.
WithConsumerAutoAck determines whether the server expect acknowledgments for messages from the consumer or not.
WithConsumerExclusive determines whether only this consumer can access the queue or not.
WithConsumerInitializationRetryDelay sets the delay the client will wait between each consumer initialization attempt.
WithConsumerNoLocal determines whether the server will send messages to the connection that published them or not.
WithConsumerNoWait determines whether the server will respond to the consume method or not.
WithDelayMessageExchangeType configures the exchange as a delayed message exchange and sets its exchange type.
WithDeliveryLimit sets the redelivery-count's limit msg's actual consume count is always 1 greater than redelivery limit because the first consumption is not counted as redelivery-count.
WithExchangeArgument sets an optional argument of the exchange declaration.
WithExchangeArguments sets the optional arguments for the exchange declaration.
WithExchangeAutoDelete determines whether the exchange is deleted when all queues have finished using it or not.
WithExchangeDurable determines whether the exchange is durable or not.
WithExchangeInternal determines whether the exchange may not be used directly by publishers, but only when bound to other exchanges or not.
WithExchangeName specifies the name of the exchange.
WithExchangeNoWait determines whether the server will respond to the declare exchange method or not.
WithExchangeType specifies the type of the exchange.
WithHost configures the host the client will use to connect to the server.
No description provided by the author
WithMessageDelay specifies how long the message should be delayed through the delayed exchange.
WithMessageHeaders sets a key/value entry in the message headers table.
WithMessageHeaders overrides the message headers with the given ones.
WithMessageTTL will overrides the message expiration property with the given TTL.
WithNotifyPublishCapacity configures the number of confirmation channel buffer size.
WithPassword configures the password the client will use to connect to the server.
WithPort configures the port the client will use to connect to the server.
WithPrefetchCount configures the number of messages the channel will keep for a consumer before receiving acknowledgment.
WithPublishExchangeName specifies the name of the exchange to publish to.
WithPublishRetryDelay specifies the delay between each publishing attempt.
WithPublishRoutingKey specifies the routing key for the message.
WithPublishTimeout specifies the timeout after which the client will stop attempting to publish the message.
WithQueueArgument sets an optional argument of the queue declaration.
WithQueueArguments configures the optional arguments used by plugins and broker-specific features such as message TTL, queue length limit, etc.
WithQueueAutoDelete determines whether the queue is deleted when all consumers have finished using it.
No description provided by the author
WithQueueDeadLetterExchange specifies the name of the dead letter exchange used by the queue.
WithQueueDeadLetterRoutingKey specifies the routing key used to publish the message to the dead letter exchange.
WithQueueDurable determines whether queue is durable or not.
WithQueueExclusive determines whether the queue is exclusive or not.
WithQueueName specifies the name of the queue.
WithQueueQuorum sets the queue type as "Quorum queue".
WithURL configures the URL the client will use to dial with the server.
WithUsername configures the username the client will use to connect to the server.
WithVirtualHost configures the virtual host the client will use to connect to the server.

# Constants

DeadLetterExchangeNameArgument is the key used in the queue optional arguments to specify the name of the dead letter exchange of a queue through its optional arguments.
DeadLetterRoutingKeyArgument is the key used in the queue optional arguments to specify the routing key of the message sent to the dead letter exchange.
DefaultChannelInitializationRetryDelay is the default delay between each channel initialization attempt.
DefaultConnectionRetryDelay is the default delay between each connection attempt.
DefaultConsumerInitializationRetryDelay is the delay between each consumer initialization attempt.
DefaultNotifyPublishCapacity is the default number of confirmation channel buffer size.
DefaultPrefetchCount is the default number of messages the server will be able to send to your consumer before receiving acknowledgment.
DefaultPublishRetryDelay is the default delay between each attempt to publish a RabbitMQ message.
DefaultPublishTimeout is the default duration the client will wait until the server confirms to the client the message a successfully been published.
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.
DelayedTypeArgument is the key used to specify the exchange type of a delayed message exchange.
DelayMessageHeader is the key used to specify the delay before publishing the message to the queue.
https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-direct.
https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-fanout.
https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-headers.
MessageDeliveryCount in message's header represent how many times the message was redelivered.
QueueDeliveryLimit is queue optional arguments to limit redelivery count('x-delivery-count') of unacked message https://www.rabbitmq.com/quorum-queues.html#configuration.
QueueTypeArgument is the key used in the queue optional arguments to specify the queue type.
QuorumQueueType is the value used with the key "x-queue-type" in the optional arguments when declaring a queue to declare a Quorum queue.
https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-topic.

# Variables

DefaultChannelConfig is the default configuration used by a client to initialize a RabbitMQ channel.
DefaultConnectionConfig is the default configuration used by the client to connect to the server.
DefaultConsumerConfig is the default configuration used by a client instance to consume messages.
DefaultExchangeConfig is the default configuration used by a client instance to declare exchanges.
DefaultPublishConfig is the default configuration used by a client instance to publish messages.
DefaultQueueConfig is the default configuration used by a client instance to declare queues.
ErrAlreadyConnected is returned when trying to connect while the client is already connected to the server.
ErrEmptyConsumerName is returned when a the length of the given consumer name equals zero.
ErrEmptyQueueName is returned when a the length of the given queue name equals zero.
ErrNotConnect is returned when an operation which requires the client to be connected to the server is invoked but the client still isn't connected.
ErrPublishTimeout is returned when the client did not succeed to publish a message after the configured publish timeout duration.

# Structs

ChannelConfig contains all the configurable parameters used by a client instance to initialize a RabbitMQ channel.
Client is a reliable wrapper around an AMQP connection which automatically recover from connection errors.
ClientConfig represents the configuration of a client instance.
ConnectionConfig contains all the configurable parameters used by a client instance to connect to a RabbitMQ server.
ConsumerConfig contains all the configurable parameters used by a client instance to consume messages.
ExchangeConfig contains all the configurable parameters used by a client instance to declare an exchange.
PublishConfig contains all the configurable parameters used by a client instance to publish messages.
QueueConfig contains all the configurable parameters used by a client instance to declare a queue.

# Type aliases

ClientOption are options used to configure the client.
ConsumerOption configures a Consume call.
No description provided by the author
ExchangeType defines the functionality of the exchange i.e.
PublishOption configures a Publish call.
No description provided by the author