Categorygithub.com/blokur/harego
modulepackage
0.4.2
Repository: https://github.com/blokur/harego.git
Documentation: pkg.go.dev

# README

Harego

High-level library on top of [amqp][github.com/rabbitmq/amqp091-go].

Build Status

Harego

  1. Description
  2. Usage
  3. Development

Description

A harego.Client is a concurrent safe queue manager for RabbitMQ, and a high-level implementation on top of amqp library. The Client creates one or more workers for publishing/consuming messages. The default values are chosen to make the Client a durable queue working with the default exchange and topic kind. Client can be configure by passing provided ConfigFunc functions to NewClient() constructor.

The Consume() method will call the provided HandlerFunc with the next available message on the next available worker. The return value of the HandlerFunc decided what would happen to the message. The Consume worker will delay before act on the ack for the amount of time the HandlerFunc returns as the second value.

You can increase the worker sizes by passing Workers(n) to the NewClient constructor.

When the Close() method is called, all connections will be closed and the Client will be useless. You can create a new object for more works.

Note

This library is in beta phase and the API might change until we reach a stable release.

Usage

NewClient

The only requirement for the NewClient function is a Connector to connect to the broker when needed:

// to use an address:
harego.NewClient(harego.URLConnector(address))
// to use an amqp connection:
harego.NewClient(harego.AMQPConnector(conn))

The connector is used when the connection is lost, so the Client can initiate a new connection.

Publish

In this setup the message is sent to the myexchange exchange:

client, err := harego.NewClient(harego.URLConnector(address),
    harego.ExchangeName("myexchange"),
)
// handle the error.
err = client.Publish(&amqp.Publishing{
    Body: []byte(msg),
})
// handle the error.

Consume

In this setup the myqueue is bound to the myexchange exchange, and handler is called for each message that are read from this queue:

client, err := harego.NewClient(harego.URLConnector(address),
    harego.ExchangeName("myexchange"),
    harego.QueueName("myqueue"),
)
// handle the error.
err = client.Consume(ctx, func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
   return harego.AckTypeAck, 0
})
// handle the error.

You can create multiple workers in the above example for concurrently handle multiple messages:

client, err := harego.NewClient(harego.URLConnector(address),
    harego.ExchangeName("myexchange"),
    harego.QueueName("myqueue"),
    harego.Workers(20),
)
// handle the error.
err = client.Consume(ctx, func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
   return harego.AckTypeAck, 0
})
// handle the error.

The handler will receive 20 messages concurrently and the Ack is sent for each message separately.

Delays

If the returned duration is 0, the acknowledgement is sent to the broker immediately. Otherwise Consume function sleeps for that duration before it's been sent. Please note that the delay will cause the current handler to sleep for this duration, therefore you need enough workers to be able to handle next available messages.

Requeueing

If you return a harego.AckTypeRequeue from the handler, the message is sent back to the same queue. This means this message will be consumed after all messages in the queue is consumed.

Development

Prerequisite

This project supports Go >= 1.18. To run targets from the Makefile you need to install GNU make. You also need docker installed for integration tests.

In order to install dependencies:

make dependencies

This also installs reflex to help with development process.

To run this application you need to provide the following settings as environment variables or application arguments:

RABBITMQ_PORT
RABBITMQ_ADDR
RABBITMQ_ADMIN_PORT
RABBITMQ_USER
RABBITMQ_PASSWORD
RABBITMQ_VH

Running Tests

To watch for file changes and run unittest:

make unittest
# or to run them with race flag:
make unittest_race

There is also a integration_test target for running integration tests.

Make Examples

make unittest
make unittest run=TestMyTest # runs a specific test with regexp
make unittest dir=./db/...   # runs tests in a package
make unittest dir=./db/... run=TestSomethingElse
make unittest flags="-race -count=2"

Please see the Makefile for more targets.

Mocks

To generate mocks run:

make mocks

RabbitMQ

For convenience you can trigger the integration_deps target to setup required RabbitMQ instance:

make integration_deps

# Packages

No description provided by the author

# Functions

AMQPConnector uses r everytime the Client needs a new connection.
AutoDelete marks the exchange and queues with autoDelete property which causes the messages to be automatically removed from the queue when consumed.
ConsumerName sets the consumer name of the consuming queue.
ExchangeName sets the exchange name.
ExclusiveQueue marks the queue as exclusive.
Internal sets the exchange to be internal.
NewClient returns a Client capable of publishing and consuming messages.
NotDurable marks the exchange and the queue not to be durable.
NoWait marks the exchange as noWait.
PrefetchCount sets how many items should be prefetched for consumption.
PrefetchSize sets the prefetch size of the Qos.
QueueArgs sets the args possed to the QueueDeclare method.
QueueName sets the queue name.
RoutingKey sets the routing key of the queue.
URLConnector creates a new connection from url.
WithDeliveryMode sets the default delivery mode of messages.
WithExchangeType sets the exchange type.
Workers sets the worker count for consuming messages.

# Constants

AckTypeAck causes the message to be removed in broker.
AckTypeNack causes the message to be requeued in broker.
AckTypeReject causes the message to be dropped in broker.
AckTypeRequeue causes the message to be requeued back to the end of the queue.
DeliveryModePersistent messages will be restored to durable queues and lost on non-durable queues during server restart.
DeliveryModeTransient means higher throughput but messages will not be restored on broker restart.
ExchangeTypeDirect defines a direct exchange.
ExchangeTypeFanout defines a fanout exchange.
ExchangeTypeHeaders defines a headers exchange.
ExchangeTypeTopic defines a topic exchange.

# Variables

ErrAlreadyConfigured is returned when an already configured client is about to receive new configuration.
ErrClosed is returned when the Client is closed and is being reused.
ErrInput is returned when an input is invalid.
ErrNilHnadler is returned when the handler is nil.

# Structs

Client is a concurrent safe construct for publishing a message to exchanges, and consuming messages from queues.

# Interfaces

A Channel can operate queues.
RabbitMQ defines a rabbitmq exchange.go:generate mockery --name RabbitMQ --filename rabbitmq_mock.go.

# Type aliases

AckType specifies how the message is acknowledged to RabbotMQ.
ConfigFunc is a function for setting up the Client.
A Connector should return a live connection.
DeliveryMode is the DeliveryMode of a amqp.Publishing message.
ExchangeType is the kind of exchange.
A HandlerFunc receives a message when it is available.