# README
Golang Client for RabbitMQ (instrumented with Streamdal)
This library has been instrumented with Streamdal's Go SDK.
Getting Started
The following environment variables must be set before launching a producer or consumer:
STREAMDAL_ADDRESS
- Address for the streamdal server (Example:
localhost:8082
)
- Address for the streamdal server (Example:
STREAMDAL_AUTH_TOKEN
- Authentication token used by the server (Example:
1234
)
- Authentication token used by the server (Example:
STREAMDAL_SERVICE_NAME
- How this application/service will be identified in Streamdal Console (Example:
rabbitmq
)
- How this application/service will be identified in Streamdal Console (Example:
By default, the library will NOT have Streamdal instrumentation enabled; to enable it,
you will need to set the above environment variables and set EnableStreamdal
to true
š That's it - you're ready to run the example! š
For more in-depth explanation of the changes and available settings, see What's changed?.
Example
A fully working example is provided in _examples/producer and _examples/consumer.
To run the example:
-
Start a local RabbitMQ instance:
docker-compose -f _examples/docker-compose.yml up -d
-
Install & start Streamdal:
curl -sSL https://sh.streamdal.com | sh
-
Open a browser to verify you can see the streamdal UI at:
http://localhost:8080
- It should look like this:
- It should look like this:
-
Produce a message:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_AUTH_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=billing-svc \ go run _examples/producer/producer.go -body '{"name": "Demo Person", "email": "[email protected]"}' -key "demo"
-
Run a consume operation:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_AUTH_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=billing-svc \ go run _examples/consumer/consumer.go -key "demo"
-
Open the Streamdal Console in a browser https://localhost:8080
- It should look like this:
- It should look like this:
-
Create a pipeline that detects and masks PII fields & attach it to the consumer
-
In another terminal, produce a message again:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_AUTH_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=billing-svc \ go run _examples/producer/producer.go -body '{"name": "Demo Person", "email": "[email protected]"}' -key "demo"
-
Then run the consumer again, you should see a masked message in the consumer terminal:
2024/03/01 10:13:05 [INFO] got 56B delivery: [1] "{\"name\": \"Demo Person\", \"email\": \"demo****************\"}"
Passing "runtime" settings to the shim
By default, the shim will set the ComponentName
to "rabbitmq" and the OperationName
to the name of the exchange you are producing to. For the consumer, the
OperationName
will be set to the name of the exchange+"-"+routing key
.
Also, by default, if the shim runs into any errors executing streamdal.Process()
,
it will swallow the errors and return the original value.
When producing, you can set StreamdalRuntimeConfig
in PublishWithDeferredConfirmWithContext()
:
res, err := channel.PublishWithDeferredConfirmWithContext(
ctx,
"events",
"new-orders",
true,
false,
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
DeliveryMode: amqp.Persistent,
Priority: 0,
AppId: "sequential-producer",
Body: []byte(`{"email": "[email protected]"}`)
},
amqp.StreamdalRuntimeConfig{
OperationName: "custom-operation-name",
ComponentName: "custom-component-name",
},
)
// This will cause the producer to show up in Streamdal Console with the
// OperationName "custom-operation-name" and it will show as connected to
// "custom-component-name" (instead of the default "rabbitmq").
Passing StreamdalRuntimeConfig
in the consumer is not implemented yet!
What's changed?
The goal of any shim is to make minimally invasive changes so that the original library remains backwards-compatible and does not present any "surprises" at runtime.
The following changes have been made to the original library:
-
Added
EnableStreamdal
bool to the mainConfig
struct- This is how you enable the Streamdal instrumentation in the library.
-
Added streamdal instance to
amqp.Channel
-
Added
StreamdalRuntimeConfig
toPublishWithDeferredConfirmWithContext()
- This is how you can pass runtime settings to the shim. 4A new file ./streamdal.go has been added to the library that
contains helper funcs, structs and vars used for simplifying Streamdal
instrumentation in the core library.
-
Added shim code to
channel.Consume()' and
channel.PublishWithDeferredConfirmWithContext()`