Categorygithub.com/marcelmiguel/go-rabbitmq
modulepackage
0.9.6
Repository: https://github.com/marcelmiguel/go-rabbitmq.git
Documentation: pkg.go.dev

# README

go-rabbitmq

Wrapper of rabbitmq/amqp091-go that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Supported by Qvault

Deploy

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

Goal

The goal with go-rabbitmq is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling

⚙️ Installation

Inside a Go module:

go get github.com/wagslane/go-rabbitmq

🚀 Quick Start Consumer

Default options

consumer, err := rabbitmq.NewConsumer(
    "amqp://guest:guest@localhost", rabbitmq.Config{},
    rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) rabbitmq.Action {
        log.Printf("consumed: %v", string(d.Body))
        // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
        return rabbitmq.Ack
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"}
)
if err != nil {
    log.Fatal(err)
}

With options

consumer, err := rabbitmq.NewConsumer(
    "amqp://user:pass@localhost",
    rabbitmq.Config{},
    rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("consumed: %v", string(d.Body))
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		"my_queue",
		[]string{"routing_key", "routing_key_2"},
		rabbitmq.WithConsumeOptionsConcurrency(10),
		rabbitmq.WithConsumeOptionsQueueDurable,
		rabbitmq.WithConsumeOptionsQuorum,
		rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
		rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
		rabbitmq.WithConsumeOptionsConsumerName(consumerName),
	)
if err != nil {
    log.Fatal(err)
}

🚀 Quick Start Publisher

Default options

publisher, _, err := rabbitmq.NewPublisher("amqp://user:pass@localhost", rabbitmq.Config{})
if err != nil {
    log.Fatal(err)
}
defer publisher.Close()
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
    log.Fatal(err)
}

With options

publisher, err := rabbitmq.NewPublisher(
    "amqp://user:pass@localhost",
    rabbitmq.Config{},
    // can pass nothing for no logging
    rabbitmq.WithPublisherOptionsLogging,
)
defer publisher.Close()
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish(
	[]byte("hello, world"),
	[]string{"routing_key"},
	rabbitmq.WithPublishOptionsContentType("application/json"),
	rabbitmq.WithPublishOptionsMandatory,
	rabbitmq.WithPublishOptionsPersistentDelivery,
	rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
    log.Fatal(err)
}

returns := publisher.NotifyReturn()
go func() {
    for r := range returns {
        log.Printf("message returned from server: %s", string(r.Body))
    }
}()

Other usage examples

See the examples directory for more ideas.

Stability

Note that the API is currently in v0. I don't plan on any huge changes, but there may be some small breaking changes before we hit v1.

💬 Contact

Twitter Follow

Submit an issue (above in the issues tab)

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/rabbitmq/amqp091-go.

👏 Contributing

I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

# Packages

No description provided by the author

# Functions

No description provided by the author
NewConsumer returns a new Consumer connected to the given rabbitmq server.
NewPublisher returns a new publisher with an open channel to the cluster.
WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange.
WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag.
WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag.
WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag.
WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type.
WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to.
WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag.
WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the binding exchange.
WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound the channel will not be closed with an error.
WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages.
WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false).
WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue.
WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given.
WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries.
WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection.
WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput.
WithConsumeOptionsQueueArgs returns a function that sets the queue arguments.
WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will be deleted when there are no more conusmers on it.
WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't be destroyed when the server restarts.
WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means it's are only accessible by the connection that declares it and will be deleted when the connection closes.
WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means the queue will be assumed to be declared on the server, and won't be declared at all.
WithConsumeOptionsQueueNoWait sets the queue to nowait, which means the queue will assume to be declared on the server.
WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability.
WithConsumerOptionsLogger sets logging to a custom interface.
WithConsumerOptionsLogging sets a logger to log to stdout.
WithConsumerOptionsReconnectInterval sets the interval at which the consumer will attempt to reconnect to the rabbit server.
WithPublisherOptionsLogger sets logging to a custom interface.
WithPublisherOptionsLogging sets logging to true on the consumer options.
WithPublisherOptionsReconnectInterval sets the interval at which the publisher will attempt to reconnect to the rabbit server.
WithPublishOptionsAppID returns a function that sets the application id.
WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e.
WithPublishOptionsContentType returns a function that sets the content type, i.e.
WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier.
WithPublishOptionsExchange returns a function that sets the exchange to publish to.
WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message.
WithPublishOptionsHeaders returns a function that sets message header values, i.e.
WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle.
WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle.
WithPublishOptionsMessageID returns a function that sets the message identifier.
WithPublishOptionsPersistentDelivery sets the message to persist.
WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9.
WithPublishOptionsReplyTo returns a function that sets the reply to field.
WithPublishOptionsTimestamp returns a function that sets the timestamp for the message.
WithPublishOptionsType returns a function that sets the message type name.
WithPublishOptionsUserID returns a function that sets the user id i.e.

# Constants

Ack default ack this msg after you have successfully processed this delivery.
NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
NackRequeue deliver this message to a different consumer.
DeliveryMode.
DeliveryMode.

# Structs

BindingExchangeOptions are used when binding to an exchange.
Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag.
ConsumeOptions are used to describe how a new consumer will be created.
Consumer allows you to create and connect to queues for data consumption.
ConsumerOptions are used to describe a consumer's configuration.
Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.
Publisher allows you to publish messages safely across an open connection.
PublisherOptions are used to describe a publisher's configuration.
PublishOptions are used to control how data is published.
Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

# Interfaces

Logger is the interface to send logs to.

# Type aliases

Action is an action that occurs after processed this delivery.
Config wraps amqp.Config Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake.
Handler defines the handler of each Delivery and return Action.
Table stores user supplied fields of the following types: bool byte float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types Functions taking a table will immediately fail when the table contains a value of an unsupported type.