# README
Sidekiq compatible background workers in golang.
- reliable queueing for all queues using brpoplpush
- handles retries
- support custom middleware
- customize concurrency per queue
- responds to Unix signals to safely wait for jobs to finish before exiting.
- provides stats on what jobs are currently running
- redis sentinel support
- well tested
Example usage:
package main
import (
"fmt"
workers "github.com/digitalocean/go-workers2"
)
func myJob(message *workers.Msg) error {
// do something with your message
// message.Jid()
// message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
return nil
}
func myMiddleware(queue string, mgr *workers.Manager, next workers.JobFunc) workers.JobFunc {
return func(message *workers.Msg) (err error) {
// do something before each message is processed
err = next(message)
// do something after each message is processed
return
}
}
func main() {
// Create a manager, which manages workers
manager, err := workers.NewManager(workers.Options{
// location of redis instance
ServerAddr: "localhost:6379",
// instance of the database
Database: 0,
// number of connections to keep open with redis
PoolSize: 30,
// unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
ProcessID: "1",
})
if err != nil {
fmt.Println(err)
}
// create a middleware chain with the default middlewares, and append myMiddleware
mids := workers.DefaultMiddlewares().Append(myMiddleware)
// pull messages from "myqueue" with concurrency of 10
// this worker will not run myMiddleware, but will run the default middlewares
manager.AddWorker("myqueue", 10, myJob)
// pull messages from "myqueue2" with concurrency of 20
// this worker will run the default middlewares and myMiddleware
manager.AddWorker("myqueue2", 20, myJob, mids...)
// pull messages from "myqueue3" with concurrency of 20
// this worker will only run myMiddleware
manager.AddWorker("myqueue3", 20, myJob, myMiddleware)
// If you already have a manager and want to enqueue
// to the same place:
producer := manager.Producer()
// Alternatively, if you want to create a producer to enqueue messages
// producer, err := workers.NewProducer(Options{
// // location of redis instance
// ServerAddr: "localhost:6379",
// // instance of the database
// Database: 0,
// // number of connections to keep open with redis
// PoolSize: 30,
// // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
// ProcessID: "1",
// })
// Add a job to a queue
producer.Enqueue("myqueue3", "Add", []int{1, 2})
// Add a job to a queue with retry
producer.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})
// stats will be available at http://localhost:8080/stats
go workers.StartAPIServer(8080)
// Blocks until process is told to exit via unix signal
manager.Run()
}
When running the above code example, it will produce the following output at localhost:8080/stats
:
[
{
"manager_name": "",
"processed": 5,
"failed": 57,
"jobs": {
"myqueue": null,
"myqueue2": null,
"myqueue3": null
},
"enqueued": {
"myqueue": 0,
"myqueue2": 0,
"myqueue3": 0
},
"retry_count": 4
}
]
Development sponsored by DigitalOcean. Code forked from github/jrallison/go-workers. Initial development sponsored by Customer.io.
# Functions
ConfigureAPIServer allows global API server configuration with the given options.
DefaultMiddlewares creates the default middleware pipeline.
LogMiddleware is the default logging middleware.
NewManager creates a new manager with provide options.
NewManagerWithRedisClient creates a new manager with provide options and pre-configured Redis client.
NewMiddlewares creates the processing pipeline given the list of middleware funcs.
NewMsg returns a new message.
NewProducer creates a new producer with the given options.
NewProducerWithRedisClient creates a new producer with the given options and Redis client.
NopMiddleware does nothing.
RegisterAPIEndpoints sets up API server endpoints.
RetryMiddleware middleware that allows retries for jobs failures.
StartAPIServer starts the API server.
StatsMiddleware middleware to collect stats on processed messages.
StopAPIServer stops the API server.
# Constants
DefaultRetryMax is default for max number of retries for a job.
NanoSecondPrecision is a constant for the number of nanoseconds in a second.
RetryTimeFormat is default for retry time format.
# Variables
Logger is the default go-workers2 logger, only used here in this file.
# Structs
APIOptions contains the set of configuration options for the global api.
Args is the set of parameters for a message.
EnqueueData stores data and configuration for new work.
EnqueueOptions stores configuration for new work.
JobStatus contains the status and data for active jobs of a manager.
Manager coordinates work, workers, and signaling needed for job processing.
Msg is the struct for job data (parameters and metadata).
Options contains the set of configuration options for a manager and/or producer.
Producer is used to enqueue new work.
Retries stores retry information.
No description provided by the author
Stats containts current stats for a manager.
# Interfaces
Fetcher is an interface for managing work messages.
# Type aliases
JobFunc is a message processor.
MiddlewareFunc is an extra function on the processing pipeline.
Middlewares contains the lists of all configured middleware functions.
RetriesExhaustedFunc gets executed when retry attempts have been exhausted.