# README
go-jobq
A Job Queue implementation in golang
This module provides a simple implementation of a job queue. It is suitable for situations where an application just needs a basic queuing mechanism to handle workloads that can easily be defined using a golang interface{}. It provides an in memory queue and a redis backed queue.
The redis-backed queue actually uses an in memory queue to service an incoming redis stream. the api is slightly different but still pretty simple.
Define a worker to handle the job, pass a factory method for the worker to the q, then push an interface{} representing the job data to the q. When a worker thread becomes available, the interface containing the job data will be passed to your worker.
Usage
go get github.com/johnscode/go-jobq.git
To create an in memory job queue first create the queue, then start it:
q := NewQueue(workers int, capacity int, workerFactory)
q.StartWorkers(ctx)
Terminate the queue using the Stop method:
q.Stop()
Define the worker and worker factory
A worker needs to implement the IQueueWorker interface:
type IQueueWorker interface {
ProcessJob(ctx context.Context, job interface{})
QuitChan() chan bool
}
ProcessJob is the worker function that takes the 'job' data as an interface and performs your operation.
QuitChan() provides a chan bool that the queue uses to stop the workers.
The queue will use a factory that you provide to create the workers. The worker factory implements the QueueWorkerFactory interface:
type QueueWorkerFactory interface {
CreateWorker(ctx context.Context, quit chan bool) (IQueueWorker, error)
}
Add a job to the q
q.EnqueueJob(jobdata)
For a Redis-backed queue, you need to provide a redis client and a string representing the name of the redis stream that is the backing store for the queue. Note: you don't need to create the redis stream, redis will do that for us.
q, _ := CreateRedisStreamJobQueue(redisClient, numWorkers, capacity, workerFactory)
Example of an in memory queue
Create a job q to operate on a series of strings:
a. Define the worker
type struct Worker {
quitChan chan bool
// whatever else your worker needs
}
func (w Worker) ProcessJob(ctx context.Context, job interface{}) {
if str, ok := job.(string); ok {
// do stuff with the string
}
}
func (w Worker) QuitChan() chan bool {
return w.quitChan
}
b. Define the factory
type WorkerFactory struct {
// whatever else your factory needs
}
func (f WorkerFactory) CreateWorker(ctx context.Context, quit chan bool) (IQueueWorker, error) {
return Worker {
quitChan: quit
}
}
c. Create and start the queue with 5 workers and a capacity of 10
q:=NewQueue(5, 10, WorkerFactory{})
// StartWorkers will block until the queue quits (or fails), so use a go thread
go q.StartWorkers(ctx)
d. Push a job t the q
q.EnqueueJob("string to process")
Example of a Redis backed q
Create a redis-backed job q to operate on a series of strings:
a. Define the worker and factory (slightly different from above)
type RedisWorker struct {
quitChan chan bool
}
func (w RedisWorker) ProcessJob(ctx context.Context, job interface{}) {
if xmsg, ok := job.(redis.XMessage); ok {
fmt.Printf("process string: %+v\n", xmsg.Values)
}
}
func (w RedisWorker) QuitChan() chan bool {
return w.quitChan
}
type RedisWorkerFactory struct {
// whatever else your factory needs
}
func (f RedisWorkerFactory) CreateWorker(ctx context.Context, quit chan bool) (jobq.IQueueWorker, error) {
return RedisWorker{
quitChan: quit,
}, nil
}
b. Create and start the queue:
rdb, _:= redis.NewClient(...)
quitChan := make(chan bool)
q, _ := CreateRedisStreamJobQueue(rdb, "our-string-queue", 2, 3, WorkerFactory{})
// Work will block until the queue quits (or fails), so use a go thread
// write to quitChan to tell the queue to stop
go q.Work(ctx, quitChan)
c. Push data to the redis queue. Note: this could me in a different process or instance
q := QProducer{Redis: rdb, "our-string-queue"}
_ = q.SendToQ(ctx, "string to process")
Note on Encoding Payload
For a redis backed queue, the job data needs to be easily marshalled to a string. Redis will fail to marshall a data type that needs a binary encoding. Any primitive, struct of primitives, or map[string]interface{} of primitives will easily marshal.
To send a struct that has data that requires binary encoding, try marshaling the struct to JSON and using the json string as the job data. Similarly, for a byte array, try encoding to hex then sending the hex string as the job. Your ProcessJob method can easily unmarshal/decode on the other end of the queue.
TODO
- Replace the factory pattern, seems clunky