Categorygithub.com/Azure/go-asyncjob
modulepackage
0.10.0
Repository: https://github.com/azure/go-asyncjob.git
Documentation: pkg.go.dev

# README

AsyncJob

AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a sequential chain.

Concepts

JobDefinition is a graph describe code blocks and their connections.

  • you can use AddStep, StepAfter, StepAfterBoth to organize steps in a JobDefinition.
  • jobDefinition can be and should be build and seal in package init time.
  • jobDefinition have a generic typed input
  • calling Start with the input, will instantiate an jobInstance, and steps will began to execute.
  • jobDefinition can be visualized using graphviz, easier for human to understand.

JobInstance is an instance of JobDefinition, after calling .Start() method from JobDefinition

  • all Steps on the definition will be copied to JobInstance.
  • each step will be executed once it's precedent step is done.
  • jobInstance can be visualized as well, instance visualize contains detailed info(startTime, duration) on each step.

StepDefinition is a individual code block which can be executed and have inputs, output.

  • StepDefinition describe it's preceding steps.
  • StepDefinition contains generic Params
  • ideally all stepMethod should come from JobInput (generic type on JobDefinition), or static method. To avoid shared state between jobs.
  • output of a step can be feed into next step as input, type is checked by go generics.

StepInstance is instance of StepDefinition

  • step is wrapped in AsyncTask
  • a step would be started once all it's dependency is finished.
  • executionPolicy can be applied {Retry, ContextEnrichment}

Usage

Build and run a asyncjob


// SqlSummaryAsyncJobDefinition is the job definition for the SqlSummaryJobLib
//   JobDefinition fit perfectly in init() function
var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]

func init() {
	var err error
	SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(map[string]asyncjob.RetryPolicy{})
	if err != nil {
		panic(err)
	}

	SqlSummaryAsyncJobDefinition.Seal()
}

func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) {
	job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob")

	connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetConnection: %w", err)
	}

	checkAuthTask, err := asyncjob.AddStep(job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step CheckAuth: %w", err)
	}

	table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetTableClient1: %w", err)
	}

	qery1ResultTsk, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step QueryTable1: %w", err)
	}

	table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetTableClient2: %w", err)
	}

	qery2ResultTsk, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step QueryTable2: %w", err)
	}

	summaryTsk, err := asyncjob.StepAfterBoth(job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithRetry(retryPolicies["Summarize"]), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step Summarize: %w", err)
	}

	_, err = asyncjob.AddStep(job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step EmailNotification: %w", err)
	}
	return job, nil
}
	// execute job
	jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
	jobInstance2 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})

    // ...

	jobInstance1.Wait(context.WithTimeout(context.Background(), 10*time.Second))
	jobInstance2.Wait(context.WithTimeout(context.Background(), 10*time.Second))

visualize of a job

	# visualize the job
	dotGraph := job.Visualize()
	fmt.Println(dotGraph)

visualize job graph

digraph {
	newrank = "true"
		"QueryTable2" [label="QueryTable2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254054-08:00\nDuration: 13.207µs" fillcolor=green] 
		"QueryTable1" [label="QueryTable1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254098-08:00\nDuration: 11.394µs" fillcolor=green] 
		"EmailNotification" [label="EmailNotification" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254143-08:00\nDuration: 11.757µs" fillcolor=green] 
		"sqlSummaryJob" [label="sqlSummaryJob" shape=triangle style=filled tooltip="State: completed\nStartAt: 0001-01-01T00:00:00Z\nDuration: 0s" fillcolor=green] 
		"GetConnection" [label="GetConnection" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253844-08:00\nDuration: 154.825µs" fillcolor=green] 
		"GetTableClient2" [label="GetTableClient2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254017-08:00\nDuration: 25.793µs" fillcolor=green] 
		"GetTableClient1" [label="GetTableClient1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254076-08:00\nDuration: 12.459µs" fillcolor=green] 
		"Summarize" [label="Summarize" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254121-08:00\nDuration: 7.88µs" fillcolor=green] 
		"CheckAuth" [label="CheckAuth" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253818-08:00\nDuration: 18.52µs" fillcolor=green] 

		"CheckAuth" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green] 
		"CheckAuth" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green] 
		"GetTableClient2" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green] 
		"GetTableClient1" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green] 
		"QueryTable1" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green] 
		"QueryTable2" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green] 
		"Summarize" -> "EmailNotification" [style=bold tooltip="Time: 2022-12-12T12:00:32.254143-08:00" color=green] 
		"sqlSummaryJob" -> "CheckAuth" [style=bold tooltip="Time: 2022-12-12T12:00:32.253818-08:00" color=green] 
		"sqlSummaryJob" -> "GetConnection" [style=bold tooltip="Time: 2022-12-12T12:00:32.253844-08:00" color=green] 
		"GetConnection" -> "GetTableClient2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254017-08:00" color=green] 
		"GetConnection" -> "GetTableClient1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254076-08:00" color=green] 
}

collect result from job

you can enrich job to aware result from given step, then you can collect result (strongly typed) from that step

var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]
SqlSummaryAsyncJobDefinition = asyncjob.JobWithResult(job /*from previous section*/, summaryTsk)

jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
result, err := jobInstance1.Result(ctx)

Overhead?

  • go routine will be created for each step in your jobDefinition, when you call .Start()
  • each step also hold tiny memory as well for state tracking.
  • userFunction is instrumented with state tracking, panic handling.

Here is some simple visualize on how it actual looks like:

gantt
    title       asyncjob.Start()
    dateFormat  HH:mm

    section GetConnection
    WaitPrecedingTasks            :des11, 00:00,0ms
    userFunction                  :des12, after des11, 20ms

    section GetTableClient1
    WaitPrecedingTasks            :des21, 00:00,20ms
    userFunction                  :des22, after des21, 15ms

    section GetTableClient2
    WaitPrecedingTasks            :des31, 00:00,20ms
    userFunction                  :des32, after des31, 21ms

    section QueryTable1
    WaitPrecedingTasks            :des41, 00:00,35ms
    userFunction                  :des42, after des41, 24ms

    section QueryTable2
    WaitPrecedingTasks            :des51, 00:00,41ms
    userFunction                  :des52, after des51, 30ms

    section QueryResultSummarize
    WaitPrecedingTasks            :des61, 00:00, 71ms
    userFunction                  :des62, after des61, 10ms

# Functions

AddStep adds a step to the job definition.
AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver.
Add precedence to a step.
No description provided by the author
Create new JobDefinition it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
StepAfter add a step after a preceding step, also take input from that preceding step.
StepAfterBoth add a step after both preceding steps, also take input from both preceding steps.
StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver.
StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver.
No description provided by the author
No description provided by the author
Allow retry of a step on error.
No description provided by the author

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
No description provided by the author
No description provided by the author
No description provided by the author
JobInstance is the instance of a jobDefinition.
No description provided by the author
No description provided by the author
RetryReport would record the retry count (could extend to include each retry duration, ...).
StepDefinition defines a step and it's dependencies in a job definition.
No description provided by the author
StepExecutionData would measure the step execution time and retry report.
No description provided by the author
StepInstance is the instance of a step, within a job instance.

# Interfaces

Interface for a job definition.
No description provided by the author
No description provided by the author
StepDefinitionMeta is the interface for a step definition.
StepInstanceMeta is the interface for a step instance.

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author
StepContextPolicy allows context enrichment before passing to step.
No description provided by the author