Categorygithub.com/caffix/pipeline
modulepackage
0.2.3
Repository: https://github.com/caffix/pipeline.git
Documentation: pkg.go.dev

# README

Data Pipeline

GitHub Test Status GoDoc License Go Report CodeFactor Codecov Follow on Twitter

Simple asynchronous data pipeline written in Go with support for concurrent tasks at each stage.

Installation Go Version

go get -v -u github.com/caffix/pipeline

Usage

The pipeline processes data provided by the input source through multiple stages and finally consumed by the output sink. All steps of the pipeline can be executing concurrently to maximize throughput. The pipeline can also be executed with buffering in-between each step in an attempt to minimize the impact of one stage taking longer than the others. Any error returned from a task being executed will terminate the pipeline. If a task returns nil data, the data is marked as processed and will not continue to the following stage.

The Pipeline Data

The pipeline Data implements the Clone and MarkAsProcessed methods that performs a deep copy and marks the data to prevent further movement down the pipeline, respectively. Below is a simple pipeline Data implementation:

type stringData struct {
	processed bool
	val       string
}

// Clone implements the pipeline Data interface.
func (s *stringData) Clone() pipeline.Data { return &stringData{val: s.val} }

// Clone implements the pipeline Data interface.
func (s *stringData) MarkAsProcessed() { s.processed = true }

// String implements the Stringer interface.
func (s *stringData) String() string   { return s.val }

The Input Source

The InputSource is an iterator that feeds the pipeline with data. Once the Next method returns false, the pipeline prevents the following stage from receiving data and begins an avalanche affect stopping each stage and eventually terminating the pipeline. Below is a simple input source:

type stringSource []pipeline.Data

var source stringSource = []*stringData{
    &stringData{val: "one"},
    &stringData{val: "two"},
    &stringData{val: "three"},
}

// Next implements the pipeline InputSource interface.
func (s stringSource) Next(context.Context) bool { return len(s) > 0 }

// Data implements the pipeline InputSource interface.
func (s stringSource) Data() pipeline.Data {
    defer func() { s = s[1:] }
    return s[0]
}

// Error implements the pipeline InputSource interface.
func (s stringSource) Error() error { return nil }

The Output Sink

The OutputSink serves as a final landing spot for the data after successfully traversing the entire pipeline. All data reaching the output sink is automatically marked as processed. Below is a simple output sink:

type stringSink []string

// Consume implements the pipeline OutputSink interface.
func (s stringSink) Consume(ctx context.Context, data pipeline.Data) error {
    sd := data.(*stringData)

    s = append(s, sd.String())
    return nil
}

The Stages

The pipeline steps are executed in sequential order by instances of Stage. The execution strategies implemented are FIFO, FixedPool, DynamicPool, Broadcast, and Parallel:

  • FIFO - Executes the single Task
  • FixedPool - Executes a fixed number of instances of the one specified Task
  • DynamicPool - Executes a dynamic number of instances of the one specified Task
  • Broadcast - Executes several unique Task instances concurrently moving Data ASAP
  • Parallel - Executes several unique Task instances concurrently and passing through the original Data only once all the tasks complete successfully

The stage execution strategies can be combined to form desired pipelines. A Stage requires at least one Task to be executed at the step it represents in the pipeline. Each Task returns Data and an error. If the data returned is nil, it will not be sent to the following Stage. If the error is non-nil, the entire pipeline will be terminated. This allows users of the pipeline to have complete control over how failures impact the overall pipeline execution. A Task implements the Process method.

// TaskFunc is defined as a function with a Process method that calls the function
task := pipeline.TaskFunc(func(ctx context.Context, data pipeline.Data, tp pipeline.TaskParams) (pipeline.Data, error) {
    var val int
    s := data.(*stringData)

	switch s.String() {
    case "one":
        val = 1
    case "two":
        var = 2
    case "three":
        var = 3
    }

    data.val = fmt.Sprintf("%s - %d", s.String(), val)
    return data, nil
})

stage := pipeline.FIFO("", task)

Executing the Pipeline

The Pipeline continues executing until all the Data from the input source is processed, an error takes place, or the provided Context expires. At a minimum, the pipeline requires an input source, a pass through stage, and the output sink.

p := NewPipeline(stage)

if err := p.Execute(context.TODO(), source, sink); err != nil {
    fmt.Printf("Error executing the pipeline: %v\n", err)
}

Future Features

Some additional features would bring value to this data pipeline implementation.

Logging

No logging is built into this pipeline implementation and this could be quite useful to have.

Metrics and Monitoring

It would be helpful to have the ability to monitor stage and task performance such as how long each is taking to execute, the number of Data instances processes, the number of successes and failures, etc.

Task Implementations for Common Use Cases

This pipeline implementation is very abstract, which allows users to perform nearly any set of steps. Currently, users must implement their own tasks. Some tasks are very common and the project could build support for such activities. For example, executing a script pulled from a Git repo.

Support for Configuration Files

As the implementation becomes from complex, it could be helpful to support the use of configuration files and reduce the level of effort necessary to build a pipeline. For example, the configuration file could specify when tasks should be output to alternative stages.

Develop Additional Stage Execution Strategies

While the current execution strategies work for many use cases, there could be opportunities to develop additional stage types that ease pipeline development.

Licensing License

This program is free software: you can redistribute it and/or modify it under the terms of the Apache license.

# Functions

Broadcast returns a Stage that passes a copy of each incoming data to all specified tasks and emits their outputs to the next stage.
DynamicPool returns a Stage that maintains a dynamic pool that can scale up to max parallel tasks for processing incoming inputs in parallel and emitting their outputs to the next stage.
FIFO returns a Stage that processes incoming data in a first-in first-out fashion.
FixedPool returns a Stage that spins up a pool containing numWorkers to process incoming data in parallel and emit their outputs to the next stage.
NewPipeline returns a new data pipeline instance where input traverse each of the provided Stage instances.
Parallel returns a Stage that passes a copy of each incoming Data to all specified tasks, waits for all the tasks to finish before sending data to the next stage, and only passes the original Data through to the following stage.
SendData marks the provided data as new to the pipeline and sends it to the provided named stage.

# Structs

Pipeline is an abstract and extendable asynchronous data pipeline with concurrent tasks at each stage.

# Interfaces

Data is implemented by values that can be sent through a pipeline.
InputSource is implemented by types that generate Data instances which can be used as inputs to a Pipeline instance.
OutputSink is implemented by types that can operate as the tail of a pipeline.
Stage is designed to be executed in sequential order to form a multi-stage data pipeline.
StageParams provides the information needed for executing a pipeline Stage.
Task is implemented by types that can process Data as part of a pipeline stage.
TaskParams provides access to pipeline mechanisms needed by a Task.

# Type aliases

SinkFunc is an adapter to allow the use of plain functions as OutputSink instances.
StageRegistry is a map of stage identifiers to input channels.
TaskFunc is an adapter to allow the use of plain functions as Task instances.