Categorygithub.com/winlin/rabbitroutine
modulepackage
0.8.4
Repository: https://github.com/winlin/rabbitroutine.git
Documentation: pkg.go.dev

# README

PkgGoDev Build Status Go Report Card

Rabbitmq Failover Routine

Lightweight library that handles RabbitMQ auto-reconnect and publishing retry routine for you. The library is designed to save the developer from the headache when working with RabbitMQ.

rabbitroutine solves your RabbitMQ reconnection problems:

Stop to do wrappers, do features!

Install

go get github.com/furdarius/rabbitroutine

Adding as dependency by "go dep"

$ dep ensure -add github.com/furdarius/rabbitroutine

Usage

Consuming

You need to implement Consumer and register it with StartConsumer or with StartMultipleConsumers. When connection is established (at first time or after reconnect) Declare method is called. It can be used to declare required RabbitMQ entities (consumer example).

Usage example:


// Consumer declares your own RabbitMQ consumer implementing rabbitroutine.Consumer interface.
type Consumer struct {}
func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error {}
func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error {}

url := "amqp://guest:[email protected]:5672/"

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
    // How long to wait between reconnect
    Wait: 2 * time.Second,
})

ctx := context.Background()

go func() {
    err := conn.Dial(ctx, url)
    if err != nil {
    	log.Println(err)
    }
}()

consumer := &Consumer{}
go func() {
    err := conn.StartConsumer(ctx, consumer)
    if err != nil {
        log.Println(err)
    }
}()

Full example demonstrates messages consuming

Publishing

For publishing FireForgetPublisher and EnsurePublisher implemented. Both of them can be wrapped with RetryPublisher to repeat publishing on errors and mitigate short-term network problems.

Usage example:

ctx := context.Background()

url := "amqp://guest:[email protected]:5672/"

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
    // How long wait between reconnect
    Wait: 2 * time.Second,
})

pool := rabbitroutine.NewPool(conn)
ensurePub := rabbitroutine.NewEnsurePublisher(pool)
pub := rabbitroutine.NewRetryPublisher(
    ensurePub,
    rabbitroutine.PublishMaxAttemptsSetup(16),
    rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)),
)

go conn.Dial(ctx, url)

err := pub.Publish(ctx, "myexch", "myqueue", amqp.Publishing{Body: []byte("message")})
if err != nil {
    log.Println("publish error:", err)
}

Full example demonstrates messages publishing

Contributing

Pull requests are very much welcomed. Create your pull request, make sure a test or example is included that covers your change and your commits represent coherent changes that include a reason for the change.

To run the integration tests, make sure you have RabbitMQ running on any host (e.g with docker run --net=host -it --rm rabbitmq), then export the environment variable AMQP_URL=amqp://host/ and run go test -tags integration. As example:

AMQP_URL=amqp://guest:[email protected]:5672/ go test -v -race -cpu=1,2 -tags integration -timeout 5s

Use golangci-lint to check code with linters:

golangci-lint run ./...

TravisCI will also run the integration tests and golangci-lint.

# Packages

No description provided by the author

# Functions

ConstDelay returns constant delay value.
LinearDelay returns delay value increases linearly depending on the current attempt.
NewConnector return a new instance of Connector.
NewEnsurePublisher returns a new instance of EnsurePublisher.
NewFireForgetPublisher returns a new instance of FireForgetPublisher.
NewLightningPool return a new instance of LightningPool.
NewPool returns a new instance of Pool.
NewRetryPublisher returns a new instance of RetryPublisherOption.
PublishDelaySetup sets function for publish delay time.Duration receiving.
PublishMaxAttemptsSetup sets limit of publish attempts.

# Variables

ErrNoRoute indicates that queue is bound that matches the routing key.
ErrNotFound indicates that RabbitMQ entity doesn't exist.

# Structs

AMQPNotified is fired when AMQP error occurred.
ChannelKeeper stores AMQP Channel with Confirmation and Close chans.
Config stores reconnect options.
Connector implement RabbitMQ failover.
Dialed is fired when connection was successfully established.
EnsurePublisher implements Publisher interface and guarantees delivery of the message to the server.
FireForgetPublisher implements Publisher interface and used to publish messages to RabbitMQ exchange without delivery guarantees.
LightningPool stores AMQP Channels without confirm mode, so they will be used without delivery guarantees.
Pool is a set of AMQP Channels that may be individually saved and retrieved.
Retried is fired when connection retrying occurs.
RetryPublisher retries to publish message before context done.

# Interfaces

Consumer interface provides functionality of rabbit entity Declaring and queue consuming.
Publisher interface provides functionality of publishing to RabbitMQ.

# Type aliases

RetryDelayFunc returns how long to wait before retry.
RetryPublisherOption describes a functional option for configuring RetryPublisher.