Categorygithub.com/miku/parallel
modulepackage
0.1.3
Repository: https://github.com/miku/parallel.git
Documentation: pkg.go.dev

# README

parallel

Process lines in parallel.

This package helps to increase the performance of command line applications, that transform data and where data is read in a mostly line orientied fashion.

Note: The order of the input lines is not preserved in the output.

The main type is a parallel.Processor, which reads from an io.Reader, applies a function to each input line (separated by a newline by default) and writes the result to an io.Writer.

The transformation function takes a byte slice and therefore does not assume any specific format, so the input may be plain lines, CSV, newline delimited JSON or similar line oriented formats. The output is just bytes and can again assume any format.

An example for a simple transformation that does nothing:

func Noop(b []byte) ([]byte, error) {
	return b, nil
}

We can connect this function to IO and let it run:

p := parallel.NewProcessor(os.Stdin, os.Stdout, Noop)
if err := p.Run(); err != nil {
	log.Fatal(err)
}

That's all the setup needed. For details and self contained programs, see examples.

The processer expects a parallel.TransformerFunc. There are some functions, that take a byte slice and and return a byte slice, but do not return an error (an example would be bytes.ToUpper). These functions can be turned into a TransformerFunc with a simple helper:

p := parallel.NewProcessor(os.Stdin, os.Stdout, parallel.ToTransformerFunc(bytes.ToUpper))
if err := p.Run(); err != nil {
	log.Fatal(err)
}

Full Example

// Uppercases each line. Order of lines is not preserved.
//
//     $ printf "hello\nhi\n" | go run examples/uppercase.go
//     HELLO
//     HI

package main

import (
	"bytes"
	"log"
	"os"

	"github.com/miku/parallel"
)

func main() {
	// Setup input, output and business logic.
	p := parallel.NewProcessor(os.Stdin, os.Stdout, func(b []byte) ([]byte, error) {
		return bytes.ToUpper(b), nil
	})

	// Start processing with parallel workers.
	if err := p.Run(); err != nil {
		log.Fatal(err)
	}
}

Adjusting the processor

The processor has a few attributes, that can be adjusted prior to running:

p := parallel.NewProcessor(os.Stdin, os.Stdout, parallel.ToTransformerFunc(bytes.ToUpper))

// Adjust processor options.
p.NumWorkers = 4          // number of workers (default to runtime.NumCPU())
p.BatchSize = 10000       // how many records to batch, before sending to a worker
p.RecordSeparator = '\n'  // record separator (must be a byte at the moment)

if err := p.Run(); err != nil {
	log.Fatal(err)
}

The default should be ok for a lot of use cases. Batches are kept in memory, so higher batch sizes will need more memory but will decrease the coordination overhead. Sometimes, a batch size of one can be useful too.

Experimental arbitrary record support

scan_test.go contains an example of how to use a bufio.SplitFunc to separate records. The original parallel implementation only looked at lines (akin to bufio.ScanLines), but there are other potential use cases, such as parallel parsing of XML.

Random performance data point

Combining parallel with a fast JSON library, such as jsoniter, one can process up to 100000 JSON documents (of about 1K in size) per second. Here is an example snippet.

# Packages

No description provided by the author
Package scan accepts a bufio.SplitFunc and generalizes batches to non-line oriented input, e.g.

# Functions

NewBytesBatch creates a new BytesBatch with a given capacity.
NewBytesBatchCapacity creates a new BytesBatch with a given capacity.
NewProcessor creates a new line processor.
ToTransformerFunc takes a simple transformer and wraps it so it can be used in places where a TransformerFunc is expected.

# Constants

Version of library.

# Variables

New is a preferred way to create a new parallel processor.

# Structs

BytesBatch is a slice of byte slices.
Processor can process lines in parallel.

# Type aliases

SimpleTransformerFunc converts bytes to bytes.
TransformerFunc takes a slice of bytes and returns a slice of bytes and a an error.