Categorygithub.com/vgarvardt/gue/v3
modulepackage
3.3.0
Repository: https://github.com/vgarvardt/gue.git
Documentation: pkg.go.dev

# README

gue

GoDev Coverage Status ReportCard License

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v3

Additionally, you need to apply DB migration.

Usage Example

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jackc/pgx/v4/pgxpool"
	"golang.org/x/sync/errgroup"

	"github.com/vgarvardt/gue/v3"
	"github.com/vgarvardt/gue/v3/adapter/pgxv4"
)

const (
	printerQueue   = "name_printer"
	jobTypePrinter = "PrintName"
)

type printNameArgs struct {
	Name string
}

func main() {
	printName := func(ctx context.Context, j *gue.Job) error {
		var args printNameArgs
		if err := json.Unmarshal(j.Args, &args); err != nil {
			return err
		}
		fmt.Printf("Hello %s!\n", args.Name)
		return nil
	}

	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv4.NewConnPool(pgxPool)

	gc := gue.NewClient(poolAdapter)
	wm := gue.WorkMap{
		jobTypePrinter: printName,
	}

	finishedJobsLog := func(ctx context.Context, j *gue.Job, err error) {
		if err != nil {
			return
		}

		j.Tx().Exec(
			ctx,
			"INSERT INTO finished_jobs_log (queue, type, run_at) VALUES ($1, $2, now())",
			j.Queue,
			j.Type,
		)
	}

	// create a pool w/ 2 workers
	workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue(printerQueue), que.WithPoolHooksJobDone(finishedJobsLog))

	ctx, shutdown := context.WithCancel(context.Background())

	// work jobs in goroutine
	g, gctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		err := workers.Run(gctx)
		if err != nil {
			// In a real-world applications, use a better way to shut down
			// application on unrecoverable error. E.g. fx.Shutdowner from
			// go.uber.org/fx module.
			log.Fatal(err)
		}
		return err
	})

	args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
	if err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  jobTypePrinter,
		Queue: printerQueue,
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	j = &gue.Job{
		Type:  jobTypePrinter,
		Queue: printerQueue,
		RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	time.Sleep(30 * time.Second) // wait for while

	// send shutdown signal to worker
	shutdown()
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}
}

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

pgx/v4

package main

import (
	"context"
	"log"
	"os"

	"github.com/jackc/pgx/v4/pgxpool"

	"github.com/vgarvardt/gue/v3"
	"github.com/vgarvardt/gue/v3/adapter/pgxv4"
)

func main() {
	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv4.NewConnPool(pgxPool)

	gc := gue.NewClient(poolAdapter)
	...
}

pgx/v3

package main

import (
	"log"
	"os"

	"github.com/jackc/pgx"

	"github.com/vgarvardt/gue/v3"
	"github.com/vgarvardt/gue/v3/adapter/pgxv3"
)

func main() {
	pgxCfg, err := pgx.ParseURI(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}
	pgxPool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
		ConnConfig: pgxCfg,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv3.NewConnPool(pgxPool)

	gc := gue.NewClient(poolAdapter)
	...
}

lib/pq

package main

import (
	"database/sql"
	"log"
	"os"

	_ "github.com/lib/pq" // register postgres driver

	"github.com/vgarvardt/gue/v3"
	"github.com/vgarvardt/gue/v3/adapter/libpq"
)

func main() {
	db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	poolAdapter := libpq.NewConnPool(db)

	gc := gue.NewClient(poolAdapter)
	...
}

pg/v10

package main

import (
	"log"
	"os"

	"github.com/go-pg/pg/v10"

	"github.com/vgarvardt/gue/v3"
	"github.com/vgarvardt/gue/v3/adapter/gopgv10"
)

func main() {
	opts, err := pg.ParseURL(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	db := pg.Connect(opts)
	defer db.Close()

	poolAdapter := gopgv10.NewConnPool(db)

	gc := gue.NewClient(poolAdapter)
	...
}

Logging

Package supports several logging libraries using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • NoOp (adapter.NoOpLogger) - default adapter that does nothing, so it is basically /dev/null logger
  • Stdlib log - adapter that uses log logger for logs output. Instantiate it with adapter.NewStdLogger(...).
  • Uber zap - adapter that uses go.uber.org/zap logger for logs output. Instantiate it with adapter.zap.New(...).

Testing

Linter and tests are running for every Pull Request, but it is possible to run linter and tests locally using docker and make.

Run linter: make link. This command runs liner in docker container with the project source code mounted.

Run tests: make test. This command runs project dependencies in docker containers if they are not started yet and runs go tests with coverage.

# Packages

No description provided by the author

# Functions

NewClient creates a new Client that uses the pgx pool.
NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap.
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.
WithClientID sets client ID for easier identification in logs.
WithClientLogger sets Logger implementation to client.
WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.
WithPoolHooksJobLocked calls WithWorkerHooksJobLocked for every worker in the pool.
WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
WithPoolID sets worker pool ID for easier identification in logs.
WithPoolLogger sets Logger implementation to worker pool.
WithPoolPollInterval overrides default poll interval with the given value.
WithPoolPollStrategy overrides default poll strategy with given value.
WithPoolQueue overrides default worker queue name with the given value.
WithWorkerHooksJobDone sets hooks that are called when worker finished working the job.
WithWorkerHooksJobLocked sets hooks that are called right after the job was polled from the DB.
WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type.
WithWorkerID sets worker ID for easier identification in logs.
WithWorkerLogger sets Logger implementation to worker.
WithWorkerPollInterval overrides default poll interval with the given value.
WithWorkerPollStrategy overrides default poll strategy with given value.
WithWorkerQueue overrides default worker queue name with the given value.

# Constants

PriorityPollStrategy cares about the priority first to lock top priority jobs first even if there are availableones that should be executed earlier but with lower priority.
RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution.

# Variables

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

# Structs

Client is a Gue client that can add jobs to the queue and remove jobs from the queue.
Job is a single unit of work for Gue to perform.
Worker is a single worker that pulls jobs off the specified queue.
WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.

# Type aliases

Backoff is the interface for backoff implementation that will be used to reschedule errored jobs.
ClientOption defines a type that allows to set client properties during the build-time.
HookFunc is a function that may react to a Job lifecycle events.
PollStrategy determines how the DB is queried for the next job to work on.
WorkerOption defines a type that allows to set worker properties during the build-time.
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
WorkFunc is a function that performs a Job.
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.