# README
AsyncJob
AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a sequential chain.
Concepts
JobDefinition is a graph describe code blocks and their connections.
- you can use AddStep, StepAfter, StepAfterBoth to organize steps in a JobDefinition.
- jobDefinition can be and should be build and seal in package init time.
- jobDefinition have a generic typed input
- calling Start with the input, will instantiate an jobInstance, and steps will began to execute.
- jobDefinition can be visualized using graphviz, easier for human to understand.
JobInstance is an instance of JobDefinition, after calling .Start() method from JobDefinition
- all Steps on the definition will be copied to JobInstance.
- each step will be executed once it's precedent step is done.
- jobInstance can be visualized as well, instance visualize contains detailed info(startTime, duration) on each step.
StepDefinition is a individual code block which can be executed and have inputs, output.
- StepDefinition describe it's preceding steps.
- StepDefinition contains generic Params
- ideally all stepMethod should come from JobInput (generic type on JobDefinition), or static method. To avoid shared state between jobs.
- output of a step can be feed into next step as input, type is checked by go generics.
StepInstance is instance of StepDefinition
- step is wrapped in AsyncTask
- a step would be started once all it's dependency is finished.
- executionPolicy can be applied {Retry, ContextEnrichment}
Usage
Build and run a asyncjob
// SqlSummaryAsyncJobDefinition is the job definition for the SqlSummaryJobLib
// JobDefinition fit perfectly in init() function
var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]
func init() {
var err error
SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(map[string]asyncjob.RetryPolicy{})
if err != nil {
panic(err)
}
SqlSummaryAsyncJobDefinition.Seal()
}
func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) {
job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob")
connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step GetConnection: %w", err)
}
checkAuthTask, err := asyncjob.AddStep(job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step CheckAuth: %w", err)
}
table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step GetTableClient1: %w", err)
}
qery1ResultTsk, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step QueryTable1: %w", err)
}
table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step GetTableClient2: %w", err)
}
qery2ResultTsk, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step QueryTable2: %w", err)
}
summaryTsk, err := asyncjob.StepAfterBoth(job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithRetry(retryPolicies["Summarize"]), asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step Summarize: %w", err)
}
_, err = asyncjob.AddStep(job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext))
if err != nil {
return nil, fmt.Errorf("error adding step EmailNotification: %w", err)
}
return job, nil
}
// execute job
jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
jobInstance2 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
// ...
jobInstance1.Wait(context.WithTimeout(context.Background(), 10*time.Second))
jobInstance2.Wait(context.WithTimeout(context.Background(), 10*time.Second))
visualize of a job
# visualize the job
dotGraph := job.Visualize()
fmt.Println(dotGraph)
digraph {
newrank = "true"
"QueryTable2" [label="QueryTable2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254054-08:00\nDuration: 13.207µs" fillcolor=green]
"QueryTable1" [label="QueryTable1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254098-08:00\nDuration: 11.394µs" fillcolor=green]
"EmailNotification" [label="EmailNotification" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254143-08:00\nDuration: 11.757µs" fillcolor=green]
"sqlSummaryJob" [label="sqlSummaryJob" shape=triangle style=filled tooltip="State: completed\nStartAt: 0001-01-01T00:00:00Z\nDuration: 0s" fillcolor=green]
"GetConnection" [label="GetConnection" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253844-08:00\nDuration: 154.825µs" fillcolor=green]
"GetTableClient2" [label="GetTableClient2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254017-08:00\nDuration: 25.793µs" fillcolor=green]
"GetTableClient1" [label="GetTableClient1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254076-08:00\nDuration: 12.459µs" fillcolor=green]
"Summarize" [label="Summarize" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254121-08:00\nDuration: 7.88µs" fillcolor=green]
"CheckAuth" [label="CheckAuth" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253818-08:00\nDuration: 18.52µs" fillcolor=green]
"CheckAuth" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green]
"CheckAuth" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green]
"GetTableClient2" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green]
"GetTableClient1" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green]
"QueryTable1" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green]
"QueryTable2" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green]
"Summarize" -> "EmailNotification" [style=bold tooltip="Time: 2022-12-12T12:00:32.254143-08:00" color=green]
"sqlSummaryJob" -> "CheckAuth" [style=bold tooltip="Time: 2022-12-12T12:00:32.253818-08:00" color=green]
"sqlSummaryJob" -> "GetConnection" [style=bold tooltip="Time: 2022-12-12T12:00:32.253844-08:00" color=green]
"GetConnection" -> "GetTableClient2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254017-08:00" color=green]
"GetConnection" -> "GetTableClient1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254076-08:00" color=green]
}
collect result from job
you can enrich job to aware result from given step, then you can collect result (strongly typed) from that step
var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]
SqlSummaryAsyncJobDefinition = asyncjob.JobWithResult(job /*from previous section*/, summaryTsk)
jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
result, err := jobInstance1.Result(ctx)
Overhead?
- go routine will be created for each step in your jobDefinition, when you call .Start()
- each step also hold tiny memory as well for state tracking.
- userFunction is instrumented with state tracking, panic handling.
Here is some simple visualize on how it actual looks like:
gantt
title asyncjob.Start()
dateFormat HH:mm
section GetConnection
WaitPrecedingTasks :des11, 00:00,0ms
userFunction :des12, after des11, 20ms
section GetTableClient1
WaitPrecedingTasks :des21, 00:00,20ms
userFunction :des22, after des21, 15ms
section GetTableClient2
WaitPrecedingTasks :des31, 00:00,20ms
userFunction :des32, after des31, 21ms
section QueryTable1
WaitPrecedingTasks :des41, 00:00,35ms
userFunction :des42, after des41, 24ms
section QueryTable2
WaitPrecedingTasks :des51, 00:00,41ms
userFunction :des52, after des51, 30ms
section QueryResultSummarize
WaitPrecedingTasks :des61, 00:00, 71ms
userFunction :des62, after des61, 10ms
# Functions
AddStep adds a step to the job definition.
AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver.
Add precedence to a step.
No description provided by the author
Create new JobDefinition
it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
StepAfter add a step after a preceding step, also take input from that preceding step.
StepAfterBoth add a step after both preceding steps, also take input from both preceding steps.
StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver.
StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver.
No description provided by the author
No description provided by the author
Allow retry of a step on error.
No description provided by the author
# Constants
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Structs
JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
No description provided by the author
No description provided by the author
No description provided by the author
JobInstance is the instance of a jobDefinition.
No description provided by the author
No description provided by the author
RetryReport would record the retry count (could extend to include each retry duration, ...).
StepDefinition defines a step and it's dependencies in a job definition.
No description provided by the author
StepExecutionData would measure the step execution time and retry report.
No description provided by the author
StepInstance is the instance of a step, within a job instance.
# Interfaces
Interface for a job definition.
No description provided by the author
No description provided by the author
StepDefinitionMeta is the interface for a step definition.
StepInstanceMeta is the interface for a step instance.
# Type aliases
No description provided by the author
No description provided by the author
No description provided by the author
StepContextPolicy allows context enrichment before passing to step.
No description provided by the author