Using Kafka Connect
Kafka Connect is a component of the Strimzi operator used to run Kafka source and sink connectors on Kubernetes. You can use it to move data between Kafka and external systems such as databases, object storage, search engines, and other message systems.
This document describes how to create a KafkaConnect cluster, add connector plugins, create KafkaConnector resources, and manage connector lifecycle.
The examples in this document use default as the namespace. In production environments, deploy Kafka Connect in a dedicated namespace. Replace default with the namespace used in your environment.
TOC
PrerequisitesCreate a Kafka Connect ClusterCreate a Kafka Connect Cluster with TLS and SCRAMAdd Connector PluginsCheck Available Connector PluginsIf the Connector Class Does Not ExistCreate a Source ConnectorCreate a Sink ConnectorConfigure External Secrets for ConnectorsConfigure LoggingConfigure MetricsConfigure Distributed TracingConfigure Rack AwarenessConfigure JVM OptionsConfigure Health ChecksVerify Kafka Connect and ConnectorsUpdate a ConnectorPause, Resume, and Stop a ConnectorManage Connector OffsetsList OffsetsAlter OffsetsReset OffsetsDelete a ConnectorKey Configuration ParametersKafkaConnect ResourceKafkaConnector ResourceBest PracticesTroubleshootingPrerequisites
Before creating Kafka Connect, ensure that the following conditions are met:
- The Kafka operator is installed and watching the namespace where Kafka Connect will be created.
- A Kafka cluster is running and reachable from the Kafka Connect pods.
- The connector plugin required by your workload is available in the Kafka Connect image or configured through the Kafka Connect build mechanism.
- The target Kafka topics and external systems are reachable.
- Kafka users, ACLs, and TLS certificates are prepared if the target Kafka cluster enables authentication or TLS.
Create a Kafka Connect Cluster
The following example creates a Kafka Connect cluster named my-connect-cluster and connects it to a Kafka cluster named my-cluster through the internal plain bootstrap service.
Replace the namespace, bootstrap address, connector image, and resource settings according to your environment.
The annotation strimzi.io/use-connector-resources: "true" enables connector management through KafkaConnector custom resources. With this mode enabled, connector instances are reconciled by the operator instead of being managed only through the Kafka Connect REST API.
Create a Kafka Connect Cluster with TLS and SCRAM
If the Kafka cluster requires TLS and SCRAM-SHA-512 authentication, configure tls and authentication in the KafkaConnect resource.
Add Connector Plugins
Kafka Connect workers need connector plugins before connector instances can run. Strimzi supports several plugin delivery methods. Use the method that matches your image and registry policy.
The following example shows the Strimzi build pattern using a Maven artifact. Replace the registry, artifact URL, and checksum with values approved in your environment.
If you already have an image built outside the cluster, point Kafka Connect to it directly:
Check Available Connector Plugins
Before creating a connector, verify that the connector plugin is available in the Kafka Connect cluster.
The output should include the connector class that you plan to use, for example org.apache.kafka.connect.file.FileStreamSourceConnector or a connector class provided by your plugin.
Only create a connector after its class appears in /connector-plugins. In the current product image, the default plugin list can contain only MirrorMaker 2 connector classes. A connector JAR present in the image classpath does not automatically mean that the connector is available for use through Kafka Connect plugin discovery.
If the Connector Class Does Not Exist
If the connector class you need is not listed in /connector-plugins, do not create the KafkaConnector yet. Add the plugin to the Kafka Connect runtime first, then check /connector-plugins again.
Use one of the following approaches:
- Recommended: use
spec.buildto let Strimzi build a new Kafka Connect image with the required plugin and push it to your registry. See the build example in Add Connector Plugins. - Use a prebuilt custom image that already contains the required plugin, then set
spec.image. - Upstream Strimzi also supports
spec.pluginswithtype: image, which uses Kubernetes Image Volumes to mount an OCI artifact into/opt/kafka/plugins. In the current product test cluster, this path was accepted by the CRD but did not expose the connector class through/connector-plugins, so do not use it as the default procedure unless you validate it successfully in your own environment.
The spec.build approach is the most predictable because the plugin becomes part of the resulting image and can be promoted across environments.
After changing the image source, wait for the Kafka Connect pod rollout to finish and run /connector-plugins again. Only create the KafkaConnector after the required class appears in the returned list.
Create a Source Connector
After the Kafka Connect cluster is ready and the required plugin is available, create a KafkaConnector resource.
The strimzi.io/cluster label must match the name of the KafkaConnect resource.
The file connector is useful for validating the Kafka Connect workflow, but only after the org.apache.kafka.connect.file.FileStreamSourceConnector class is available in /connector-plugins, for example by using the Kafka Connect build example above or a custom image. For production workloads, use the connector plugin required by your data source, such as a change data capture connector or a relational database connector.
The autoRestart field enables automatic restart for failed connectors and tasks. Set maxRestarts to limit the number of consecutive automatic restarts. If the connector exceeds the maximum restart count, it remains in a failed state until the issue is resolved and the connector is manually restarted.
Create a Sink Connector
The following example shows the structure of a sink connector. Replace the connector class and configuration with the plugin used in your environment.
If the connector needs secrets, mount them into the KafkaConnect cluster through external configuration and reference them from connector configuration. Avoid storing passwords directly in the KafkaConnector resource.
Configure External Secrets for Connectors
To use ${file:...} references in connector configuration, mount secrets into the Kafka Connect pods using spec.externalConfiguration. The following example mounts a Kubernetes Secret as a volume so that connectors can read credentials from a properties file.
Connectors reference mounted secrets using the ${file:/opt/kafka/external-configuration/<volume-name>/<file>:<key>} syntax.
You can also pass secrets as environment variables:
In newer versions, spec.externalConfiguration is being phased out in favor of standard pod template fields. Existing externalConfiguration setups continue to work, but new deployments should use the following alternatives:
- For environment variables sourced from a Secret or ConfigMap, use
spec.template.connectContainer.envwithvalueFrom.secretKeyReforvalueFrom.configMapKeyRef. - For file-based secrets used with
${file:...}references, mount the Secret or ConfigMap throughspec.template.pod.volumesandspec.template.connectContainer.volumeMounts. Choose a mount path that matches your connector configuration and reference files through that path.
The following example mounts a Secret at /mnt/connector-secrets so that connectors can read credentials from a properties file:
Connectors then reference values using ${file:/mnt/connector-secrets/<file>:<key>}.
Configure Logging
Use spec.logging to configure the log levels for Kafka Connect workers. You can use inline logging or reference an external ConfigMap.
Kafka Connect uses log4j2. Both inline and external configuration follow the log4j2 property syntax.
Inline logging:
Each custom logger needs a logger.<key>.name line that declares the package it targets, plus a logger.<key>.level line that sets the level. The <key> is an arbitrary identifier used only inside the loggers map.
External logging using a ConfigMap:
Create the ConfigMap before applying the KafkaConnect resource. The ConfigMap must contain a full log4j2 configuration:
Configure Metrics
Enable Prometheus metrics collection using spec.metricsConfig. Metrics are exported through a JMX Prometheus exporter sidecar.
Create the metrics ConfigMap with JMX exporter rules:
Example metrics-config.yml:
After metrics are enabled, configure a Prometheus ServiceMonitor or PodMonitor to scrape the metrics endpoint. Use Grafana dashboards to visualize Kafka Connect worker and connector metrics.
Configure Distributed Tracing
Enable OpenTelemetry distributed tracing to track messages through Kafka Connect pipelines.
Set the OpenTelemetry endpoint through environment variables in the Kafka Connect template:
Only messages produced and consumed by Kafka Connect itself are traced. Messages already in Kafka topics before tracing is enabled are not retroactively traced.
Configure Rack Awareness
Configure rack awareness so that Kafka Connect workers consume from the closest Kafka broker replicas. This reduces cross-zone network traffic in multi-zone deployments.
The topologyKey value must match a node label present on all Kubernetes nodes where Kafka Connect pods can be scheduled.
Configure JVM Options
Tune JVM memory allocation and garbage collection for Kafka Connect workers using spec.jvmOptions.
Set -Xms and -Xmx according to the spec.resources.requests.memory and spec.resources.limits.memory values. Leave enough memory for non-heap usage such as thread stacks and native memory.
Configure Health Checks
Customize liveness and readiness probe settings if the default values do not match your environment.
Increase initialDelaySeconds if Kafka Connect workers take longer to start, for example when loading large connector plugins.
Verify Kafka Connect and Connectors
Check the Kafka Connect cluster status:
Check connector status:
Check worker pods:
Check the Kafka Connect worker logs if the connector is not ready:
Update a Connector
Update a connector by editing the KafkaConnector resource:
The operator reconciles the change and updates the connector through the Kafka Connect REST API.
Common updates include:
- Changing
tasksMax - Updating connector-specific configuration
- Adding topic patterns
- Adjusting connector retry or error-handling parameters
Pause, Resume, and Stop a Connector
Use spec.state to control whether a connector is running, paused, or stopped.
Pause a connector by setting spec.state to paused.
Resume the connector by setting spec.state to running.
Stop the connector by setting spec.state to stopped.
Manage Connector Offsets
You can list, alter, and reset connector offsets through the KafkaConnector resource using spec.listOffsets and spec.alterOffsets. The operator writes the results to a ConfigMap and reconciles the requested changes, so offset management stays declarative and does not require calling the Kafka Connect REST API.
Offset operations are driven by annotations on the KafkaConnector resource:
strimzi.io/connector-offsets=listtriggers a list operation.strimzi.io/connector-offsets=altertriggers an alter operation. The connector must be in thestoppedstate first.strimzi.io/connector-offsets=resettriggers a reset operation. The connector must also bestopped.
List Offsets
Declare the target ConfigMap in spec.listOffsets, then annotate the resource to trigger the list:
Alter Offsets
To alter offsets, stop the connector, provide the new offsets in a ConfigMap referenced by spec.alterOffsets, and annotate the resource:
Reset Offsets
To reset offsets, stop the connector and annotate the resource:
After the reset or alter operation completes, return the connector to running. The connector resumes processing from the new offset position or from the position defined by its offset reset policy.
Altering or resetting offsets can cause data to be reprocessed or skipped. Only do this when you understand the impact on downstream consumers and the external system.
Delete a Connector
Delete the KafkaConnector resource to remove the connector instance from Kafka Connect:
Deleting a connector does not automatically delete Kafka topics, committed offsets, or data in the external system.
Key Configuration Parameters
KafkaConnect Resource
KafkaConnector Resource
Best Practices
- Use
KafkaConnectorresources for connector lifecycle management instead of manually creating connectors through the REST API. - Use at least
2Kafka Connect replicas for production workloads. - Configure internal topic replication factors based on the Kafka cluster size.
- Keep connector plugin versions compatible with the Kafka Connect runtime version.
- Keep one
KafkaConnectorresource per connector instance. - Avoid storing secrets directly in
spec.config. Preferspec.template.connectContainer.envfor environment variables andspec.template.pod.volumeswithspec.template.connectContainer.volumeMountsfor file-based secrets. UseexternalConfigurationonly when you need compatibility with existing deployments. - Set
tasksMaxaccording to connector capabilities and workload size. - Enable
autoRestarton all production connectors to recover from transient failures automatically. - Enable Prometheus metrics and configure Grafana dashboards to monitor connector task status, restart count, lag, and error rates.
- Configure logging at
INFOlevel by default. UseDEBUGonly for troubleshooting specific connector issues. - Use rack awareness in multi-zone clusters to reduce cross-zone network traffic.
- Use
spec.templateto set pod affinity and anti-affinity rules for high availability.