Categorygithub.com/nicholasjackson/pipe
modulepackage
2.0.0-alpha.3+incompatible
Repository: https://github.com/nicholasjackson/pipe.git
Documentation: pkg.go.dev

# README

Pipe - Event Grid and Message Router

Docker Repository on Quay CircleCI Maintainability

This project allows you to listen to a variety of message sources and perform an action when a message is received. The documentation and the project is currently work in progress however curretly supported providers are:

  • Nats.io - read and write to nats streaming
  • HTTP - receive and send events over http

The project is built around a provider model where plugable elements can be added to the server to allow support for a variety of message sources.

Planned providers:

  • Log files - read and write to log files
  • SQS - AWS Simple Message Queue
  • PubSub - Google pub sub
  • Kafka
  • And more.

Configuration

To configure pipes HCL configuration file is used...

# Input block, will listen for nats messages on defined queue
input "nats_queue" "nq_in" {
  server = "nats://nats.service.consul:4222"
  cluster_id = "test-cluster"
  queue = "testmessagequeue"
}

# Output block, defines a http output
output "http" "nq_out" {
  protocol = "http"
  server = "localhost"
  port = 8080
  path = "/message"
}

pipe "accept_nats" {
  # Name of the input block
  input = "nq_in"

  # Do not handle messages older than
  expiration = "1h"

  # Action to perform when a new message is received
  action {
    # Name of the output
    output = "nq_out"

    # Transform the initial message
    template = <<EOF
      {
        "text": "Hey a picture from selfi drone",
        "image": "{{ .JSON.Data }}"
      }
    EOF

  }

  # Called when action succeedes
  on_success {
    output = "success"
  }
 
  on_success {
    output = "success"
  }

  # Called when the action fails
  on_fail {
    output = "fail"
  }
}

Template values

.Raw

Return raw binary data as an array of bytes from the message

.JSON

If the message type is application/json return an object which allows access to elements i.e.
Given:

{
  "Pets": [
    {"name": "fido"}
  ]
}

Then:

  {{ .JSON.Pets[0].name }} // fido

Note .JSON does not convert the output to JSON format, writing the direct output of .JSON.Pets would produce a go formatted object. To output json see the template function tojson.

Template functions

base64encode

Base64 encode []byte

input_template: |
{
  "image": "{{ base64encode .Raw }}"
}

base64decode

Base64 decode a string

input_template: |
  {{ base64decode .JSON.Image }}

tojson

Convert to valid json

input_template: |
  {{ tojson .JSON.Pets }}

Metrics

Metrics are exported using StatsD to import metrics into Prometheus please use the prometheus StatsD exporter https://hub.docker.com/r/prom/statsd-exporter/

Running the queue

To run the listener you can use the build docker container and provide a configuration file as a volume mount.

docker run -it \
  -v $(shell pwd)/examples:/etc/config \
  quay.io/nicholasjackson/faas-nats:latest \
  -config /etc/config/examples

Testing

There is a simple test harness in ./testharness/main.go which can be used to validate the subscription and transformations.

TODO

[x] Implement monitoring and metrics with StatsD
[ ] Finish documentation [ ] Write more examples [ ] Finish basic provider implementation

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author