Kafka Sink
Overview
The Kafka sink writes events to a Kafka topic. It can extract the partition key from the record based on specific sources such as the stream ID, headers, or record key and also supports basic authentication and resilience features to handle transient errors.
Quickstart
You can create the Kafka Sink connector as follows. Replace id
with a unique connector name or ID:
POST /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "kafka-sink",
"bootstrapServers": "localhost:9092",
"topic": "customers",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream"
}
}
After creating and starting the Kafka sink connector, every time an event is appended to the example-stream
, the Kafka sink connector will send the record to the specified Kafka topic. You can find a list of available management API endpoints in the API Reference.
Settings
Adjust these settings to specify the behavior and interaction of your Kafka sink connector with KurrentDB, ensuring it operates according to your requirements and preferences.
Tips
The Kafka sink inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page.
The Kafka sink can be configured with the following options:
Name | Details |
---|---|
topic | required Description: The Kafka topic to produce records to. |
bootstrapServers | Description: Comma-separated list of Kafka broker addresses. Default: "localhost:9092" |
defaultHeaders | Description: Headers included in all produced messages. Default: Empty |
Authentication
Name | Details |
---|---|
authentication:securityProtocol | Description: Protocol used for Kafka broker communication. Default: "plaintext" Accepted Values: - "plaintext" , "saslPlaintext" or "saslSsl" |
authentication:saslMechanism | Description: SASL mechanism to use for authentication. Default: "plain" Accepted Values: - "plain" , "scramSha256" , or "scramSha512" |
authentication:username | Description: SASL username |
authentication:password | Description: SASL password |
Partitioning
Name | Details |
---|---|
partitionKeyExtraction:enabled | Description: Enables partition key extraction. Default: "false" |
partitionKeyExtraction:source | Description: Source for extracting the partition key. See Partitioning Accepted Values: "partitionKey" , "stream" , "streamSuffix" , or "headers" Default: "partitionKey" |
partitionKeyExtraction:expression | Description: Regular expression for extracting the partition key. |
See the Partitioning section for examples.
Resilience
The Kafka sink connector relies on its own Kafka retry mechanism and doesn't include the configuration from Resilience configuration.
Name | Details |
---|---|
waitForBrokerAck | Description: Whether the producer waits for broker acknowledgment before considering the send operation complete. See Broker Acknowledgment Default: "true" |
resilience:reconnectBackoffMaxMs | Description: The maximum time to wait before reconnecting to a broker after the connection has been closed. Default: "20000" |
resilience:messageSendMaxRetries | Description: How many times to retry sending a failing Message. Default: "2147483647" |
Miscellaneous
Name | Details |
---|---|
brokerAddressFamily | Description: Allowed broker IP address families. Default: "V4" Accepted Values: "Any" ,"V4" , or "V6" |
compression:type | Description: Kafka compression type. Default: "Zstd" Accepted Values: "None" , "Gzip" ,"Lz4" , "Zstd" , or "Snappy" |
compression:level | Description: Kafka compression level. Default: "6" |
Delivery Guarantees
The Kafka sink guarantees at least once delivery through Kafka's built-in idempotent producer mechanism and configurable retry settings. Messages are only checkpointed after successful delivery confirmation from Kafka.
The waitForBrokerAck
setting controls delivery behavior:
- If enabled, the connector blocks until the broker confirms delivery before advancing its checkpoint, trading throughput for stronger delivery guarantees.
- If disabled, messages are sent asynchronously and checkpointed after confirmed delivery, yielding higher throughput at the cost of weaker ordering guarantees.
If a failure occurs before acknowledgment, the retry mechanism will attempt redelivery. If the connector restarts, it will resume from the last successfully checkpointed position and may reprocess messages that were sent but not yet checkpointed.
Headers
The Kafka sink connector lets you include custom headers in the message headers it sends to your topic. To add custom headers, use the defaultHeaders
setting in your connector configuration. Each custom header should be specified with the prefix defaultHeaders:
followed by the header name.
Example:
PUT /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"defaultHeaders:X-API-Key": "your-api-key-here",
"defaultHeaders:X-Tenant-ID": "production-tenant",
"defaultHeaders:X-Source-System": "KurrentDB"
}
These headers will be included in every message sent by the connector, in addition to the default headers automatically added by the connector's plugin.
Examples
Authentication
The Kafka sink connector supports secure communication with Kafka brokers using SASL authentication. By default, the connector communicates in plaintext without authentication. However, you can configure it to use SASL with different security protocols and authentication mechanisms.
Note
When using saslSsl
, the connector uses your system's trusted CA certificates for SSL/TLS encryption. This works with managed services like AWS MSK, Confluent Cloud, and Azure Event Hubs. For self-signed or private CA certificates, add them to your system's trust store first.
SASL/PLAINTEXT with PLAIN Authentication
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslPlaintext",
"authentication:saslMechanism": "plain",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
SASL/PLAINTEXT with SCRAM-SHA-256 Authentication
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslPlaintext",
"authentication:saslMechanism": "scramSha256",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
SASL/PLAINTEXT with SCRAM-SHA-512 Authentication
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslPlaintext",
"authentication:saslMechanism": "scramSha512",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
SASL/SSL with PLAIN Authentication
For production environments with encryption (recommended for managed Kafka services):
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslSsl",
"authentication:saslMechanism": "plain",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
SASL/SSL with SCRAM-SHA-256 Authentication
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslSsl",
"authentication:saslMechanism": "scramSha256",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
SASL/SSL with SCRAM-SHA-512 Authentication
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslSsl",
"authentication:saslMechanism": "scramSha512",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
Additional resources
- Azure Event Hub Security and Authentication
- Set up SASL/SCRAM authentication for an Amazon MSK cluster
- Use SASL/SCRAM authentication in Confluent Platform
Partitioning
The Kafka sink connector allows customizing the partition keys that are sent with the message.
By default, it will use "partitionKey"
and the message will be distributed using round-robin partitioning across the available partitions in the topic.
Partition using Stream ID
You can extract part of the stream name using a regular expression (regex) to define the partition key. The expression is optional and can be customized based on your naming convention. In this example, the expression captures the stream name up to _data
.
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "stream",
"partitionKeyExtraction:expression": "^(.*)_data$"
}
Alternatively, if you only need the last segment of the stream name (after a hyphen), you can use the streamSuffix
source. This doesn't require an expression since it automatically extracts the suffix.
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "streamSuffix"
}
The streamSuffix
source is useful when stream names follow a structured format, and you want to use only the trailing part as the partition key. For example, if the stream is named user-123
, the partition key would be 123
.
Partition using header values
You can create partition keys by combining values from a record's metadata.
PUT /connectors/{{id}}/settings
Host: localhost:2113
Content-Type: application/json
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "headers",
"partitionKeyExtraction:expression": "key1,key2"
}
Specify the header keys you want to use in the partitionKeyExtraction:expression
field (e.g., key1,key2
). The connector will concatenate the header values with a hyphen (-
) to create the partition key.
For example, if your event has headers key1: regionA
and key2: zone1
, the partition key will be regionA-zone1
.
Tutorial
Learn how to set up and use a Kafka Sink connector in KurrentDB through a tutorial.