Categorygithub.com/mfateev/cadence-client
modulepackage
0.2.0-beta
Repository: https://github.com/mfateev/cadence-client.git
Documentation: pkg.go.dev

# README

Go framework for Cadence

Cadence is a distributed, scalable, durable, and highly available orchestration engine we developed at Uber Engineering to execute asynchronous long-running business logic in a scalable and resilient way.

cadence-client is the framework for authoring workflows and activites.

How to use

Make sure you clone this repo into the correct location.

git clone [email protected]:uber/cadence.git $GOPATH/src/go.uber.org/cadence

or

go get go.uber.org/cadence

Activity

Activity is the implementation of a particular task in the business logic.

Activities are implemented as functions. Data can be passed directly to an activity via function parameters. The parameters can be either basic types or structs, with the only requirement being that the parameters need to be serializable. Even though it is not required, we recommand that the first parameter of an activity function is of type context.Context, in order to allow the activity to interact with other framework methods. The function must return an error value, and can optionally return a result value. The result value can be either a basic type or a struct with the only requirement being that the it is serializable.

The values passed to activities through invocation parameters or returned throughthe result value is recorded in the execution history. The entire execution history is transfered from the Cadence service to workflow workers with every event that the workflow logic needs to process. A large execution history can thus adversily impact the performance of your workflow. Therefore be mindful of the amount of data you transfer via activity invocation parameters or return values. Other than that no additional limitations exist on activity implementations.

In order to make the activity visible to the worker process hosting it, the activity needs to be registered via a call to cadence.RegisterActivity.

package simple

import (
	"context"

	"go.uber.org/cadence"
	"go.uber.org/zap"
)

func init() {
	cadence.RegisterActivity(SimpleActivity)
}

// SimpleActivity is a sample Cadence activity function that takes one parameter and
// returns a string containing the parameter value.
func SimpleActivity(ctx context.Context, value string) (string, error) {
	cadence.GetActivityLogger(ctx).Info("SimpleActivity called.", zap.String("Value", value))
	return "Processed: " + value, nil
}

Workflow

Workflow is the implementation of coordination logic. Its sole purpose is to orchestrate activity executions.

Workflows are implemented as functions. Startup data can be passed to a workflow via function parameters. The parameters can be either basic types or structs, with the only requirement being that the parameters need to be serializable. The first parameter of a workflow function is of type cadence.Context. The function must return an error value, and can optional return a result value. The result value can be either a basic type or a struct with the only requirement being that the it is serializable.

Workflow functions need to execute deterministically. Therefore, here is a list of rules that workflow code should obey to be a good Cadence citizen:

  • Use cadence.Context everywhere.
  • Don’t use range over map.
  • Use cadence.SideEffect to call rand and similar nondeterministic functions like UUID generator.
  • Use cadence.Now to get current time. Use cadence.NewTimer or cadence.Sleep instead of standard Go functions.
  • Don’t use native channel and select. Use cadence.Channel and cadence.Selector.
  • Don’t use go func(...). Use cadence.Go(func(...)).
  • Don’t use non constant global variables as multiple instances of a workflow function can be executing in parallel.
  • Don’t use any blocking functions besides belonging to Channel, Selector or Future
  • Don’t use any synchronization primitives as they can cause blockage and there is no possibility of races when running under dispatcher.
  • Don’t change workflow code when there are open workflows using it. Cadence is going to provide versioning mechanism to deal with deploying code changes without breaking existing workflows.
  • Don’t perform any IO or service calls as they are not usually deterministic. Use activities for that.
  • Don’t access configuration APIs directly from workflow as change in configuration affects workflow execution path. Either return configuration from an activity or use cadence.SideEffect to load it.

In order to make the workflow visible to the worker process hosting it, the workflow needs to be registered via a call to cadence.RegisterWorkflow.

package simple

import (
	"time"

	"go.uber.org/cadence"
	"go.uber.org/zap"
)

func init() {
	cadence.RegisterWorkflow(SimpleWorkflow)
}

// SimpleWorkflow is a sample Cadence workflow that accepts one parameter and
// executes an activity to which it passes the aforementioned parameter.
func SimpleWorkflow(ctx cadence.Context, value string) error {
	options := cadence.ActivityOptions{
		ScheduleToStartTimeout: time.Second * 60,
		StartToCloseTimeout:    time.Second * 60,
	}
	ctx = cadence.WithActivityOptions(ctx, options)

	var result string
	err := cadence.ExecuteActivity(ctx, activity.SimpleActivity, value).Get(ctx, &result)
	if err != nil {
		return err
	}
	cadence.GetLogger(ctx).Info(
		"SimpleActivity returned successfully!", zap.String("Result", result))

	cadence.GetLogger(ctx).Info("SimpleWorkflow completed!")
	return nil
}

Worker

A worker or “worker service” is a services hosting the workflow and activity implementations. The worker polls the “Cadence service” for tasks, performs those tasks and communicates task execution results back to the “Cadence service”. Worker services are developed, deployed and operated by Cadence customers.

You can run a Cadence worker in a new or an exiting service. Use the framework APIs to start the Cadence worker and link in all activity and workflow implementations that you require this service to execute.

package main

import (
	"github.com/uber/tchannel-go"
	"github.com/uber/tchannel-go/thrift"

	"github.com/uber-go/tally"

	"go.uber.org/cadence"
	t "go.uber.org/cadence/.gen/go/cadence"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

var HostPort = "127.0.0.1:7933"
var Domain = "SimpleDomain"
var ClientName = "SimpleWorker"
var CadenceService = "CadenceServiceFrontend"

func main() {
	startWorker(
		buildLogger(),
		buildCadenceClient())
}

func buildLogger() *zap.Logger {
	config := zap.NewDevelopmentConfig()
	config.Level.SetLevel(zapcore.InfoLevel)

	var err error
	logger, err := config.Build()
	if err != nil {
		panic("Failed to setup logger")
	}

	return logger
}

func buildCadenceClient() t.TChanWorkflowService {
	tchan, err := tchannel.NewChannel(ClientName, nil)
	if err != nil {
		panic("Failed to setup channel")
	}

	opts := &thrift.ClientOptions{
		HostPort: HostPort,
	}
	return t.NewTChanWorkflowServiceClient(
		thrift.NewClient(tchan, CadenceService, opts))
}

func startWorker(logger *zap.Logger, client t.TChanWorkflowService) {
	workerOptions := cadence.WorkerOptions{
		Logger:       logger,
		MetricsScope: tally.NewTestScope(ClientName, map[string]string{}),
	}

	worker := cadence.NewWorker(
		client,
		Domain,
		ClientName,
		workerOptions)
	err := worker.Start()
	if err != nil {
		panic("Failed to start worker")
	}

	logger.Info("Started Worker.", zap.String("worker", ClientName))
}

Contributing

We'd love your help in making Cadence-client great. Please review our instructions.

License

MIT License, please see LICENSE for details.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

AddActivityRegistrationInterceptor adds interceptor that is called for each RegisterActivity call.
AddWorkflowRegistrationInterceptor adds interceptor that is called for each RegisterWorkflow call.
DeserializeFnResults de-serializes a function results.
EnableVerboseLogging enable or disable verbose logging.
ExecuteActivity requests activity execution in the context of a workflow.
ExecuteChildWorkflow requests child workflow execution in the context of a workflow.
GetActivityInfo returns information about currently executing activity.
GetActivityLogger returns a logger that can be used in activity.
GetLogger returns a logger to be used in workflow's context.
GetSignalChannel returns channel corresponding to the signal name.
GetWorkflowInfo extracts info of a current workflow from a context.
Go creates a new coroutine.
GoNamed creates a new coroutine with a given human readable name.
NewActivityTaskHandler creates an instance of a WorkflowTaskHandler from a decision poll response using activity functions registered through RegisterActivity.
NewActivityTaskWorker returns instance of an activity task handler worker.
NewBufferedChannel create new buffered Channel instance.
NewCanceledError creates CanceledError instance.
NewChannel create new Channel instance.
NewClient creates an instance of a workflow client.
NewContinueAsNewError creates ContinueAsNewError instance If the workflow main function returns this error then the current execution is ended and the new execution with same workflow ID is started automatically with options provided to this function.
NewDomainClient creates an instance of a domain client, to manager lifecycle of domains.
NewErrorWithDetails creates ErrorWithDetails instance Create standard error through errors.New or fmt.Errorf() if no details are provided.
NewFuture creates a new future as well as associated Settable that is used to set its value.
NewHeartbeatTimeoutError creates TimeoutError instance WARNING: This function is public only to support unit testing of workflows.
NewNamedBufferedChannel create new BufferedChannel instance with a given human readable name.
NewNamedChannel create new Channel instance with a given human readable name.
NewNamedSelector creates a new Selector instance with a given human readable name.
NewSelector creates a new Selector instance.
NewTimeoutError creates TimeoutError instance.
NewTimer returns immediately and the future becomes ready after the specified timeout.
NewWorker creates an instance of worker for managing workflow and activity executions.
NewWorkflowTaskHandler creates an instance of a WorkflowTaskHandler from a decision poll response using workflow functions registered through RegisterWorkflow To be used to replay a workflow in a debugger.
NewWorkflowTaskWorker returns an instance of a workflow task handler worker.
Now returns the current time when the decision is started or replayed.
RecordActivityHeartbeat sends heartbeat for the currently executing activity If the activity is either cancelled (or) workflow/activity doesn't exist then we would cancel the context with error context.Canceled.
RegisterActivity - register a activity function with the framework.
RegisterWorkflow - registers a workflow function with the framework.
RequestCancelWorkflow can be used to request cancellation of an external workflow.
SerializeFnArgs serializes an activity function arguments.
SideEffect executes provided function once, records its result into the workflow history and doesn't reexecute it on replay returning recorded result instead.
Sleep pauses the current goroutine for at least the duration d.
WithActivityOptions adds all options to the context.
WithActivityTask adds activity specific information into context.
WithCancel returns a copy of parent with a new Done channel.
WithChildPolicy adds a ChildWorkflowPolicy to the context.
WithChildWorkflowOptions adds all workflow options to the context.
WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context.
WithHeartbeatTimeout adds a timeout to the context.
WithScheduleToCloseTimeout adds a timeout to the context.
WithScheduleToStartTimeout adds a timeout to the context.
WithStartToCloseTimeout adds a timeout to the context.
WithTaskList adds a task list to the context.
WithTestTags - is used for internal cadence use to pass any test tags.
WithValue returns a copy of parent in which the value associated with key is val.
WithWaitForCancellation adds wait for the cacellation to the context.
WithWorkflowDomain adds a domain to the context.
WithWorkflowID adds a workflowID to the context.
WithWorkflowTaskList adds a task list to the context.
WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context.

# Constants

ChildWorkflowPolicyAbandon is policy that will have no impact to child workflow execution when parent workflow is terminated.
ChildWorkflowPolicyRequestCancel is policy that will send cancel request to all open child workflows when parent workflow is terminated.
ChildWorkflowPolicyTerminate is policy that will terminate all child workflows when parent workflow is terminated.
LibraryVersion is a semver string that represents the version of this cadence client library it will be embedded as a "version" header in every rpc call made by this client to cadence server.

# Variables

ErrActivityResultPending is returned from activity's Execute method to indicate the activity is not completed when Execute method returns.
ErrCanceled is the error returned by Context.Err when the context is canceled.
ErrDeadlineExceeded is the error returned by Context.Err when the context's deadline passes.

# Structs

No description provided by the author
ActivityOptions stores all activity-specific parameters that will be stored inside of a context.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
WorkflowInfo information about currently executing workflow.
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Context is a clone of context.Context with Done() returning Channel instead of native channel.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
ServiceInvoker abstracts calls to the Cadence service from an activity implementation.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

A CancelFunc tells an operation to abandon its work.
No description provided by the author
No description provided by the author
No description provided by the author