# README
Kafka
The Kafka input component enable consuming messages from Kafka topics with configurable settings such as partition fetch size, offset reset policies, and balancing strategies.
Config Definition
class Kafka extends Input {
fixed sourceName = "kafka"
common: Common.Kafka
autoCommitEnabled: Boolean = true
consumerGroupID: String
autoOffsetReset: AutoOffsetReset = "earliest"
balancerStrategy: Listing<Strategy> = new Listing<Strategy> {
"cooperative-sticky"
}
maxPartitionFetchBytes: DataSize(validateBuffersSizes) = 1.mib
fetchMaxBytes: DataSize(validateBuffersSizes) = 50.mib
}
Common.Kafka Definition
The common
attribute of the Kafka input component references the Common.Kafka class, which defines essential configurations for connecting to Kafka brokers and interacting with topics.
Common.Kafka Config
class KafkaAuth {
saslMechanism: SASLMechanism
saslUsername: String
saslPassword: String
}
class Kafka {
saslAuth: KafkaAuth?
brokers: Listing<String>
version: String?
topics: Listing<String>
}
Common.Kafka Attributes
Attribute | Type | Description | Default Value |
---|---|---|---|
saslMechanism | SASLMechanism | SASL mechanism to use (SCRAM-SHA-512 or SCRAM-SHA-256 ). | null |
saslUsername | String | SASL authentication username. | null |
saslPassword | String | SASL authentication password. | null |
brokers | Listing<String> | List of Kafka broker addresses. | Required |
version | String | Kafka protocol version (optional). | null |
topics | Listing<String> | List of Kafka topics to subscribe to. | Required |
Validations
-
SASL Mechanism Validation
- If
saslAuth
is not null andsaslMechanism
is not set, the following error is thrown:'saslMechanism' can not be null
- If
-
SASL Credentials Validation
- If
saslAuth
is not null, bothsaslUsername
andsaslPassword
must be provided. Otherwise, the following error is thrown:
'saslUsername' and 'saslPassword' can not be empty string or null
- If
Kafka Input Attributes
Attribute | Type | Description | Default Value |
---|---|---|---|
common | Common.Kafka | Reusable Kafka connection settings (e.g., brokers, SASL). | Required |
autoCommitEnabled | Boolean | Enables or disables auto-commit for consumer offsets. | true |
consumerGroupID | String | Consumer group ID for managing Kafka consumers and partition ownership. | Required |
autoOffsetReset | AutoOffsetReset | Behavior when there is no initial offset or when the offset is invalid (earliest or latest ). | "earliest" |
balancerStrategy | Listing<Strategy> | Strategy used for partition assignment during rebalancing. | ["cooperative-sticky"] |
maxPartitionFetchBytes | DataSize | Maximum data fetched per partition per request. | 1.mib |
fetchMaxBytes | DataSize | Maximum data fetched across all partitions per request. | 50.mib |
Validations
Buffer Size Validation
- Ensures
fetchMaxBytes
is greater than or equal tomaxPartitionFetchBytes
. - If the validation fails, an exception is thrown:
'fetchMaxBytes' should be more than 'maxPartitionFetchBytes'
Pkl Configuration Example
Basic Kafka Input
new Inputs.Kafka {
common = new Common.Kafka {
brokers = {
"broker1:9092"
"broker2:9092"
}
topics = {
"example-topic"
}
}
consumerGroupID = "example-consumer-group"
}
Kafka Input with SASL Authentication
new Inputs.Kafka {
common = new Common.Kafka {
saslEnabled = true
saslMechanism = "SCRAM-SHA-512"
saslUsername = "example-user"
saslPassword = "example-password"
brokers = {
"broker1:9092"
"broker2:9092"
}
topics = {
"secure-topic"
}
}
consumerGroupID = "example-secure-consumer"
}
Attributes in Detail
Common Kafka Settings (common
)
- Defines reusable Kafka connection attributes (e.g., brokers, topics, SASL authentication).
Auto Commit Enabled (autoCommitEnabled
)
- If
true
, offsets are automatically committed to Kafka. - If
false
, manual offset commit is required.
Consumer Group ID (consumerGroupID
)
- Identifies the group of Kafka consumers that share load and maintain offset tracking.
Offset Reset Behavior (autoOffsetReset
)
earliest
: Start consuming from the earliest available message.latest
: Start consuming from the latest message.
Partition Balancing Strategy (balancerStrategy
)
- Default:
cooperative-sticky
ensures minimal partition movement during rebalancing. - Custom strategies can be added for advanced partitioning needs.
Use Cases
-
Basic Kafka Consumption
- Use the
common
attribute to specify brokers and topics, along with a consumer group ID for basic use cases.
- Use the
-
Secure Kafka Consumption
- Use
saslEnabled
,saslMechanism
,saslUsername
, andsaslPassword
to secure the Kafka connection.
- Use
-
Optimized Data Transfer
- Adjust
maxPartitionFetchBytes
andfetchMaxBytes
to fine-tune data fetching and improve performance.
- Adjust
Notes
- Ensure
Common.Kafka
is configured with valid brokers and topics. - Validate buffer sizes (
fetchMaxBytes
andmaxPartitionFetchBytes
) to avoid runtime errors.
# Functions
No description provided by the author