Categorygithub.com/cyhalothrin/go-rabbit
modulepackage
1.0.2
Repository: https://github.com/cyhalothrin/go-rabbit.git
Documentation: pkg.go.dev

# README

go-rabbit

Golang AMQP client with reconnection logic. Uses rabbitmq/amqp091-go under the hood.

Installation

go get github.com/cyhalothrin/go-rabbit@latest

Quick Start

Consumer

First, we need to declare queue and binding, define consumers and publishers, then call session.Start() to connect to broker and watch session lifecycle. Channels will be recreated on reconnection with defined consumers and publishers.

opt := rabbit.ConnectionOptions{
    IPs:         []string{"localhost"},
    Port:        5672,
    VirtualHost: "default",
    User:        "guest",
    Password:    "guest",
}

// connection is not established at this moment
session, err := rabbit.NewSession(opt)
if err != nil {
    panic(err)
}

ex := Exchange{ Name: "amq.topic" }
queue := &Queue{ Name: "amqp-lib-test-queue" }
bind := Binding{
    Queue:    queue,
    Exchange: exc,
    Key:      "hello",
}

sess.Declare(rabbit.DeclareExchange(ex), rabbit.DeclareQueue(queue), rabbit.DeclareBinding(bind))

sess.AddConsumer(
    NewConsumer(
        HandlerFunc(func(d Delivery) *Envelop {
            fmt.Println("Received:", string(d.Body))
			
            return nil
    }),
    queue,
    ConsumerAutoAck(),
))

// connect to the server and watch connection status
err := sess.Start()
if err != nil {
    panic(err)
}

session.Start() will try to connect to the broker until all attempts fail. In second case, returns error. You need to handle this error, but it usually means that the application can no longer work with the broker. The number of attempts can be defined by SessionMaxAttempts option, default 100.

Publisher

Publisher shares the same connection as consumer, but will use a different channel. See RabbitMQ documentation for more details.

// create session as above, you can reuse the same session
pub := session.Publisher()

err := pub.Publish(ctx, rabbit.Envelop{
        Payload: rabbit.Publishing{
        Body: []byte("hello"),
    },
    Exchange: exc.Name,
    Key:      "hello",
})
if err != nil {
	panic(err)
}

Call session.Start() after all consumers and publishers are declared, but only call it once.

RPC

See the examples directory for more details.

Tests

Run rabbit:

make rabbit

and tests:

make test

# Packages

No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
ConsumerRPCResponsePublishTimeout set time for waiting response publish.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
NewRPCClient you can use clientName for identifying consumer, it will be added to consumer tag.
No description provided by the author
nolint:gocritic // I don't see reason why config should be passed by pointer.
PublisherTimeout sets the timeout for publishing messages.
RPCClientCallTimeout see RPCClient.callTimeout.
RPCClientLog sets own logger for client.
RPCClientReplyQueueName see RPCClient.replyQueueName.
RPCClientReplyQueueNamePrefix see RPCClient.replyQueuePrefix.
RPCClientRequestX sets exchange for calls.
RPCClientResponseX sets exchange for responses.
RPCServerRequestX sets exchange for calls.
RPCServerResponseX sets exchange for responses.
SessionConsumerConfig consumer config.
SessionLog sets logger, it will be used by all consumers, rpc clients if not set for them.
SessionMaxAttempts max number of connection attempts, default 100.
nolint:gocritic // amqp091 declares it by value.

# Variables

No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
ConsumerConfig see channel.Qos().
No description provided by the author
ErrCritical is returned when client is unable to continue working.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
RPCClient with RabbitMQ you can create RPC with async call handling.
No description provided by the author
Session wraps amqp091.Connection and watches for connection state changes and reconnects if necessary with declared exchanges, queues, bindings and consumers.

# Interfaces

Declarer implemented by *amqp091.Channel.
Enveloper common interface for events or RPC call, it allows you to create your own message type with custom logic for handling some fields of amqp091.Publishing.
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author
These aliases are added for convenience of not importing a rabbitmq/amqp091-go package.
No description provided by the author
No description provided by the author
These aliases are added for convenience of not importing a rabbitmq/amqp091-go package.
No description provided by the author
No description provided by the author
No description provided by the author