Categorygithub.com/choria-io/asyncjobs
modulepackage
0.1.2
Repository: https://github.com/choria-io/asyncjobs.git
Documentation: pkg.go.dev

# README

Choria Asynchronous Jos

Overview

This is an Asynchronous Job Queue system that relies on NATS JetStream for storage and general job life cycle management. It is compatible with any NATS JetStream based system like a private hosted JetStream, Choria Streams or a commercial SaaS.

Each Task is stored in JetStream by a unique ID and Work Queue item is made referencing that Task. JetStream will handle dealing with scheduling, retries, acknowledgements and more of the Work Queue item. The stored Task will be updated during the lifecycle.

Multiple processes can process jobs concurrently, thus job processing is both horizontally and vertically scalable. Job handlers are implemented in Go with one process hosting one or many handlers. Other languages can implement Job Handlers using NATS Request-Reply services. Per process concurrency and overall per-queue concurrency controls exist.

This package heavily inspired by hibiken/asynq.

Go Reference Go Report Card CodeQL Unit Tests

Status

This is a brand-new project, under heavy development. The core Task handling is in good shape and reasonably stable. Task Scheduler is still subject to some change.

Synopsis

Tasks are published to Work Queues:

// establish a connection to the EMAIL work queue using a NATS context
client, _ := asyncjobs.NewClient(asyncjobs.NatsConn(nc), asyncjobs.BindWorkQueue("EMAIL"))

// create a task with the type 'email:new' and body from newEmail()
task, _ := asyncjobs.NewTask("email:new", newEmail())

// store it in the Work Queue
client.EnqueueTask(ctx, task)

Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus integration, concurrency and backoffs configured.

// establish a connection to the EMAIL work queue using a 
// NATS context, with concurrency, prometheus stats and backoff
client, _ := asyncjobs.NewClient(
	asyncjobs.NatsContext("EMAIL"), 
	asyncjobs.BindWorkQueue("EMAIL"),
	asyncjobs.ClientConcurrency(10),
	asyncjobs.PrometheusListenPort(8080),
	asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes))

router := asyncjobs.NewTaskRouter()
router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (any, error) {
	log.Printf("Processing task %s", task.ID)

	// do work here using task.Payload

	return "sent", nil
})

client.Run(ctx, router)

See our documentation for a deep dive into the use cases, architecture, abilities and more.

Requirements

NATS 2.8.0 or newer with JetStream enabled.

Features

See the Feature List page for a full feature break down.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author

# Functions

BindWorkQueue binds the client to a work queue that should already exist.
ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling.
CustomLogger sets a custom logger to use for all logging.
DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState.
DiscardTaskStatesByName configures the client to discard Tasks that reach a final state in the list of supplied TaskState.
IsRetryPolicyKnown determines if the named policy exist.
IsValidName is a generic strict name validator for what we want people to put in name - task names etc, things that turn into subjects.
MemoryStorage enables storing tasks and work queue in memory in JetStream.
NatsConn sets an already connected NATS connection as communications channel.
NatsContext attempts to connect to the NATS client context c.
NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.
NewLeaderElectedEvent creates a new event notifying of a leader election win.
NewTask creates a new task of taskType that can later be used to route tasks to handlers.
NewTaskRouter creates a new Mux.
NewTaskScheduler creates a new Task Scheduler service.
NewTaskStateChangeEvent creates a new event notifying of a change in task state.
NoStorageInit skips setting up any queues or task stores when creating a client.
ParseEventJSON parses event bytes returning the parsed Event and its event type.
PrometheusListenPort enables prometheus listening on a specific port.
RequestReplySubjectForTaskType returns the subject a request-reply handler should listen on for a specified task type.
RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes.
RetryBackoffPolicyName uses the policy named to schedule job retries by using RetryPolicyLookup(name).
RetryPolicyLookup loads a policy by name.
RetryPolicyNames returns a list of pre-generated retry policies.
RetrySleep sleeps for the duration for try n or until interrupted by ctx.
StoreReplicas sets the replica level to keep for the tasks store and work queue Used only when initially creating the underlying streams.
TaskDeadline sets an absolute time after which the task should not be handled.
TaskDependsOn are Tasks that this task is dependent on, can be called multiple times.
TaskDependsOnIDs are IDs that this task is dependent on, can be called multiple times.
TaskMaxTries sets a maximum to the amount of processing attempts a task will have, the queue max tries will override this.
TaskRequiresDependencyResults indicates that if a task has any dependencies their results should be loaded before execution.
TaskRetention is the time tasks will be kept for in the task storage Used only when initially creating the underlying streams.
WorkQueue configures the client to consume messages from a specific queue When not set the "DEFAULT" queue will be used.

# Constants

ConfigBucketName is the KV bucket for configuration like scheduled tasks.
DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get.
DefaultMaxTries when not configured for a task this is the default tries it will get.
DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting.
EventsSubjectWildcard is the NATS wildcard for receiving all events.
LeaderElectedEventSubjectPattern is the pattern for determining the event publish subject.
LeaderElectedEventSubjectWildcard is the NATS wildcard for receiving all LeaderElectedEvent messages.
LeaderElectedEventType is the event type for LeaderElectedEvent events.
LeaderElectionBucketName is the KV bucket that will manage leader elections.
RequestReplyContentTypeHeader is the header text sent to indicate the body encoding and type.
RequestReplyDeadlineHeader is the header indicating the deadline for processing the item.
RequestReplyError is the header indicating a generic failure in handling an item.
RequestReplyTaskHandlerPattern is the subject request reply task handlers should listen on by default.
RequestReplyTaskType is the content type indicating the payload is a Task in JSON format.
RequestReplyTerminateError is the header to send in a reply that the task should be terminated via ErrTerminateTask.
ShortedScheduledDeadline is the shortest deadline a scheduled task may have.
TasksStreamName is the name of the JetStream Stream storing tasks.
TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID.
TasksStreamSubjects is a NATS wildcard matching all tasks.
TaskStateActive tasks that are currently being handled.
TaskStateBlocked tasks that are waiting on dependencies.
TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject.
TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages.
TaskStateChangeEventType is the event type for TaskStateChangeEvent events.
TaskStateCompleted tasks that are completed.
TaskStateExpired tasks that reached their deadline or maximum tries.
TaskStateNew newly created tasks that have not been handled yet.
TaskStateQueueError tasks that could not have their associated Work Queue item created.
TaskStateRetry tasks that previously failed and are waiting retry.
TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error.
TaskStateUnknown is for tasks that do not have a state set.
TaskStateUnreachable tasks that could not be run due to dependency problems.
WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue.
WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name.
WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType.
WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store.

# Variables

ErrContextWithoutDeadline indicates a context.Context was passed without deadline when it was expected.
ErrDuplicateHandlerForTaskType indicates a task handler for a specific type is already registered.
ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message.
ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed.
ErrExternalCommandNotFound indicates a command for an ExternalProcess handler was not found.
ErrInvalidHeaders indicates that message headers from JetStream were not valid.
ErrInvalidQueueState indicates a queue was attempted to be used but no internal state is known of that queue.
ErrInvalidStorageItem indicates a Work Queue item had no JetStream state associated with it.
ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers.
ErrNoMux indicates that a processor was started with no routing mux configured.
ErrNoNatsConn indicates that a nil connection was supplied.
ErrNoTasks indicates the task store is empty.
ErrQueueConsumerNotFound indicates that the Work Queue store has no consumers defined.
ErrQueueItemCorrupt indicates that an item received from the work queue was invalid - perhaps invalid JSON.
ErrQueueItemInvalid is an item read from the queue with no data or obviously bad data.
ErrQueueNameRequired indicates a queue has no name.
ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load.
ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error.
ErrRequestReplyNoDeadline indicates a request-reply handler was called without a deadline.
ErrRequestReplyShortDeadline indicates a deadline context has a too short timeout.
ErrScheduledTaskAlreadyExist indicates a scheduled task that was being created already existed.
ErrScheduledTaskInvalid indicates a loaded task was invalid.
ErrScheduledTaskNotFound indicates the requested task does not exist.
ErrScheduledTaskShortDeadline indicates the time allowed for task execution is too short.
ErrScheduleInvalid indicates an invalid cron schedule was supplied.
ErrScheduleIsRequired indicates a cron schedule must be supplied when creating new schedules.
ErrScheduleNameInvalid indicates the name given to a task is invalid.
ErrScheduleNameIsRequired indicates a schedule name is needed when creating new schedules.
ErrStorageNotReady indicates the underlying storage is not ready.
ErrTaskAlreadyActive indicates that a task is already in the active state.
ErrTaskAlreadyInState indicates an update failed because a task was already in the desired state.
ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed.
ErrTaskExceedsMaxTries indicates a task exceeded its maximum attempts.
ErrTaskLoadFailed indicates a task failed for an unknown reason.
ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load.
ErrTaskPastDeadline indicates a task that was scheduled for handling is past its deadline.
ErrTaskTypeCannotEnqueue indicates that a task is in a state where it cannot be enqueued as new.
ErrTaskTypeInvalid indicates an invalid task type was given.
ErrTaskTypeRequired indicates an empty task type was given.
ErrTaskUpdateFailed indicates a task update failed.
ErrTerminateTask indicates that a task failed, and no further processing attempts should be made.
ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name.
ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered.
ErrUnknownRetryPolicy indicates the requested retry policy does not exist.
RetryDefault is the default retry policy.
RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour.
RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute.
RetryLinearTenMinutes is a 50-step policy between 1 and 10 minutes.
TaskItem is a task as defined by Task.

# Structs

BaseEvent is present in all event types and can be used to detect the type.
Client connects Task producers and Task handlers to the backend.
ClientOpts configures the client.
LeaderElectedEvent notifies that a leader election was won.
Mux routes messages Note: this will change to be nearer to a server mux and include support for middleware.
ProcessItem is an individual item stored in the work queue.
Queue represents a work queue.
QueueInfo holds information about a queue state.
RetryPolicy defines a period that failed jobs will be retried against.
ScheduledTask represents a cron like schedule and task properties that will result in regular new tasks to be created machine schedule.
No description provided by the author
Task represents a job item that handlers will execute.
TaskResult is the result of task execution, this will only be set for successfully processed jobs.
No description provided by the author
TasksInfo is state about the tasks store.
TaskStateChangeEvent notifies that a significant change occurred in a Task.

# Interfaces

Logger is a pluggable logger interface.
RetryPolicyProvider is the interface that the ReplyPolicy implements, use this to implement your own exponential backoff system or similar for task retries.
No description provided by the author
Storage implements the backend access.
StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more.

# Type aliases

ClientOpt configures the client.
HandlerFunc handles a single task, the response bytes will be stored in the original task.
ItemKind indicates the kind of job a work queue entry represents.
TaskOpt configures Tasks made using NewTask().
TaskState indicates the current state a task is in.