Categorygithub.com/Bose/go-work
modulepackage
0.0.0-20200820223031-0a9c8610aa19
Repository: https://github.com/bose/go-work.git
Documentation: pkg.go.dev

# README

go-work

go-work is a framework that takes many of the http framework abstractions (requests, status, middleware, etc) and applies them to generic work loads.

Why go-work?

It seems like there's always work to do in non-trivial systems. Some times that work can easily be wrapped into an gRPC or HTTP service, but there are many problems that just don't fix that pattern:

  • deferred background Jobs from an gRPC/HTTP request
  • adding/consuming Jobs from a queue (Redis, Pulsar, S3, etc)
  • adding/consuming Jobs from a database table (just another type of queue)
  • scheduling Jobs in the future

Whenever we tackle these work problems, we need to solve the same issues for every execution of a Job:

  • logging
  • metrics
  • tracing
  • concurrency

If we do have concurrent work, then we also need to manage things like:

  • on-ramping the load
  • max concurrency
  • cancelling Jobs

Using go-work

Jobs

Jobs define the work to be done in a common interface. Think of them like an http.Request with an attached context (Ctx). Jobs have Args, Context, and optional timeouts. A Job.Ctx has key/values which can be used to pass data down the chain of adapters (aka middleware). A Job.Ctx.Status is set by a Handler to represent the state of the Job's execution: Success, Error, NoResponse, etc) Jobs are passed to Handlers by a Worker for each request.

type Job struct {
    // Queue the job should be placed into
    Queue string

    // ctx related to the execution of a job - Perform(job) gets a new ctx everytime
    Ctx *Context

    // Args that will be passed to the Handler as the 2nd parameter when run
    Args Args

    // Handler that will be run by the worker
    Handler string

    // Timeout for every execution of job
    Timeout time.Duration
}

Handlers

Handlers define the func to be executed for a Job by a Worker. Handlers also represent an interface than can be chained together to create middleware (aka Adapters)

type Handler func(j *Job) error

Example that "handles" a publishing Job request. You can imagine how easy it will be to define a new handler that handles publishing the next message from a Redis source. Notice the handler sets the Job's status before returning.

func DefaultPublishNextMessageCRDB(j *work.Job) error {
	box := j.Args["box"].(outbox.EventOutbox)
	publisher := j.Args["publisher"].(pulsar.Producer)
	connFactory := j.Args["outboxConnectionFactory"].(OutboxConnectionFactory)

	l, ok := j.Ctx.Get(work.AggregateLogger)
	if !ok {
		err := fmt.Errorf("PublishNextMessage: no logger")
		logrus.Error(err)
		j.Ctx.SetStatus(work.StatusInternalError)
		return err
	}
	log := l.(*logrus.Logger)
	logger := log.WithFields(logrus.Fields{})

	db, err := connFactory(logger)
	if err != nil {
		j.Ctx.SetStatus(work.StatusInternalError)
		logger.Error(err)
		return err
	}
	logger.Infof("PublishNextMessage: checking new messages...")
	toPub, err := box.NextMessage(db, logger)
	if err != nil {
		logger.Infof("PublishNextMessage: no message: %s", err.Error())
		j.Ctx.SetStatus(work.StatusNoResponse)
		return nil
	}

	event := createEvent(toPub)
	if err := pubsub.PublishEvent(context.Background(), publisher, event); err != nil {
		j.Ctx.SetStatus(work.StatusInternalError)
		err := fmt.Errorf("PublishNextMessage: pubsub.PublishEvent error == %s", err.Error())
		logger.Error(err)
		return err
	}
	logger.Infof("PublishNextMessage: published to topic %s", publisher.Topic())

	if err := box.MarkAsPublished(db, toPub, logger, outbox.WithPublishedToTopics(publisher.Topic())); err != nil {
		j.Ctx.SetStatus(work.StatusInternalError)
		err := fmt.Errorf("PublishNextMessage: box.MarkAsPublished error == %s", err.Error())
		logger.Error(err)
		return err
	}
	j.Ctx.SetStatus(work.StatusSuccess)
	return nil
}

Concurrent Job

Concurrent jobs define a job to be performed by workers concurrently.

type ConcurrentJob struct {

    // PerformWith if the job will be performed with PerformWithEveryWithSync as a reoccuring job
    // or just once as PerformWithWithSync
    PerformWith PerformWith

    // PerformEvery defines the duration between executions of PerformWithEveryWithSync jobs
    PerformEvery time.Duration

    // MaxWorkers for the concurrent Job
    MaxWorkers int64

    // Job to run concurrently
    Job Job
    // contains filtered or unexported fields
}

ConcurrentJob API

func (j *ConcurrentJob) Start() error
func (j *ConcurrentJob) Stop()
func (j *ConcurrentJob) Register(name string, h Handler) error
func (*ConcurrentJob) RunningWorkers
func NewConcurrentJob(
    job Job,
    workerFactory WorkerFactory,
    performWith PerformWith,
    performEvery time.Duration,
    maxWorker int64,
    startInterval time.Duration,
    logger Logger,
) (ConcurrentJob, error)

Be sure to call ConcurrentJob.Stop() or your program will leak resources.

defer w.Stop()

WorkerFactory API

type WorkerFactory func(context.Context, Logger) Worker
func NewCommonWorkerFactory(ctx context.Context, l Logger) Worker

Job.Ctx Status

Handlers set the status for every Job request (execution) using Job.Ctx.SetStatus() This allows middleware to take action based on the Job's status for things like metrics and logging.

const (
    // StatusUnknown was a job with an unknown status
    StatusUnknown Status = -1

    // StatusSucces was a successful job
    StatusSuccess = 200

    // StatusBadRequest was a job with a bad request
    StatusBadRequest = 400

    // StatusForbidden was a forbidden job
    StatusForbidden = 403

    // StatusUnauthorized was an unauthorized job
    StatusUnauthorized = 401

    // StatusTimeout was a job that timed out
    StatusTimeout = 408

    // StatusNoResponse was a job that intentionally created no response (basically the conditions were met for a noop by the Job)
    StatusNoResponse = 444

    // StatusInternalError was a job with an internal error
    StatusInternalError = 500

    // StatusUnavailable was a job that was unavailable
    StatusUnavailable = 503
)

Worker

A Worker implements an interface that defines how a Job will be executed.

  • now (sync and async)
  • at a time in the future (only async)
  • after waiting a specific time (only async)
  • every occurence of a specified time interval (sync and async)

Be sure to call Worker.Stop() or your program could leak resources.

defer w.Stop()

Official implementations:

  • CommonWorker. CommonWorkers is backed by the standard lib and goroutines.
type Worker interface {
	// Start the worker
	Start(context.Context) error
	// Stop the worker
	Stop() error
	// PerformEvery a job every interval (loop)
	// if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time
	// the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval
	PerformEvery(*Job, time.Duration, ...Option) error
	// Perform a job as soon as possibly,  If WithSync(true) Option then it's a blocking call, the default is false (so async)
	Perform(*Job, ...Option) error
	// PerformAt performs a job at a particular time and always async
	PerformAt(*Job, time.Time) error
	// PerformIn performs a job after waiting for a specified amount of time and always async
	PerformIn(*Job, time.Duration) error
	// PeformReceive peforms a job for every value received from channel
	PerformReceive(*Job, interface{}, ...Option) error
	// Register a Handler
	Register(string, Handler) error
	// GetContext returns the worker context
	GetContext() context.Context
	// SetContext sets the worker context
	SetContext(context.Context)
}

Adapter (Middleware)

Adapter defines a common func interface so middleware can be chained together for a Job. Currently, there is adapter middleware for things like: metrics, tracing, logging, and healthchecks. More adapters will be added as well.

type Adapter func(Handler) Handler

Example opentracing Adapater

// WithOpenTracing is an adpater middleware that adds opentracing
func WithOpenTracing(operationPrefix []byte) Adapter {
	if operationPrefix == nil {
		operationPrefix = []byte("api-request-")
	}
	return func(h Handler) Handler {
		return Handler(func(job *Job) error {
			// all before request is handled
			var span opentracing.Span
			if cspan := job.Ctx.Value("tracing-context"); cspan != nil {
				span = StartSpanWithParent(cspan.(opentracing.Span).Context(), string(operationPrefix)+job.Handler)
			} else {
				span = StartSpan(string(operationPrefix) + job.Handler)
			}
			defer span.Finish()                  // after all the other defers are completed.. finish the span
			job.Ctx.Set("tracing-context", span) // add the span to the context so it can be used for the duration of the request.
			defer span.SetTag("status-code", job.Ctx.Status())
			err := h(job)
			return err
		})
	}

Acknowledgements

I need to acknowledge that many of the ideas implemented in this library are not my own. I've been inspired and shamelessly borrowed from the following individuals/projects:


Complete API reference: docs.md

# Functions

Adapt a handler with provided middlware adapters.
GetOpts - iterate the inbound Options and return a struct.
InitTracing will init opentracing with options WithSampleProbability defaults: constant sampling.
NewCommonWorker creates a new CommonWorker.
NewCommonWorkerFactory is a simple adapter that allows the factory to conform to the WorkerFactory type.
NewCommonWorkerWithContext creates a new CommonWorker.
NewConcurrentJob makes a new job.
NewContext factory.
NewHealthCheck creates a new HealthCheck with the options provided.
NewCounter returns a counter metric that increments the value with each incoming number.
NewMetricStatusGauge is a factory for statusGauge Metrics.
New will initialize a new Prometheus instance with the given options.
NewSession factory.
Run the function safely knowing that if it panics, the panic will be caught and returned as an error.
StartSpan will start a new span with no parent span.
StartSpanWithParent will start a new span with a parent span.
WithAggregateLogger is a middleware adapter for aggregated logging (see go-gin-logrus).
WithBanner specifies the table name to use for an outbox.
WithChannel optional channel parameter.
WithEngine is an option allowing to set the gin engine when intializing with New.
WithErrorPercentage allows you to override the default of 1.0 (100%) with the % you want for error rate.
No description provided by the author
WithHealthCheck is an adpater middleware for healthcheck.
WithHealthHandler override the default health endpoint handler.
WithHealthPath override the default path for the health endpoint.
No description provided by the author
WithIgnore is used to disable instrumentation on some routes.
WithJob optional Job parameter.
WithLogLevel will set the logrus log level for the job handler.
WithMetricsPath is an option allowing to set the metrics path when intializing with New.
WithNamespace is an option allowing to set the namespace when intitializing with New.
WithOpenTracing is an adpater middleware that adds opentracing.
Instrument is a gin middleware that can be used to generate metrics for a single handler.
WithReducedLoggingFunc specifies the function used to set custom logic around when to print logs.
WithSampleProbability - optional sample probability.
WithSilentNoResponse specifies that StatusNoResponse requests should be silent (no logging).
WithSilentSuccess specifies that StatusSuccess requests should be silent (no logging).
WithSubsystem is an option allowing to set the subsystem when intitializing with New.
WithSync optional synchronous execution.
WrapChanel takes a concrete receiving chan in as an interface{}, and wraps it with an interface{} chan so you can treat all receiving channels the same way.

# Constants

AggregateLogger defines the const string for getting the logger from a Job context.
DefaultHealthTickerDuration is the time duration between the recalculation of the status returned by HealthCheck.GetStatus().
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
RequestSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the request approximate size in bytes WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt.
ResponseSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the response approximate size in bytes WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt.
StatusBadRequest was a job with a bad request.
StatusForbidden was a forbidden job.
StatusInternalError was a job with an internal error.
StatusNoResponse was a job that intentionally created no response (basically the conditions were met for a noop by the Job).
StatusSuccess was a successful job.
StatusTimeout was a job that timed out.
StatusUnauthorized was an unauthorized job.
StatusUnavailable was a job that was unavailable.
StatusUnknown was a job with an unknown status.

# Variables

ContextTraceIDField - used to find the trace id in the context - optional.
No description provided by the author

# Structs

CommonWorker defines the typical common worker.
ConcurrentJob represents a job to be run concurrently.
Context for the Job and is reset for every execution.
HealthCheck provides a healthcheck endpoint for the work.
Job to be processed by a Worker.
Prometheus contains the metrics gathered by the instance and its path.
Session that is used to pass session info to a Job this is a good spot to put things like *redis.Pool or *sqlx.DB for outbox connection pools.

# Interfaces

Logger is used by worker to write logs.
Metric is a single meter (a counter for now, but in the future: gauge or histogram, optionally - with history).
No description provided by the author

# Type aliases

Adapter defines the adaptor middleware type.
Args is how parameters are passed to jobs.
Handler is executed a a Work for a given Job.
Option - how Options are passed as arguments.
Options = how options are represented.
No description provided by the author
ReducedLoggingFunc defines a function type used for custom logic on when to print logs.
Status for a job's execution (Perform).
No description provided by the author