Categorygithub.com/episub/queue
modulepackage
1.4.1
Repository: https://github.com/episub/queue.git
Documentation: pkg.go.dev

# README

esync is a library to help keep some services synchronised with other services.

Should be safe for multiple processors to run simultaneously. A driver's pop() function should not return the same value at the next call.

Usage

The following is a basic example of how to use this library.

package main

import (
	"log"
	"os"
	"time"

	"github.com/episub/spawn/queue"
)

// Use a scheduled action to run tasks at intervals
type myScheduledAction struct{}

func (m myScheduledAction) Do() error {
	log.Printf("myScheduledAction done")
	return nil
}

func (m myScheduledAction) Stream() string {
	return "my-scheduled-action"
}

// Use a queue action to handle tasks in a queue
type myQueueAction struct{}

func (m myQueueAction) Do(task queue.Task) (queue.TaskResult, queue.TaskMessage) {
	log.Printf("myQueueAction done.  Task key: %s", task.Key)

	return queue.TaskResultSuccess, queue.TaskMessage("No Problem")
}

func queueErrorManager(err error) {
	log.Printf("Error from sync manager: %s", err)
}

func main() {
	dbHost := os.Getenv("DB_HOST")
	dbUser := os.Getenv("DB_USER")
	dbPass := os.Getenv("DB_PASS")
	dbName := os.Getenv("DB_DB")
	dbTable := os.Getenv("DB_TABLE")

	postgresDriver, err := queue.NewPostgresDriver(dbUser, dbPass, dbHost, dbName, dbTable)

	if err != nil {
		log.Fatal(err)
	}

	sm := queue.NewSyncManager(postgresDriver)

	// Optionally set an error handler so that you can catch errors from the running loop and put them through your own logging solution
	sm.SetErrorHandler(queueErrorManager)

	// Schedule a regular action to perform at specified intervals
	sm.Schedule(myScheduledAction{}, time.Second*3)

	// Register our queue handler for tasks with name "exampleTask"
	sm.RegisterTaskHandler(myQueueAction{}, "exampleTask")
	data := map[string]interface{}{
		"testData": true,
	}

	tm := queue.NewTaskManager(postgresDriver)

	// Add one task to the queue
	err = tm.AddTask("exampleTask", "myKey", data)
	if err != nil {
		panic(err)
	}

	// Off we go
	sm.Run()
}

It may be possible in future versions for the same action to be used simultaneously, so be careful with pointer functions that may end up sharing values across goroutines. Avoid pointer functions where possible.

Details

You need to create an actionManager object, providing it a database driver object that meets the 'Driver' interface. Included drivers:

  • PostgreSQL (PostgresDriver)

You can use this library for either creating a service to run the synchronising actions, or for creating entries in a queue to be acted on by the synchronisation service. At the very least you need a SyncManager.

Concepts

  • Data: an action can store data in the queue
  • Task Key: this should uniquely identify a particular action. Think of it as the primary key, though it may not be the actual primary key, depending on driver implementation. If there is more than one READY entry for the same task key, only the most recent will be performed.
  • Task Name: this identifies the type of task. Action managers may handle particular task types. For example, you may have a task name such as "CUSTOMER_UPDATE", with multiple database entries of that sort. Try to keep one action per task name.
  • Stream: some tasks can be run simultaneously, while others may need to block. Put them in the same stream if they should block each other, and separate streams if safe to run concurrently.

Actions

Actions are descriptions of an act to perform. You provide them with a function that will run when the action is to be performed. You will then need to schedule the action to occur, either at specific intervals, or as the action to be performed by a particular queue item.

Actions should be designed to be safe to be used by multiple processes. Therefore, avoid pointers.

Actions should gracefully return if they take too long, as they will block the main loop.

Register action

Actions need to be registered for each task name. If there is no registered action for a task name, then the particular task is cancelled when its turn comes.

Queue

One will wish to create actions in the queue to be performed in good time. Not every action needs to form part of a queue, but it is helpful to be able to queue actions to be performed in time. To use the queue, you need a driver that provides a connection to the queue. The driver needs to fulfil the 'Driver' interface.

SyncManager

Running

In some cases, another service may not handle multiple connections well -- for example, NetSuite. In these cases you should ensure that you are only running one instance of this service.

Driver

When designing a driver, you need to be careful that you don't implement a 'pop' that will ignore newer tasks. Suppose that a task to update a customer is added, actioned, but before the action is finished a new update customer task is added. You then return the action and mark it as finished. This task should be performed again, so you need to be careful that the "mark as finished" task does not override the newer update task.

PostgreSQL

CREATE TABLE public.message_queue(
	message_queue_id uuid NOT NULL DEFAULT gen_random_uuid(),
	data jsonb NOT NULL DEFAULT '{}',
	task_key varchar(64) NOT NULL,
	task_name varchar(64) NOT NULL,
	created_at timestamptz DEFAULT Now(),
  created_by varchar(64) NOT NULL,
	last_attempted timestamptz NOT NULL DEFAULT Now(),
	state varchar(16) NOT NULL,
	last_attempt_message varchar NOT NULL,
  do_after timestamptz NOT NULL DEFAULT Now(),
	CONSTRAINT message_queue_id_pk PRIMARY KEY (message_queue_id)
);

CREATE TABLE public.cdc_hash(
	cdc_hash_id uuid NOT NULL DEFAULT gen_random_uuid(),
	cdc_controller_id uuid NOT NULL,
	object_id varchar NOT NULL,
	hash uuid,
	created_at timestamptz NOT NULL DEFAULT Now(),
	updated_at timestamptz NOT NULL DEFAULT Now(),
	CONSTRAINT cdc_hash_pk PRIMARY KEY (cdc_hash_id),
	CONSTRAINT cdc_hash_controller_object_uq UNIQUE (cdc_controller_id,object_id)
);

CREATE INDEX idx_cdc_hash_id ON public.cdc_hash (cdc_controller_id, object_id);
CREATE INDEX idx_cdc_hash_id_hash ON public.cdc_hash (cdc_controller_id, object_id, hash);

)

TODO:

  • Implement a timeout so that if some action blocks, then we can perform other actions
  • Per-task retry intervals so that some tasks can be tried every minute, while others wait every hour. Solution: separate queue runners per task type. They can then check for and pop tasks that are specific to their task name. When a task is registered, an interval is passed in the register call, and a new queue is spun up for that task.

# Functions

NewCDCRunnerAction Creates and initialises a new Execute SQL Processor.
NewEmailSendgridAction Returns an email action for esync.
NewPostgresDriver Returns a new postgres driver, initialised.
NewSyncClient Returns a properly configured sync client.
NewSyncManager Returns a new and ready sync manager.
NewTaskManager Returns a task manager.

# Constants

CDCActionCreate A new record should be created.
CDCActionDelete An existing record should be removed.
CDCActionUpdate An existing record should be updated.

# Variables

ErrNoTasks Returned when there are no tasks available in the queue.
TaskCancelled Task has been cancelled and will not be actioned.
TaskDone Task is completed/finished/done.
TaskFailed Task has failed and will not be actioned.
TaskInProgress Task is currently being processed.
TaskReady Task is ready to be actioned.
TaskResultPermanentFailure Task failed with an error and is not to be retried.
TaskResultRetryFailure Task resulted in an error, but can be retried later.
TaskResultSuccess Task was a success.
TaskRetry Task has failed and needs to be retried at a later time.

# Structs

Attachment For email, data should be base64 encoded.
CDCObjectAction Object ID along with the action that should be taken.
CDCRunnerAction Runs at regular intervals.
No description provided by the author
EmailPayload Data for sending email stored in and pulled from queue.
EmailSendgridAction Handler for emails in message queue.
PostgresDriver PostgreSQL Driver.
SyncClient Client for interacting with the queue.
SyncManager is the central process for running actions.
Task A task to be performed.
No description provided by the author
TaskManager Used for clients that want to work with the queue.

# Interfaces

Driver Manages the connection to the background queue to keep track of tasks.
ScheduledAction A scheduled action to be run.
TaskAction An action to perform given some task in the task queue.

# Type aliases

CDCAction Action that should be taken based on row changes.
TaskMessage Arbitrary message associated with action.
TaskResult Result code for action.
TaskState Allowable states for a task.