Categorygithub.com/vgarvardt/gue/v2
modulepackage
2.2.1
Repository: https://github.com/vgarvardt/gue.git
Documentation: pkg.go.dev

# README

gue

GoDev Coverage Status ReportCard License

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v2

Additionally, you need to apply DB migration.

Usage Example

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/jackc/pgx/v4/pgxpool"
    "golang.org/x/sync/errgroup"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

type printNameArgs struct {
    Name string
}

func main() {
    printName := func(j *gue.Job) error {
        var args printNameArgs
        if err := json.Unmarshal(j.Args, &args); err != nil {
            return err
        }
        fmt.Printf("Hello %s!\n", args.Name)
        return nil
    }

    pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv4.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    wm := gue.WorkMap{
        "PrintName": printName,
    }
    // create a pool w/ 2 workers
    workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer"))

    ctx, shutdown := context.WithCancel(context.Background())

    // work jobs in goroutine
    g, gctx := errgroup.WithContext(ctx)
    g.Go(func() error {
        err := workers.Run(gctx)
        if err != nil {
            // In a real-world applications, use a better way to shut down
            // application on unrecoverable error. E.g. fx.Shutdowner from
            // go.uber.org/fx module.
            log.Fatal(err)
        }
        return err
    })

    args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
    if err != nil {
        log.Fatal(err)
    }

    j := &gue.Job{
        Type:  "PrintName",
        Args:  args,
    }
    if err := gc.Enqueue(context.Background(), j); err != nil {
        log.Fatal(err)
    }

    j := &gue.Job{
        Type:  "PrintName",
        RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
        Args:  args,
    }
    if err := gc.Enqueue(context.Background(), j); err != nil {
        log.Fatal(err)
    }

    time.Sleep(30 * time.Second) // wait for while

    // send shutdown signal to worker
    shutdown()
    if err := g.Wait(); err != nil {
        log.Fatal(err)
    }
}

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

pgx/v4

package main

import (
    "context"
    "log"
    "os"

    "github.com/jackc/pgx/v4/pgxpool"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

func main() {
    pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv4.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    ...
}

pgx/v3

package main

import (
    "log"
    "os"

    "github.com/jackc/pgx"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv3"
)

func main() {
    pgxCfg, err := pgx.ParseURI(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    pgxPool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
        ConnConfig:   pgxCfg,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv3.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    ...
}

lib/pq

package main

import (
    "database/sql"
    "log"
    "os"

    _ "github.com/lib/pq" // register postgres driver

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/libpq"
)

func main() {
    db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    poolAdapter := libpq.NewConnPool(db)

    gc := gue.NewClient(poolAdapter)
    ...
}

pg/v10

package main

import (
    "log"
    "os"

    "github.com/go-pg/pg/v10"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/gopgv10"
)

func main() {
    opts, err := pg.ParseURL(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    db := pg.Connect(opts)
    defer db.Close()

    poolAdapter := gopgv10.NewConnPool(db)

    gc := gue.NewClient(poolAdapter)
    ...
}

Logging

Package supports several logging libraries using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • NoOp (adapter.NoOpLogger) - default adapter that does nothing, so it is basically /dev/null logger
  • Stdlib log - adapter that uses log logger for logs output. Instantiate it with adapter.NewStdLogger(...).
  • Uber zap - adapter that uses go.uber.org/zap logger for logs output. Instantiate it with adapter.zap.New(...).

Testing

Linter and tests are running for every Pull Request, but it is possible to run linter and tests locally using docker and make.

Run linter: make link. This command runs liner in docker container with the project source code mounted.

Run tests: make test. This command runs project dependencies in docker containers if they are not started yet and runs go tests with coverage.

# Packages

No description provided by the author

# Functions

NewClient creates a new Client that uses the pgx pool.
NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap.
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.
WithClientID sets client ID for easier identification in logs.
WithClientLogger sets Logger implementation to client.
WithPoolID sets worker pool ID for easier identification in logs.
WithPoolLogger sets Logger implementation to worker pool.
WithPoolPollInterval overrides default poll interval with the given value.
WithPoolQueue overrides default worker queue name with the given value.
WithWorkerID sets worker ID for easier identification in logs.
WithWorkerLogger sets Logger implementation to worker.
WithWorkerPollInterval overrides default poll interval with the given value.
WithWorkerQueue overrides default worker queue name with the given value.

# Variables

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

# Structs

Client is a Gue client that can add jobs to the queue and remove jobs from the queue.
Job is a single unit of work for Gue to perform.
Worker is a single worker that pulls jobs off the specified queue.
WorkerPool is a pool of Workers, each working jobs from the queue queue at the specified interval using the WorkMap.

# Type aliases

Backoff is the interface for backoff implementation that will be used to reschedule errored jobs.
ClientOption defines a type that allows to set client properties during the build-time.
WorkerOption defines a type that allows to set worker properties during the build-time.
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
WorkFunc is a function that performs a Job.
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.