Categorygithub.com/luno/workflow
modulepackage
0.2.1
Repository: https://github.com/luno/workflow.git
Documentation: pkg.go.dev

# README

Workflow Logo
Go Reference Mentioned in Awesome Go

Workflow

Workflow is a distributed event driven workflow framework that runs robust, durable, and scalable sequential business logic on your services.

Workflow uses a RoleScheduler to distribute the work across your instances through a role assignment process (similar to a leadership election process, but with more than a single role of leader).

Workflow expects to be run on multiple instances but can also be run on single instances. Using the above-mentioned RoleScheduler, Workflow is able to make sure each process only runs once at any given time regardless if you are running 40 instances of your service or 1 instance.


Features

  • Tech stack agnostic: Use Kafka, Cassandra, Redis, MongoDB, Postgresql, MySQL, RabbitM, or Reflex - the choice is yours!
  • Graph based (Directed Acyclic Graph - DAG): Design the workflow by defining small units of work called "Steps".
  • TDD: Workflow was built using TDD and remains well-supported through a suit of tools.
  • Callbacks: Allow for manual callbacks from webhooks or manual triggers from consoles to progress the workflow, such as approval buttons or third-party webhooks.
  • Event fusion: Add event connectors to your workflow to consume external event streams (even if it's from a different event streaming platform).
  • Hooks: Write hooks that execute on core changes in a workflow Run.
  • Schedule: Allows standard cron spec to schedule workflows
  • Timeouts: Set either a dynamic or static time for a workflow to wait for. Once the timeout finishes everything continues as it was.
  • Parallel consumers: Specify how many step consumers should run or specify the default for all consumers.
  • Consumer management: Consumer management and graceful shutdown of all processes making sure there is no goroutine leaks!

Installation

To start using workflow you will need to add the workflow module to your project. You can do this by running:

go get github.com/luno/workflow

Adapters

Adapters enable Workflow to be tech stack agnostic by placing an interface / protocol between Workflow and the tech stack. Workflow uses adapters to understand how to use that specific tech stack.

For example, the Kafka adapter enables workflow to produce messages to a topic as well as consume them from a topic using a set of predefined methods that wrap the kafka client. Reflex is an event streaming framework that works very differently to Kafka and the adapter pattern allows for the differences to be contained and localised in the adapter and not spill into the main implementation.

Event Streamer

The EventStreamer adapter interface defines what is needed to be satisfied in order for an event streaming platform or framework to be used by Workflow.

All implementations of the EventStreamer interface should be tested using adaptertest.TestEventStreamer

Record Store

The RecordStore adapter interface defines what is needed to satisfied in order for a storage solution to be used by Workflow.

All implementations of the RecordStore interface should be tested using adaptertest.RunRecordStoreTest

Role Scheduler

The RoleScheduler adapter interface defines what is needed to satisfied in order for a role scheduling solution to be used by Workflow.

All implementations of the RoleScheduler interface should be tested using adaptertest.RunRoleSchedulerTest

There are more adapters available but only the above 3 are core requirements to use Workflow. To start, use the in-memory implementations as that is the simplest way to experiment and get used to Workflow. For testing other adapter types be sure to look at adaptertest which are tests written for adapters to ensure that they meet the specification.

Adapters, except for the in-memory implementations, don't come with the core Workflow module such as kafkastreamer, reflexstreamer, sqlstore, sqltimeout, rinkrolescheduler, webui and many more. If you wish to use these you need to add them individually based on your needs or build out your own adapter.

Kafka

go get github.com/luno/workflow/adapters/kafkastreamer

Reflex

go get github.com/luno/workflow/adapters/reflexstreamer

SQL Store

go get github.com/luno/workflow/adapters/sqlstore

SQL Timeout

go get github.com/luno/workflow/adapters/sqltimeout

Rink Role Scheduler

go get github.com/luno/workflow/adapters/rinkrolescheduler

WebUI

go get github.com/luno/workflow/adapters/webui

Connectors

Connectors allow Workflow to consume events from an event streaming platform or framework and either trigger a workflow run or provide a callback to the workflow run. This means that Connectors can act as a way for Workflow to connect with the rest of the system.

Connectors are implemented as adapters as they would share a lot of the same code as implementations of an EventStreamer and can be seen as a subsection of an adapter.

An example can be found here.


Basic Usage

Step 1: Define the workflow

package usage

import (
	"context"

	"github.com/luno/workflow"
)

type Step int

func (s Step) String() string {
	switch s {
	case StepOne:
		return "One"
	case StepTwo:
		return "Two"
	case StepThree:
		return "Three"
	default:
		return "Unknown"
	}
}

const (
	StepUnknown Step = 0
	StepOne Step = 1
	StepTwo Step = 2
	StepThree Step = 3
)

type MyType struct {
	Field string
}

func Workflow() *workflow.Workflow[MyType, Step] {
	b := workflow.NewBuilder[MyType, Step]("my workflow name")

	b.AddStep(StepOne, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
		r.Object.Field = "Hello,"
		return StepTwo, nil
	}, StepTwo)

	b.AddStep(StepTwo, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
		r.Object.Field += " world!"
		return StepThree, nil
	}, StepThree)

	return b.Build(...)
}
---
title: The above defined workflow creates the below Directed Acyclic Graph
---
stateDiagram-v2
	direction LR
    
	[*]-->One
    One-->Two
    Two-->Three
    Three-->[*]

Step 2: Run the workflow

wf := usage.Workflow()

ctx := context.Background()
wf.Run(ctx)

Stop: To stop all processes and wait for them to shut down correctly call

wf.Stop()

Step 3: Trigger the workflow

foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID, StepOne)
if err != nil {
	...
}

Awaiting results: If appropriate and desired you can wait for the workflow to complete. Using context timeout (cancellation) is advised.

foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID, StepOne)
if err != nil {
	...
}

ctx, cancel := context.WithTimeout(ctx, 10 * time.Second)
defer cancel()

record, err := wf.Await(ctx, foreignID, runID, StepThree)
if err != nil {
	...
}

Detailed examples

Head on over to ./_examples to get familiar with callbacks, timeouts, testing, connectors and more about the syntax in depth 😊


What is a workflow Run

When a Workflow is triggered it creates an individual workflow instance called a Run. This is represented as workflow.Run in Workflow. Each run has a lifecycle which is a finite set of states - commonly referred to as Finite State Machine. Each workflow Run has the following of states (called RunState in Workflow):

Run StateValue (int)Description
Unknown0Has no meaning. Protects against default zero value.
Initiated1State assinged at creation of Run and is yet to be processed.
Running2Has begun to be processed and is currently still being processed by a step in the workflow.
Paused3Temporary stoppage that can be resumed or cancelled. Will prevent any new triggers of the same Foreign ID.
Completed4Finished all the steps configured at time of execution.
Cancelled5Did not complete all the steps and was terminated before completion.
Data Deleted6Run Object has been modified to remove data or has been entirely removed. Likely for PII scrubbing reasons.
Requested Data Deleted7Request state for the workflow to apply the default or custom provided delete operation to the Run Object.

A Run can only exist in one state at any given time and the RunState allows for control over the Run.

---
title: Diagram the run states of a workflow
---
stateDiagram-v2
	direction LR
    
    Initiated-->Running
    
    Running-->Completed
    Running-->Paused

    Paused-->Running
    
    Running --> Cancelled
    Paused --> Cancelled
    
    state Finished {
        Completed --> RequestedDataDeleted
        Cancelled --> RequestedDataDeleted
            
        DataDeleted-->RequestedDataDeleted
        RequestedDataDeleted-->DataDeleted
    }

Hooks

Hooks allow for you to write some functionality for Runs that enter a specific RunState. For example when using PauseAfterErrCount the usage of the OnPause hook can be used to send a notification to a team to notify them that a specific Run has errored to the threshold and now has been paused and should be investigated. Another example is handling a known sentinel error in a Workflow Run and cancelling the Run by calling (where r is *Run) r.Cancel(ctx) or if a Workflow Run is manually cancelled from a UI then a notifgication can be sent to the team for visibility.

Hooks run in an event consumer. This means that it will retry until a nil error has been returned and is durable across deploys and interruptions. At-least-once delivery is guaranteed, and it is advised to use the RunID as an idempotency key to ensure that the operation is idempotent.

Available Hooks:

HookParameter(s)Return(s)DescriptionIs Event Driven?
OnPauseworkflow.RunStateChangeHookFuncerrorFired when a Run enters RunStatePausedYes
OnCancelledworkflow.RunStateChangeHookFuncerrorFired when a Run enters RunStateCancelledYes
OnCompletedworkflow.RunStateChangeHookFuncerrorFired when a Run enters RunStateCompletedYes

Configuration Options

This package provides several options to configure the behavior of the workflow process. You can use these options to customize the instance count, polling frequency, error handling, lag settings, and more. Each option is defined as a function that takes a pointer to an options struct and modifies it accordingly. Below is a description of each available option:

ParallelCount

func ParallelCount(instances int) Option
  • Description: Defines the number of instances of the workflow process. These instances are distributed consistently, each named to reflect its position (e.g., "consumer-1-of-5"). This helps in managing parallelism in workflow execution.
  • Parameters:
    • instances: The total number of parallel instances to create.
  • Usage Example:
b.AddStep(
    StepOne,
    ...,
    StepTwo,
).WithOptions(
    workflow.ParallelCount(5)
)

PollingFrequency

func PollingFrequency(d time.Duration) Option
  • Description: Sets the duration at which the workflow process polls for changes. Adjust this to control how frequently the process checks for new events or updates.
  • Parameters:
    • d: The polling frequency as a time.Duration.
  • Usage Example:
b.AddStep(
    StepOne,
    ...,
    StepTwo,
).WithOptions(
    workflow.PollingFrequency(10 * time.Second)
)

ErrBackOff

func ErrBackOff(d time.Duration) Option
  • Description: Defines the duration for which the workflow process will back off after encountering an error. This is useful for managing retries and avoiding rapid repeated failures.
  • Parameters:
    • d: The backoff duration as a time.Duration.
  • Usage Example:
b.AddStep(
    StepOne,
    ...,
    StepTwo,
).WithOptions(
    workflow.ErrBackOff(5 * time.Minute)
)

LagAlert

func LagAlert(d time.Duration) Option
  • Description: Specifies the time threshold before a Prometheus metric switches to true, indicating that the workflow consumer is struggling to keep up. This can signal the need to convert to a parallel consumer.
  • Parameters:
    • d: The duration of the lag alert as a time.Duration.
  • Usage Example:
b.AddStep(
    StepOne,
    ...,
    StepTwo,
).WithOptions(
    workflow.LagAlert(15 * time.Minute),
)

ConsumeLag

func ConsumeLag(d time.Duration) Option
  • Description: Defines the maximum age of events that the consumer will process. Events newer than the specified duration will be held until they are older than the lag period.
  • Parameters:
    • d: The lag duration as a time.Duration.
  • Usage Example:
b.AddStep(
    StepOne,
    ...,
    StepTwo,
).WithOptions(
    workflow.ConsumeLag(10 * time.Minute),
)

PauseAfterErrCount

func PauseAfterErrCount(count int) Option
  • Description: Sets the number of errors allowed before a record is updated to RunStatePaused. This mechanism acts similarly to a Dead Letter Queue, preventing further processing of problematic records and allowing for investigation and retry.
  • Parameters:
    • count: The maximum number of errors before pausing.
  • Usage Example:
b.AddStep(
    StepOne,
    ...,
    StepTwo,
).WithOptions(
    workflow.PauseAfterErrCount(3),
)

Glossary

TermDescription
BuilderA struct type that facilitates the construction of workflows. It provides methods for adding steps, callbacks, timeouts, and connecting workflows.
CallbackA method in the workflow API that can be used to trigger a callback function for a specified status. It passes data from a reader to the specified callback function.
ConsumerA component that consumes events from an event stream. In this context, it refers to the background consumer goroutines launched by the workflow.
EventStreamerAn interface representing a stream for workflow events. It includes methods for producing and consuming events.
GraphA representation of the workflow's structure, showing the relationships between different statuses and transitions.
HooksAn event driven process that take place on a Workflow's Run's lifecycle defined in a finite number of states called RunState.
ProducerA component that produces events to an event stream. It is responsible for sending events to the stream.
RecordIs the "wire format" and representation of a Run that can be stored and retrieved. The RecordStore is used for storing and retrieving records.
RecordStoreAn interface representing a store for Record(s). It defines the methods needed for storing and retrieving records. The RecordStore's underlying technology must support transactions in order to prevent dual-writes.
RoleSchedulerAn interface representing a scheduler for roles in the workflow. It is responsible for coordinating the execution of different roles.
RunA Run is the representation of the instance that is created and processed by the Workflow. Each time Trigger is called a new "Run" is created.
RunStateRunState defines the finite number of states that a Run can be in. This is used to control and monitor the lifecycle of Runs.
TopicA method that generates a topic for producing events in the event streamer based on the workflow name and status.
TriggerA method in the workflow API that initiates a workflow for a specified foreignID and starting status. It returns a Run ID and allows for additional configuration options.

Best practices

  1. Break up complex business logic into small steps.
  2. Workflow can be used to produce new meaningful data and not just be used to execute logic. If it is used for this, it's suggested to implement a CQRS pattern where the workflow acts as the "Command" and the data is persisted into a more queryable manner.
  3. Changes to workflows must be backwards compatible. If you need to introduce a non-backwards compatible change then the non-backwards compatible workflow should be added alongside the existing workflow with the non-backwards compatible workflow receiving all the incoming triggers. The old workflow should be given time to finish processing any workflows it started and once it has finished processing all the existing non-finished Runs then it may be safely removed. Alternatively versioning can be added internally to your Object type that you provide, but this results in changes to the workflow's Directed Acyclic Graph (map of steps connecting together).
  4. Workflow is not intended for low-latency. Asynchronous event driven systems are not meant to be low-latency but prioritise decoupling, durability, distribution of workload, and breakdown of complex logic (to name a few).
  5. Ensure that the prometheus metrics that come with Workflow are being used for monitoring and alerting.

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
ConsumeLag defines the age of the event that the consumer will consume.
CreateDiagram creates a diagram in a md file for communicating a workflow's set of steps in an easy-to-understand manner.
No description provided by the author
No description provided by the author
No description provided by the author
ErrBackOff defines the time duration of the backoff of the workflow process when an error is encountered.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
LagAlert defines the time duration / threshold before the prometheus metric defined in /internal/metrics/metrics.go switches to true which means that the workflow consumer is struggling to consume events fast enough and might need to be converted to a parallel consumer.
No description provided by the author
MakeOutboxEventData creates a OutboxEventData that houses all the information that must be stored and be retrievable from the outbox.
Marshal create a single point of change if the encoding changes.
No description provided by the author
No description provided by the author
NewTestingRun should be used when testing logic that defines a workflow.Run as a parameter.
ParallelCount defines the number of instances of the workflow process.
PauseAfterErrCount defines the number of times an error can occur until the record is updated to RunStatePaused which is similar to a Dead Letter Queue in the sense that the record will no longer be processed and won't block the workflow's consumers and can be investigated and retried later on.
PollingFrequency defines the time duration of which the workflow process will poll for changes.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Unmarshal create a single point of change if the decoding changes.
No description provided by the author
No description provided by the author
No description provided by the author
WithClock allows the configuring of workflow's use and access of time.
No description provided by the author
WithCustomDelete allows for specifying a custom deleter function for scrubbing PII data when a workflow Run enters RunStateRequestedDataDeleted and is the function that once executed successfully allows for the RunState to move to RunStateDataDeleted.
WithDebugMode enabled debug mode for a workflow which results in increased logs such as when processes ar launched, shutdown, events are skipped etc.
WithDefaultOptions applies the provided options to the entire workflow and not just to an individual process.
No description provided by the author
No description provided by the author
WithLogger allows for specifying a custom logger.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
WithTimeoutStore allows the configuration of a TimeoutStore which is required when using timeouts in a workflow.

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
ConnectorEvent defines a schema that is inline with how workflow uses an event notification pattern.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Record is the cornerstone of Workflow.
Run is a representation of a workflow run.
No description provided by the author
TypedRecord differs from Record in that it contains a Typed Object and Typed Status.
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
EventStreamer implementations should all be tested with adaptertest.TestEventStreamer.
Logger interface allows the user of Workflow to provide a custom logger and not use the default which is provided in internal/logger.
No description provided by the author
RecordStore implementations should all be tested with adaptertest.TestRecordStore.
RoleScheduler implementations should all be tested with adaptertest.TestRoleScheduler.
RunStateController allows the interaction with a specific workflow record.
No description provided by the author
No description provided by the author
TimeoutStore implementations should all be tested with adaptertest.TestTimeoutStore.

# Type aliases

Ack is used for the event streamer to safeUpdate its cursor of what messages have been consumed.
No description provided by the author
No description provided by the author
No description provided by the author
ConnectorEventFilter can be passed to the event streaming implementation to allow specific consumers to have an earlier on filtering process.
No description provided by the author
ConsumerFunc provides a record that is expected to be modified if the data needs to change.
No description provided by the author
EventFilter can be passed to the event streaming implementation to allow specific consumers to have an earlier on filtering process.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
RunStateChangeHookFunc defines the function signature for all hooks associated to the run.
No description provided by the author
No description provided by the author
go:generate stringer -type=State.
No description provided by the author
TimeoutFunc runs once the timeout has expired which is set by TimerFunc.
TimerFunc exists to allow the specification of when the timeout should expire dynamically.
No description provided by the author