modulepackage
0.0.0-20230519174807-3352087e8a70
Repository: https://github.com/mergestat/sqlq.git
Documentation: pkg.go.dev
# README
sqlq
:warning: still very experimental! proceed with care!
sqlq
is a SQL-backed, persistent, job queuing solution.
Usage
To start using sqlq
add it to your dependencies with:
go get -u github.com/mergestat/sqlq
Enqueing new jobs
Using the sqlq.Enqueue()
function, push a job to a queue:
func NewSendMail(args ...string) *sqlq.JobDescription {
var params = args // somehow serialize args ... using json.Encode() probably
return sqlq.NewJobDesc("send-email", WithParameters(params));
}
var conn *sql.DB // handle to an open database connection
job, err := sqlq.Enqueue(conn, "default", NewSendMail(from, to, body))
if err != nil {
// handle error here
}
De-queuing and processing jobs
To de-queue and process a job, you need to use one of the available runtimes. Currently, the only available runtime is the embed
runtime
available under github.com/mergestat/sqlq/runtime/embed
package. The following example demonstrates how to implement a job handler for that runtime.
func SendMail() sqlq.Handlerfunc {
return func(ctx context.Context, job *sqlq.Job) error {
// send mail's implementation
return nil
}
}
func main() {
var upstream *sql.DB // handle to an open database connection
var worker, _ = embed.NewWorker(upstream, embed.WorkerConfig{
Queues: []sqlq.Queue{"embed/default"}, // queues to watch
})
_ = worker.Register("send-mail", SendMail())
// starting the worker will start the processing routine in background
if err := worker.Start(); err != nil {
// handle error
}
// wait here .. probably listening for os.Signal
// make a clear / graceful exit
if err := worker.Shutdown(5 * time.Second); err != nil {
// graceful exit failed / didn't complete
return nil
}
}
These basic examples are just to give an idea. Refer to tests to see more varied use cases.
Developing
To start a postgres
database in Docker and execute tests against it, run:
docker-compose up
and then:
go test ./... -postgres-url postgres://postgres:password@localhost:5432 -v
# Functions
AttachLogger attaches a new Logger to the given job, that logs to the provided backend.
AttachPinger attaches a new Pinger to the given job.
AttachResultWriter attaches a result writer to the given job, using the provided Connection as the backend to write to.
Cancelled transitions the job to CANCELLED state and mark it as completed.
Dequeue dequeues a single job from one of the provided queues.
Enqueue enqueues a new job in the given queue based on provided description.
Error transitions the job to ERROR state and mark it as completed.
IsCancelled checks the current job state searching for a cancelling state, if so returns true.
NewJobDesc creates a new JobDescription for a job with given typename and with the provided opts.
NewLogger returns a new Logger for the given Job, which write logs to the provided backend.
Reap reaps any zombie process, processes where state is 'running' but the job hasn't pinged in a while, in the given queues.
Success transitions the job to SUCCESS state and mark it as completed.
WithKeepAlive sets the keepalive ping duration for the job.
WithMaxRetries sets the maximum retry limit for the job.
WithParameters is used to pass additional parameters / arguments to the job.
WithPriority sets the job's priority.
WithRetention sets the retention policy for a job.
No description provided by the author
# Constants
DebugLevel is used to log messages at debug level.
ErrorLevel is used to log messages at error level.
InfoLevel is used to log messages at info level.
cancelled.
cancelling.
errored.
invalid.
pending.
running.
success.
WarnLevel is used to log messages at warn level.
# Variables
ErrJobStateMismatch is returned if an optimistic locking failure occurs when transitioning a job between states.
ErrSkipRetry must be used by the job handler routine to signal that the job must not be retried (even when there are attempts left).
# Structs
DequeueFilters are a set of optional filters that can be used with Dequeue().
Job represents an instance of a task / job in a queue.
JobDescription describes a job to be enqueued.
Logger is used to log messages from a job's handler to a given backend.
# Interfaces
Connection is a utility abstraction over sql connection / pool / transaction objects.
Handler is the user-provided implementation of the job's business logic.
LogBackend service provides the backend / sink implementation where the log messages are displayed and / or persisted.
No description provided by the author
# Type aliases
No description provided by the author
JobState represents the status a job is in and serves as the basis of a job's state machine.
LogBackendAdapter is a utility type to use complying functions are log backends.
LogLevel defines the logging levels used when sending log messages.
Queue represents a named group / queue where jobs can be pushed / enqueued.