Categorygithub.com/sysulq/rsmq-go
modulepackage
0.2.0
Repository: https://github.com/sysulq/rsmq-go.git
Documentation: pkg.go.dev

# README

rsmq-go

Go codecov

Go implementation of the Message Queue based on Redis Streams.

rsmq

Why

  • High performance and low latency
  • Easy to use and maintain with Redis

Features

  • Add message to the queue
  • Consume message from the queue
  • Auto-acknowledgment of message
  • Message delivery delay with specific timestamp
  • Message retry ability
  • Dead letter queue after retry limit
  • Auto clean idle consumer
  • Pending message processing
  • Distributed rate limiting
  • Tag filter for message
  • OpenTelemetry instrumentation

Installation

go get github.com/sysulq/rsmq-go

Example

package rsmq_test

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
	"github.com/sysulq/rsmq-go"
)

func Example_produceAndConsume() {
	cc := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})

	queue := rsmq.New(rsmq.Options{
		Client: cc,
		Topic:  "example",
		ConsumeOpts: rsmq.ConsumeOpts{
			ConsumerGroup:   "task_group",
			AutoCreateGroup: true,
			MaxConcurrency:  1,
		},
	})
	defer queue.Close()

	// Produce tasks
	for i := 0; i < 10; i++ {
		task := &rsmq.Message{
			Payload: json.RawMessage(fmt.Sprintf(`{"message": "Hello %d"}`, i)),
		}

		err := queue.Add(context.Background(), task)
		if err != nil {
			log.Printf("Failed to enqueue task: %v", err)
		}
	}

	// Consume tasks
	go func() {
		err := queue.Consume(
			context.Background(),
			func(ctx context.Context, task *rsmq.Message) error {
				var payload map[string]interface{}
				_ = json.Unmarshal(task.Payload, &payload)
				fmt.Printf("Processing task, payload: %v\n", payload)

				return nil
			},
		)
		if err != nil {
			log.Fatalf("Error consuming tasks: %v", err)
		}
	}()

	time.Sleep(time.Second)
	// Output:
	// Processing task, payload: map[message:Hello 0]
	// Processing task, payload: map[message:Hello 1]
	// Processing task, payload: map[message:Hello 2]
	// Processing task, payload: map[message:Hello 3]
	// Processing task, payload: map[message:Hello 4]
	// Processing task, payload: map[message:Hello 5]
	// Processing task, payload: map[message:Hello 6]
	// Processing task, payload: map[message:Hello 7]
	// Processing task, payload: map[message:Hello 8]
	// Processing task, payload: map[message:Hello 9]
}

# Packages

No description provided by the author
No description provided by the author

# Functions

Map2 returns an iterator over f applied to seq.
New creates a new MessageQueue instance.

# Variables

MessagingRsmqMessageDeliveryTimestamp is the messaging delivery timestamp for rsmq.
MessagingRsmqMessageGroup is the messaging group for rsmq.
MessagingRsmqMessageID is the messaging ID for rsmq.
MessagingRsmqMessageTopic is the messaging topic for rsmq.
MessagingRsmqSystem is the messaging system for rsmq.

# Structs

ConsumeOpts represents options for consuming messages.
MessageQueue manages message production and consumption.
No description provided by the author
RetentionOpts represents options for retention policy.

# Type aliases

BatchMessageHandler is a function that processes a batch of messages and returns a list of errors.
Message represents a message in the queue.
MessageHandler is a function that processes a message and returns a result.