# README
rsmq-go
Go implementation of the Message Queue based on Redis Streams.
Why
- High performance and low latency
- Easy to use and maintain with Redis
Features
- Add message to the queue
- Consume message from the queue
- Auto-acknowledgment of message
- Message delivery delay with specific timestamp
- Message retry ability
- Dead letter queue after retry limit
- Auto clean idle consumer
- Pending message processing
- Distributed rate limiting
- Tag filter for message
- OpenTelemetry instrumentation
Installation
go get github.com/sysulq/rsmq-go
Example
package rsmq_test
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
"github.com/sysulq/rsmq-go"
)
func Example_produceAndConsume() {
cc := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
queue := rsmq.New(rsmq.Options{
Client: cc,
Topic: "example",
ConsumeOpts: rsmq.ConsumeOpts{
ConsumerGroup: "task_group",
AutoCreateGroup: true,
MaxConcurrency: 1,
},
})
defer queue.Close()
// Produce tasks
for i := 0; i < 10; i++ {
task := &rsmq.Message{
Payload: json.RawMessage(fmt.Sprintf(`{"message": "Hello %d"}`, i)),
}
err := queue.Add(context.Background(), task)
if err != nil {
log.Printf("Failed to enqueue task: %v", err)
}
}
// Consume tasks
go func() {
err := queue.Consume(
context.Background(),
func(ctx context.Context, task *rsmq.Message) error {
var payload map[string]interface{}
_ = json.Unmarshal(task.Payload, &payload)
fmt.Printf("Processing task, payload: %v\n", payload)
return nil
},
)
if err != nil {
log.Fatalf("Error consuming tasks: %v", err)
}
}()
time.Sleep(time.Second)
// Output:
// Processing task, payload: map[message:Hello 0]
// Processing task, payload: map[message:Hello 1]
// Processing task, payload: map[message:Hello 2]
// Processing task, payload: map[message:Hello 3]
// Processing task, payload: map[message:Hello 4]
// Processing task, payload: map[message:Hello 5]
// Processing task, payload: map[message:Hello 6]
// Processing task, payload: map[message:Hello 7]
// Processing task, payload: map[message:Hello 8]
// Processing task, payload: map[message:Hello 9]
}
# Variables
MessagingRsmqMessageDeliveryTimestamp is the messaging delivery timestamp for rsmq.
MessagingRsmqMessageGroup is the messaging group for rsmq.
MessagingRsmqMessageID is the messaging ID for rsmq.
MessagingRsmqMessageTopic is the messaging topic for rsmq.
MessagingRsmqSystem is the messaging system for rsmq.
# Structs
ConsumeOpts represents options for consuming messages.
MessageQueue manages message production and consumption.
No description provided by the author
RetentionOpts represents options for retention policy.
# Type aliases
BatchMessageHandler is a function that processes a batch of messages and returns a list of errors.
Message represents a message in the queue.
MessageHandler is a function that processes a message and returns a result.