Categorygithub.com/sagilio/asynq
modulepackage
0.23.14
Repository: https://github.com/sagilio/asynq.git
Documentation: pkg.go.dev

# README

Asynq logo

Simple, reliable & efficient distributed task queue in Go

GoDoc Go Report Card Build Status License: MIT Gitter chat

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts tasks on a queue
  • Server pulls tasks off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Example use case

Task Queue Diagram

Features

Stability and Compatibility

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

☝️ Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Quickstart

Make sure you have Go installed (download). Version 1.14 or higher is required.

Initialize your project by creating a folder and then running go mod init github.com/your/repo (learn more) inside the folder. Then install Asynq library with the go get command:

go get -u github.com/sagilio/asynq

Make sure you're running a Redis server locally or from a Docker container. Version 4.0 or higher is required.

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
    "github.com/sagilio/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

type EmailDeliveryPayload struct {
    UserID     int
    TemplateID string
}

type ImageResizePayload struct {
    SourceURL string
}

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeEmailDelivery, payload), nil
}

func NewImageResizeTask(src string) (*asynq.Task, error) {
    payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
    if err != nil {
        return nil, err
    }
    // task options can be passed to NewTask, which can be overridden at enqueue time.
    return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    var p EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    var p ImageResizePayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Resizing image: src=%s", p.SourceURL)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
	return &ImageProcessor{}
}

In your application code, import the above package and use Client to put tasks on queues.

package main

import (
    "log"
    "time"

    "github.com/sagilio/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

Next, start a worker server to process these tasks in the background. To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with net/http Handler.

package main

import (
    "log"

    "github.com/sagilio/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: redisAddr},
        asynq.Config{
            // Specify how many concurrent workers to use
            Concurrency: 10,
            // Optionally specify multiple queues with different priority.
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
            // See the godoc for other configuration options
        },
    )

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started guide.

To learn more about asynq features and APIs, see the package godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks.

Here's a few screenshots of the Web UI:

Queues view

Web UI Queues View

Tasks view

Web UI TasksView

Metrics view Screen Shot 2021-12-19 at 4 37 19 PM

Settings and adaptive dark mode

Web UI Settings and adaptive dark mode

For details on how to use the tool, refer to the tool's README.

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

To install the CLI tool, run the following command:

go install github.com/sagilio/asynq/tools/asynq

Here's an example of running the asynq dash command:

Gif

For details on how to use the tool, refer to the tool's README.

Contributing

We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on Gitter channel, etc) made by the community.

Please see the Contribution Guide before contributing.

License

Copyright (c) 2019-present Ken Hibino and Contributors. Asynq is free and open-source software licensed under the MIT License. Official logo was created by Vic Shóstak and distributed under Creative Commons license (CC0 1.0 Universal).

# Functions

Deadline returns an option to specify the deadline for the given task.
DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.
GetMaxRetry extracts maximum retry from a context, if any.
GetQueueName extracts queue name from a context, if any.
GetRetryCount extracts retry count from a context, if any.
GetTaskID extracts a task ID from a context, if any.
Group returns an option to specify the group used for the task.
MaxRetry returns an option to specify the max number of times the task will be retried.
NewClient returns a new Client instance given a redis connection option.
New returns a new instance of Inspector.
NewPeriodicTaskManager returns a new PeriodicTaskManager instance.
NewScheduler returns a new Scheduler instance given the redis connection option.
NewServeMux allocates and returns a new ServeMux.
NewServer returns a new Server given a redis connection option and server configuration.
NewTask returns a new Task given a type name and payload data.
NotFound returns an error indicating that the handler was not found for the given task.
NotFoundHandler returns a simple task handler that returns a ``not found`` error.
Page returns an option to specify the page number for list operation.
PageSize returns an option to specify the page size for list operation.
ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
ProcessAt returns an option to specify when to process the given task.
ProcessIn returns an option to specify when to process the given task relative to the current time.
Queue returns an option to specify the queue to enqueue the task into.
Retention returns an option to specify the duration of retention period for the task.
TaskID returns an option to specify the task ID.
Timeout returns an option to specify how long a task may run.
Unique returns an option to enqueue a task only if the given task is unique.

# Constants

No description provided by the author
DebugLevel is the lowest level of logging.
ErrorLevel is used for undesired and unexpected events that the program can recover from.
FatalLevel is used for undesired and unexpected events that the program cannot recover from.
No description provided by the author
InfoLevel is used for general informational log messages.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Indicates that the task is currently being processed by Handler.
Indicates that the task is waiting in a group to be aggregated into one task.
Indicates that the task is archived and stored for inspection purposes.
Indicates that the task is processed successfully and retained until the retention TTL expires.
Indicates that the task is ready to be processed by Handler.
Indicates that the task has previously failed and scheduled to be processed some time in the future.
Indicates that the task is scheduled to be processed some time in the future.
No description provided by the author
No description provided by the author
WarnLevel is used for undesired but relatively expected events, which may indicate a problem.

# Variables

ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
ErrLeaseExpired error indicates that the task failed because the worker working on the task could not extend its lease due to missing heartbeats.
ErrQueueNotEmpty indicates that the specified queue is not empty.
ErrQueueNotFound indicates that the specified queue does not exist.
ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown.
ErrTaskIDConflict indicates that the given task could not be enqueued since its task ID already exists.
ErrTaskNotFound indicates that the specified task cannot be found in the queue.
SkipRetry is used as a return value from Handler.ProcessTask to indicate that the task should not be retried and should be archived instead.

# Structs

A Client is responsible for scheduling tasks.
ClusterNode describes a node in redis cluster.
Config specifies the server's background-task processing behavior.
DailyStats holds aggregate data for a given day for a given queue.
GroupInfo represents a state of a group at a certain time.
Inspector is a client interface to inspect and mutate the state of queues and tasks.
PeriodicTaskConfig specifies the details of a periodic task.
PeriodicTaskManager manages scheduling of periodic tasks.
No description provided by the author
QueueInfo represents a state of a queue at a certain time.
RedisClientOpt is used to create a redis client that connects to a redis server directly.
RedisClusterClientOpt is used to creates a redis client that connects to redis cluster.
RedisFailoverClientOpt is used to creates a redis client that talks to redis sentinels for service discovery and has an automatic failover capability.
ResultWriter is a client interface to write result data for a task.
A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
SchedulerEntry holds information about a periodic task registered with a scheduler.
SchedulerOpts specifies scheduler options.
ServeMux is a multiplexer for asynchronous tasks.
Server is responsible for task processing and task lifecycle management.
ServerInfo describes a running Server instance.
Task represents a unit of work to be performed.
A TaskInfo describes a task and its metadata.
WorkerInfo describes a running worker processing a task.

# Interfaces

An ErrorHandler handles an error occurred during task processing.
GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
A Handler processes tasks.
ListOption specifies behavior of list operation.
Logger supports logging at various log levels.
Option specifies the task processing behavior.
PeriodicTaskConfigProvider provides configs for periodic tasks.
RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.

# Type aliases

The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler.
The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator.
The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler.
LogLevel represents logging level.
MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler.
No description provided by the author
RetryDelayFunc calculates the retry delay duration for a failed task given the retry count, error, and the task.
TaskState denotes the state of a task.