# README
Fx Worker Module
Installation
go get github.com/ankorstore/yokai/fxworker
Features
This module provides a workers pool to your Fx application with:
- automatic panic recovery
- automatic logging
- automatic metrics
- possibility to defer workers
- possibility to limit workers max execution attempts
Documentation
Dependencies
This module is intended to be used alongside:
- the fxconfig module
- the fxlog module
- the fxtrace module
- the fxmetrics module
- the fxgenerate module
Loading
To load the module in your Fx application:
package main
import (
"context"
"github.com/ankorstore/yokai/fxconfig"
"github.com/ankorstore/yokai/fxgenerate"
"github.com/ankorstore/yokai/fxlog"
"github.com/ankorstore/yokai/fxmetrics"
"github.com/ankorstore/yokai/fxtrace"
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/worker"
"go.uber.org/fx"
)
func main() {
fx.New(
fxconfig.FxConfigModule, // load the module dependencies
fxlog.FxLogModule,
fxtrace.FxTraceModule,
fxtrace.FxTraceModule,
fxmetrics.FxMetricsModule,
fxgenerate.FxGenerateModule,
fxworker.FxWorkerModule, // load the module
fx.Invoke(func(pool *worker.WorkerPool) {
pool.Start(context.Background()) // start the workers pool
}),
).Run()
}
Configuration
Configuration reference:
# ./configs/config.yaml
app:
name: app
env: dev
version: 0.1.0
debug: true
modules:
worker:
defer: 0.1 # threshold in seconds to wait before starting all workers, immediate start by default
attempts: 3 # max execution attempts in case of failures for all workers, no restart by default
metrics:
collect:
enabled: true # to collect metrics about workers executions
namespace: foo # workers metrics namespace (empty by default)
subsystem: bar # workers metrics subsystem (empty by default)
Notes:
- the workers logging will be based on the fxlog module configuration
- the workers tracing will be based on the fxtrace module configuration
Registration
This module provides the possibility to register several Worker implementations, with optional WorkerExecutionOption.
They will be then collected and given by Fx to the WorkerPool, made available in the Fx container.
This is done via the AsWorker()
function:
package main
import (
"context"
"github.com/ankorstore/yokai/fxconfig"
"github.com/ankorstore/yokai/fxgenerate"
"github.com/ankorstore/yokai/fxlog"
"github.com/ankorstore/yokai/fxmetrics"
"github.com/ankorstore/yokai/fxtrace"
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/worker"
"go.uber.org/fx"
)
type ExampleWorker struct{}
func NewExampleWorker() *ExampleWorker {
return &ExampleWorker{}
}
func (w *ExampleWorker) Name() string {
return "example-worker"
}
func (w *ExampleWorker) Run(ctx context.Context) error {
worker.CtxLogger(ctx).Info().Msg("run")
return nil
}
func main() {
fx.New(
fxconfig.FxConfigModule, // load the module dependencies
fxlog.FxLogModule,
fxtrace.FxTraceModule,
fxtrace.FxTraceModule,
fxmetrics.FxMetricsModule,
fxgenerate.FxGenerateModule,
fxworker.FxWorkerModule, // load the module
fx.Provide(
fxworker.AsWorker(
NewExampleWorker, // register the ExampleWorker
worker.WithDeferredStartThreshold(1), // with a deferred start threshold of 1 second
worker.WithMaxExecutionsAttempts(2), // and 2 max execution attempts
),
),
fx.Invoke(func(pool *worker.WorkerPool) {
pool.Start(context.Background()) // start the workers pool
}),
).Run()
}
To get more details about the features made available for your workers (contextual logging, tracing, etc.), check the worker module documentation.
Override
By default, the worker.WorkerPool
is created by
the DefaultWorkerPoolFactory.
If needed, you can provide your own factory and override the module:
package main
import (
"context"
"github.com/ankorstore/yokai/fxconfig"
"github.com/ankorstore/yokai/fxgenerate"
"github.com/ankorstore/yokai/fxhealthcheck"
"github.com/ankorstore/yokai/fxlog"
"github.com/ankorstore/yokai/fxmetrics"
"github.com/ankorstore/yokai/fxtrace"
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/healthcheck"
"github.com/ankorstore/yokai/worker"
"go.uber.org/fx"
)
type CustomWorkerPoolFactory struct{}
func NewCustomWorkerPoolFactory() worker.WorkerPoolFactory {
return &CustomWorkerPoolFactory{}
}
func (f *CustomWorkerPoolFactory) Create(options ...worker.WorkerPoolOption) (*worker.WorkerPool, error) {
return &worker.WorkerPool{...}, nil
}
func main() {
fx.New(
fxconfig.FxConfigModule, // load the module dependencies
fxlog.FxLogModule,
fxtrace.FxTraceModule,
fxtrace.FxTraceModule,
fxmetrics.FxMetricsModule,
fxgenerate.FxGenerateModule,
fxworker.FxWorkerModule, // load the module
fx.Decorate(NewCustomWorkerPoolFactory), // override the module with a custom factory
fx.Invoke(func(pool *worker.WorkerPool) {
pool.Start(context.Background()) // start the custom worker pool
}),
).Run()
}
# Functions
AsWorker registers a [worker.Worker] into Fx, with an optional list of [worker.WorkerExecutionOption].
GetReturnType returns the return type of a target.
GetType returns the type of a target.
NewFxWorkerModuleInfo returns a new [FxWorkerModuleInfo].
NewFxWorkerPool returns a new [worker.WorkerPool].
NewFxWorkerRegistry returns as new [WorkerRegistry].
NewWorkerDefinition returns a new [WorkerDefinition].
# Constants
ModuleName is the module name.
# Variables
FxWorkerModule is the [Fx] worker module.
# Structs
FxWorkerModuleInfo is a module info collector for fxcore.
FxWorkerPoolParam allows injection of the required dependencies in [NewFxWorkerPool].
FxWorkerRegistryParam allows injection of the required dependencies in [NewFxWorkerRegistry].
WorkerRegistry is the registry collecting workers and their definitions.
# Interfaces
WorkerDefinition is the interface for workers definitions.