Categorygithub.com/jxo-me/rabbitmq-go
modulepackage
1.0.15
Repository: https://github.com/jxo-me/rabbitmq-go.git
Documentation: pkg.go.dev

# README

rabbitmq-go

Wrapper of rabbitmq/amqp091-go that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

Goal

The goal with rabbitmq-go is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling
  • TCP block handling

⚙️ Installation

Inside a Go module:

go get github.com/jxo-me/rabbitmq-go

🚀 Quick Start Consumer

Default options


func main() {
    ctx := context.Background()
    conn, err := rabbitmq.NewConn(
        ctx,
        "amqp://guest:guest@localhost",
        rabbitmq.WithConnectionOptionsLogging,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close(ctx)
    
    consumer, err := rabbitmq.NewConsumer(
        ctx,
        conn,
        func(d rabbitmq.Delivery) rabbitmq.Action {
        log.Printf("consumed: %v", string(d.Body))
        // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
        return rabbitmq.Ack
        },
        "my_queue",
        rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
        rabbitmq.WithConsumerOptionsExchangeName("events"),
        rabbitmq.WithConsumerOptionsExchangeDeclare,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close(ctx)
    
    // block main thread - wait for shutdown signal
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)
    
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        sig := <-sigs
            fmt.Println()
            fmt.Println(sig)
        done <- true
    }()
    
    fmt.Println("awaiting signal")
    <-done
    fmt.Println("stopping consumer")
}

🚀 Quick Start Publisher

With options

func main() {
    ctx := context.Background()
        conn, err := rabbitmq.NewConn(
        ctx,
        "amqp://guest:guest@localhost",
        rabbitmq.WithConnectionOptionsLogging,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close(ctx)
    
    publisher, err := rabbitmq.NewPublisher(
        ctx,
        conn,
        rabbitmq.WithPublisherOptionsLogging,
        rabbitmq.WithPublisherOptionsExchangeName("events"),
        rabbitmq.WithPublisherOptionsExchangeDeclare,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer publisher.Close(ctx)
    
    publisher.NotifyReturn(func(r rabbitmq.Return) {
        log.Printf("message returned from server: %s", string(r.Body))
    })
    
    publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
        log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
    })
    
    // block main thread - wait for shutdown signal
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)
    
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        sig := <-sigs
            fmt.Println()
            fmt.Println(sig)
        done <- true
    }()
    
    fmt.Println("awaiting signal")
    
    ticker := time.NewTicker(time.Second)
    for {
        select {
            case <-ticker.C:
                err = publisher.PublishWithContext(
                    context.Background(),
                    []byte("hello, world"),
                    []string{"my_routing_key"},
                    rabbitmq.WithPublishOptionsContentType("application/json"),
                    rabbitmq.WithPublishOptionsMandatory,
                    rabbitmq.WithPublishOptionsPersistentDelivery,
                    rabbitmq.WithPublishOptionsExchange("events"),
                )
                if err != nil {
                    log.Println(err)
                }
            case <-done:
            fmt.Println("stopping publisher")
            return
        }
    }
}

Other usage examples

See the examples directory for more ideas.

Stability

Note that the API is currently in v0. I don't plan on any huge changes, but there may be some small breaking changes before we hit v1.

Submit an issue (above in the issues tab)

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/rabbitmq/amqp091-go.

👏 Contributing

I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

# Packages

No description provided by the author

# Functions

ClientMiddlewareChain will attach all given middlewares to your SendFunc.
ConsumeMiddlewareChain will attach all given middlewares to your HandlerFunc.
ContextWithQueueName adds the given queueName to the provided context.
ContextWithShutdownChan adds a shutdown chan to the given context.
NewConn creates a new connection manager.
NewConsumer returns a new Consumer connected to the given rabbitmq server it also starts consuming on the given connection with automatic reconnection handling Do not reuse the returned consumer for anything other than to close it.
NewPublisher returns a new publisher with an open channel to the cluster.
NewRequest will generate a new request to be published.
NewResponseWriter will create a new response writer with given amqp.Publishing.
No description provided by the author
QueueNameFromContext returns the queue name for the current request.
ShutdownChanFromContext returns the shutdown chan.
WithClientOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset, the default will be used (false).
WithClientOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue.
WithClientOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given.
WithClientOptionsLogger sets logging to a custom interface.
WithClientOptionsLogging sets logging to true on the client options and sets the.
WithClientOptionsQueueArgs adds optional args to the queue.
WithClientOptionsQueueAutoDelete ensures the queue is an auto-delete queue.
WithClientOptionsQueueDurable ensures the queue is a durable queue.
WithClientOptionsQueueExclusive ensures the queue is an exclusive queue.
WithClientPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the return channel for you to handle.
WithClientPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key, a message will be sent back on the return channel for you to handle.
WithConnectionOptionsConfig sets the Config used in the connection.
WithConnectionOptionsLogger sets logging to true on the consumer options and sets the.
WithConnectionOptionsLogging sets logging to true on the consumer options and sets the.
WithConnectionOptionsReconnectInterval sets the reconnection interval.
WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options on a per-binding basis.
WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages.
WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false).
WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue.
WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given.
WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries.
WithConsumerOptionsExchangeArgs adds optional args to the exchange.
WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange.
WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance.
WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange.
WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange.
WithConsumerOptionsExchangeKind ensures the queue is a durable queue.
WithConsumerOptionsExchangeName sets the exchange name.
WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange.
WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be used if you want to to consume from multiple exchanges on the same consumer.
WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange.
WithConsumerOptionsLogger sets logging to a custom interface.
WithConsumerOptionsLogging uses a default logger that writes to std out.
WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection.
WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput.
WithConsumerOptionsQueueArgs adds optional args to the queue.
WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue.
WithConsumerOptionsQueueDurable ensures the queue is a durable queue.
WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue.
WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's existance upon startup.
WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue.
WithConsumerOptionsQueuePassive ensures the queue is a passive queue.
WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability.
WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options.
WithConsumerOptionsRoutingKeys binds the queue to routingKeys with the default binding options.
WithPublisherOptionsConfirm enables confirm mode on the connection this is required if publisher confirmations should be used.
WithPublisherOptionsExchangeArgs adds optional args to the exchange.
WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange.
WithPublisherOptionsExchangeDeclare will create the exchange if it doesn't exist.
WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange.
WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange.
WithPublisherOptionsExchangeKind ensures the queue is a durable queue.
WithPublisherOptionsExchangeName sets the exchange name.
WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange.
WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange.
WithPublisherOptionsLogger sets logging to a custom interface.
WithPublisherOptionsLogging sets logging to true on the publisher options and sets the.
WithPublishOptionsAppID returns a function that sets the application id.
WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e.
WithPublishOptionsContentType returns a function that sets the content type, i.e.
WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier.
WithPublishOptionsExchange returns a function that sets the exchange to publish to.
WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message.
WithPublishOptionsHeaders returns a function that sets message header values, i.e.
WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle.
WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle.
WithPublishOptionsMessageID returns a function that sets the message identifier.
WithPublishOptionsPersistentDelivery sets the message to persist.
WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9.
WithPublishOptionsReplyTo returns a function that sets the reply to field.
WithPublishOptionsTimestamp returns a function that sets the timestamp for the message.
WithPublishOptionsType returns a function that sets the message type name.
WithPublishOptionsUserID returns a function that sets the user id i.e.

# Constants

Ack default ack this msg after you have successfully processed this delivery.
Manual Message acknowledgement is left to the user using the msg.Ack() method.
NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
NackRequeue deliver this message to a different consumer.
DeliveryMode.
DeliveryMode.

# Variables

ErrRequestRejected can be returned by Client#Send() when the server Backs the message.
ErrRequestReturned can be returned by Client#Send() when the server returns the message.
ErrRequestTimeout is an error returned when a client request does not receive a response within the client timeout duration.
ErrUnexpectedConnClosed is returned by ListenAndServe() if the server shuts down without calling Stop() and if AMQP does not give an error when said shutdown happens.

# Structs

Binding describes the binding of a queue to a routing key on an exchange.
BindingOptions describes the options a binding can have.
No description provided by the author
Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag.
Conn manages the connection to a rabbit cluster it is intended to be shared across publishers and consumers.
ConnectionOptions are used to describe how a new consumer will be created.
ConsumeOptions are used to configure the consumer on the rabbit server.
Consumer allows you to create and connect to queues for data consumption.
ConsumerOptions are used to describe how a new consumer will be created.
Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.
ExchangeOptions are used to configure an exchange.
Publisher allows you to publish messages safely across an open connection.
PublisherOptions are used to describe a publisher's configuration.
PublishOptions are used to control how data is published.
QueueOptions are used to configure a queue.
Request is a requet to perform with the client.
RequestMap keeps track of requests based on their DeliveryTag and/or CorrelationID.
ResponseWriter is used by a handler to construct an RPC response.
Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.
No description provided by the author

# Type aliases

Action is an action that occurs after processed this delivery.
ClientMiddlewareFunc represents a function that can be used as middleware.
Config wraps amqp.Config Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake.
ConsumeMiddlewareFunc represent a function that can be used as middleware.
Handler defines the handler of each Delivery and return Action.
HandlerFunc is the function that handles all request based on the routing key.
Logger is describing a logging structure.
No description provided by the author
No description provided by the author
SendFunc represents the function that Send does.
Table stores user supplied fields of the following types: bool byte float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types Functions taking a table will immediately fail when the table contains a value of an unsupported type.