Categorygithub.com/nats-io/kinesis-bridge
repositorypackage
0.1.0
Repository: https://github.com/nats-io/kinesis-bridge.git
Documentation: pkg.go.dev

# Packages

No description provided by the author

# README

Kinesis-NATS Bridge

The is a standalone program that reads records from AWS Kinesis streams and publishes them to NATS streams.

Configuration

A configuration file is used to delcare the Kinesis streams and their mapping to a NATS stream. Here is an example:

nats:
  # Optional NATS context to use which declares the configuration
  # to connect and authenticate with NATS.
  context: "kinesis-bridge"

  # Name of the KV bucket used to store shard offsets per stream. This
  # can be shared across all streams.
  bucket: "kinesis-bridge"

kinesis:
  # Each key is a stream name.
  sensor-data:
    # Encoding declares the encoding of the data. Must be set if
    # the properties will be accessible.
    encoding: json

    # Start position for shards when initialized. "earliest" or "new" (default).
    start_position: new

    # Declares the corresponding NATS configuration.
    nats:
      # The subject to publish a message to. This can be a concrete subject
      # like "sensor-data", but template variables are also supported (see below).
      subject: "sensor-data.{{.Data.facility_code}}.{{.Data.pointid}}"

      # Subject to publish if a record cannot be parsed and published to
      # the standard subject.
      dlq: "sensor-data.dlq"

Subject

The defined subject and DLQ subject must be bound to a stream.

The subject supports the following template variables:

  • Data - For the Data property to be accessible, the encoding type must be supported, e.g. json. The default encoding is assumed to be an opaque bytes.
  • PartitionKey - The partition key set on the record, if any.
  • SequenceNumber - The sequence number of the record in the stream.
  • ShardID - The shard ID on the stream that the record was in.

Headers

When a message is republished to NATS, the following headers are set:

  • Kinesis-Stream-Name - Name of the stream the message was from.
  • Kinesis-Shard-Id - ID of the shard the record was stored in.
  • Kinesis-Partition-Key - Partition key of the record.
  • Kinesis-Sequence-Number - Sequence number of the message within the shard.
  • Kinesis-Arrival-Timestamp - The arrival timestamp of the record within the stream.
  • Nats-Msg-Id - Hash of the stream name, shard, partition key, and sequence number.

Setup

Before running the bridge, the streams and KV bucket must be created that match the configuration.

Create a stream

The subjects specified in the configuration must be bound to a NATS stream. For example, a stream sensor-data could be created with a subject sensor-data.> which will match the messages that are successfully parsed as well as the ones needing to going into the DLQ subject.

To create the stream, use nats stream add. You will be prompted for each option, however three options that are important to define are the subjects, the replicas, and limits such as max age.

$ nats stream add --subjects "sensor-data.>" --replicas 3 --max-age "24h" sensor-data

Create a KV bucket

$ nats kv add --replicas 3 kinesis-bridge

AWS credentials

The official Go SDK for AWS and Kinesis is used. The default configuration loader will use the standard AWS_* environment variables if defined, otherwise it will fallback to the default profile in ~/.aws.