Categorygithub.com/Soreing/hopper
modulepackage
0.0.0-20240810143204-89ac53e9bf03
Repository: https://github.com/soreing/hopper.git
Documentation: pkg.go.dev

# README

Hopper

Build Coverage Go Report Card Go Reference

Hopper is a wrapper around the amqp091-go package, providing a simplified and quick way to create publishers and consumers on RabbitMQ.

Controllers

Connect to the RabbitMQ server by creating a Controller instance with a connection URL and configs.

url := "amqp://guest:guest@localhost:5672/"
ctrl, err := hopper.NewController(url, amqp091.Config{})
if err != nil {
    panic(err)
}

The Controller implements functions that call the internal amqp091.Channel for interacting with exchanges and queues. The below example sets up a simple test case, where a messages exchange routes inbound messages to the inbound queue.

ctrl.ExchangeDeclare("messages", "direct", true, false, false, false, nil)
ctrl.QueueDeclare("inbound", true, false, false, false, nil)
ctrl.QueueBind("inbound", "messages.inbound", "messages", false, nil)

Publishers

The Controller can create Publishers, which create their own amqp091.Channel and manage the publishing of messages. The publisher needs an Id to distinguish different publishers and a mode that sets the behavior of the publisher. ConfirmMode will make the publisher wait for confirmations from the server, and TransactionMode makes publishings atomic until they are either committed or rolled back.

pub, err := ctrl.NewPublisher("publisher-id", hopper.ConfirmMode)
if err != nil {
    panic(err)
}

Publishers support middleware functions that execute on every message that is sent with the publisher. The middleware function provides the original context of the called function, as well as a PublisherContext, which exposes the amqp091.Publishing object for modification, as well as the exchange and routing key.

Middleware functions execute in the order they were attached. To move to the next function in the publishing pipeline, call .Next() on the publishing context.

pub.Use(func(ctx context.Context, pctx *hopper.PublishingContext) {
    pubId := pctx.PublisherId()
    msgId := "3e9dc427-a233-4352-878b-808312f7ec48"
    pctx.Publishing.MessageId = msgId

    start := time.Now()
    pctx.Next()
    elapsed := time.Since(start)

    if err := pctx.GetError(); err == nil {
        fmt.Println("Message", msgId, "delivered in", elapsed, "by", pubId)
    }
})

Publishing sends a message body with a content type to some exchange with a routing key.

ctx := context.TODO()
message := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit.")
publishing := amqp091.Publishing{Body: message, ContentType: "plain/text"}

// Publishing synchronously (blocking)
err = pub.Publish(ctx, "messages", "messages.inbound", publishing)
if err != nil {
    panic(err)
}

// Publishing asynchronously (non-blocking)
chn := pub.PublishAsync(ctx, "messages", "messages.inbound", publishing)
err <- chn
if err != nil {
    panic(err)
}

Consumers

The Controller can create Consumers, which create their own amqp091.Channel and manage the consuming of messages. The consumer needs an Id to distinguish different consumers and a consumer channel to declare the queue with bindings.

queue := hopper.ConsumerQueue{
    Name:    "inbound",
    Durable: true,
    AutoAck: false,
    Bindings: map[string][]string{
        "messages": {"messages.inbound"},
    },
}

con, err := ctrl.NewConsumer("consumer-id", queue)
if err != nil {
    panic(err)
}

To consume messages, get the amqp091.Delivery channel from the consumer. You can acknowledge or reject messages directly on the amqp091.Delivery or on the consumer with the delivery tag.

msg := <-con.Deliveries()
fmt.Println(string(msg.Body))
msg.Ack(false)

Termination

Controllers, Publishers and Consumers include a Done() method, returning a channel that closes when the entities are closed. When encountering an error, the entities close automatically and the error can be fetched with the Error() method. To gracefully shut down the entities, call Shutdown().

go func(con *hopper.Consumer) {
    <-con.Done()
    if err := con.Error(); err != nil {
        fmt.Println("Consumer terminated with error:", err)
    }
}(con)

con.Shutdown(ctx)

# Functions

NewController creates a controller by connecting to the AMQP server with a url and creating a channel.

# Constants

ConfirmMode puts a publisher into confirm mode where each publishing receives a confirmation for reliable publishing.
TransactionMode puts the publusher into transaction mode where all publishings before commit or rollback will be atomic.

# Structs

Consumer manages consuming deliveries from the AMQP server.
ConsumerQueue configures a queue that a consumer will fetch messages from.
Controller has an AMQP connection and a channel.
Publisher manages publishing and confirmations to the AMQP server.
PublishingContext facilitates running middlewares on publishing, containing a modifiable publishing and a value storage.

# Type aliases

PublisherMiddleware describes a function that is called during the publish pipeline.
PublisherMode configures the publisher's behavior.