Categorygithub.com/ankorstore/yokai/worker
modulepackage
1.2.0
Repository: https://github.com/ankorstore/yokai.git
Documentation: pkg.go.dev

# README

Worker Module

ci go report codecov Deps PkgGoDev

Worker module based on sync.

Installation

go get github.com/ankorstore/yokai/worker

Documentation

This module provides a WorkerPool, that:

The WorkerPool can be configured to:

  • defer all workers start with a threshold in seconds: 0 by default (start immediately)
  • attempt a maximum amount of runs in case of failures: 1 by default (no restarts)

The Worker executions:

  • have a unique identifier
  • have automatic panic recovery
  • are automatically logged
  • are automatically generating metrics

Workers

This module provides a Worker interface to implement to provide your own workers, for example:

package workers

import (
	"context"

	"github.com/ankorstore/yokai/worker"
)

// classic worker
type ClassicWorker struct{}

func NewClassicWorker() *ClassicWorker {
	return &ClassicWorker{}
}

func (w *ClassicWorker) Name() string {
	return "classic-worker"
}

func (w *ClassicWorker) Run(ctx context.Context) error {
	worker.CtxLogger(ctx).Info().Msg("run")

	return nil
}

// cancellable worker
type CancellableWorker struct{}

func NewCancellableWorker() *CancellableWorker {
	return &CancellableWorker{}
}

func (w *CancellableWorker) Name() string {
	return "cancellable-worker"
}

func (w *CancellableWorker) Run(ctx context.Context) error {
	logger := worker.CtxLogger(ctx)

	for {
		select {
		// when the WorkerPool stops, the ctx cancellation is forwarded to the workers
		case <-ctx.Done():
			logger.Info().Msg("cancel")

			return w.cancel()
		default:
			logger.Info().Msg("run")

			return w.run(ctx)
		}
	}
}

func (w *CancellableWorker) run(ctx context.Context) error {
	// your worker logic
}

func (w *CancellableWorker) cancel() error {
	// your worker cancel logic, for example graceful shutdown
}

Notes:

  • to perform more complex tasks, you can inject dependencies to your workers implementation (ex: database, cache, etc.)
  • it is recommended to design your workers with a single responsibility

WorkerPool

You can create a WorkerPool instance with the DefaultWorkerPoolFactory, register your Worker implementations, and start them:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"path/to/workers"
)

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithGlobalDeferredStartThreshold(1),                                             // will defer all workers start of 1 second
		worker.WithGlobalMaxExecutionsAttempts(2),                                              // will run 2 times max failing workers
		worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)),    // registers the ClassicWorker, with a deferred start of 3 second
		worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max
	)

	// start the pool
	pool.Start(context.Background())

	// get all workers execution reports, in real time
	executions := pool.Executions()

	// stop the pool (will forward context cancellation to each worker)
	pool.Stop()

	// get a specific worker execution report, after pool stop
	execution, _ := pool.Execution("cancellable-worker")
}

Logging

You can use the CtxLogger() function to retrieve the contextual log.Logger from your workers, and emit correlated logs.

The workers executions are logged, with the following fields added automatically to each log records:

  • worker: worker name
  • workerExecutionID: worker execution id
package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
)

type LoggingWorker struct{}

func NewLoggingWorker() *LoggingWorker {
	return &LoggingWorker{}
}

func (w *LoggingWorker) Name() string {
	return "logging-worker"
}

func (w *LoggingWorker) Run(ctx context.Context) error {
	// log the current worker name and execution id
	worker.CtxLogger(ctx).Info().Msgf(
		"execution %s for worker %s",
		worker.CtxWorkerName(ctx),        // contextual worker name
		worker.CtxWorkerExecutionId(ctx), // contextual worker execution id
	)

	return nil
}

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithWorker(NewLoggingWorker()), // registers the LoggingWorker
	)

	// start the pool
	pool.Start(context.Background())
}

Tracing

You can use the CtxTracer() function to retrieve the contextual tracer from your workers, and emit correlated spans: they will have the Worker and WorkerExecutionID attributes added with respectively the worker name and execution id.

This module provides the AnnotateTracerProvider function, to extend a TracerProvider to add automatically current worker information id to the spans emitted during a worker execution:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"go.opentelemetry.io/otel/trace"
)

// tracing worker
type TracingWorker struct{}

func NewTracingWorker() *TracingWorker {
	return &TracingWorker{}
}

func (w *TracingWorker) Name() string {
	return "tracing-worker"
}

func (w *TracingWorker) Run(ctx context.Context) error {
	// emit some trace span
	_, span := worker.CtxTracer(ctx).Start(ctx, "some span")
	defer span.End()

	return nil
}

func main() {
	// tracer provider
	tp := trace.GetTracerProvider()

	// annotate the tracer provider
	worker.AnnotateTracerProvider(tp)

	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithWorker(NewTracingWorker()),
	)

	// start the pool
	pool.Start(context.Background())
}

Metrics

The WorkerPool automatically generate metrics about:

  • started workers
  • re started workers
  • workers stopped with success
  • workers stopped with error

To enable those metrics in a registry, simply call Register on the WorkerMetrics of the WorkerPool:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"github.com/prometheus/client_golang/prometheus"
)

func main() {
	// metrics registry
	registry := prometheus.NewRegistry()

	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create()

	// register the pool metrics
	pool.Metrics().Register(registry)

	// start the pool
	pool.Start(context.Background())
}

Healthcheck

This module provides an WorkerProbe, compatible with the healthcheck module:

package main

import (
	"context"

	yokaihc "github.com/ankorstore/yokai/healthcheck"
	"github.com/ankorstore/yokai/worker"
	"github.com/ankorstore/yokai/worker/healthcheck"
)

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create()

	// create the checker with the worker probe
	checker, _ := yokaihc.NewDefaultCheckerFactory().Create(
		yokaihc.WithProbe(healthcheck.NewWorkerProbe(pool)),
	)

	// start the pool
	pool.Start(context.Background())

	// run the checker
	res, _ := checker.Check(context.Background(), yokaihc.Readiness)
}

This probe is successful if all the executions statuses of the WorkerPool are healthy.

# Packages

No description provided by the author

# Functions

AnnotateTracerProvider extends a provided [oteltrace.TracerProvider] spans with worker execution attributes.
CtxLogger returns the contextual [log.Logger].
CtxTracer returns the contextual [oteltrace.Tracer].
CtxWorkerExecutionId returns the contextual [Worker] execution id.
CtxWorkerName returns the contextual [Worker] name.
DefaultWorkerExecutionOptions are the default options for the [Worker] executions.
DefaultWorkerPoolOptions are the default options used in the [DefaultWorkerPoolFactory].
NewDefaultWorkerPoolFactory returns a [DefaultWorkerPoolFactory], implementing [WorkerPoolFactory].
NewTracerProviderWorkerAnnotator returns a new [TracerProviderWorkerAnnotator].
NewWorkerExecution returns a new [WorkerExecution].
NewWorkerExecutionEvent returns a new [WorkerExecutionEvent].
NewWorkerMetrics returns a new [WorkerMetrics], and accepts metrics namespace and subsystem.
NewWorkerPool returns a new [WorkerPool], with optional [WorkerPoolOption].
NewWorkerRegistration returns a new [WorkerRegistration] for a given [Worker] and an optional list of [WorkerRegistration].
Sanitize transforms a given string to not contain spaces or dashes, and to be in lower case.
WithDeferredStartThreshold is used to specify the worker deferred start threshold, in seconds.
WithGenerator is used to specify the [uuid.UuidGenerator] to use by the [WorkerPool].
WithGlobalDeferredStartThreshold is used to specify the global workers deferred start threshold, in seconds.
WithGlobalMaxExecutionsAttempts is used to specify the global workers max execution attempts.
WithMaxExecutionsAttempts is used to specify the worker max execution attempts.
WithMetrics is used to specify the [WorkerMetrics] to use by the [WorkerPool].
WithWorker is used to register a [Worker] in the [WorkerPool], with an optional list of [WorkerPoolOption].

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
TracerName is the workers tracer name.
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

CtxWorkerExecutionIdKey is a contextual struct key for the current worker execution id.
CtxWorkerNameKey is a contextual struct key for the current worker name.
DefaultWorkerPoolFactory is the default [WorkerPoolFactory] implementation.
ExecutionOptions are options for the [Worker] executions.
PoolOptions are options for the [WorkerPoolFactory] implementations.
TracerProviderWorkerAnnotator is the [oteltrace.TracerProvider] workers annotator, implementing [otelsdktrace.SpanProcessor].
WorkerExecution represents a [Worker] execution within the [WorkerPool].
WorkerExecutionEvent is an event happening during a [Worker] execution.
WorkerMetrics allows the [WorkerPool] to send worker metrics to a [prometheus.Registry].
WorkerPool is the [Worker] pool.
WorkerRegistration is a [Worker] registration, with optional [WorkerExecutionOption].

# Interfaces

Worker is the interface to implement to provide workers.
WorkerPoolFactory is the interface for [WorkerPool] factories.

# Type aliases

WorkerExecutionOption are functional options for the [Worker] executions.
WorkerPoolOption are functional options for the [WorkerPoolFactory] implementations.
WorkerStatus is an enum for the possible statuses of a workers.