package
1.2.4
Repository: https://github.com/ccheers/xpkg.git
Documentation: pkg.go.dev

# README

pipeline

import "github.com/ccheers/xpkg/sync/pipeline"

Index

Variables

ErrFull channel full error

var ErrFull = errors.New("channel full")

type Aggregation

Aggregation pipeline struct

type Aggregation struct {
    Do    func(c context.Context, index int, values map[string][]interface{})
    Split func(key string) int
    // contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(config *Config) (res *Aggregation)

NewPipeline new pipline

func (*Aggregation) Add

func (p *Aggregation) Add(c context.Context, key string, value interface{}) (err error)

Add async add a value to channal, channel shard in split method

func (*Aggregation) Close

func (p *Aggregation) Close() (err error)

Close all goroutinue

func (*Aggregation) Start

func (p *Aggregation) Start()

Start start all mergeproc

func (*Aggregation) SyncAdd

func (p *Aggregation) SyncAdd(c context.Context, key string, value interface{}) (err error)

SyncAdd sync add a value to channal, channel shard in split method

type Config

Config Aggregation config

type Config struct {
    // MaxSize merge size
    MaxSize int
    // Interval merge interval
    Interval xtime.Duration
    // Buffer channel size
    Buffer int
    // Worker channel number
    Worker int
    // Name use for metrics
    Name string
}

Generated by gomarkdoc

# Packages

No description provided by the author

# Functions

NewPipeline new pipline.

# Variables

ErrFull channel full error.

# Structs

Aggregation pipeline struct.
Config Aggregation config.