Kafka Source
Overview
The Kafka Source Connector enables you to consume messages from an existing Kafka topic and append them into a KurrentDB stream.
Quick Start
You can create the Kafka Source connector as follows. Replace {id} with your desired connector ID:
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "kafka-source",
"topic": "{{topic}}",
"consumer:bootstrapServers": "localhost:9092",
"schemaName": "{{schemaName}}"
}
}Settings
Core Options
| Name | Details |
|---|---|
topic | required Description: Kafka topic to consume from. |
schemaName | required Description: Name of the schema used for message. |
partition | Description: Specific partition to consume from (leave empty for all partitions). |
offset | Description: Starting offset position (leave empty for default behavior). |
preserveMagicByte | Description: Whether to preserve Kafka's schema registry magic byte. Default: false |
Concurrency Configuration
The Kafka Source Connector processes messages in parallel with multiple consumer tasks. Each task has its own consumer and buffer, all sharing the group kurrentdb-{ConnectorId}. Their channels run independently and merge into one output stream for efficient multi-partition handling.
| Name | Details |
|---|---|
concurrency:channelCapacity | Description: Capacity of each consumer's bounded channel. Higher values provide more buffering but consume more memory. Default: 10000 |
concurrency:tasks | Description: The maximum number of tasks to create for this connector. Default: 1 |
Consumer Configuration
The Consumer section accepts standard Confluent Kafka consumer configuration options. Key options include:
| Name | Details |
|---|---|
consumer:bootstrapServers | Description: Comma-separated list of Kafka broker addresses. Default: "localhost:9092" |
consumer:autoOffsetReset | Description: Offset reset behavior when there is no initial offset. Default: Earliest |
consumer:enableAutoCommit | Description: Enable automatic offset commits. Default: true |
consumer:autoCommitIntervalMs | Description: Auto-commit interval in milliseconds. Default: 5000 |
consumer:sessionTimeoutMs | Description: Session timeout in milliseconds. Default: 45000 |
consumer:enablePartitionEof | Description: Enable end-of-partition notifications. Default: true |
consumer:securityProtocol | Description: SASL mechanism to use for authentication. Accepted Values: plaintext, ssl, saslPlaintextDefault: plaintext |
consumer:saslMechanism | Description: Protocol used to communicate with brokers. Accepted Values: plainDefault: "" |
consumer:sslCaPem | Description: CA certificate string (PEM format) for verifying the broker's key. |
consumer:sslCertificatePem | Description: Client's public key string (PEM format) used for authentication. |
consumer:sslKeyPem | Description: Client's private key string (PEM format) used for authentication. |
consumer:saslUsername | Description: SASL username for use with the PLAIN. |
consumer:saslPassword | Description: SASL password for use with the PLAIN. |
Stream Routing Configuration
The stream section configures how Kafka messages are routed to KurrentDB streams:
| Name | Details |
|---|---|
stream:strategy | Description: Stream routing strategy. Accepted Values: - topic: Route to stream named after the Kafka topic- partitionKey: Route based on Kafka message key- fixed: Route all messages to a single stream- header: Extract stream name from message headersDefault: topic |
stream:expression | Description: Expression for custom routing, depending on strategy. - For fixed: stream name (defaults to topic name if not provided)- For header: comma-separated header names to check- Otherwise: not used |
Authentication
The default authentication method is plaintext. You can configure the authentication method to use by setting the authentication:securityProtocol option.
SSL/TLS
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"instanceTypeName": "kafka-source",
"topic": "customers",
"consumer:securityProtocol": "ssl",
"consumer:sslCaPem": "...",
"consumer:sslCertificatePem": "...",
"consumer:sslKeyPem": "..."
}SASL/PLAIN
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"instanceTypeName": "kafka-source",
"topic": "customers",
"consumer:securityProtocol": "saslPlaintext",
"consumer:saslMechanism": "plain",
"consumer:saslUsername": "user",
"consumer:saslPassword": "pass"
}