Kafka Sink
Overview
The Kafka sink writes events to a Kafka topic. It can extract the partition key from the record based on specific sources such as the stream ID, headers, or record key and also supports basic authentication and resilience features to handle transient errors.
Quickstart
You can create the Kafka Sink connector as follows:
$JSON = @"
{
"settings": {
"instanceTypeName": "kafka-sink",
"topic": "customers",
"bootstrapServers": "localhost:9092",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream",
"authentication:username": "your-username",
"authentication:password": "your-password",
"authentication:securityProtocol": "SaslSsl",
"waitForBrokerAck": "true"
}
}
"@ `
curl.exe -X POST `
-H "Content-Type: application/json" `
-d $JSON `
http://localhost:2113/connectors/kafka-sink-connector
JSON='{
"settings": {
"instanceTypeName": "kafka-sink",
"topic": "your-topic",
"bootstrapServers": "your-kafka-cluster-address:9092",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream",
"authentication:username": "your-username",
"authentication:password": "your-password",
"authentication:securityProtocol": "SaslSsl",
"waitForBrokerAck": "true"
}
}'
curl -X POST \
-H "Content-Type: application/json" \
-d "$JSON" \
http://localhost:2113/connectors/mongo-sink-connector
After creating and starting the Kafka sink connector, every time an event is appended to the example-stream
, the Kafka sink connector will send the record to the specified Kafka topic.You can find a list of available management API endpoints in the API Reference.
Settings
Adjust these settings to specify the behavior and interaction of your Kafka sink connector with EventStoreDB, ensuring it operates according to your requirements and preferences.
Tips
The Kafka sink inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page.
The kafka sink can be configured with the following options:
Name | Details |
---|---|
topic | required Type: string Description: The Kafka topic to produce records to. |
bootstrapServers | Type: string Description: Comma-separated list of Kafka broker addresses. Default: localhost:9092 |
defaultHeaders | Type: dict<string,string> Description: Headers included in all produced messages. Default: None |
authentication:securityProtocol | Type: SecurityProtocol Description: Protocol used for Kafka broker communication. Default: Plaintext |
authentication:username | Type: string Description: Username for authentication. |
authentication:password | Type: string Description: Password for authentication. |
Partitioning
Name | Details |
---|---|
partitionKeyExtraction:enabled | Type: boolean Description: Enables partition key extraction. Default: false |
partitionKeyExtraction:source | Type: Enum Description: Source for extracting the partition key. Accepted Values: stream , streamSuffix , headers Default: PartitionKey |
partitionKeyExtraction:expression | Type: string Description: Regular expression for extracting the partition key. |
See the Partitioning section for examples.
Resilience
Besides the common sink settings that can be found in the Resilience Configuration page, the Kafka sink connector supports additional settings related to resilience:
Name | Details |
---|---|
waitForBrokerAck | Type: boolean Description: Whether the producer waits for broker acknowledgment before considering the send operation complete. Default: true |
resilience:enabled | Type: boolean Description: Enables resilience features for message handling. Default: true |
resilience:maxRetries | Type: int Description: Maximum number of retry attempts. Default: -1 (unlimited) |
resilience:transientErrorDelay | Type: TimeSpan Description: Delay between retries for transient errors. Default: 00:00:00 |
resilience:reconnectBackoffMaxMs | Type: int Description: Maximum backoff time in milliseconds for reconnection attempts. Default: 20000 |
resilience:messageSendMaxRetries | Type: int Description: Number of times to retry sending a failing message. Note: Retrying may cause reordering unless enable.idempotence is set to true.Default: 2147483647 |
Miscellaneous
Name | Details |
---|---|
brokerAddressFamily | Type: BrokerAddressFamily Description: Allowed broker IP address families. Default: V4 |
compression:type | Type: CompressionType Description: Kafka compression type. Default: Zstd |
compression:level | Type: int Description: Kafka compression level. Default: 6 |
At least once delivery
The Kafka sink guarantees at least once delivery by retrying failed requests based on configurable resilience settings. It will continue to attempt delivery until the event is successfully sent or the maximum number of retries is reached, ensuring each event is delivered at least once.
The Kafka sink currently retries transient errors based on the following error codes:
- Local_AllBrokersDown: All broker connections are down
- OutOfOrderSequenceNumber: Broker received an out of order sequence number
- TransactionCoordinatorFenced: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
- UnknownProducerId: Unknown Producer Id.
For detailed information on the listed error codes, refer to the Kafka documentation.
Configuration example
{
"resilience:enabled": true,
"resilience:requestTimeoutMs": 3000,
"resilience:maxRetries": -1,
"resilience:transientErrorDelay": "00:00:05",
"resilience:reconnectBackoffMaxMs": 20000,
"resilience:messageSendMaxRetries": 2147483647
}
Broker Acknowledgment
In the Kafka sink connector for EventStoreDB, broker acknowledgment refers to the producer waiting for confirmation from the Kafka broker that a message has been successfully received. When waitForBrokerAck
is enabled (which is the default setting), the producer waits for this acknowledgment, ensuring more reliable delivery of messages, which is crucial for systems that require durability and fault tolerance.
While this setting improves reliability, it can slightly increase latency, as the producer must wait for confirmation from Kafka before continuing. If higher throughput is preferred over strict delivery guarantees, you can disable this option.
For more details about Kafka broker acknowledgment, refer to Kafka's official documentation.
To learn more about authentication in Kafka, see Authentication using SASL
For Kafka client enum types, please refer to the official Kafka .NET client documentation.
Examples
Partitioning
The Kafka sink connector writes events to Kafka topics, and it allows the customization of partition keys. Kafka's partitioning strategy is essential for ensuring that related messages are sent to the same partition, which helps maintain message ordering and effective load distribution. Read more about Kafka Partitions.
Kafka partition keys can be generated from various sources, similar to how document IDs are generated in the MongoDB connector. These sources include the event stream, stream suffix, headers, or other record fields.
By default, it will use the PartitionKey
and grab this value from the EventStoreDB record.
Partition using Stream ID
You can extract part of the stream name using a regular expression (regex) to define the partition key. The expression is optional and can be customized based on your naming convention. In this example, the expression captures the stream name up to _data
.
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "stream",
"partitionKeyExtraction:expression": "^(.*)_data$"
}
Alternatively, if you only need the last segment of the stream name (after a hyphen), you can use the streamSuffix
source. This doesn't require an expression since it automatically extracts the suffix.
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "streamSuffix"
}
The streamSuffix
source is useful when stream names follow a structured format, and you want to use only the trailing part as the document ID. For example, if the stream is named user-123
, the partition key would be 123
.
Partition using header values
You can generate the document ID by concatenating values from specific event headers. In this case, two header values (key1
and key2
) are combined to form the ID.
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "headers",
"partitionKeyExtraction:expression": "key1,key2"
}
The Headers
source allows you to pull values from the event's metadata. The documentId:expression
field lists the header keys (in this case, key1
and key2
), and their values are concatenated to generate the document ID. This is useful when headers hold important metadata that should define the document's unique identifier, such as region, user ID, or other identifiers.
Click here to see an example
{
"key1": "value1",
"key2": "value2"
}
// outputs "value1-value2"
Tutorial
Sink to Confluent Cloud
Objectives
In this quickstart, you will learn how to:
- Configure the Kafka Sink connector to write events to a Kafka topic hosted on Confluent Cloud.
- Start the Kafka Sink connector.
- Append events to KurrentDB through the UI.
- View the appended events in the Kafka topic.
Prerequisites
Before starting, ensure you have the following:
- A KurrentDB cluster with the appropriate license key
- This quickstart uses a cluster provisioned on a public network of Kurrent Cloud
- A Kafka cluster, with an API key allowed to write to a topic named
loans
- This quickstart uses a public Kafka cluster provisioned on Confluent Cloud
- Familiarity with command-line operations (curl)
Step 1. Configure the Kafka Sink connector
To configure the Kafka Sink connector, you need to create a configuration file that specifies the connection details for the Kafka cluster and the connector instance configuration.
- Create a file named
kafka-sink-config.json
with the following content:
Note
Replace the values of bootstrapServers
, authentication:username
, and authentication:password
with the appropriate values for your Kafka cluster.
{
"settings": {
"instanceTypeName": "kafka-sink",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "prefix",
"subscription:filter:expression": "LoanRequest",
"topic": "loans",
"bootstrapServers": "pkc-z9doz.eu-west-1.aws.confluent.cloud:9092",
"authentication:username": "UJ",
"authentication:password": "Nh",
"authentication:securityProtocol": "SaslSsl",
"waitForBrokerAck": "true"
}
}
- Create the Kafka Sink connector instance by sending a POST request to the KurrentDB API:
Note
Replace admin:password
with your KurrentDB username and password and the URL https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113
with the URL of your KurrentDB cluster.
curl -i -L -u admin:password `
-H "content-type: application/json" -d '@kafka-sink-config.json' `
-X POST https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113/connectors/kafka-sink-quickstart
curl -i -L -u admin:password \
-H "content-type: application/json" -d '@kafka-sink-config.json' \
-X POST https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113/connectors/kafka-sink-quickstart
- View the configuration of the Kafka Sink connector instance by sending a GET request to the KurrentDB API:
Note
Replace admin:password
with your KurrentDB username and password and the URL https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113
with the URL of your KurrentDB cluster.
curl -u admin:password https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113/connectors/kafka-sink-quickstart/settings
curl -u admin:password https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113/connectors/kafka-sink-quickstart/settings
The output will display the configuration of the Kafka Sink connector instance:
{
"settings": {
"instanceTypeName": "kafka-sink",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "prefix",
"subscription:filter:expression": "LoanRequest",
"topic": "loans",
"bootstrapServers": "pkc-z9doz.eu-west-1.aws.confluent.cloud:9092",
"authentication:username": "UJ",
"authentication:password": "Nh",
"authentication:securityProtocol": "SaslSsl",
"waitForBrokerAck": "true"
},
"timestamp": "2024-08-14T19:08:45.907847700Z"
}
Step 2. Start the Kafka Sink connector
- Start the Kafka Sink connector instance by sending a POST request to the KurrentDB API:
Note
Replace admin:password
with your KurrentDB username and password and the URL https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113
with the URL of your KurrentDB cluster.
curl -i -L -u admin:password https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113/connectors/kafka-sink-quickstart/start
curl -i -L -u admin:password https://cu8i0e3tv1lr03.mesdb.eventstore.cloud:2113/connectors/kafka-sink-quickstart/start
The start request will return a HTTP/1.1 200 OK
response code to confirm that the Kafka Sink connector instance has started successfully.
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Step 3. Append events to KurrentDB through the UI
- Browse to the KurrentDB UI and log in with your credentials.
- Click the
Stream Browser
tab in the navigation menu. - Click on the
Add Event
button. - In the
Stream ID
field, enterLoanRequest-1
. - In the
Event Type
field, enterLoanRequested
. - In the
Event Body
field, enter the following JSON object:
{
"Amount": 10000,
"loanTerm": 12
}
- Click the
Add
button to append the event to theLoanRequest-1
stream. - Repeat steps 3-7 to append more events to the
LoanRequest-2
stream.
Step 4. View the appended events in the Kafka topic
- Browse to the Kafka cluster UI and log in with your credentials.
- Navigate to the
loans
topic. - View the appended events in the
loans
topic.