Categorygithub.com/conduitio/conduit-processor-sdk
modulepackage
0.2.0
Repository: https://github.com/conduitio/conduit-processor-sdk.git
Documentation: pkg.go.dev

# README

Conduit Processor SDK

License Test Go Report Card Go Reference

This repository contains the Go software development kit for implementing a processor for Conduit.

Note: if you'd like to use another language for writing processors, feel free to open an issue and request a processor SDK for a specific language.

Quick Start

Create a new folder and initialize a fresh go module:

go mod init example.com/conduit-processor-example

Add the processor SDK dependency:

go get github.com/conduitio/conduit-processor-sdk

You can now create a new processor by implementing the Processor interface. For more details about that, check our documentation for Building Standalone Processors.

On the other hand, if the processor is very simple and can be reduced to a single function (e.g. no configuration needed), then we can use sdk.NewProcessorFunc(), as below:

//go:build wasm

package main

import (
    sdk "github.com/conduitio/conduit-processor-sdk"
)

func main() {
    sdk.Run(sdk.NewProcessorFunc(
        sdk.Specification{Name: "example-processor"},
        func(ctx context.Context, rec opencdc.Record) (opencdc.Record, error) {
            // do something with the record
            return rec, nil
        },
    ))
}

With this, you are set to build your processor. Note that we are building the processor as a WebAssembly module, so you need to set GOARCH and GOOS:

GOARCH=wasm GOOS=wasip1 go build -o example-processor.wasm main.go

The produced example-processor.wasm file can be used as a processor in Conduit. Copy it to the processors directory of your Conduit instance and configure it in the processors section of the pipeline configuration file.

FAQ

Why do I need to specify GOARCH and GOOS?

Conduit uses WebAssembly to run standalone processors. This means that you need to build your processor as a WebAssembly module. You can do this by setting the environment variables GOARCH=wasm and GOOS=wasip1 when running go build. This will produce a WebAssembly module that can be used as a processor in Conduit.

How do I use a processor?

To use a standalone WASM processor in Conduit, the following two steps need to be done:

  1. Copying the WebAssembly module to the processors directory of your Conduit instance. By default, that's a directory called processors that is in the same directory as Conduit. The directory can be changed with the -processors.path flag.

    An example directory structure would be:

    .
    ├── conduit
    └── processors
        └── example-processor.wasm
    
  2. Use the processor in the processors section of the pipeline configuration file. using the name the processor defines in its specifications. For example:

    processors:
      - id: my-example-processor
        plugin: example-processor
        settings:
          field: 'foo'
          value: 'bar'
    

How do I log in a processor?

You can get a zerolog.Logger instance from the context using the sdk.Logger function. This logger is pre-configured to append logs in the format expected by Conduit.

Keep in mind that logging in the hot path (i.e. in the Process method) can have a significant impact on the performance of your processor, therefore we recommend to use the Trace level for logs that are not essential for the operation of the processor.

Example:

func (p *ExampleProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
    logger := sdk.Logger(ctx)
    logger.Trace().Msg("Processing records")
    // ...
}

# Packages

Package pprocutils provides the functionality for Conduit to set up utilities for processors.
No description provided by the author
No description provided by the author
Package wasm provides the functionality for communicating with Conduit as a standalone plugin.

# Functions

DefaultProcessorMiddleware returns a slice of middleware that is added to all processors by default.
Logger returns the logger for the processor.
NewProcessorFunc creates a ProcessorFunc from a function and specifications.
NewReferenceResolver creates a new reference resolver from the input string.
ParseConfig sanitizes the configuration, applies defaults, validates it and copies the values into the target object.
ProcessorWithMiddleware wraps the processor into the supplied middleware.

# Variables

No description provided by the author
ErrUnimplemented is returned in functions of plugins that don't implement a certain method.

# Structs

ErrorRecord is a record that failed to be processed and will be nacked.
FilterRecord is a record that will be acked and filtered out of the pipeline.
ProcessorFunc is an adapter allowing use of a function as a Processor.
ProcessorWithSchemaDecode is a middleware that decodes the key and/or payload of a record using a schema before passing it to the processor.
ProcessorWithSchemaDecodeConfig is the configuration for the ProcessorWithSchemaDecode middleware.
ProcessorWithSchemaEncode is a middleware that encodes the record payload and/or key with a schema.
ProcessorWithSchemaEncodeConfig is the configuration for the ProcessorWithSchemaEncode middleware.
Specification is returned by a processor when Specify is called.
UnimplementedProcessor should be embedded to have forward compatible implementations.

# Interfaces

ProcessedRecord is a record returned by the processor.
Processor receives records, manipulates them and returns back the processed records.
ProcessorMiddleware wraps a Processor and adds functionality to it.
ProcessorMiddlewareOption is a function that can be used to configure a ProcessorMiddleware.

# Type aliases

Reference is an interface that represents a reference to a field in a record.
ReferenceResolver is a type that knows how to resolve a reference to a field in a record.
SingleRecord is a single processed record that will continue down the pipeline.