Categorygithub.com/streamdal/ibm-sarama
repositorypackage
1.42.2-streamdal1
Repository: https://github.com/streamdal/ibm-sarama.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author

# README

Confluent's Golang Client for Apache KafkaTM (instrumented with Streamdal)

This library has been instrumented with Streamdal's Go SDK.

Getting Started

The following environment variables must be set before launching a producer or consumer:

  1. STREAMDAL_ADDRESS
    • Address for the streamdal server (Example: localhost:8082)
  2. STREAMDAL_AUTH_TOKEN
    • Authentication token used by the server (Example: 1234)
  3. STREAMDAL_SERVICE_NAME
    • How this application/service will be identified in Streamdal Console (Example: kafkacat)

By default, the library will not have Streamdal instrumentation enabled; to enable it, you will need to TODO.

šŸŽ‰ That's it - you're ready to run the example! šŸŽ‰

For more in-depth explanation of the changes and available settings, see What's changed?.

Example

A fully working example is provided in examples/go-kafkacat-streamdal.

To run the example:

  1. Change directory to examples/go-kafkacat-streamdal
  2. Start a local Kafka instance: docker-compose up -d
  3. Install & start Streamdal: curl -sSL https://sh.streamdal.com | sh
  4. Open a browser to verify you can see the streamdal UI at: http://localhost:8080
    • It should look like this: streamdal-console-1
  5. Launch a consumer:
    STREAMDAL_ADDRESS=localhost:8082 \
    STREAMDAL_AUTH_TOKEN=1234 \
    STREAMDAL_SERVICE_NAME=kafkacat \
    go run go-kafkacat-streamdal.go --broker localhost consume --group testgroup test
    
  6. In another terminal, launch a producer:
    STREAMDAL_ADDRESS=localhost:8082 \
    STREAMDAL_AUTH_TOKEN=1234 \
    STREAMDAL_SERVICE_NAME=kafkacat \
    go run go-kafkacat-streamdal.go produce --broker localhost --topic test --key-delim=":"
    
  7. In the producer terminal, produce some data by pasting: testKey:{"email":"[email protected]"}
  8. In the consumer terminal, you should see: {"email":"[email protected]"}
  9. Open the Streamdal Console in a browser https://localhost:8080
    • It should look like this: streamdal-console-2
  10. Create a pipeline that detects and masks PII fields & attach it to the consumer
    • streamdal-console-3
  11. Produce a message in producer terminal: testKey:{"email":"[email protected]"}
  12. You should see a masked message in the consumer terminal: {"email":"fo*********"}
    • Tip: If you detach the pipeline from the consumer and paste the same message again, you will see the original, unmasked message.

Passing "runtime" settings to the shim

By default, the shim will set the ComponentName to "kafka" and the OperationName to the name of the topic you are producing to or consuming from.

Also, by default, if the shim runs into any errors executing streamdal.Process(), it will swallow the errors and return the original value.

When producing, you can set StreamdalRuntimeConfig in the ProducerMessage:

msg := &ProducerMessage{
	    Topic: "test",
    Value: []byte("hello, world"),
    StreamdalRuntimeConfig: &StreamdalRuntimeConfig{
        ComponentName: "kafka",
        OperationName: "produce",
    },
}

// And then pass the msg to the producer.Input() channel as usual

Passing StreamdalRuntimeConfig in the consumer is not implemented yet!

What's changed?

The goal of any shim is to make minimally invasive changes so that the original library remains backwards-compatible and does not present any "surprises" at runtime.

NOTE: IBM/sarama is significantly more complex than other Go Kafka libraries, so the integration is a bit more invasive than other shims.

The following changes have been made to the original library:

  1. Added EnableStreamdal bool to the main Config struct
    • This is how you enable the Streamdal instrumentation in the library.
  2. Added Streamdal setup in newAsyncProducer() and newConsumer()
    • newAsyncProducer() is used for both NewSyncProducer() and NewAsyncProducer()
  3. Updated async_producer.go:dispatcher() to call on Streamdal's `.Process()
    • This function is called for every message produced to Kafka via the Input() channel
  4. Updated consumer.go:responseFeeder() to call on Streamdal's `.Process()
    • This function feeds messages to Consume()
  5. A new file ./streamdal.go has been added to the library that contains helper funcs, structs and vars used for simplifying Streamdal instrumentation in the core library.