Categorygithub.com/streamdal/rabbitmq-amqp091-go
repositorypackage
0.0.0-20240502191843-eaead84377d6
Repository: https://github.com/streamdal/rabbitmq-amqp091-go.git
Documentation: pkg.go.dev

# README

Golang Client for RabbitMQ (instrumented with Streamdal)

This library has been instrumented with Streamdal's Go SDK.

Getting Started

The following environment variables must be set before launching a producer or consumer:

  1. STREAMDAL_ADDRESS
    • Address for the streamdal server (Example: localhost:8082)
  2. STREAMDAL_AUTH_TOKEN
    • Authentication token used by the server (Example: 1234)
  3. STREAMDAL_SERVICE_NAME
    • How this application/service will be identified in Streamdal Console (Example: rabbitmq)

By default, the library will NOT have Streamdal instrumentation enabled; to enable it, you will need to set the above environment variables and set EnableStreamdal to true

šŸŽ‰ That's it - you're ready to run the example! šŸŽ‰

For more in-depth explanation of the changes and available settings, see What's changed?.

Example

A fully working example is provided in _examples/producer and _examples/consumer.

To run the example:

  1. Start a local RabbitMQ instance: docker-compose -f _examples/docker-compose.yml up -d

  2. Install & start Streamdal: curl -sSL https://sh.streamdal.com | sh

  3. Open a browser to verify you can see the streamdal UI at: http://localhost:8080

    • It should look like this: streamdal-console-1
  4. Produce a message:

    STREAMDAL_ADDRESS=localhost:8082 \
    STREAMDAL_AUTH_TOKEN=1234 \
    STREAMDAL_SERVICE_NAME=billing-svc \
    go run _examples/producer/producer.go -body '{"name": "Demo Person", "email": "[email protected]"}' -key "demo"
    
  5. Run a consume operation:

    STREAMDAL_ADDRESS=localhost:8082 \
    STREAMDAL_AUTH_TOKEN=1234 \
    STREAMDAL_SERVICE_NAME=billing-svc \
    go run _examples/consumer/consumer.go -key "demo"
    
  6. Open the Streamdal Console in a browser https://localhost:8080

    • It should look like this: streamdal-console-2
  7. Create a pipeline that detects and masks PII fields & attach it to the consumer

    • streamdal-console-3
  8. In another terminal, produce a message again:

    STREAMDAL_ADDRESS=localhost:8082 \
    STREAMDAL_AUTH_TOKEN=1234 \
    STREAMDAL_SERVICE_NAME=billing-svc \
    go run _examples/producer/producer.go -body '{"name": "Demo Person", "email": "[email protected]"}' -key "demo"
    
  9. Then run the consumer again, you should see a masked message in the consumer terminal:

    2024/03/01 10:13:05 [INFO] got 56B delivery: [1] "{\"name\": \"Demo Person\", \"email\": \"demo****************\"}"
    

Passing "runtime" settings to the shim

By default, the shim will set the ComponentName to "rabbitmq" and the OperationName to the name of the exchange you are producing to. For the consumer, the OperationName will be set to the name of the exchange+"-"+routing key.

Also, by default, if the shim runs into any errors executing streamdal.Process(), it will swallow the errors and return the original value.

When producing, you can set StreamdalRuntimeConfig in PublishWithDeferredConfirmWithContext():

res, err := channel.PublishWithDeferredConfirmWithContext(
   ctx,
   "events",
   "new-orders",
   true,
   false,
   amqp.Publishing{
      Headers:         amqp.Table{},
      ContentType:     "application/json",
      ContentEncoding: "",
      DeliveryMode:    amqp.Persistent,
      Priority:        0,
      AppId:           "sequential-producer",
      Body:            []byte(`{"email": "[email protected]"}`)
   },
   amqp.StreamdalRuntimeConfig{
      OperationName: "custom-operation-name",
      ComponentName: "custom-component-name",
   },
)

// This will cause the producer to show up in Streamdal Console with the
// OperationName "custom-operation-name" and it will show as connected to
// "custom-component-name" (instead of the default "rabbitmq").

Passing StreamdalRuntimeConfig in the consumer is not implemented yet!

What's changed?

The goal of any shim is to make minimally invasive changes so that the original library remains backwards-compatible and does not present any "surprises" at runtime.

The following changes have been made to the original library:

  1. Added EnableStreamdal bool to the main Config struct

    • This is how you enable the Streamdal instrumentation in the library.
  2. Added streamdal instance to amqp.Channel

  3. Added StreamdalRuntimeConfig to PublishWithDeferredConfirmWithContext()

    • This is how you can pass runtime settings to the shim. 4A new file ./streamdal.go has been added to the library that

    contains helper funcs, structs and vars used for simplifying Streamdal

    instrumentation in the core library.

  4. Added shim code to channel.Consume()' and channel.PublishWithDeferredConfirmWithContext()`