Categorygithub.com/streamdal/rabbit
modulepackage
0.1.25
Repository: https://github.com/streamdal/rabbit.git
Documentation: pkg.go.dev

# README

rabbit

Master build status Go Report Card

A RabbitMQ wrapper lib around streadway/amqp rabbitmq/amqp091-go with some bells and whistles.

NOTE: streadway/amqp is no longer maintained and RabbitMQ team have forked streadway/amqp and created rabbitmq/amqp091-go. You can read about this change here. This library uses rabbitmq/amqp091-go.

  • Support for auto-reconnect
  • Support for context (ie. cancel/timeout)
  • Support for using multiple binding keys
  • Support Producer, Consumer or both modes

Motivation

We (Streamdal, formerly Batch.sh), make heavy use of RabbitMQ - we use it as the primary method for facilitating inter-service communication. Due to this, all services make use of RabbitMQ and are both publishers and consumers.

We wrote this lib to ensure that all of our services make use of Rabbit in a consistent, predictable way AND are able to survive network blips.

NOTE: This library works only with non-default exchanges. If you need support for default exchange - open a PR!

Usage

package main

import (
    "fmt"
    "log"  

    "github.com/streamdal/rabbit"
)

func main() { 
    r, err := rabbit.New(&rabbit.Options{
        URL:          "amqp://localhost",
        QueueName:    "my-queue",
        ExchangeName: "messages",
        BindingKeys:   []string{"messages"},
    })
    if err != nil {
        log.Fatalf("unable to instantiate rabbit: %s", err)
    }
    
    routingKey := "messages"
    data := []byte("pumpkins")

    // Publish something
    if err := r.Publish(context.Background(), routingKey, data); err != nil {
        log.Fatalf("unable to publish message: ")
    }

    // Consume once
    if err := r.ConsumeOnce(nil, func(amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
    }); err != nil {
        log.Fatalf("unable to consume once: %s", err),
    }

    var numReceived int

    // Consume forever (blocks)
    ctx, cancel := context.WithCancel(context.Background())

    r.Consume(ctx, nil, func(msg amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
        
        numReceived++
        
        if numReceived > 1 {
            r.Stop()
        }
    })

    // Or stop via ctx 
    r.Consume(..)
    cancel()
}

# Packages

No description provided by the author

# Functions

New is used for instantiating the library.
ValidateOptions validates various combinations of options.

# Constants

Both means that the client is acting as both a consumer and a producer.
Consumer means that the client is acting as a consumer.
DefaultRetryReconnectSec determines how long to wait before attempting to reconnect to a rabbit server.
DefaultStopTimeout is the default amount of time Stop() will wait for consume function(s) to exit.
No description provided by the author
Producer means that the client is acting as a producer.

# Variables

DefaultAppID is used for identifying the producer.
DefaultConsumerTag is used for identifying consumer.
ErrShutdown will be returned if the client is shutdown via Stop() or Close().

# Structs

Binding represents the information needed to bind a queue to an Exchange.
ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.
NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.
Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`.
Rabbit struct that is instantiated via `New()`.

# Interfaces

IRabbit is the interface that the `rabbit` library implements.
Logger is the common interface for user-provided loggers.

# Type aliases

Mode is the type used to represent whether the RabbitMQ clients is acting as a consumer, a producer, or both.