Categorygithub.com/myntra/pipeline
modulepackage
0.0.0-20180618182531-2babf4864ce8
Repository: https://github.com/myntra/pipeline.git
Documentation: pkg.go.dev

# README

Pipeline

A package to build multi-staged concurrent workflows with a centralized logging output.


The package could be used to define and execute CI/CD tasks(either sequential or concurrent). A tool with similar goals would be Jenkins Pipeline. However, compared to Jenkins Pipeline, this package has fewer constructs since the logic is specified in code, as opposed to a Jenkinsfile.

It's tiny by design and is valuable when used as a glue rather than a container.

go get

$ go get gopkg.in/myntra/pipeline.v1

Concepts

The package has three building blocks to create workflows : Pipeline, Stage and Step . A pipeline is a collection of stages and a stage is a collection of steps. A stage can have either concurrent or sequential steps, while stages are always sequential.

Pipeline

The step block is where the actual work is done. Stage and pipeline act as flow governors.

The Step Interface

Step is the unit of work which can be concurrently or sequentially staged with other steps. To do that, we need to implement the Step interface.

type Step interface {
	Out
	Exec(*Request) *Result
	Cancel() error
}

To satisfy the interface we need to embed pipeline.StepContext and implement Exec(*Request)*Result, Cancel()error methods in the target type. For e.g:

type work struct {
	pipeline.StepContext
}

func (w work) Exec(request *pipeline.Request) *pipeline.Result {
	return &pipeline.Result{}
}

func (w work) Cancel() error {
	return nil
}

The pipeline.StepContext type provides a Status method which can be used to log to the out channel. The current step receives a Request value passed on by the previous step. Internally data(Request.Data and Request.KeyVal) is copied from the previous step's Result.

Usage

The api NewStage(name string, concurrent bool, disableStrictMode bool) is used to stage work either sequentially or concurrently. In terms of the pipeline package, a unit of work is an interface: Step.

The following example shows a sequential stage. For a more complex example, please see: examples/advanced.go

package main

import (
	"fmt"
	"time"

	"github.com/myntra/pipeline"
)

type work struct {
	pipeline.StepContext
	id int
}

func (w work) Exec(request *pipeline.Request) *pipeline.Result {
	w.Status(fmt.Sprintf("%+v", request))

	duration := time.Duration(1000 * w.id)
	time.Sleep(time.Millisecond * duration)
	msg := fmt.Sprintf("work %d", w.id)

	return &pipeline.Result{
		Error:  nil,
		Data:   struct{msg string}{msg:msg},
		KeyVal: map[string]interface{}{"msg": msg},
	}
}

func (w work) Cancel() error {
	w.Status("cancel step")
	return nil
}

func readPipeline(pipe *pipeline.Pipeline) {
	out, err := pipe.Out()
	if err != nil {
		return
	}

	progress, err := pipe.GetProgressPercent()
	if err != nil {
		return
	}

	for {
		select {
		case line := <-out:
			fmt.Println(line)
		case p := <-progress:
			fmt.Println("percent done: ", p)
		}
	}
}

func main() {
	// create a new pipeline
	workpipe := pipeline.NewProgress("myProgressworkpipe", 1000, time.Second*3)
	// func NewStage(name string, concurrent bool, disableStrictMode bool) *Stage
	// To execute steps concurrently, set concurrent=true.
	stage := pipeline.NewStage("mypworkstage", false, false)

	// a unit of work
	step1 := &work{id: 1}
	// another unit of work
	step2 := &work{id: 2}

	// add the steps to the stage. Since concurrent is set false above. The steps will be
	// executed one after the other.
	stage.AddStep(step1)
	stage.AddStep(step2)

	// add the stage to the pipe.
	workpipe.AddStage(stage)

	go readPipeline(workpipe)

	result := workpipe.Run()
	if result.Error != nil {
		fmt.Println(result.Error)
	}

	fmt.Println("timeTaken:", workpipe.GetDuration())
}

Check examples directory for more.

Logging and Progress

  • pipeline.Out() : Get all statuses/logs.
  • pipeline.Progress : Get progress in percentage.

Output of the above example:

Example Output

# Packages

No description provided by the author

# Functions

New returns a new pipeline name of the pipeline outBufferLen is the size of the output buffered channel.
NewProgress returns a new pipeline which returns progress updates name of the pipeline outBufferLen is the size of the output buffered channel expectedDurationInMs is the expected time for the job to finish in milliseconds If set, you can get the current time spent from GetDuration()int64 and listen on the channel returned by GetProgress() <-chan float64 to get current progress.
NewStage returns a new stage name of the stage concurrent flag sets whether the steps will be executed concurrently.

# Constants

DefaultBuffer channel buffer size of the output buffer.
DefaultDrainTimeout time to wait for all readers to finish consuming output.

# Structs

Pipeline is a sequence of stages.
Request is the result dispatched in a previous step.
Result is returned by a step to dispatch data to the next step or stage.
Stage is a collection of steps executed concurrently or sequentially concurrent: run the steps concurrently disableStrictMode: In strict mode if a single step fails, all the other concurrent steps are cancelled.
StepContext type is embedded in types which need to statisfy the Step interface.

# Interfaces

Step is the unit of work which can be concurrently or sequentially staged with other steps.