Categorygithub.com/vgarvardt/gue/v4
modulepackage
4.1.0
Repository: https://github.com/vgarvardt/gue.git
Documentation: pkg.go.dev

# README

gue

GoDev Coverage Status ReportCard License

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v4

Additionally, you need to apply DB migration.

Usage Example

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jackc/pgx/v4/pgxpool"
	"golang.org/x/sync/errgroup"

	"github.com/vgarvardt/gue/v4"
	"github.com/vgarvardt/gue/v4/adapter/pgxv4"
)

const (
	printerQueue   = "name_printer"
	jobTypePrinter = "PrintName"
)

type printNameArgs struct {
	Name string
}

func main() {
	printName := func(ctx context.Context, j *gue.Job) error {
		var args printNameArgs
		if err := json.Unmarshal(j.Args, &args); err != nil {
			return err
		}
		fmt.Printf("Hello %s!\n", args.Name)
		return nil
	}

	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv4.NewConnPool(pgxPool)

	gc, err := gue.NewClient(poolAdapter)
	if err != nil {
		log.Fatal(err)
	}
	wm := gue.WorkMap{
		jobTypePrinter: printName,
	}

	finishedJobsLog := func(ctx context.Context, j *gue.Job, err error) {
		if err != nil {
			return
		}

		j.Tx().Exec(
			ctx,
			"INSERT INTO finished_jobs_log (queue, type, run_at) VALUES ($1, $2, now())",
			j.Queue,
			j.Type,
		)
	}

	// create a pool w/ 2 workers
	workers, err := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue(printerQueue), gue.WithPoolHooksJobDone(finishedJobsLog))
	if err != nil {
		log.Fatal(err)
	}

	ctx, shutdown := context.WithCancel(context.Background())

	// work jobs in goroutine
	g, gctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		err := workers.Run(gctx)
		if err != nil {
			// In a real-world applications, use a better way to shut down
			// application on unrecoverable error. E.g. fx.Shutdowner from
			// go.uber.org/fx module.
			log.Fatal(err)
		}
		return err
	})

	args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
	if err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  jobTypePrinter,
		Queue: printerQueue,
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	j = &gue.Job{
		Type:  jobTypePrinter,
		Queue: printerQueue,
		RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	time.Sleep(30 * time.Second) // wait for while

	// send shutdown signal to worker
	shutdown()
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}
}

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

pgx/v4

package main

import (
	"context"
	"log"
	"os"

	"github.com/jackc/pgx/v4/pgxpool"

	"github.com/vgarvardt/gue/v4"
	"github.com/vgarvardt/gue/v4/adapter/pgxv4"
)

func main() {
	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv4.NewConnPool(pgxPool)

	gc, err := gue.NewClient(poolAdapter)
	...
}

pgx/v5

package main

import (
	"log"
	"os"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/vgarvardt/gue/v4"
	"github.com/vgarvardt/gue/v4/adapter/pgxv5"
)

func main() {
	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.NewConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv5.NewConnPool(pgxPool)

	gc, err := gue.NewClient(poolAdapter)
	...
}

lib/pq

package main

import (
	"database/sql"
	"log"
	"os"

	_ "github.com/lib/pq" // register postgres driver

	"github.com/vgarvardt/gue/v4"
	"github.com/vgarvardt/gue/v4/adapter/libpq"
)

func main() {
	db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	poolAdapter := libpq.NewConnPool(db)

	gc, err := gue.NewClient(poolAdapter)
	...
}

Logging

Package supports several logging libraries using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • NoOp (adapter.NoOpLogger) - default adapter that does nothing, so it is basically /dev/null logger
  • Stdlib log - adapter that uses log logger for logs output. Instantiate it with adapter.NewStdLogger(...).
  • Uber zap - adapter that uses go.uber.org/zap logger for logs output. Instantiate it with adapter.zap.New(...).

# Packages

No description provided by the author

# Functions

GetWorkerIdx gets the index of the worker in the pool from the worker context.
NewClient creates a new Client that uses the pgx pool.
NewExponentialBackoff instantiates new exponential backoff implementation with config.
NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap.
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
RandomStringID returns random alphanumeric string that can be used as ID.
RunLock ensures that there is only one instance of the running callback function "f" (worker).
WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.
WithClientID sets client ID for easier identification in logs.
WithClientLogger sets Logger implementation to client.
WithClientMeter sets metric.Meter instance to the client.
WithPoolGracefulShutdown enables graceful shutdown mode for all workers in the pool.
WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.
WithPoolHooksJobLocked calls WithWorkerHooksJobLocked for every worker in the pool.
WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
WithPoolID sets worker pool ID for easier identification in logs.
WithPoolLogger sets Logger implementation to worker pool.
WithPoolMeter sets metric.Meter instance to every worker in the pool.
WithPoolPollInterval overrides default poll interval with the given value.
WithPoolPollStrategy overrides default poll strategy with given value.
WithPoolQueue overrides default worker queue name with the given value.
WithPoolTracer sets trace.Tracer instance to every worker in the pool.
WithWorkerGracefulShutdown enables graceful shutdown mode in the worker.
WithWorkerHooksJobDone sets hooks that are called when worker finished working the job, right before the successfully executed job will be removed or errored job handler will be called to decide if the Job will be re-queued or discarded.
WithWorkerHooksJobLocked sets hooks that are called right after the job was polled from the DB.
WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type.
WithWorkerID sets worker ID for easier identification in logs.
WithWorkerLogger sets Logger implementation to worker.
WithWorkerMeter sets metric.Meter instance to the worker.
WithWorkerPollInterval overrides default poll interval with the given value.
WithWorkerPollStrategy overrides default poll strategy with given value.
WithWorkerQueue overrides default worker queue name with the given value.
WithWorkerTracer sets trace.Tracer instance to the worker.

# Constants

Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
PriorityPollStrategy cares about the priority first to lock top priority jobs first even if there are available ones that should be executed earlier but with lower priority.
RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution.
WorkerIdxUnknown is returned when worker index in the pool is not set for some reasons.

# Variables

BackoffNever is the Backoff implementation that never returns errored job to the queue for retry, but discards it in case of the error.
DefaultExponentialBackoff is the exponential Backoff implementation with default config applied.
ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

# Structs

Client is a Gue client that can add jobs to the queue and remove jobs from the queue.
Job is a single unit of work for Gue to perform.
Worker is a single worker that pulls jobs off the specified queue.
WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.

# Type aliases

Backoff is the interface for backoff implementation that will be used to reschedule errored jobs to a later time.
ClientOption defines a type that allows to set client properties during the build-time.
HookFunc is a function that may react to a Job lifecycle events.
JobPriority is the wrapper type for Job.Priority.
PollStrategy determines how the DB is queried for the next job to work on.
WorkerOption defines a type that allows to set worker properties during the build-time.
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
WorkFunc is a function that performs a Job.
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.