Categorygithub.com/dumbmachine/nq
modulepackage
0.3.0
Repository: https://github.com/dumbmachine/nq.git
Documentation: pkg.go.dev

# README

Reliable, Efficient and Cancellable Distributed Task Queue in Go

Go Report Card License: MIT GoDoc

NQ ( Nats Queue ) is Go package for queuing and processing jobs in background with workers. Based on nats with a focus on cancel-ability of enqueued jobs.

NQ requires nats-server version that supports both jetstream support and key-value store

How does it work?:

Task Queue Figure Task Queue Figure
This package was designed such that a task should always be cancellable by client. Workers can be configured to cancel and quit instantly upon network partision ( eg. disconnect from nats-server ).

Features

Task Options Walkthrough

Watch for updates

( Introduced in v0.3 )

Listen for updates to task metadata

func main() {
	client := nq.NewPublishClient(nq.NatsClientOpt{
		Addr: "nats://127.0.0.1:4222",
	}, nq.NoAuthentcation(),
	)

	defer client.Close()

	bytesPayload1, err := json.Marshal(UrlPayload{Url: "https://httpstat.us/200?sleep=10000"})
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	task1 := nq.NewTask(QueueDev, bytesPayload1)
	if ack, err := client.Enqueue(task1); err == nil {
		log.Printf("Watching updates queue=%s taskID=%s payload=%s", ack.Queue, ack.ID, ack.Payload)
		wg.Add(1)
		updates, err := client.GetUpdates(ack.ID)
		if err != nil {
			panic(err)
		}
		// listening for updates
		go func() {
			defer wg.Done()

			for {
				msg, ok := <-updates
				if !ok {
					// channel closed
					return
				}
				log.Printf("Change detected, status=%s", msg.GetStatus())
			}
		}()
	} else {
		log.Printf("err=%s", err)
	}
	wg.Wait()
}

2022/08/29 22:17:15 Watching updates queue=scrap-url-dev taskID=yzaKwBIcbGEt8sMGgMJcZ0 payload={"url":"https://httpstat.us/200?sleep=10000"}
2022/08/29 22:17:15 Change detected, status=pending
2022/08/29 22:17:16 Change detected, status=processing
2022/08/29 22:17:28 Change detected, status=completed

Retrying

By default task is submitted for retry, if it returns non-nil error.

// a task that will be retried 2 before being marked as `failed`
taskWithRetry := nq.NewTask("my-queue", bytesPayload, nq.Retry(2))

Custom filtering function for error, to mark task as failed only on specific error. Here if a task fails due to ErrFailedDueToInvalidApiKeys, it will be consider as failure and will be retried

var ErrFailedDueToInvalidApiKeys = errors.New("failed to perform task, invalid api keys")

srv := nq.NewServer(nq.NatsClientOpt{Addr: nats.DefaultURL}, nq.Config{
	IsFailureFn: func(err error) bool {
		return errors.Is(err, ErrFailedDueToInvalidApiKeys)
	},
	ServerName:  nq.GenerateServerName(),
})

Deadline / Timeout for tasks

// a task that executes till time.Now() + 1 hour
taskWithDeadline := nq.NewTask("my-queue", bytesPayload, nq.Deadline(time.Now().Add(time.Hour)), nq.TaskID("deadlineTaskID"))

// a task that executes for 10 minutes
taskWithTimeout := nq.NewTask("my-queue", bytesPayload, nq.Timeout(time.Minute * 10), nq.TaskID("timeoutTaskID"))

Task cancellations

Tasks that are either waiting for execution or being executed on any worker, can be cancelled. Cancellation of a task requires it's taskID.

// Cancel a task by ID
taskSignature := nq.NewTask("my-queue", []byte())
ack, err := client.Enqueue(taskSignature);
client.Cancel(ack.ID)

A Task can handle cancel like so:

func longRunningOperation(ctx context.Context, task *nq.TaskPayload) error {
	if ctx.Err() != nil {
		return ctx.Err()
	}
	for i := 0; i < 1000; i++ {
		timeout := time.Millisecond * 20
		println("sleeping for: ",timeout)
		time.Sleep(timeout)
		if ctx.Err() != nil {
			return ctx.Err()
		}
	}
	return nil
}

NOTE: Successful cancellation depends on task function respecting context.Done().

Automatic Failover

ShutdownOnNatsDisconnect option will shutdown workers and server is connection to nats-server is broken. Useful when tasks being cancellable at all times is required.

Note: When disconnect is observed, workers would stop processing new messages. The workers would be cancelled in shutdownTimeout duration. If any tasks is/are not completed after this, they will be cancelled and still be available in task queue for future / other workers to process.

Auto-shutdown of worker server if at any time server is incapable of respecting a cancel request. Eg. losing connection to nats-server

srv := nq.NewServer(nq.NatsClientOpt{
	Addr: "nats://127.0.0.1:4222",
}, nq.Config{
	ServerName:  nq.GenerateServerName(),
	Concurrency: 2,
	LogLevel:    nq.InfoLevel,
}, nq.ShutdownOnNatsDisconnect(),
)
$ go run examples/simple.go sub
nq: pid=24914 2022/08/21 15:43:45.650999 INFO: Registered queue=scrap-url-dev
nq: pid=24914 2022/08/21 15:43:45.652720 INFO: Started Server@DumbmachinePro-local/24914
nq: pid=24914 2022/08/21 15:43:45.652739 INFO: [*] Listening for messages
nq: pid=24914 2022/08/21 15:43:45.652742 INFO: cmd/ctrl + c to terminate the process
nq: pid=24914 2022/08/21 15:43:45.652744 INFO: cmd/ctrl + z to stop processing new tasks
nq: pid=24914 2022/08/21 15:43:48.363110 ERROR: Disconnected from nats
nq: pid=24914 2022/08/21 15:43:48.363173 INFO: Starting graceful shutdown
nq: pid=24914 2022/08/21 15:43:53.363535 INFO: Waiting for all workers to finish...
nq: pid=24914 2022/08/21 15:43:53.363550 INFO: All workers have finished
nq: pid=24914 2022/08/21 15:43:53.363570 INFO: Exiting

Reconnection

Server can configured to not shutdown and instead try to reconnect to nats, when disconnected.

srv := nq.NewServer(nq.NatsClientOpt{
		Addr:          "nats://127.0.0.1:4222",
		ReconnectWait: time.Second * 5, // controls timeout between reconnects
		MaxReconnects: 100, // controls total number of reconnects before giving up
	}, nq.Config{ServerName:  "local-serv-1"})

If nats-server is up again:

  1. With previous state ( i.e with expected queue data )

    nq: pid=7988 2022/08/22 17:24:44.349815 INFO: Registered queue=scrap-url-dev
    nq: pid=7988 2022/08/22 17:24:44.356378 INFO: Registered queue=another-one
    nq: pid=7988 2022/08/22 17:24:44.356393 INFO: Started Server@DumbmachinePro-local/7988
    nq: pid=7988 2022/08/22 17:24:44.356444 INFO: [*] Listening for messages
    nq: pid=7988 2022/08/22 17:24:44.356455 INFO: cmd/ctrl + c to terminate the process
    nq: pid=7988 2022/08/22 17:24:44.356459 INFO: cmd/ctrl + z to stop processing new tasks
    disconnected from nats
    2022/08/22 22:55:02 reconnection found nats://127.0.0.1:4222
    nq: pid=7988 2022/08/22 17:25:02.860051 INFO: Re-registering subscriptions to nats-server
    nq: pid=7988 2022/08/22 17:25:02.864988 INFO: Registration successful[nats://127.0.0.1:4222]
    disconnected from nats
    
  2. Without previous state If registered queues are not found in nats-server, they will be created

    nq: pid=7998 2022/08/22 17:26:44.349815 INFO: Registered queue=scrap-url-dev
    nq: pid=7998 2022/08/22 17:26:44.356378 INFO: Registered queue=another-one
    nq: pid=7998 2022/08/22 17:26:44.356393 INFO: Started Server@DumbmachinePro-local/7998
    nq: pid=7998 2022/08/22 17:26:44.356444 INFO: [*] Listening for messages
    nq: pid=7998 2022/08/22 17:26:44.356455 INFO: cmd/ctrl + c to terminate the process
    nq: pid=7998 2022/08/22 17:26:44.356459 INFO: cmd/ctrl + z to stop processing new tasks
    disconnected from nats
    2022/08/22 22:57:25 reconnection found nats://127.0.0.1:4222
    nq: pid=7998 2022/08/22 17:27:25.518079 INFO: Re-registering subscriptions to nats-server
    nq: pid=7998 2022/08/22 17:27:25.524895 WARN: stream=scrap-url-dev re-registering
    nq: pid=7998 2022/08/22 17:27:25.542725 INFO: Registered queue=scrap-url-dev
    nq: pid=7998 2022/08/22 17:27:25.543668 WARN: stream=another-one re-registering
    nq: pid=7998 2022/08/22 17:27:25.554961 INFO: Registered queue=another-one
    nq: pid=7998 2022/08/22 17:27:25.555002 INFO: Registration successful[nats://127.0.0.1:4222]
    

Monitoring and Alerting

Refer nats monitoring section and monitoring tool by nats-io

CLI Usage

Install CLI

go install github.com/dumbmachine/nq/tools/nq@latest
  • Cancel task
$ nq -u nats://127.0.0.1:4222 task cancel --id customID
Cancel message sent task=customID
  • Status of task
$ nq -u nats://127.0.0.1:4222 task status --id customID
taskID=customID status=Cancelled
  • Queue stats
$ nq -u nats://127.0.0.1:4222 queue stats --name scrap-url-dev
queue: scrap-url-dev | MessagesPending: 11 | Size: 3025 Bytes

Quickstart

Install NQ library

go get -u github.com/dumbmachine/nq

Make sure you have nats-server running locally or in a container. Example:

docker run --rm -p 4222:4222 --name nats-server -ti nats:latest -js

Now create a client to publish jobs.

// Creating publish client
package main

import (
	"encoding/json"
	"log"

	"github.com/dumbmachine/nq"
)

type Payload struct {
	Url string `json:"url"`
}

func main() {
	client := nq.NewPublishClient(nq.NatsClientOpt{
		Addr: "nats://127.0.0.1:4222",
	}, nq.NoAuthentcation(),
	// see godoc for more options
	)
	defer client.Close()

	bPayload, err := json.Marshal(Payload{Url: "https://httpstat.us/200?sleep=10000"})
	if err != nil {
		log.Println(err)
	}

	taskSig := nq.NewTask("scrap-url-dev", bPayload)
	if ack, err := client.Enqueue(taskSig); err == nil {
		log.Printf("Submitted queue=%s taskID=%s payload=%s", ack.Queue, ack.ID, ack.Payload)
	} else {
		log.Printf("err=%s", err)
	}
}


// creating worker server
package main

import (
	"context"
	"encoding/json"
	"errors"
	"net/http"
	"time"

	"github.com/dumbmachine/nq"
)

type Payload struct {
	Url string `json:"url"`
}

// Processing function
func fetchHTML(ctx context.Context, task *nq.TaskPayload) error {
	var payload Payload
	if err := json.Unmarshal(task.Payload, &payload); err != nil {
		return errors.New("invalid payload")
	}
	client := &http.Client{}
	req, _ := http.NewRequest("GET", payload.Url, nil)
	req = req.WithContext(ctx)
	if _, err := client.Do(req); err != nil {
		return err
	}
	return nil
}

func main() {

	srv := nq.NewServer(nq.NatsClientOpt{
		Addr:          "nats://127.0.0.1:4222",
		ReconnectWait: time.Second * 2,
		MaxReconnects: 100,
	}, nq.Config{
		ServerName:  nq.GenerateServerName(),
		Concurrency: 1,
		LogLevel:    nq.InfoLevel,
	},
	)

	srv.Register("scrap-url-dev", fetchHTML)

	if err := srv.Run(); err != nil {
		panic(err)
	}
}

Note: New messages are fetched from queue in sequencial order of their registration. NQ does not implement any custom priority order for registered queue yet.

To learn more about nq APIs, see godoc

Acknowledgements

Async : Many of the design ideas are taken from async

# Packages

No description provided by the author

# Functions

No description provided by the author
Deadline returns an option to specify the deadline for the given task.
No description provided by the author
No description provided by the author
Generates a server name, combination of hostname and process id .
NewCancelations returns a Cancelations instance.
No description provided by the author
TODO: Allow users to specify `forceReRegister` as a boolean NewNatsBroker returns a new instance of NatsBroker.
NewPublishClient returns a new Client instance, given nats connection options, to interact with nq tasks .
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
NewTask returns a new Task given queue and byte payload TaskOption can be used to configure task processing.
Connect to nats-server without any authentication Default.
Returns an options to specify maximum number of times a task will be retried before being marked as failed.
Shutdown server and workers if connection with nats-server is broken.
streamNameToCancelStreamName returns the name of stream responsible for cancellation of tasks in given stream.
Returns a durable name for stream Helps re-establishing connection to nats-server while maintaining sequence state.
TaskID returns an option to specify the task ID.
Timeout returns an option to specify how long a task can run before being cancelled.
Connect to nats-server using token authentication Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/tokens.
Connect to nats-server using username:password pair Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/username_passwordß.

# Constants

cancelled by user.
successfully processed.
No description provided by the author
DebugLevel is the lowest level of logging.
deleted before being run.
ErrorLevel is used for undesired and unexpected events that the program can recover from.
taskFN returns an error.
FatalLevel is used for undesired and unexpected events that the program cannot recover from.
InfoLevel is used for gener al informational log messages.
No description provided by the author
No description provided by the author
waiting for task to be received by worker.
task is being processed by a worker.
General options.
No description provided by the author
QueueOpt.
No description provided by the author
Authentication types TODO: error on using multiple of belwo.
WarnLevel is used for undesired but relatively expected events, which may indicate a problem.

# Variables

trying to cancel an already cancelled task.
No description provided by the author
Happens when malformed data is sent to task-stream.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
Internal representation of options for nats-server connection.
Server config.
No description provided by the author
No description provided by the author
No description provided by the author
NatsClientOpt represent NATS connection configuration option.
Client responsible for interaction with nq tasks Client is used to enqueue / cancel tasks or fetch metadata for tasks.
No description provided by the author
No description provided by the author
Internal `Queue`s represent an abstraction over a nats stream -> subject.
No description provided by the author
Responsible for task lifecycle management and processing.
Task is a representation work to be performed by a worker.
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author
LogLevel represents a log level.
Signature for function executed by a worker.
No description provided by the author