package
1.12.6
Repository: https://github.com/go-dev-frame/sponge.git
Documentation: pkg.go.dev

# README

rabbitmq

rabbitmq library wrapped in github.com/rabbitmq/amqp091-go, supports automatic reconnection and customized setting parameters, includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types, and dead letter is supported.

Example of use

Code Example

The following code example is including direct, topic, fanout, headers, delayed message, publisher subscriber six message types.

Tip: the wrapped Consume function uses manual acknowledgement mode by default and does not need to call the ack function again.

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/go-dev-frame/sponge/pkg/logger"
	"github.com/go-dev-frame/sponge/pkg/rabbitmq"
)

var (
	producerCount int32
	consumerCount int32
)

func main() {
	url := "amqp://guest:[email protected]:5672/"

	directExample(url)

	//topicExample(url)

	//fanoutExample(url)

	//headersExample(url)

	//delayedMessageExample(url)

	//publisherSubscriberExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue-1"
	routeKey := "direct-key-1"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishDirect(context.Background(), []byte("[direct] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side direct message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo"
	queueName := "topic-queue-1"
	routingKey := "key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			key := "key1.key2.key" + strconv.Itoa(i)
			err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side topic message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo"
	queueName := "fanout-queue-1"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side fanout message
	{
		queueName = "fanout-queue-1"
		c1 := runConsume(url, exchange, queueName, queueArgs)
		queueName = "fanout-queue-2"
		c2 := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		fmt.Println("\n\nconsumer 2 count:", c2.Count())
	}

	printStat()
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo"
	queueName := "headers-queue-1"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		for i := 1; i <= 100; i++ {
			headersKeys1 := headersKeys
			err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] key1 message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)

			// because of x-match: all, headersKeys2 will not match the same queue, so drop it
			headersKeys2 := map[string]interface{}{"foo": "bar"}
			err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message "+strconv.Itoa(i)))
			checkErr(err)
		}
	}

	// consumer-side headers message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo"
	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		for i := 1; i <= 100; i++ {
			err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side delayed message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func publisherSubscriberExample(url string) {
	channelName := "pub-sub"
	fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")

	// publisher-side message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewPublisher(channelName, connection)
		checkErr(err)
		defer p.Close()

		for i := 1; i <= 100; i++ {
			err = p.Publish(context.Background(), []byte("[pub-sub] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// subscriber-side message
	{
		identifier := "pub-sub-queue-1"
		s1 := runSubscriber(url, channelName, identifier)
		identifier = "pub-sub-queue-2"
		s2 := runSubscriber(url, channelName, identifier)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(s1.Count()))
		fmt.Println("\n\nsubscriber 2 count:", s2.Count())
	}

	printStat()
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
		rabbitmq.WithConsumerAutoAck(false),
		rabbitmq.WithConsumerQueueDeclareOptions(
			rabbitmq.WithQueueDeclareArgs(queueArgs),
		),
	)
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

func runSubscriber(url string, channelName string, identifier string) *rabbitmq.Subscriber {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	s.Subscribe(context.Background(), handler)

	return s
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func printStat() {
	fmt.Println("\n\n-------------------- stat --------------------")
	fmt.Println("producer count:", atomic.LoadInt32(&producerCount))
	fmt.Println("consumer count:", atomic.LoadInt32(&consumerCount))
	fmt.Println("----------------------------------------------\n")
	atomic.StoreInt32(&producerCount, 0)
	atomic.StoreInt32(&consumerCount, 0)
}

Example of Dead Letter

The following example code is in the direct, topic, fanout, headers, delayed message five message types to add a queue of dead letters, dead letter queue is fixed to direct type.

Tip: the wrapped Consume function uses manual acknowledgement mode by default.

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/go-dev-frame/sponge/pkg/logger"
	"github.com/go-dev-frame/sponge/pkg/rabbitmq"
)

var (
	producerCount int32
	consumerCount int32

	deadLetterConsumerCount int32
)

func main() {
	url := "amqp://guest:[email protected]:5672/"

	directExample(url)

	//topicExample(url)

	//fanoutExample(url)

	//headersExample(url)

	//delayedMessageExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo-2"
	queueName := "direct-queue-2"
	routingKey := "direct-key-2"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routingKey)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs() // get producer queue args

		for i := 1; i <= 100; i++ {
			err = p.PublishDirect(context.Background(), []byte("[direct] say hello"+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side direct message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo-2"
	queueName := "topic-queue-2"
	routingKey := "dl-key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			key := "dl-key1.key2.key" + strconv.Itoa(i)
			err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side topic message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo-2"
	queueName := "fanout-queue-3"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-direct-key")
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side fanout message
	{
		queueName = "fanout-queue-3"
		c1 := runConsume(url, exchange, queueName, queueArgs)
		queueName = "fanout-queue-4"
		c2 := runConsume(url, exchange, queueName, queueArgs)
		c3 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&consumerCount, int32(c2.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c3.Count()))
	}

	printStat()
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo-2"
	queueName := "headers-queue-2"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-headers-key")
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		for i := 1; i <= 100; i++ {
			headersKeys1 := headersKeys
			err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)

			// because of x-match: all, headersKeys2 will not match the same queue, so drop it
			headersKeys2 := map[string]interface{}{"foo": "bar"}
			err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message"))
			checkErr(err)
		}
	}

	// consumer-side headers message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo-2"
	queueName := "delayed-message-queue-2"
	routingKey := "delayed-key-2"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		for i := 1; i <= 200; i++ {
			delayTime := time.Second
			if i > 100 {
				delayTime = time.Second * 2
			}

			err = p.PublishDelayedMessage(ctx, delayTime, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side delayed message
	{
		time.Sleep(time.Second * 3) // wait for all messages to be sent
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 10)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
		rabbitmq.WithConsumerAutoAck(false),
		rabbitmq.WithConsumerQueueDeclareOptions(
			rabbitmq.WithQueueDeclareArgs(queueArgs),
		),
	)
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

func runConsumeForDeadLetter(url string, exchange *rabbitmq.Exchange, queueName string) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func printStat() {
	fmt.Println("\n\n-------------------- stat --------------------")
	fmt.Println("producer count:", producerCount)
	fmt.Println("consumer count:", consumerCount)
	fmt.Println("dead letter consumer count:", deadLetterConsumerCount)
	fmt.Println("----------------------------------------------\n")
	atomic.StoreInt32(&producerCount, 0)
	atomic.StoreInt32(&consumerCount, 0)
	atomic.StoreInt32(&deadLetterConsumerCount, 0)
}

Example of Automatic Resumption of Publish

If the error of publish is caused by the network, you can check if the reconnection is successful and publish it again.

package main

import (
	"context"
	"errors"
	"strconv"
	"time"

	"github.com/go-dev-frame/sponge/pkg/logger"
	"github.com/go-dev-frame/sponge/pkg/rabbitmq"
)

var url = "amqp://guest:[email protected]:5672/"

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Hour)
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue"
	routeKey := "info"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)

	err := runConsume(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runConsume failed", logger.Err(err))
		return
	}

	err = runProduce(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runProduce failed", logger.Err(err))
		return
	}
}

func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}
	defer connection.Close()

	p, err := rabbitmq.NewProducer(exchange, queueName, connection)
	if err != nil {
		return err
	}
	defer p.Close()

	count := 0
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			count++
			data := []byte("direct say hello" + strconv.Itoa(count))
			err = p.PublishDirect(ctx, data)
			if err != nil {
				if errors.Is(err, rabbitmq.ErrClosed) {
					for {
						if !connection.CheckConnected() { // check connection
							time.Sleep(time.Second * 2)
							continue
						}
						p, err = rabbitmq.NewProducer(exchange, queueName, connection)
						if err != nil {
							logger.Warn("reconnect failed", logger.Err(err))
							time.Sleep(time.Second * 2)
							continue
						}
						break
					}
				} else {
					logger.Warn("publish failed", logger.Err(err))
				}
			}
			logger.Info("publish message", logger.String("data", string(data)))
			time.Sleep(time.Second * 5)
		}
	}
}

func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	if err != nil {
		return err
	}

	c.Consume(ctx, handler)

	return nil
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

# Functions

NewConnection rabbitmq connection.
NewConsumer create a consumer.
NewDelayedMessageExchange create a delayed message exchange.
NewDirectExchange create a direct exchange.
NewFanoutExchange create a fanout exchange.
NewHeadersExchange create a headers exchange, the headerType supports "all" and "any".
NewProducer create a producer.
NewPublisher create a publisher, channelName is exchange name.
NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name.
NewTopicExchange create a topic exchange.
WithConsumeArgs set consume args option.
WithConsumeConsumer set consume consumer option.
WithConsumeExclusive set consume exclusive option.
WithConsumeNoLocal set consume noLocal option.
WithConsumeNoWait set consume no wait option.
WithConsumerAutoAck set consumer auto ack option, if false, manual ACK required.
WithConsumerConsumeOptions set consumer consume option.
WithConsumerExchangeDeclareOptions set exchange declare option.
WithConsumerPersistent set consumer persistent option.
WithConsumerQosOptions set consume qos option.
WithConsumerQueueBindOptions set queue bind option.
WithConsumerQueueDeclareOptions set queue declare option.
WithDeadLetter set dead letter exchange, queue, routing key.
WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.
WithDeadLetterOptions set dead letter options.
WithDeadLetterQueueBindOptions set dead letter queue bind option.
WithDeadLetterQueueDeclareOptions set dead letter queue declare option.
WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.
WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.
WithExchangeDeclareArgs set exchange declare args option.
WithExchangeDeclareAutoDelete set exchange declare auto delete option.
WithExchangeDeclareInternal set exchange declare internal option.
WithExchangeDeclareNoWait set exchange declare no wait option.
WithLogger set logger option.
WithProducerExchangeDeclareOptions set exchange declare option.
WithProducerMandatory set producer mandatory option.
WithProducerPersistent set producer persistent option.
WithProducerQueueBindOptions set queue bind option.
WithProducerQueueDeclareOptions set queue declare option.
WithPublisherConfirm enables publisher confirm.
WithQosEnable set qos enable option.
WithQosPrefetchCount set qos prefetch count option.
WithQosPrefetchGlobal set qos global option.
WithQosPrefetchSize set qos prefetch size option.
WithQueueBindArgs set queue bind args option.
WithQueueBindNoWait set queue bind no wait option.
WithQueueDeclareArgs set queue declare args option.
WithQueueDeclareAutoDelete set queue declare auto delete option.
WithQueueDeclareExclusive set queue declare exclusive option.
WithQueueDeclareNoWait set queue declare no wait option.
WithReconnectTime set reconnect time interval option.
WithTLSConfig set tls config option.

# Constants

DefaultURL default rabbitmq url.
HeadersTypeAll all.
HeadersTypeAny any.

# Variables

ErrClosed closed.

# Structs

Connection rabbitmq connection.
Consumer session.
Exchange rabbitmq minimum management unit.
Producer session.
Publisher session.
Subscriber session.

# Type aliases

ConnectionOption connection option.
ConsumeOption consume option.
ConsumerOption consumer option.
DeadLetterOption declare dead letter option.
DelayedMessagePublishOption declare queue bind option.
ExchangeDeclareOption declare exchange option.
Handler message.
HeadersType headers type.
ProducerOption producer option.
QosOption qos option.
QueueBindOption declare queue bind option.
QueueDeclareOption declare queue option.