modulepackage
3.0.1+incompatible
Repository: https://github.com/shift/rabbus.git
Documentation: pkg.go.dev
# README
Rabbus 🚌 ✨
- A tiny wrapper over amqp exchanges and queues.
- In memory retries with exponential backoff for sending messages.
- Protect producer calls with circuit breaker.
- Automatic reconnect to RabbitMQ broker when connection is lost.
- Go channel API.
Installation
go get -u github.com/rafaeljesus/rabbus
Usage
The rabbus package exposes an interface for emitting and listening RabbitMQ messages.
Emit
import (
"context"
"time"
"github.com/rafaeljesus/rabbus"
)
func main() {
timeout := time.After(time.Second * 3)
cbStateChangeFunc := func(name, from, to string) {
// do something when state is changed
}
r, err := rabbus.New(
rabbusDsn,
rabbus.Durable(true),
rabbus.Attempts(5),
rabbus.Sleep(time.Second*2),
rabbus.Threshold(3),
rabbus.OnStateChange(cbStateChangeFunc),
)
if err != nil {
// handle error
}
defer func(r Rabbus) {
if err := r.Close(); err != nil {
// handle error
}
}(r)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Run(ctx)
msg := rabbus.Message{
Exchange: "test_ex",
Kind: "topic",
Key: "test_key",
Payload: []byte(`foo`),
}
r.EmitAsync() <- msg
for {
select {
case <-r.EmitOk():
// message was sent
case <-r.EmitErr():
// failed to send message
case <-timeout:
// handle timeout error
}
}
}
Listen
import (
"context"
"encoding/json"
"time"
"github.com/rafaeljesus/rabbus"
)
func main() {
timeout := time.After(time.Second * 3)
cbStateChangeFunc := func(name, from, to string) {
// do something when state is changed
}
r, err := rabbus.New(
rabbusDsn,
rabbus.Durable(true),
rabbus.Attempts(5),
rabbus.Sleep(time.Second*2),
rabbus.Threshold(3),
rabbus.OnStateChange(cbStateChangeFunc),
)
if err != nil {
// handle error
}
defer func(r Rabbus) {
if err := r.Close(); err != nil {
// handle error
}
}(r)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Run(ctx)
messages, err := r.Listen(rabbus.ListenConfig{
Exchange: "events_ex",
Kind: "topic",
Key: "events_key",
Queue: "events_q",
DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"),
BindArgs: rabbus.NewBindArgs().With("baz", "qux"),
})
if err != nil {
// handle errors during adding listener
}
defer close(messages)
go func(messages chan ConsumerMessage) {
for m := range messages {
m.Ack(false)
}
}(messages)
}
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
Badges
GitHub @rafaeljesus  · Medium @_jesus_rafael  · Twitter @_jesus_rafael
# Packages
No description provided by the author
# Functions
AMQPProvider expose a interface for interacting with amqp broker.
Attempts is the max number of retries on broker outages.
BreakerInterval is the cyclic period of the closed state for CircuitBreaker to clear the internal counts, If Interval is 0, CircuitBreaker doesn't clear the internal counts during the closed state.
BreakerTimeout is the period of the open state, after which the state of CircuitBreaker becomes half-open.
Durable indicates of the queue will survive broker restarts.
New returns a new Rabbus configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating connection and channel.
NewBindArgs creates new queue bind values builder.
NewDeclareArgs creates new queue declaration values builder.
OnStateChange is called whenever the state of CircuitBreaker changes.
PassiveExchange forces passive connection with all exchanges using amqp's ExchangeDeclarePassive instead the default ExchangeDeclare.
PrefetchCount limit the number of unacknowledged messages.
PrefetchSize when greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.
QosGlobal when global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection.
Sleep is the sleep time of the retry mechanism.
Threshold when a threshold of failures has been reached, future calls to the broker will not run.
# Constants
ContentTypeJSON define json content type.
ContentTypePlain define plain text content type.
ExchangeDirect indicates the exchange is of direct type.
ExchangeFanout indicates the exchange is of fanout type.
ExchangeTopic indicates the exchange is of topic type.
Persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
Transient means higher throughput but messages will not be restored on broker restart.
# Variables
ErrMissingExchange is returned when exchange name is not passed as parameter.
ErrMissingHandler is returned when function handler is not passed as parameter.
ErrMissingKind is returned when exchange type is not passed as parameter.
ErrMissingQueue is returned when queue name is not passed as parameter.
ErrUnsupportedArguments is returned when more than the permitted arguments is passed to a function.
# Structs
BindArgs is the wrapper for AMQP Table class to set common queue bind values.
No description provided by the author
DeclareArgs is the queue declaration values builder.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Type aliases
No description provided by the author
Option represents an option you can pass to New.