modulepackage
1.0.0-beta.1
Repository: https://github.com/bxcodec/goqueue.git
Documentation: pkg.go.dev
# README
goqueue
GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.
Index
Support
You can file an Issue. See documentation in Go.Dev
Getting Started
Install
go get -u github.com/bxcodec/goqueue
Example
package main
import (
"context"
"encoding/json"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
"github.com/bxcodec/goqueue/interfaces"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/options"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
"github.com/bxcodec/goqueue/publisher"
)
func initExchange(ch *amqp.Channel, exchangeName string) error {
return ch.ExchangeDeclare(
exchangeName,
"topic",
true,
false,
false,
false,
nil,
)
}
func main() {
// Initialize the RMQ connection
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
rmqConn, err := amqp.Dial(rmqDSN)
if err != nil {
panic(err)
}
// Initialize the Publisher
rmqPub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: rmqConn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("publisher_id"),
publisherOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
),
)
publisherChannel, err := rmqConn.Channel()
if err != nil {
panic(err)
}
defer publisherChannel.Close()
initExchange(publisherChannel, "goqueue")
consumerChannel, err := rmqConn.Channel()
if err != nil {
panic(err)
}
defer consumerChannel.Close()
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
consumerChannel,
publisherChannel,
"goqueue", // exchange name
[]string{"goqueue.payments.#"}, // routing keys pattern
)),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithQueueName("consumer_queue"),
)
queueSvc := goqueue.NewQueueService(
options.WithConsumer(rmqConsumer),
options.WithPublisher(rmqPub),
options.WithMessageHandler(handler()),
)
go func() {
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"message": fmt.Sprintf("Hello World %d", i),
}
jbyt, _ := json.Marshal(data)
err := queueSvc.Publish(context.Background(), interfaces.Message{
Data: data,
Action: "goqueue.payments.create",
Topic: "goqueue",
})
if err != nil {
panic(err)
}
fmt.Println("Message Sent: ", string(jbyt))
}
}()
// change to context.Background() if you want to run it forever
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = queueSvc.Start(ctx)
if err != nil {
panic(err)
}
}
func handler() interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
data := m.Data
jbyt, _ := json.Marshal(data)
fmt.Println("Message Received: ", string(jbyt))
return m.Ack(ctx)
}
}
Advance Setups
RabbitMQ -- Retry Concept
Src: Excalidraw Link
Contribution
To contrib to this project, you can open a PR or an issue.
# Packages
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Functions
AddGoQueueEncoding stores the given encoding for the specified content type in the goQueueEncodingMap.
GetGoQueueEncoding returns the encoding associated with the given content type.
NewQueueService creates a new instance of QueueService with the provided options.
# Variables
No description provided by the author
No description provided by the author
No description provided by the author
JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message.
JSONEncoder is an implementation of the EncoderFn interface that encodes a Message into JSON format.
JSONEncoding represents the encoding configuration for JSON.
# Structs
Encoding represents an encoding configuration for a specific content type.
QueueService represents a service that handles message queuing operations.