Categorygithub.com/bufbuild/prototransform
modulepackage
0.4.0
Repository: https://github.com/bufbuild/prototransform.git
Documentation: pkg.go.dev

# README

The Buf logo

Prototransform

Build Report Card GoDoc

Convert protobuf message data to alternate formats

Use the prototransform library to simplify your data transformation & collection. Our simple package allows the caller to convert a given message data blob from one format to another by referring to a type schema on the Buf Schema Registry.

  • No need to bake in proto files
  • Supports Binary, JSON and Text formats
  • Extensible for other/custom formats

Getting started

prototransform is designed to be flexible enough to fit quickly into your development environment.

Here's an example of how you could use prototransform to transform messages received from a PubSub topic...

Transform Messages from a Topic

Whilst prototransform has various applications, converting messages off some kind of message queue is a primary use-case. This can take many forms, for the purposes of simplicity we will look at this abstractly in a pub/sub model where we want to:

  1. Open a subscription to a topic with the Pub/Sub service of your choice
  2. Start a SchemaWatcher to fetch a module from the Buf Schema Registry
  3. Receive, Transform and Acknowledge messages from the topic

Opening a Subscription & Schema Watcher

import (
	"context"
	"fmt"

	"github.com/bufbuild/prototransform"
	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/<driver>"
)
...
	subs, err := pubsub.OpenSubscription(ctx, "<driver-url>")
	if err != nil {
		return fmt.Errorf("could not open topic subscription: %v", err)
	}
	defer subs.Shutdown(ctx)
	// Supply auth credentials to the BSR
	client := prototransform.NewDefaultFileDescriptorSetServiceClient("<bsr-token>")
	// Configure the module for schema watcher
	cfg := &prototransform.SchemaWatcherConfig{
		SchemaPoller: prototransform.NewSchemaPoller(
			client,
			"buf.build/someuser/somerepo", // BSR module
			"some-tag", // tag or draft name or leave blank for "latest"
		),
	}
	watcher, err := prototransform.NewSchemaWatcher(ctx, cfg)
	if err != nil {
		return fmt.Errorf("failed to create schema watcher: %v", err)
	}
	defer watcher.Stop()
	// before we start processing messages, make sure the schema has been
	// downloaded
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	if err := watcher.AwaitReady(ctx); err != nil {
		return fmt.Errorf("schema watcher never became ready: %v", err)
	}
...

A SchemaWatcher is the entrypoint of prototransform. This is created first so your code can connect to the Buf Schema Registry and fetch a schema to be used to transform and/or filter payloads.

Prepare a converter

A Converter implements the functionality to convert payloads to different formats and optionally filter/mutate messages during this transformation. In the following example, we have initialized a *prototransform.Converter which expects a binary input and will return JSON.

...
    converter := &prototransform.Converter{
        Resolver:       schemaWatcher,
        InputFormat:    prototransform.BinaryInputFormat(proto.UnmarshalOptions{}),
        OutputFormat:   prototransform.JSONOutputFormat(protojson.MarshalOptions{}),
    }
...

Out of the box, you can supply proto, protojson and prototext here but feel free to supply your own custom formats as-well.

FORMATInputFormatOutputFormat
JSONprototransform.JSONInputFormat()prototransform.JSONOutputFormat()
TEXTprototransform.TEXTInputFormat()prototransform.TEXTOutputFormat()
Binaryprototransform.BinaryInputFormat()prototransform.BinaryOutputFormat()

Receiving and Transforming Messages

Now that we have an active subscription, schema watcher, and converter, we can start processing messages. A simple subscriber that transforms received messages looks like this:

...
    // Loop on received messages.
    for {
        msg, err := subscription.Receive(ctx)
        if err != nil {
            log.Printf("Receiving message: %v", err)
            break
        }
        // Do transformation based on the message name
        convertedMessage, err := converter.ConvertMessage("<message-name>", msg.Body)
        if err != nil {
            log.Printf("Converting message: %v", err)
            break
        }
        fmt.Printf("Converted message: %q\n", convertedMessage)

        msg.Ack()
    }
...

For illustrative purposes, let's assume that the topic we have subscribed to is buf.connect.demo.eliza.v1, we have the module stored on the BSR here. We would configure the message name as buf.connect.demo.eliza.v1.ConverseRequest.

Options

Cache

A SchemaWatcher can be configured with a user-supplied cache implementation, to act as a fallback when fetching schemas. The interface is of the form:

type Cache interface {
    Load(ctx context.Context, key string) ([]byte, error)
    Save(ctx context.Context, key string, data []byte) error
}

This repo provides three implementations that you can use:

  1. filecache: Cache schemas in local files.
  2. rediscache: Cache schemas in a shared Redis server.
  3. memcache: Cache schemas in a shared memcached server.

Filters

A use-case exists where the values within the output message should differ from the input given some set of defined rules. For example, Personally Identifiable Information(PII) may want to be removed from a message before it is piped into a sink. For this reason, we have supplied Filters.

Here's an example where we have defined a custom annotation to mark fields as sensitive:

syntax = "proto3";
package foo.v1;
// ...
extend google.protobuf.FieldOptions {
  bool sensitive = 30000;
}
// ...
message FooMessage {
  string name = 1 [(sensitive) = true];
}

We then use prototransform.Redact() to create a filter and supply it to our converter via its Filters field:

...
isSensitive := func (in protoreflect.FieldDescriptor) bool {
    return proto.GetExtension(in.Options(), foov1.E_Sensitive).(bool)
}
filter := prototransform.Redact(isSensitive)
converter.Filters = prototransform.Filters{filter}
...

Now, any attribute marked as "sensitive" will be omitted from the output produced by the converter.

This package also includes a predicate named HasDebugRedactOption that can be used to redact data for fields that have the debug_redact standard option set (this option was introduced in protoc v22.0).

Community

For help and discussion around Protobuf, best practices, and more, join us on Slack.

Status

This project is currently in alpha. The API should be considered unstable and likely to change.

Legal

Offered under the Apache 2 license.

# Packages

No description provided by the author
No description provided by the author

# Functions

BinaryInputFormat convenience method for binary input format.
BinaryOutputFormat convenience method for binary output format.
BufTokenFromEnvironment returns a token that can be used to download the given module from the BSR by inspecting the BUF_TOKEN environment variable.
HasDebugRedactOption returns a function that can be used as a predicate, with [Redact], to omit fields where the `debug_redact` field option is set to true.
InputFormatWithoutResolver convenience method for input format without resolver.
JSONInputFormat convenience method for JSON input format.
JSONOutputFormat convenience method for JSON output format.
NewAuthInterceptor accepts a token for a Buf Schema Registry (BSR) and returns an interceptor which can be used when creating a Connect client so that every RPC to the BSR is correctly authenticated.
NewDefaultFileDescriptorSetServiceClient will create an authenticated connection to the public Buf Schema Registry (BSR) at https://buf.build.
NewSchemaPoller returns a SchemaPoller that uses the given Buf Reflection API client to download descriptors for the given module.
NewSchemaWatcher creates a new [SchemaWatcher] for the given [SchemaWatcherConfig].
OutputFormatWithoutResolver convenience method for output format without resolver.
Redact returns a Filter that will remove information from a message.
TextInputFormat convenience method for text input format.
TextOutputFormat convenience method for text output format.

# Variables

ErrLeaseStateNotYetKnown is an error that may be returned by Lease.IsHeld to indicate that the leaser has not yet completed querying for the lease's initial state.
ErrSchemaNotModified is an error that may be returned by a SchemaPoller to indicate that the poller did not return any descriptors because the caller's cached version is still sufficiently fresh.
ErrSchemaWatcherNotReady is an error returned from the various Find* methods of SchemaWatcher an initial schema has not yet been downloaded (or loaded from cache).
ErrSchemaWatcherStopped is an error returned from the AwaitReady method that indicates the schema watcher was stopped before it ever became ready.

# Structs

Converter allows callers to convert byte payloads from one format to another.
SchemaWatcher watches a schema in a remote registry by periodically polling.
SchemaWatcherConfig contains the configurable attributes of the [SchemaWatcher].

# Interfaces

Cache can be implemented and supplied to prototransform for added guarantees in environments where uptime is critical.
InputFormat provides the interface to supply the [Converter] with an input composition.
Lease represents a long-lived distributed lease.
Leaser provides access to long-lived distributed leases, for leader election or distributed locking.
Marshaler is a [Marshaler].
OutputFormat provides the interface to supply the [Converter] with an output composition.
Resolver is used to resolve symbol names and numbers into schema definitions.
SchemaPoller polls for descriptors from a remote source.
Unmarshaler is a [Unmarshaler].

# Type aliases

Filter provides a way for user-provided logic to alter the message being converted.
Filters is a slice of filters.