Categorygithub.com/Yordroid/rabbitmq
modulepackage
1.0.5
Repository: https://github.com/yordroid/rabbitmq.git
Documentation: pkg.go.dev

# README

rabbitmq

Wrap github.com/rabbitmq/amqp091-go to enable automatic reconnection in case of disconnection.

Feature

  • Add automatic reconnection handling
  • Retry sending messages when failed
  • Shield the usage details of github.com/rabbitmq/amqp091-go and reduce users' cognitive load.

Usage

Publish

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/icpd/rabbitmq"
)

func main() {
	// 1. Initialize a rabbitmq object
	r := rabbitmq.NewRabbitmq(
		"amqp://admin:[email protected]:5672",
		nil,
	)

	// 2. Create a connection to the rabbitmq service
	if err := r.Connect(); err != nil {
		log.Fatal(err)
	}

	// 3. Declare an exchange with the rabbitmq service
	exchange := rabbitmq.ExchangeOptions{
		Name:    "example_exchange",          // Exchange name
		Type:    rabbitmq.ExchangeTypeFanout, // Exchange type
		Durable: true,                        // Whether it is durable
	}
	if err := r.Exchange(exchange); err != nil {
		log.Fatal(err)
	}

	// 4. Send a message to the exchange, not blocking
	// Notice: Please ensure that the exchange has been created before publishing a message
	err := r.Publish(
		context.Background(),
		[]byte(fmt.Sprintf("hello %d", i)), // Message body to be sent 
		rabbitmq.Exchange(exchange),        // Set the message exchange target
	)
	if err != nil {
		log.Fatal(err)
	}

	select {}
}

Subscribe

package main

import (
	"log"

	"github.com/icpd/rabbitmq"
)

func main() {
	// 1. Initialize a rabbitmq object
	r := rabbitmq.NewRabbitmq(
		"amqp://admin:[email protected]:5672",
		nil,
	)

	// 2. Create a connection to the rabbitmq service
	if err := r.Connect(); err != nil {
		log.Fatal(err)
	}

	// 3. Exchange configuration
	exchange := rabbitmq.ExchangeOptions{
		Name:    "example_exchange",          // Exchange name
		Type:    rabbitmq.ExchangeTypeFanout, // Exchange type
		Durable: true,                        // Whether it is durable
	}

	// 4. Create a subscription
	// Subscribing and consuming internally starts a goroutine that consumes data so it won't block the main goroutine.
	err := r.Subscribe(func(msg []byte) error {
		log.Println("receive:", string(msg))
		return nil
	},
		rabbitmq.Queue("example_queue"), // Set the name of the consumption queue
		rabbitmq.Exchange(exchange),     // Set the exchange that the queue needs to bind to
	)
	if err != nil {
		log.Fatal(err)
	}

	select {}
}

For more usage examples, please refer to the _example directory.

# Functions

DisableAutoAck 禁止自动确认消息.
DurableQueue 设置为持久化队列.
Exchange 设置交换机配置.
Key 设置路由key.
NewForeverBackoff 创建一个永不停止的 backoff.
No description provided by the author
Queue 设置队列名.
Requeue 设置消费失败时将消息重入队列.
WithoutExchange 不申明交换机.

# Constants

No description provided by the author
No description provided by the author
No description provided by the author

# Variables

DefaultDeliveryCloseDelay 默认 delivery 关闭后重建 consume 延迟时间.
DefaultPublishMaxTries 默认发送失败最大重试次数.
DefaultPublishRetryDelay 默认发送失败延迟时间.
No description provided by the author

# Structs

ExchangeOptions rabbitmq 交换机.
No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

ExchangeType 交换机类型.
No description provided by the author
No description provided by the author