# README
redisqueue
redisqueue
provides a producer and consumer of a queue that uses Redis
streams.
Features
- A
Producer
struct to make enqueuing messages easy. - A
Consumer
struct to make processing messages concurrenly. - Claiming and acknowledging messages if there's no error, so that if a consumer dies while processing, the message it was working on isn't lost. This guarantees at least once delivery.
- A "visibility timeout" so that if a message isn't processed in a designated time frame, it will be be processed by another consumer.
- A max length on the stream so that it doesn't store the messages indefinitely and run out of memory.
- Graceful handling of Unix signals (
SIGINT
andSIGTERM
) to let in-flight messages complete. - A channel that will surface any errors so you can handle them centrally.
- Graceful handling of panics to avoid crashing the whole process.
- A concurrency setting to control how many goroutines are spawned to process messages.
- A batch size setting to limit the total messages in flight.
- Support for multiple streams.
Installation
redisqueue
requires a Go version with Modules support and uses import
versioning. So please make sure to initialize a Go module before installing
redisqueue
:
go mod init github.com/my/repo
go get github.com/robinjoseph08/redisqueue/v2
Import:
import "github.com/robinjoseph08/redisqueue/v2"
Example
Here's an example of a producer that inserts 1000 messages into a queue:
package main
import (
"fmt"
"github.com/robinjoseph08/redisqueue/v2"
)
func main() {
p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
StreamMaxLength: 10000,
ApproximateMaxLength: true,
})
if err != nil {
panic(err)
}
for i := 0; i < 1000; i++ {
err := p.Enqueue(&redisqueue.Message{
Stream: "redisqueue:test",
Values: map[string]interface{}{
"index": i,
},
})
if err != nil {
panic(err)
}
if i%100 == 0 {
fmt.Printf("enqueued %d\n", i)
}
}
}
And here's an example of a consumer that reads the messages off of that queue:
package main
import (
"fmt"
"time"
"github.com/robinjoseph08/redisqueue/v2"
)
func main() {
c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
VisibilityTimeout: 60 * time.Second,
BlockingTimeout: 5 * time.Second,
ReclaimInterval: 1 * time.Second,
BufferSize: 100,
Concurrency: 10,
})
if err != nil {
panic(err)
}
c.Register("redisqueue:test", process)
go func() {
for err := range c.Errors {
// handle errors accordingly
fmt.Printf("err: %+v\n", err)
}
}()
fmt.Println("starting")
c.Run()
fmt.Println("stopped")
}
func process(msg *redisqueue.Message) error {
fmt.Printf("processing message: %v\n", msg.Values["index"])
return nil
}
# Packages
No description provided by the author
# Functions
NewConsumer uses a default set of options to create a Consumer.
NewConsumerWithOptions creates a Consumer with custom ConsumerOptions.
NewProducer uses a default set of options to create a Producer.
NewProducerWithOptions creates a Producer using custom ProducerOptions.
# Structs
Consumer adds a convenient wrapper around dequeuing and managing concurrency.
ConsumerOptions provide options to configure the Consumer.
Message constitutes a message that will be enqueued and dequeued from Redis.
Producer adds a convenient wrapper around enqueuing messages that will be processed later by a Consumer.
ProducerOptions provide options to configure the Producer.
# Type aliases
ConsumerFunc is a type alias for the functions that will be used to handle and process Messages.
RedisOptions is an alias to redis.Options so that users can this instead of having to import go-redis directly.