Categorygithub.com/mcgillowen/arkime-kafka-indexer
modulepackage
0.2.0
Repository: https://github.com/mcgillowen/arkime-kafka-indexer.git
Documentation: pkg.go.dev

# README

Go Reference Golang workflow Release workflow License CodeQL codebeat badge goreportcard

Arkime Kafka Indexer

An Elasticsearch indexer for Arkime SPI sent through Kafka using the Arkime Kafka plugin.

This "processor" is part of an ecosystem of "processors" that we are slowly open sourcing, this being the first one and most important since it is necessary for Arkime to function correctly. For an introduction to this ecosystem and why to use Kafka with Arkime see the Arkimeet 2023 - Arkime Stream Processing with Kafka talk (slides).

Usage

This indexer is designed mainly for use with Arkime and its Kafka plugin but can also be used for indexing any Kafka messages that are formatted in the ES Bulk format.

The Kafka plugin needs to be configured with either kafkaMsgFormat=bulk or kafkaMsgFormat=bulk1 and the dbBulkSize setting should be less than 1000000 since Kafka by default doesn't allow messages larger than 1MB, if you intend to enrich the SPIs with a lot of extra information the dbBulkSize should be set accordingly or use bulk1 but this also adds some extra processing overhead to the Kafka cluster.

plugins = kafka.so
kafkaMsgFormat = bulk/bulk1
dbBulkSize = 500000

The indexer can handle both bulk and bulk1, since the ES Bulk API is optimized for bulks larger than 1MB, the SPIs get bulked together and then flushed when either a time, byte size or SPI count limit is reached, these are set using the following environment variables respectively BULKER_FLUSH_INTERVAL, BULKER_MAX_BYTES and BULKER_MAX_MESSAGES. These ENV vars should be used to tune the bulking to maximize performance, the bulk_flush_reason_total metrics can be used to deduce what values to change.

The indexer can also republish SPIs that failed to be indexed into ES back to Kafka for reprocessing or something else, this is enabled by setting a producer topic using the KAFKA_PRODUCER_TOPIC ENV var, the other KAFKA_PRODUCER_* ENV vars should be set accordingly.

Kafka with SSL/TLS

Encrypted Kafka communication using SSL/TLS requires the following 4 ENV variables to be set:

  • KAFKA_SSL_CA_LOCATION: Path to the CA cert used for signing certs
  • KAFKA_SSL_CERT_LOCATION: Path to the client certificate
  • KAFKA_SSL_KEY_LOCATION: Path to the client key
  • KAFKA_SSL_KEY_PASSWORD: Path to the client key password

Only if all 4 variables are set will SSL/TLS communication be enabled.

The Docker compose stack is set up for using SSL/TLS, but the required certificates, keys and stores need to be generated before starting it. To generate the required files run the gen-aki-compose-certs.sh in the ssl folder.

More information about using SSL with librdkafka (underlying Kafka library used) check out https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka

Architecture

This processor is architected in the following way (see diagram below), the main components are:

  • Consumer: consumes messages from a Kafka topic as part of a consumer group and then sends them onwards using a buffered Go channel
  • Indexer: takes the contents of the Kafka messages that are in the Go channel and bulks them together , for ES performance reasons, and sends the bulked messages to ES using the Bulk API. The response is then checked for errors and any SPIs that failed to index are then sent into a Go channel.
  • Producer: takes the failed SPIs from the Go channel and produces them back into a Kafka topic, for further analysis and reprocessing if desired.

The number of indexers can be configured, this enables parallel sends to the elasticsearch cluster which increases the throughput.

Architecture

Installation

The easiest way to install the indexer is to use the prebuilt Docker images available at ghcr.io/mcgillowen/arkime-kafka-indexer. These can be run using:

docker run -e KAFKA_CONSUMER_TOPIC=<topic> -e KAFKA_CONSUMER_GROUP_NAME=<group name> \
           -e ELASTIC_SERVICE=<ES URL without http:// or port> ghcr.io/mcgillowen/arkime-kafka-indexer:latest

or take a look at the Docker Compose file in this repository.

There are also binaries for linux/amd64 and darwin/amd64 on the Releases page, we don't build other OS/arch combinations since we don't have systems to test them.

It can also be installed the normal Go way:

go install github.com/mcgillowen/[email protected]

Configuration

Configuration of the indexer is done using environment variables following the 12-factor app approach, documented at 12factor.net.

NAMEVARIABLETYPEDEFAULTDESCRIPTION
PortPORTstring8080Port for the HTTP server
LogLevelLOG_LEVELstringdebugThe level to log at
KafkaConsumerBrokersKAFKA_CONSUMER_BROKERSstringlocalhost:9092Kafka Broker to consume from
KafkaConsumerTopicKAFKA_CONSUMER_TOPICstringKafka topic to consume from
KafkaConsumerGroupNameKAFKA_CONSUMER_GROUP_NAMEstringName of the Kafka consumer group
KafkaConsumerIncrementalRebalanceKAFKA_CONSUMER_INCREMENTAL_REBALANCEboolfalseIf the cooperative rebalancing strategy should be used
KafkaProducerBrokersKAFKA_PRODUCER_BROKERSstringlocalhost:9092Kafka to produce to
KafkaProducerTopicKAFKA_PRODUCER_TOPICstringKafka topic to produce to
KafkaSSLCALocationKAFKA_SSL_CA_LOCATIONstringPath to the CA cert used for signing certs
KafkaSSLCertLocationKAFKA_SSL_CERT_LOCATIONstringPath to the client certificate
KafkaSSLKeyLocationKAFKA_SSL_KEY_LOCATIONstringPath to the client key
KafkaSSLKeyPasswordKAFKA_SSL_KEY_PASSWORDstringPassword of the client key
KafkaProducerMessageTimeoutKAFKA_PRODUCER_MSG_TIMEOUTtime.Duration30sProduced message timeout
KafkaProducerMessageRetriesKAFKA_PRODUCER_MSG_RETRIESint100Maximum of retries for a produced message
KafkaProducerQueueFullCooldownKAFKA_PRODUCER_FULL_QUEUE_COOLDOWNtime.Duration1sHow long to wait after a producer full queue error before retrying
KafkaProducerLogDeliveryReportsKAFKA_PRODUCER_LOG_DELIVERY_REPORTSbooltrueShould the delivery reports be logged
KafkaSessionTimeoutKAFKA_SESSION_TIMEOUTtime.Duration6000msKafka session timeout length
KafkaPollTimeoutKAFKA_POLL_TIMEOUTtime.Duration100msConsumer polling timeout
KafkaFlushIntervalKAFKA_FLUSH_INTERVALtime.Duration100msTimeout length when flushing Kafka messages at shutdown
BulkerFlushIntervalBULKER_FLUSH_INTERVALtime.Duration10sMaximum amount of time to buffer messages before sending them to ES
BulkerMaxMessagesBULKER_MAX_MESSAGESint100Maximum number of messages to buffer before sending them to ES
BulkerMaxBytesBULKER_MAX_BYTESint10_485_760Maximum number of bytes to buffer before sending them to ES
ElasticServiceELASTIC_SERVICEstringThe address of an Elasticsearch node, the client will discover the rest of nodes
ElasticServicePortELASTIC_SERVICE_PORTstring9200The ES HTTP port
ElasticUseHTTPSELASTIC_USE_HTTPSboolfalseShould the ES client communicate using HTTPS
ElasticUsernameELASTIC_USERNAMEstringThe username for ES basic authentication
ElasticPasswordELASTIC_PASSWORDstringThe password for ES basic authentication
ElasticAPIKeyELASTIC_API_KEYstringThe API key for ES authentication
ElasticServiceTokenELASTIC_SERVICE_TOKENstringThe service token for ES authentication
ElasticIndexerInstancesELASTIC_INDEXER_INSTANCESint1The number of parallel indexers to use
ElasticClientMaxRetriesELASTIC_CLIENT_MAX_RETRIESint10Number of retries when communicating with ES
ElasticClientRetryStatusesELASTIC_CLIENT_RETRY_STATUSES[]int502,503,504,429Which HTTP status codes to retry
ElasticClientDiscoverNodesELASTIC_CLIENT_DISCOVER_NODESbooltrueShould the client discover the other ES nodes
ElasticClientDiscoverIntervalELASTIC_CLIENT_DISCOVER_INTERVALtime.Duration1hInterval between updates of the list of ES connections
ElasticClientBulkTimeoutELASTIC_CLIENT_BULK_TIMEOUTtime.Duration500msThe timeout duration for the Bulk call
ElasticClientMaxDeadPercentageELASTIC_CLIENT_MAX_DEAD_PERCENTAGEint20The maximum percentage of dead ES connections
ElasticTransportDialTimeoutELASTIC_TRANSPORT_DIAL_TIMEOUTtime.Duration2s
ElasticTransportDialKeepAliveELASTIC_TRANSPORT_DIAL_KEEPALIVEtime.Duration5s
ElasticTransportMaxIdleConnsELASTIC_TRANSPORT_MAX_IDLE_CONNSint100
ElasticTransportMaxIdleConnsPerHostELASTIC_TRANSPORT_MAX_IDLE_CONNS_PER_HOSTint100
ElasticTransportMaxConnsPerHostELASTIC_TRANSPORT_MAX_CONNS_PER_HOSTint100
ElasticTransportIdleConnTimeoutELASTIC_TRANSPORT_IDLE_CONN_TIMEOUTtime.Duration10s
ElasticTransportExpectContinueTimeoutELASTIC_TRANSPORT_EXPECT_CONTINUE_TIMEOUTtime.Duration1s
ConsumerChannelBufferSizeCONSUMER_CHANNEL_BUFFER_SIZEint10
ErrorChannelBufferSizeERROR_CHANNEL_BUFFER_SIZEint10
ProducerChannelBufferSizePRODUCER_CHANNEL_BUFFER_SIZEint10
MetricsNamespaceMETRICS_NAMESPACEstringarkime
MetricsSubsystemMETRICS_SUBSYSTEMstringkafkaindexer
MetricsPathMETRICS_PATHstring/metrics
FlushedBytesBucketsMETRICS_FLUSHED_BYTES_BUCKETS[]float6450_000,100_000,500_000,1_000_000,5_000_000,25_000_000,50_000_000,75_000_000
FlushedMsgsBucketsMETRICS_FLUSHED_MSGS_BUCKETS[]float642,4,8,16,32,64,128,256

Tasks

The following subsections define tasks that can be run using the xc tool, this tool executes the commands within the code block of each subsection, with the name of the subsection being the name of the command, eg. xc lint for running the commands contained within the code block of the "Lint" subsection.

Deps

Installs the dependencies for building, formatting, linting, etc.

# Installs gofumpt for formatting the source code with extra rules
go install mvdan.cc/gofumpt@latest
# Installs golangci-lint for linting, uses the .golangci.yaml
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.62.2
# Installs the Go vulnerability checking tool
go install golang.org/x/vuln/cmd/govulncheck@latest

Precommit

Sets up pre-commit to run when committing.

Requires pre-commit to be installed before hand, see pre-commit/Installation

pre-commit run --all-files
pre-commit install

Format

Formats the code to ensure a consistent formatting

gofumpt -l -w .

Lint

Lints the source code according to the configuration in the .golangci.yaml file

govulncheck ./...
golangci-lint run ./...

Test

Runs the unit-tests

go test -race github.com/mcgillowen/arkime-kafka-indexer/...

Snapshot-Dry-Run

Creates a snapshot release using GoReleaser and GoReleaser-Cross but without publishing it.

docker run \
  --rm \
  -e CGO_ENABLED=1 \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -v `pwd`:/go/src/github.com/mcgillowen/arkime-kafka-indexer \
  -w /go/src/github.com/mcgillowen/arkime-kafka-indexer \
  ghcr.io/goreleaser/goreleaser-cross:v1.22.2 \
  --clean --skip=publish --snapshot

Snapshot

Creates a snapshot release using GoReleaser and GoReleaser-Cross.

docker run \
  --rm \
  -e CGO_ENABLED=1 \
  -v $(HOME)/.docker/config.json:/root/.docker/config.json \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -v `pwd`:/go/src/github.com/mcgillowen/arkime-kafka-indexer \
  -w /go/src/github.com/mcgillowen/arkime-kafka-indexer \
  ghcr.io/goreleaser/goreleaser-cross:v1.22.2 \
  --clean --snapshot

Release-Dry-Run

Creates a normal release using GoReleaser and GoReleaser-Cross but without publishing it.

docker run \
  --rm \
  -e CGO_ENABLED=1 \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -v `pwd`:/go/src/github.com/mcgillowen/arkime-kafka-indexer \
  -w /go/src/github.com/mcgillowen/arkime-kafka-indexer \
  ghcr.io/goreleaser/goreleaser-cross:v1.22.2 \
  --clean --skip=publish

Release

Creates a release using GoReleaser and GoReleaser-Cross.

Inputs: GITHUB_TOKEN

docker run \
  --rm \
  -e CGO_ENABLED=1 \
  -e GITHUB_TOKEN=$GITHUB_TOKEN \
  -v $HOME/.docker/config.json:/root/.docker/config.json \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -v `pwd`:/go/src/github.com/mcgillowen/arkime-kafka-indexer \
  -w /go/src/github.com/mcgillowen/arkime-kafka-indexer \
  ghcr.io/goreleaser/goreleaser-cross:v1.22.2 \
  release --clean

# Packages

Package es contains all the components required to communicate with Elasticsearch.
Package kafka contains all the components required to consume and produce Kafka messages.
Package metrics contains the struct used for exposing Prometheus metrics and a collector that works to collect metrics from the ES client.