Categorygithub.com/mergestat/sqlq
modulepackage
0.0.0-20230519174807-3352087e8a70
Repository: https://github.com/mergestat/sqlq.git
Documentation: pkg.go.dev

# README

sqlq

:warning: still very experimental! proceed with care!

sqlq is a SQL-backed, persistent, job queuing solution.

Usage

To start using sqlq add it to your dependencies with:

go get -u github.com/mergestat/sqlq

Enqueing new jobs

Using the sqlq.Enqueue() function, push a job to a queue:

func NewSendMail(args ...string) *sqlq.JobDescription {
    var params = args // somehow serialize args ... using json.Encode() probably
    return sqlq.NewJobDesc("send-email", WithParameters(params));
}


var conn *sql.DB // handle to an open database connection

job, err := sqlq.Enqueue(conn, "default", NewSendMail(from, to, body))
if err != nil {
    // handle error here
}

De-queuing and processing jobs

To de-queue and process a job, you need to use one of the available runtimes. Currently, the only available runtime is the embed runtime available under github.com/mergestat/sqlq/runtime/embed package. The following example demonstrates how to implement a job handler for that runtime.

func SendMail() sqlq.Handlerfunc {
    return func(ctx context.Context, job *sqlq.Job) error { 
        // send mail's implementation
        return nil
    }
}

func main() {
    var upstream *sql.DB // handle to an open database connection
    var worker, _ = embed.NewWorker(upstream, embed.WorkerConfig{
		    Queues: []sqlq.Queue{"embed/default"}, // queues to watch
    })
    
    _ = worker.Register("send-mail", SendMail())
    
    // starting the worker will start the processing routine in background
    if err := worker.Start(); err != nil {
        // handle error
    }
    
    // wait here .. probably listening for os.Signal
    
    // make a clear / graceful exit
    if err := worker.Shutdown(5 * time.Second); err != nil {
        // graceful exit failed / didn't complete
        return nil
    }
}

These basic examples are just to give an idea. Refer to tests to see more varied use cases.

Developing

To start a postgres database in Docker and execute tests against it, run:

docker-compose up

and then:

go test ./... -postgres-url postgres://postgres:password@localhost:5432 -v

# Packages

No description provided by the author
Package schema provides sql schema migrations for sqlq.

# Functions

AttachLogger attaches a new Logger to the given job, that logs to the provided backend.
AttachPinger attaches a new Pinger to the given job.
AttachResultWriter attaches a result writer to the given job, using the provided Connection as the backend to write to.
Cancelled transitions the job to CANCELLED state and mark it as completed.
Dequeue dequeues a single job from one of the provided queues.
Enqueue enqueues a new job in the given queue based on provided description.
Error transitions the job to ERROR state and mark it as completed.
IsCancelled checks the current job state searching for a cancelling state, if so returns true.
NewJobDesc creates a new JobDescription for a job with given typename and with the provided opts.
NewLogger returns a new Logger for the given Job, which write logs to the provided backend.
Reap reaps any zombie process, processes where state is 'running' but the job hasn't pinged in a while, in the given queues.
Success transitions the job to SUCCESS state and mark it as completed.
WithKeepAlive sets the keepalive ping duration for the job.
WithMaxRetries sets the maximum retry limit for the job.
WithParameters is used to pass additional parameters / arguments to the job.
WithPriority sets the job's priority.
WithRetention sets the retention policy for a job.
No description provided by the author

# Constants

DebugLevel is used to log messages at debug level.
ErrorLevel is used to log messages at error level.
InfoLevel is used to log messages at info level.
cancelled.
cancelling.
errored.
invalid.
pending.
running.
success.
WarnLevel is used to log messages at warn level.

# Variables

ErrJobStateMismatch is returned if an optimistic locking failure occurs when transitioning a job between states.
ErrSkipRetry must be used by the job handler routine to signal that the job must not be retried (even when there are attempts left).

# Structs

DequeueFilters are a set of optional filters that can be used with Dequeue().
Job represents an instance of a task / job in a queue.
JobDescription describes a job to be enqueued.
Logger is used to log messages from a job's handler to a given backend.

# Interfaces

Connection is a utility abstraction over sql connection / pool / transaction objects.
Handler is the user-provided implementation of the job's business logic.
LogBackend service provides the backend / sink implementation where the log messages are displayed and / or persisted.
No description provided by the author

# Type aliases

No description provided by the author
JobState represents the status a job is in and serves as the basis of a job's state machine.
LogBackendAdapter is a utility type to use complying functions are log backends.
LogLevel defines the logging levels used when sending log messages.
Queue represents a named group / queue where jobs can be pushed / enqueued.