# README
rabbitmq
Wrap github.com/rabbitmq/amqp091-go
to enable automatic reconnection in case of disconnection.
Feature
- Add automatic reconnection handling
- Retry sending messages when failed
- Shield the usage details of
github.com/rabbitmq/amqp091-go
and reduce users' cognitive load.
Usage
Publish
package main
import (
"context"
"fmt"
"log"
"github.com/icpd/rabbitmq"
)
func main() {
// 1. Initialize a rabbitmq object
r := rabbitmq.NewRabbitmq(
"amqp://admin:[email protected]:5672",
nil,
)
// 2. Create a connection to the rabbitmq service
if err := r.Connect(); err != nil {
log.Fatal(err)
}
// 3. Declare an exchange with the rabbitmq service
exchange := rabbitmq.ExchangeOptions{
Name: "example_exchange", // Exchange name
Type: rabbitmq.ExchangeTypeFanout, // Exchange type
Durable: true, // Whether it is durable
}
if err := r.Exchange(exchange); err != nil {
log.Fatal(err)
}
// 4. Send a message to the exchange, not blocking
// Notice: Please ensure that the exchange has been created before publishing a message
err := r.Publish(
context.Background(),
[]byte(fmt.Sprintf("hello %d", i)), // Message body to be sent
rabbitmq.Exchange(exchange), // Set the message exchange target
)
if err != nil {
log.Fatal(err)
}
select {}
}
Subscribe
package main
import (
"log"
"github.com/icpd/rabbitmq"
)
func main() {
// 1. Initialize a rabbitmq object
r := rabbitmq.NewRabbitmq(
"amqp://admin:[email protected]:5672",
nil,
)
// 2. Create a connection to the rabbitmq service
if err := r.Connect(); err != nil {
log.Fatal(err)
}
// 3. Exchange configuration
exchange := rabbitmq.ExchangeOptions{
Name: "example_exchange", // Exchange name
Type: rabbitmq.ExchangeTypeFanout, // Exchange type
Durable: true, // Whether it is durable
}
// 4. Create a subscription
// Subscribing and consuming internally starts a goroutine that consumes data so it won't block the main goroutine.
err := r.Subscribe(func(msg []byte) error {
log.Println("receive:", string(msg))
return nil
},
rabbitmq.Queue("example_queue"), // Set the name of the consumption queue
rabbitmq.Exchange(exchange), // Set the exchange that the queue needs to bind to
)
if err != nil {
log.Fatal(err)
}
select {}
}
For more usage examples, please refer to the _example
directory.
# Functions
DisableAutoAck 禁止自动确认消息.
DurableQueue 设置为持久化队列.
Exchange 设置交换机配置.
Key 设置路由key.
NewForeverBackoff 创建一个永不停止的 backoff.
No description provided by the author
Queue 设置队列名.
Requeue 设置消费失败时将消息重入队列.
WithoutExchange 不申明交换机.
# Constants
No description provided by the author
No description provided by the author
No description provided by the author
# Variables
DefaultDeliveryCloseDelay 默认 delivery 关闭后重建 consume 延迟时间.
DefaultPublishMaxTries 默认发送失败最大重试次数.
DefaultPublishRetryDelay 默认发送失败延迟时间.
No description provided by the author
# Structs
ExchangeOptions rabbitmq 交换机.
No description provided by the author
No description provided by the author
No description provided by the author
# Type aliases
ExchangeType 交换机类型.
No description provided by the author
No description provided by the author