# README
Confluent's Golang Client for Apache KafkaTM (instrumented with Streamdal)
This library has been instrumented with Streamdal's Go SDK.
Getting Started
The following environment variables must be set before launching a producer or consumer:
STREAMDAL_ADDRESS
- Address for the streamdal server (Example:
localhost:8082
)
- Address for the streamdal server (Example:
STREAMDAL_AUTH_TOKEN
- Authentication token used by the server (Example:
1234
)
- Authentication token used by the server (Example:
STREAMDAL_SERVICE_NAME
- How this application/service will be identified in Streamdal Console (Example:
kafkacat
)
- How this application/service will be identified in Streamdal Console (Example:
By default, the library will not have Streamdal instrumentation enabled; to enable it, you will need to TODO.
š That's it - you're ready to run the example! š
For more in-depth explanation of the changes and available settings, see What's changed?.
Example
A fully working example is provided in examples/go-kafkacat-streamdal.
To run the example:
- Change directory to
examples/go-kafkacat-streamdal
- Start a local Kafka instance:
docker-compose up -d
- Install & start Streamdal:
curl -sSL https://sh.streamdal.com | sh
- Open a browser to verify you can see the streamdal UI at:
http://localhost:8080
- It should look like this:
- It should look like this:
- Launch a consumer:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_AUTH_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=kafkacat \ go run go-kafkacat-streamdal.go --broker localhost consume --group testgroup test
- In another terminal, launch a producer:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_AUTH_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=kafkacat \ go run go-kafkacat-streamdal.go produce --broker localhost --topic test --key-delim=":"
- In the
producer
terminal, produce some data by pasting:testKey:{"email":"[email protected]"}
- In the
consumer
terminal, you should see:{"email":"[email protected]"}
- Open the Streamdal Console in a browser https://localhost:8080
- It should look like this:
- It should look like this:
- Create a pipeline that detects and masks PII fields & attach it to the consumer
- Produce a message in producer terminal:
testKey:{"email":"[email protected]"}
- You should see a masked message in the consumer terminal:
{"email":"fo*********"}
- Tip: If you detach the pipeline from the consumer and paste the same message again, you will see the original, unmasked message.
Passing "runtime" settings to the shim
By default, the shim will set the ComponentName
to "kafka" and the OperationName
to the name of the topic you are producing to or consuming from.
Also, by default, if the shim runs into any errors executing streamdal.Process()
,
it will swallow the errors and return the original value.
When producing, you can set StreamdalRuntimeConfig
in the ProducerMessage
:
msg := &ProducerMessage{
Topic: "test",
Value: []byte("hello, world"),
StreamdalRuntimeConfig: &StreamdalRuntimeConfig{
ComponentName: "kafka",
OperationName: "produce",
},
}
// And then pass the msg to the producer.Input() channel as usual
Passing StreamdalRuntimeConfig
in the consumer is not implemented yet!
What's changed?
The goal of any shim is to make minimally invasive changes so that the original library remains backwards-compatible and does not present any "surprises" at runtime.
NOTE: IBM/sarama
is significantly more complex than other Go Kafka libraries,
so the integration is a bit more invasive than other shims.
The following changes have been made to the original library:
- Added
EnableStreamdal
bool to the mainConfig
struct- This is how you enable the Streamdal instrumentation in the library.
- Added Streamdal setup in
newAsyncProducer()
andnewConsumer()
newAsyncProducer()
is used for bothNewSyncProducer()
andNewAsyncProducer()
- Updated
async_producer.go:dispatcher()
to call on Streamdal's `.Process()- This function is called for every message produced to Kafka via the
Input()
channel
- This function is called for every message produced to Kafka via the
- Updated
consumer.go:responseFeeder()
to call on Streamdal's `.Process()- This function feeds messages to
Consume()
- This function feeds messages to
- A new file ./streamdal.go has been added to the library that contains helper funcs, structs and vars used for simplifying Streamdal instrumentation in the core library.