Categorygithub.com/jje42/flow
repositorypackage
0.0.0-20220704052841-91f4bec7525c
Repository: https://github.com/jje42/flow.git
Documentation: pkg.go.dev

# Packages

No description provided by the author

# README

flow is a workflow management system, designed primarily to run jobs on a High Performance Computer (HPC) running PBS Pro or Slurm. Jobs can optionally be run inside Singularity containers. This is a very specific set of conditions, and there are many other workflow management systems already available. flow is written in Go, and so are the workflows that it manages. It was originally started as an exercise is learning the Go language.

flow is heavily influenced by Queue, a workflow engine by the Broad Institute's GSA group as part of the GATK software.

You may want to look at:

which both have more features and support.

Example Workflow

package main

import (
	"fmt"
	"strings"

	"github.com/jje42/flow"
)

type CreateInput struct {
	Output string `type:"output"`
	Word   string
}

func (c CreateInput) AnalysisName() string {
	return "CreateInput"
}

func (c CreateInput) Command() string {
	return fmt.Sprintf("echo %s >%s", c.Word, c.Output)
}

func (c CreateInput) Resources() flow.Resources {
	return flow.Resources{CPUs: 1, Memory: 1, Time: 1, Container: "docker://debian:10"}
}

type ToUpper struct {
	Input  string `type:"input"`
	Output string `type:"output"`
}

func (c ToUpper) AnalysisName() string {
	return "ToUpper"
}

func (c ToUpper) Command() string {
	return fmt.Sprintf(`cat %s | tr '[:lower:]' '[:upper:]' >%s`, c.Input, c.Output)
}

func (c ToUpper) Resources() flow.Resources {
	return flow.Resources{CPUs: 1, Memory: 1, Time: 1, Container: "docker://debian:10"}
}

type Merge struct {
	Inputs []string `type:"input"`
	Output string   `type:"output"`
}

func (c Merge) AnalysisName() string {
	return "merge"
}

func (c Merge) Command() string {
	return fmt.Sprintf(`cat %s >%s`, strings.Join(c.Inputs, " "), c.Output)
}

func (c Merge) Resources() flow.Resources {
	return flow.Resources{CPUs: 1, Memory: 1, Time: 1, Container: "docker://debian:10"}
}

func main() {
	queue := flow.Queue{}

	queue.Add(&CreateInput{Word: "hello", Output: "input1.txt"})
	queue.Add(&CreateInput{Word: "world", Output: "input2.txt"})
	queue.Add(&ToUpper{Input: "input1.txt", Output: "output1.txt"})
	queue.Add(&ToUpper{Input: "input2.txt", Output: "output2.txt"})
	queue.Add(&Merge{Inputs: []string{"output1.txt", "output2.txt"}, Output: "final.txt"})

	if err := queue.Run(); err != nil {
		log.Fatal(err)
	}
}