Categorygithub.com/connylabs/ingest
modulepackage
0.0.0-20231219154613-73bd65c814b0
Repository: https://github.com/connylabs/ingest.git
Documentation: pkg.go.dev

# README

ingest

Ingest is a pluggable tool that makes it easy to orchestrate the synchronization of objects from any API into an object storage system.

Concept

Ingest includes a main binary and some plugins for some common storage systems, like S3. Each plugin can implement a data source, a destination, or both. The S3 plugin implements both the source and destination interface, but some plugins may only be able to act as either of them. The plugins are loaded at runtime and enable users to implement their own custom plugins.

Workflows

A workflow specifies a data source and one or more destinations. When configured, objects from the source will be copied to all destinations.

Configuration

The exact configurations depends on the plugin. Typically, the S3 plugin requires an URL, an access key and a secret access key. This may differ from plugins that don't require authentication or a different kind of authentication.

The following example shows the configuration to copy objects between two instances of S3:

sources:
- name: foo_1
  type: s3
  endpoint: source.amazon.com
  bucket: source
  prefix: prefix/
  accessKeyID: key
  secretAccessKey: secret
destinations:
- name: bar_1
  type: s3
  endpoint: destination.amazon.com
  insecure: true
  bucket: destination
  prefix: prefix1/
  metafilesPrefix: meta/
  accessKeyID: key
  secretAccessKey: secret
workflows:
- name: foo_1-bar_1
  source: foo_1
  destinations:
  - bar_1
  batchSize: 1
  interval: 300s
  cleanUp: true
  webhook: http://localhost:8080

Deployment

The deployment of ingest contains of two parts.

One part is called the enqueuer. It is started with the flag --mode=enqueue. It will scan the configured sources and push a message for each item in the source to its NATS stream.

The other part is the dequeuer. It is started with the flag --mode=dequeue. It will pop messages from the NATS stream and copy each object identified by the NATS message to the configured destinations.

Usage as a Library

The library is split into two packages:

  1. enqueue: responsible for putting items into a queue; and
  2. dequeue: responsible for reading items from the queue, perform the download from the API, and the upload object into object storage.

Enqueuer

The Enqueuer fetches all elements from the external API and puts them into the NATS queue. It can put either just an ID or the entire contents of the object into the queue.

For a new service you must implement the following Nexter interface:

// Nexter is able to list the elements available in the external API and returns them one by one.
// A Nexter must be implemented for the specific service.
type Nexter interface {
	// Reset initializes or resets the state of the Nexter.
	// After Reset, calls of Next should retrieve all elements.
	Reset(context.Context) error
	// Next returns a *Codec that represents an element.
	// If all elements were returned by Next, io.EOF must be returned.
	Next(context.Context) (*Codec, error)
}

To use the Enqueuer create, a new one with enqueue.New. It implements the following interface:

// Enqueuer is able to enqueue elements into NATS.
type Enqueuer interface {
	// Enqueue adds all of the elements that the Nexter will produce into the queue.
	Enqueue(context.Context) error
}

Dequeuer

The Dequeuer reads from the queue and uploads the object into object storage. You need implement the Client interface.

// Object represents an object that can be uploaded into object storage.
type Object struct {
	// MimeType is the HTTP-style Content-Type of the object.
	MimeType string
	// Len is the length of the underlying buffer of the io.Reader.
	Len    int64
	Reader io.Reader
}
// Client is able to create an Object from a Codec.
// Client must be implemented by the caller.
type Client interface {
	// Download converts a Codec into an Object.
	// In most cases it will use the ID of the Codec
	// to download the object from an API,
	Download(context.Context, Codec) (*Object, error)
	// CleanUp is called after an object is uploaded
	// to object storage. In most cases, this will
	// serve the purpose of removing the object from
	// the source API so that it is no longer returned
	// by the Nexter and as such is not resynchronized.
	// This method must be idempotent and safe to call
	// multiple times.
	CleanUp(context.Context, Codec) error
}

To use the Dequeuer, create a new one with dequeuer.New. It implements the following interface:

// Dequeuer is able to dequeue elements from the queue and upload objects to object storage.
type Dequeuer interface {
	// Dequeue will read relevant messages on a Queue and upload the corresponding
	// Objects to object storage until the given context is cancelled.
	Dequeue(context.Context) error
}

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

NewCodec creates a Codec from an existing Identifiable.

# Constants

DefaultBatchSize default size of the batch of messages pulled from the queue.

# Structs

Codec implements the Identifiable and Codec interfaces and is used as the default marshaler/unmarshaler for Identifiables.
Object represents an object that can be uploaded into object storage.

# Interfaces

Client is able to create an Object from a Codec.
Dequeuer is able to dequeue elements from the queue and upload objects to object storage.
Enqueuer is able to enqueue elements into NATS.
Nexter is able to list the elements available in the external API and returns them one by one.
Queue is able to publish messages and subscribe to incoming messages.
Subscription is able to pull a batch of messages from a stream for a pull consumer.