Skip to main content
Version: edge

kafka

The Kafka connectors kafka_consumer and kafka_producer provide integration with Apache Kafka and compatible products such as Confluent Kafka and Redpanda. Consuming from Kafka and producing to Kafka are handled by two separate connectors.

Both Kafka connectors in Tremor are built on top of [librdkafka] version 1.8.2 and expose the full complement of configuration settings. Care SHOULD be taken when configuring kafka with tremor to ensure that the configuration settings make sense given the logic required of the resulting system.

kafka_consumer

To consume from kafka, one needs to define a connector from kafka_consumer.

Configuration

It supports the following configuration options:

OptionDescriptionTypeRequiredDefault Value
group_idThe consumer group id to register with to the kafka cluster. Corresponds to the librdkafka group.id setting.stringyes
topicsThe topics to consumer from.list of stringsyes
brokersURLs to the cluster bootstrap servers to connect to. Corresponds to the librdkafka bootstrap.servers setting.list of stringsyes
modeDetermines the working mode of the connector. The following modes are supported: performance (Default), transactional and custom. See below for more details.See below.yes"performance"

Mode

Performance

The mode describes how the connector will consume from kafka. Using the mode "performance", this connector will automatically store all offsets of messages it receives and commit them to the broker every 5 secons. This mode has the lowest overhead and is best suited for performance-sensitive applications where a single missed message or a failed message isn ot a big deal. This is the default mode.

This mode essentially sets the following librdkafka configuration options:

Example:

define connector perf_consumer from kafka_consumer
with
codec = "json",
config = {
"brokers": [
"localhost:9092"
],
"group_id": "my_consumer_group",
"topics": [
"my_topic"
],
"mode": "performance"
}
end;
Transactional

The mode "transactional" is for workloads where each and every kafka message needs to be handled successfully in order to make progress. In this mode, the offset for every message is only stored once it has been handled successfully by a downstream connector. The setting commit_interval of the mode determines how often the offsets are committed to the kafka cluster. The default is 5 seconds. If this setting is set to 0, the connector will immediately commit every single event offset directly to the kafka cluster. This will lead to lots of traffic towards the group-coordinator, and should be avoided for high volume loads. This is the safest setting though.

Failed events will be replayed. If an event fails the partition offset of the event is reset, so the consumer will start consuming again from that point. This can lead to consuming kafka messages more than once, but it guarantees at-least-once delivery in the face of failing events.

Example:

use std::time::nanos;

define connector transactional_consumer from kafka_consumer
with
codec = "json",
config = {
"brokers": [
"localhost:9092"
],
"group_id": "my_consumer_group",
"topics": [
"my_topic"
],
"mode": {
"transactional": {
# this setting can be ommitted and defaults to 5 seconds
# if set to `0`, the connector commits offsets immediately
"commit_interval": nanos::from_seconds(1)
}
}
}
end;
Custom

The mode custom allows custom configuration of the connector and the underlying librdkafka. It contains two settings:

  • rdkafka_options: librdkafka configuration options. For possible options consult the librdkafka configuration options. The options group.id, client.id and bootstrap.servers are set from the connector config and cannot be overwritten.
  • retry_failed_events: If set to true this connector will behave as in transactional mode and reset the offset for every failed kafka message, effectively retrying it. Default: false

In order to customize your settings, it might be useful to know how the different modes can be achieved using the custom mode and adapting it to your needs. Transactional mode translates to the following rdkafka_options:

{
"mode": {
"custom": {
"rdkafka_options": {
"enable.auto.commit": true,
"enable.auto.offset.store": false,
"auto.commit.interval.ms": 5000
},
"retry_failed_events": true
}
}
}

Performance mode translates to the following rdkafka_options:

{
"mode": {
"custom": {
"rdkafka_options": {
"enable.auto.commit": true,
"enable.auto.offset.store": true,
"auto.commit.interval.ms": 5000
},
"retry_failed_events": false
}
}
}

For detailed semantics on how the consumer behaves with which settings, please consult the librdkafka configuration options.

Example configuration for kafka_consumer:

config.troy
use std::time::nanos;

define connector consumer from kafka_consumer
with
metrics_interval_s = 1,
reconnect = {
"retry": {
"interval_ms": 3000,
"max_retries": 10
}
},
codec = "json",
# Kafka specific consumer configuration
config = {
# List of broker bootstrap servers
"brokers": [
"127.0.0.1:9092"
],
"group_id": "test1", # Consumer group id

# required - list of subscription topics to register with
"topics": [
"tremor_test"
],
"mode": {
"custom": {
# Whether or not to retry failed attempts
# When true - resets the offset to a failed message for retry
# - Warning: where persistent failure is expected, this will lead to persistent errors
# When false - Only commits offset for a successful acknowledgement
"retry_failed_events": false,

# librdkafka configuration settings ( indicative illustrative example )
"rdkafka_options": {
"enable.auto.commit": "false", # this will only commit a message offset if the event has been handled successfully
"auto.commit.interval.ms": 5000, # this will auto-commit the current offset every 5s
"enable.auto.offset.store": true,
"debug": "consumer" # enable librdkafka debug logging of the consumer
"enable.partition.eof": false, # do not send an EOF if a partition becomes empty
"auto.offset.reset": "beginning", # always start consuming from the beginning of all partitions
}
}
}

}
end;

Event Metadata

Events consumed from a kafka_consumer connector will have the following event metadata:

{
"kafka_consumer": {
"key": ..., # binary message key
"headers": {
"kafka_message_header": ..., # binary header value
...
},
"topic": "my_topic", # topic name
"partition": 1, # numeric parition id of the message
"offset": 12, # numeric message offset in the given partition
"timestamp": 1230000000 # optional message timestamp in nanoseconds
}
}

It can be accessed in scripts or select statements in the following way:

match $ of
case %{ present kafka_consumer } => $kafka_consumer.partition
default => -1 # invalid partition
end;

kafka_producer

To produce events as kafka messages, the a connector needs to be defined from the kafka_producer connector type.

Configuration

It supports the following configuration options:

OptionDescriptionTypeRequiredDefault Value
brokersURLs to the cluster bootstrap servers to connect to. Corresponds to the librdkafka bootstrap.servers setting.list of stringsyes
topicThe topic to produce events to.stringyes
keyThe message key to add to the produced kafka messages. Can be overwritten by event metadata value $kafka_producer.key.stringno
rdkafka_optionslibrdkafka configuration. For possible options consult the librdkafka configuration docs.json record with string valuesnoBy default only client.id and bootstrap.servers is set.

Example configuration for kafka_producer:

    define connector producer from kafka_producer
with
# Enables metrics at a 1 second interval
metrics_interval_s = 1,
# event payload is serialized to JSON
codec = "json",

# Kafka specific producer configuration
config = {
# List of broker bootstrap servers
"brokers": [
"127.0.0.1:9092",
],
# the topic to send to
"topic": "tremor_test"
}
end;

Event Metadata

To control how the kafka_producer produces events as kafka messages, the following metadata options are available:

let $kafka_producer = {
"key": "message_key", # kafka message key as string or bytes
"headers": { # message headers
"my_bytes_header": <<"badger"/binary>>,
"my_string_header": "string"
},
"timestamp": 12345, # message timestamp
"partition": 3 # numeric partition id to publish message on
};
note

It is important to provide the metadata options underneath the key kafka_producer, otherwise they will be ignored.