# README
AMQP
Golang AMQP wrapper is a library that wraps amqp.
Check out the docs.
Features
- Auto-reconnect to brocker and auto redeclare exchanges and queues.
- Control channels lifecycle: open new on high load and close unused.
- Declarative style.
client, err := amqp.NewClient( conn.DefaultConnector("amqp://localhost:5672", conn.WithLogger(lg), // We want to know connection status and errors. ), amqp.TemporaryExchange("example-exchange"), // Declare exchanges and queues. amqp.PersistentExchanges( "exchange-one", "exchange-two", "exchange-three", ), amqp.PersistentQueues( "queue for one", "queue for two", "second queue for two", ), amqp.Exchange{ Name: "declare directly", }, amqp.Queue{ Name: "", // left empty, broker generates name for you. }, amqp.Binding{ // do not forget to bind queue to exchange. Exchange: "exchange-one", Queue: "queue for one", }, amqp.WithLogger{Logger: lg}, // We want to know AMQP protocol errors. )
- Encoding and decoding hiden inside.
- Use Codec interface for your format.
- XML, JSON and Protocol Buffers (protobuf) registered yet.
- Tons of options.
- Min and max opened channels per publisher/subscriber.
- Limit receiving messages.
- Any amount of data formats.
- Fill all message fields as you wish.
- And more others...
- Everything from AMQP may be used directly.
Contributing
We are waiting for your issue or pull request.
Example
package main
import (
"context"
"fmt"
"strconv"
"time"
"github.com/devimteam/amqp"
"github.com/devimteam/amqp/conn"
"github.com/devimteam/amqp/logger"
)
// Data, that we want to deal with.
type Comment struct {
Id string
Message string
}
func main() {
ch := make(chan []interface{})
// Listens errors and writes them to stdout.
go func() {
for l := range ch {
fmt.Println(l...)
}
}()
lg := logger.NewChanLogger(ch) // Logger interface identical to go-kit Logger.
client, err := amqp.NewClient(
conn.DefaultConnector("amqp://localhost:5672",
conn.WithLogger(lg), // We want to know connection status and errors.
),
amqp.TemporaryExchange("example-exchange"), // Declare exchanges and queues.
amqp.WithLogger{Logger:lg}, // We want to know AMQP protocol errors.
)
if err != nil {
panic(err)
}
subscr := client.Subscriber()
// context used here as closing mechanism.
eventChan := subscr.SubscribeToExchange(context.Background(),"example-exchange", Comment{}, amqp.Consumer{})
go func() {
for event := range eventChan {
fmt.Println(event.Data) // do something with events
}
}()
pubsr:=client.Publisher()
for i := 0; i < 10; i++ {
// Prepare your data before publishing
comment := Comment{
Id: strconv.Itoa(i),
Message: "message " + strconv.Itoa(i),
}
// Context used here for passing data to `before` functions.
err := pubsr.Publish(context.Background(), "example-exchange", comment, amqp.Publish{})
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 500)
}
time.Sleep(time.Second * 5) // wait for delivering all messages.
}
# Functions
CommonMessageIdBuilder builds new UUID as message Id.
CommonTyper prints go-style type of value.
Lifetime sets duration between observer checks idle channels.
LimitCount limits messages for channel, by calling Qos after each reconnection.
LimitSize limits messages size in bytes for channel, by calling Qos after each reconnection.
LongExchange is a common way to declare exchange with given name.
LongQueue is a common way to declare queue with given name.
Max sets maximum amount of channels, that can be opened at the same time.
Min sets minimum amount of channels, that should be opened at the same time.
No description provided by the author
PersistentExchanges allow you to declare a bunch of exchanges with given names.
PersistentQueues allow you to declare a bunch of queues with given names.
PublishBefore adds functions, that should be called before publishing message to broker.
No description provided by the author
HandlersAmount sets the amount of handle processes, which receive deliveries from one channel.
WarnLogger option sets logger, which logs warning messages.
WaitConnection tells client to wait connection before Subscription or Pub executing.
No description provided by the author
HandlersAmount sets the amount of handle processes, which receive deliveries from one channel.
Add this option with true value that allows you to handle all deliveries from current channel, even if the Done was sent.
AllowedPriority rejects messages, which not in range.
EventChanBuffer sets the buffer of event channel for Subscription method.
DeliverBefore adds functions, that should be called before sending Event to channel.
SubscriberLogger option sets logger, which logs error messages.
WaitConnection tells client to wait connection before Subscription or Pub executing.
SetDefaultContentType sets content type which codec should be used if ContentType field of message is empty.
No description provided by the author
TemporaryExchange is a common way to create temporary exchange with given name.
# Constants
No description provided by the author
No description provided by the author
# Variables
No description provided by the author
DeliveryChannelWasClosedError is an information error, that logs to info logger when delivery channel was closed.
This error occurs when message was delivered, but it has too low or too high priority.
Durable or non-auto-delete queues with empty names will survive when all consumers have finished using it, but no one can connect to it back.
# Structs
Binding is used for bind exchange and queue.
Channel is a wrapper of *amqp.Channel.
No description provided by the author
No description provided by the author
Event represents amqp.Delivery with attached context and data.
No description provided by the author
Publish is used for AMQP Publish parameters.
No description provided by the author
No description provided by the author
No description provided by the author
WithLogger set logger for client, which will report declaration problems and so on.
# Type aliases
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author