Categorygithub.com/cloudbase-inc/go-pipeline
repositorypackage
1.2.2
Repository: https://github.com/cloudbase-inc/go-pipeline.git
Documentation: pkg.go.dev

# Packages

No description provided by the author

# README

go-pipeline package

モチベーション

データのパイプライン処理においては、考慮が必要な多くの非機能要件が存在します。

  • 並列制御(並列実行数、レートリミット)
  • エラー処理(一部でのエラー or 全体でのエラー)
  • メトリクス・ロギング

このパッケージは、単一プロセスで動くパイプライン処理を抽象化した軽量な Go ライブラリを提供することで、これらの非機能要件を実装しやすくし、開発者がコアロジックの実装に集中できるようにすることを目指すものです。

MapReduce

このパッケージで実装されているパイプラインモデルは MapReduce という歴史あるプログラミングモデルを強く意識した設計となっています。前提知識としてこちらを参照しておくと理解が深まりやすいのではないかと思います。

https://www.talend.com/jp/resources/what-is-mapreduce/

モデル

346062370-f419de33-faac-41fc-9d2b-281cf84ed19c

パイプライン (Pipeline)

データの流れをモデル化したもの。各処理を行う複数のステージと、それらの間を流れるデータであるレコードから構成されます。 最初のステージに渡されるレコードは処理の開始を表す特殊な値であり、最後のステージから出力されたレコードがパイプライン全体の出力となります。

レコード (Record)

パイプラインを流れるデータを抽象化したものです。レコードの区分けを表すグループ (Group) と、グループ内で一意の値を取る識別子 (Identifier) をヘッダーとして持ち、ペイロードとしてデータの実体を持ちます。

ステージ (Stage)

list(<group1, id1>) -> Stage() -> list(<group2, id2>)

実際の計算処理を実行する部分です。各ステージはレコードの集合を受け取り、何らかの処理を行なって加工したデータをレコードの集合として返す必要があります。 MapReduce を参考に、現在 Mapper と Reducer、および Streamer という 3 つの処理アルゴリズムが実装されています。

マッパー (Mapper)

<group1, id1> -> Mapper() -> list(<group2, id2>)

1 つのレコードに対して複数のレコードを返す関数として表現され、それらをレコード単位で並列に複数実行することで全体の結果を生成します。主に外部 API からのデータの取得処理やデータの変換処理に利用することができます。

リデューサー (Reducer)

<group1, list(id1)> -> Reduce() -> list(<group2, id2>)

前段のレコードをグループ化し、各グループのレコードの集合に対して複数のレコードを返す関数として表現され、それらをグループ単位で並列に複数実行することで全体の結果を返します。主に後段の処理で加工されたデータを集約し集計や保存処理を行うために利用することができます。

ストリーマー (Streamer)

channel(<group1, id1>) -> Streamer() -> channel(<group2, id2>)

レコードを逐次受け取り、逐次出力を返すようなストリーム処理を表現します。Mapper や Reducer での表現が難しい非同期処理を記述する場合に利用します。例えば、大量のデータを生成するような Mapper を実装する際、 代わりに Streamer で逐次結果を返すようにすることで、メモリ使用量を抑えられる他後続の処理を逐次開始することができパフォーマンスの向上も見込めます。

アウトプット (Output)

処理の単位ごとに出力されたレコードをまとめたものをレコードと区別してアウトプットと呼びます。アウトプットはレコードの他、処理の成功・失敗を表すステータス、エラーの場合にはエラーの情報も含みます。

ステータスがエラーになったアウトプットは以降のパイプラインからは除外され、成功したレコードのみで処理が進みます。発生したエラーは別途ステージの実行情報として集計されます。

使い方

examples/ にサンプルコードを配置しているので参考にしてください。サンプルコードでは、VM に対する脆弱性スキャンを題材にリージョンの取得、インスタンスのリストアップ、スキャン、脆弱性の集計をモデル化しています。

1. 各ステージで扱う Record を定義する

Record は以下のようなインターフェースとして定義されています。

type Record interface {
	Group() Group
	Identifier() string
}

type Group interface {
	String() string
}
  • レコードのグループを表す値を Record.Group() として返すように実装します。Group は String() を持つ独自型として定義するか、Group にデータを持たせる必要がない場合はGroupString(string) を利用することができます。
  • レコードの識別子を表す値を Record.Identifier() として返すように実装します。
  • グループや識別子に値を持たせる必要がない場合は、 GroupNAIdentifierNA を利用することができます。
サンプルコードの実装例
type Region struct {
	Name string
}
// Implements Record
func (r *Region) Group() pipeline.Group { return pipeline.GroupNA }
func (r *Region) Identifier() string    { return r.Name }


type Instance struct {
	Region string
	ID     string
}
// Implements Record
func (i *Instance) Group() pipeline.Group { return pipeline.GroupString(i.Region) }
func (i *Instance) Identifier() string    { return i.ID }


type Vulnerability struct {
	Instance *Instance
	ID       string
}
// Implements Record
func (v *Vulnerability) Group() pipeline.Group { return pipeline.GroupString(v.ID) }
func (v *Vulnerability) Identifier() string    { return v.Instance.ID }


type VulnerabilityCount struct {
	VulnerabilityID string
	Count           int
}
// Implements Record
func (v *VulnerabilityCount) Group() pipeline.Group { return pipeline.GroupString(v.VulnerabilityID) }
func (v *VulnerabilityCount) Identifier() string    { return pipeline.IdentifierNA }

2. Mapper / Reducer / Streamer を実装する

Mapper / Reducer / Streamer はそれぞれ次のようなインターフェースとして定義されています。これらを満たす型を実装します。

input には前段で出力されたレコードが入ります。型アサーションにより型を特定した上で必要な情報を参照します。

type Mapper[I Record, O Record] interface {
	Map(ctx context.Context, input I) ([]O, error)
}

type Reducer[I Record, O Record, G Group] interface {
	Reduce(ctx context.Context, group G, inputs []I) ([]O, error)
}

type Streamer[I Record, O Record] interface {
	Stream(ctx context.Context, inputs <-chan I) (<-chan O, <-chan error)
}
サンプルコードの実装例
type RegionLister struct{}

func (l *RegionLister) Stream(ctx context.Context, inputs <-chan pipeline.Origin) (<-chan *Region, <-chan error) {
	<-inputs

	outs := make(chan *Region)
	errs := make(chan error)

	go func() {
		defer close(outs)
		defer close(errs)

		outs <- &Region{Name: "ap-northeast-1"}
		outs <- &Region{Name: "us-west-1"}
	}()

	return outs, errs
}

type VMLister struct{}

func (l *VMLister) Map(ctx context.Context, region *Region) ([]*Instance, error) {
	if region.Name == "ap-northeast-1" {
		return []*Instance{
			{ID: "i-123"},
			{ID: "i-456"},
		}, nil
	}

	return nil, nil
}

type Scanner struct{}

func (s *Scanner) Map(ctx context.Context, instance *Instance) ([]*Vulnerability, error) {
	if instance.ID == "i-123" {
		return nil, fmt.Errorf("failed to scan instance %s", instance.ID)
	}
	if instance.ID == "i-456" {
		return []*Vulnerability{
			{ID: "CVE-2020-5678"},
			{ID: "CVE-2020-9012"},
		}, nil
	}

	return nil, nil
}

type Counter struct{}

func (c *Counter) Reduce(ctx context.Context, group pipeline.Group, vulnerabilities []*Vulnerability) ([]*VulnerabilityCount, error) {
	vulnerabilityID := group.String()

	return []*VulnerabilityCount{
		{VulnerabilityID: vulnerabilityID, Count: len(vulnerabilities)},
	}, nil
}

3. Pipeline を組み立てる

定義した Mapper や Reducer を使い、パイプラインを組み立てます。

pp := pipeline.New(
    pipeline.StreamStage("RegionLister", &RegionLister{}),
    pipeline.MapStage("VMLister", &VMLister{}, pipeline.StageTimeout(1*time.Second)),
    pipeline.MapStage("Scanner", &Scanner{}, pipeline.StageMaxParallel(3)),
    pipeline.ReduceStage("Counter", &Counter{}, pipeline.StageAbortIfAnyError(true)),
)

(Mapper|Reducer|Streamer)Stage() を使って、定義した処理をパイプラインに組み込むことができます。 またオプション引数で以下の値を設定できます。

  • StageTimeout(d time.Duration): ステージ単位のタイムアウト。タイムアウト前に正常に完了したレコードは後続のステージに渡されそのまま実行されていきます。
  • StageMaxParallel(n int): 並列実行数の上限を指定します。Mapper の場合はレコード、Reducer の場合はグループの数が最大の並列数になります。(StreamStage では利用不可)

4. Pipeline を実行する

Execute(ctx context.Context) で定義したパイプラインを実行します。

outputs, stages, err := pp.Execute(context.Background())

Execute() の返り値は以下のようになります。

  • outputs []Record: 最後のステージで処理が正常に完了したレコード
  • stages []StageExecution: 各ステージでの実行結果。定義したステージ順に値が入る
  • err error: StageAbortIfAnyError 設定時にエラーが発生した場合、全体のパイプラインを中止して該当エラーがここに入る
サンプルコードの実装例
fmt.Println("--- Outputs ---")
for _, o := range outputs {
    v := o.(*VulnerabilityCount)
    fmt.Printf("VulnerabilityID: %s, Count: %d\n", v.VulnerabilityID, v.Count)
}

fmt.Println("--- Executions ---")
for _, stage := range stages {
    successCount := 0
    errorCount := 0
    recordCount := 0
    for _, o := range stage.Outputs {
        if o.Status == pipeline.OutputStatusSuccess {
            successCount++
        }
        if o.Status == pipeline.OutputStatusError {
            errorCount++
        }
        recordCount += o.RecordCount
    }

    fmt.Printf("Stage %s: %d records generated, %d success, %d errors\n", stage.Name, recordCount, successCount, errorCount)
}
--- Outputs ---
VulnerabilityID: CVE-2020-5678, Count: 1
VulnerabilityID: CVE-2020-9012, Count: 1
--- Executions ---
Stage RegionLister: 2 records generated, 1 success, 0 errors
Stage VMLister: 2 records generated, 2 success, 0 errors
Stage Scanner: 2 records generated, 1 success, 1 errors
Stage Counter: 2 records generated, 2 success, 0 errors

その他

  • Reducer はデフォルトの挙動では全体のレコードを全て待ち受けた後にそれぞれのグループに分割して処理を行います。全体のデータ量が多い場合には、この挙動ではメモリ使用量が増大する恐れがあります。前段の処理においてグループごとに処理タイミングの偏りがある場合には、GroupCommit という特殊なレコードを用いてグループのレコードを打ち切ることができ、Reducer は GroupCommit を受け取った時点でそのグループの処理を開始します。GroupCommit が送られなかったグループは、前段の全てのレコードの送出が完了した時点でまとめて処理されます。このレコードは、実体のレコードが 0 件のグループを作成したい場合にも利用することができます。

  • 実行時に全ステージの channel を作成し、各ステージで完了した出力から後段に流していく実装となっているので、1 つのステージの実行が完了していない段階でも完了したレコードについて順次後段のステージの処理が実行されていきます。ただし、Reducer は全てのレコードの出力を待ち受けるため前段のステージ全体が完了してから実行されます。

参考実装

pipeline/examples

脆弱性スキャンを題材にしたシンプルなユースケースをサンプルとして実装しています。 本 README と合わせて参照してください。

https://github.com/cloudbase-inc/go-pipeline/tree/main/examples

ライセンス

このプロジェクトは MIT ライセンスの下でライセンスされています。詳細は LICENSE ファイルを参照してください。