Categorygithub.com/withObsrvr/cdp-pipeline-workflow
modulepackage
0.0.0-20250212015740-54ee2d496a98
Repository: https://github.com/withobsrvr/cdp-pipeline-workflow.git
Documentation: pkg.go.dev

# README

CDP Pipeline Workflow

A data pipeline for processing Stellar blockchain data, with support for payment and account creation operations(WIP). Many of the consumers and processors are experimental and may not perform as they should.

Features

  • Processes Stellar blockchain data from multiple sources:
    • Amazon S3
    • Google Cloud Storage (with OAuth or Service Account)
    • Local filesystem
  • Transforms operations into standardized formats
  • Supports both payment and create account operations(WIP)
  • Outputs to multiple destinations (MongoDB, ZeroMQ)

Prerequisites

  • Go 1.22 or later
  • Access to one of:
    • Amazon S3
    • Google Cloud Storage
    • Local filesystem
  • MongoDB instance
  • ZeroMQ (optional)

Configuration

The pipeline is configured using YAML files. Example configurations:

S3 Source Configuration

pipeline:
  name: PaymentPipeline
  source:
    type: S3BufferedStorageSourceAdapter
    config:
      bucket_name: "your-bucket"
      region: "us-east-1"
      network: "testnet"
      buffer_size: 640
      num_workers: 10
      start_ledger: 2
      end_ledger: 1000  # optional
      ledgers_per_file: 64  # optional, default: 64
      files_per_partition: 10  # optional, default: 10

GCS OAuth Configuration

pipeline:
  name: PaymentPipeline
  source:
    type: GCSBufferedStorageSourceAdapter
    config:
      bucket_name: "your-bucket"
      network: "testnet"
      buffer_size: 640
      num_workers: 10
      start_ledger: 2
      access_token: "your-oauth-token"
      ledgers_per_file: 64
      files_per_partition: 10

MongoDB Consumer Configuration

pipelines:
  PaymentPipeline:
    source:
      # ... source configuration as above ...
    processors:
      - type: TransformToAppPayment
        config:
          network_passphrase: "Test SDF Network ; September 2015"
    consumers:
      - type: SaveToMongoDB
        config:
          uri: "mongodb-uri"
          database: "mongodb-db"
          collection: "mongodb-collection"
          connect_timeout: 10  # seconds

Installation

go get github.com/withObsrvr/cdp-pipeline-workflow

Usage

For GCS authentication:

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
./cdp-pipeline-workflow -config /path/to/config.yaml

For S3 authentication:

export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
./cdp-pipeline-workflow -config /path/to/config.yaml

For GCS OAuth authentication, you'll need to:

  1. Create a Google Cloud OAuth 2.0 Client ID:

    • Go to Google Cloud Console -> APIs & Services -> Credentials
    • Create a new OAuth 2.0 Client ID
    • Download the client configuration file
  2. Get an OAuth token using the Google OAuth 2.0 Playground:

  3. Use the token in your configuration:

pipeline:
  source:
    type: GCSBufferedStorageSourceAdapter
    config:
      access_token: "your-access-token"

Note: OAuth tokens are temporary and will expire. For production use, consider using service account authentication instead.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Type definitions.
Event represents a Soroban contract event.
No description provided by the author
Event statistics.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
RPC request/response structures.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Error types.
No description provided by the author
No description provided by the author
Configuration types.
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author