Using Kafka Connect

Kafka Connect is a component of the Strimzi operator used to run Kafka source and sink connectors on Kubernetes. You can use it to move data between Kafka and external systems such as databases, object storage, search engines, and other message systems.

This document describes how to create a KafkaConnect cluster, add connector plugins, create KafkaConnector resources, and manage connector lifecycle.

Note

The examples in this document use default as the namespace. In production environments, deploy Kafka Connect in a dedicated namespace. Replace default with the namespace used in your environment.

Prerequisites

Before creating Kafka Connect, ensure that the following conditions are met:

  1. The Kafka operator is installed and watching the namespace where Kafka Connect will be created.
  2. A Kafka cluster is running and reachable from the Kafka Connect pods.
  3. The connector plugin required by your workload is available in the Kafka Connect image or configured through the Kafka Connect build mechanism.
  4. The target Kafka topics and external systems are reachable.
  5. Kafka users, ACLs, and TLS certificates are prepared if the target Kafka cluster enables authentication or TLS.

Create a Kafka Connect Cluster

The following example creates a Kafka Connect cluster named my-connect-cluster and connects it to a Kafka cluster named my-cluster through the internal plain bootstrap service.

Note

Replace the namespace, bootstrap address, connector image, and resource settings according to your environment.

cat << EOF | kubectl -n default apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.1.1
  replicas: 2
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    group.id: my-connect-cluster
    config.storage.topic: my-connect-cluster-configs
    offset.storage.topic: my-connect-cluster-offsets
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 1
      memory: 2Gi
EOF

The annotation strimzi.io/use-connector-resources: "true" enables connector management through KafkaConnector custom resources. With this mode enabled, connector instances are reconciled by the operator instead of being managed only through the Kafka Connect REST API.

Create a Kafka Connect Cluster with TLS and SCRAM

If the Kafka cluster requires TLS and SCRAM-SHA-512 authentication, configure tls and authentication in the KafkaConnect resource.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.1.1
  replicas: 2
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: scram-sha-512
    username: my-connect-user
    passwordSecret:
      secretName: my-connect-user
      password: password
  config:
    group.id: my-connect-cluster
    config.storage.topic: my-connect-cluster-configs
    offset.storage.topic: my-connect-cluster-offsets
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3

Add Connector Plugins

Kafka Connect workers need connector plugins before connector instances can run. Strimzi supports several plugin delivery methods. Use the method that matches your image and registry policy.

MethodUsage
Custom imageBuild a Kafka Connect image that already contains connector plugins, then set spec.image.
Kafka Connect buildLet Strimzi build a Kafka Connect image from plugin artifacts and push it to a configured registry.
Image-mounted pluginsUpstream Strimzi can mount plugin OCI artifacts through spec.plugins, but validate this carefully in your product build before using it.

The following example shows the Strimzi build pattern using a Maven artifact. Replace the registry, artifact URL, and checksum with values approved in your environment.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.1.1
  replicas: 2
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  build:
    output:
      type: docker
      image: registry.example.com/kafka/my-connect-cluster:latest
      pushSecret: my-registry-secret
    plugins:
      - name: kafka-connect-file
        artifacts:
          - type: maven
            repository: https://repo1.maven.org/maven2
            group: org.apache.kafka
            artifact: connect-file
            version: 4.1.1
  config:
    group.id: my-connect-cluster
    config.storage.topic: my-connect-cluster-configs
    offset.storage.topic: my-connect-cluster-offsets
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3

If you already have an image built outside the cluster, point Kafka Connect to it directly:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.1.1
  image: registry.example.com/kafka/my-connect-cluster:latest
  replicas: 2
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    group.id: my-connect-cluster
    config.storage.topic: my-connect-cluster-configs
    offset.storage.topic: my-connect-cluster-offsets
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3

Check Available Connector Plugins

Before creating a connector, verify that the connector plugin is available in the Kafka Connect cluster.

kubectl -n default port-forward service/my-connect-cluster-connect-api 8083:8083
curl http://localhost:8083/connector-plugins

The output should include the connector class that you plan to use, for example org.apache.kafka.connect.file.FileStreamSourceConnector or a connector class provided by your plugin.

Only create a connector after its class appears in /connector-plugins. In the current product image, the default plugin list can contain only MirrorMaker 2 connector classes. A connector JAR present in the image classpath does not automatically mean that the connector is available for use through Kafka Connect plugin discovery.

If the Connector Class Does Not Exist

If the connector class you need is not listed in /connector-plugins, do not create the KafkaConnector yet. Add the plugin to the Kafka Connect runtime first, then check /connector-plugins again.

Use one of the following approaches:

  1. Recommended: use spec.build to let Strimzi build a new Kafka Connect image with the required plugin and push it to your registry. See the build example in Add Connector Plugins.
  2. Use a prebuilt custom image that already contains the required plugin, then set spec.image.
  3. Upstream Strimzi also supports spec.plugins with type: image, which uses Kubernetes Image Volumes to mount an OCI artifact into /opt/kafka/plugins. In the current product test cluster, this path was accepted by the CRD but did not expose the connector class through /connector-plugins, so do not use it as the default procedure unless you validate it successfully in your own environment.

The spec.build approach is the most predictable because the plugin becomes part of the resulting image and can be promoted across environments.

After changing the image source, wait for the Kafka Connect pod rollout to finish and run /connector-plugins again. Only create the KafkaConnector after the required class appears in the returned list.

Create a Source Connector

After the Kafka Connect cluster is ready and the required plugin is available, create a KafkaConnector resource.

The strimzi.io/cluster label must match the name of the KafkaConnect resource.

Example Only

The file connector is useful for validating the Kafka Connect workflow, but only after the org.apache.kafka.connect.file.FileStreamSourceConnector class is available in /connector-plugins, for example by using the Kafka Connect build example above or a custom image. For production workloads, use the connector plugin required by your data source, such as a change data capture connector or a relational database connector.

cat << EOF | kubectl -n default apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-file-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 1
  autoRestart:
    enabled: true
    maxRestarts: 10
  config:
    file: /tmp/test.txt
    topic: my-connect-topic
EOF

The autoRestart field enables automatic restart for failed connectors and tasks. Set maxRestarts to limit the number of consecutive automatic restarts. If the connector exceeds the maximum restart count, it remains in a failed state until the issue is resolved and the connector is manually restarted.

Create a Sink Connector

The following example shows the structure of a sink connector. Replace the connector class and configuration with the plugin used in your environment.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: com.example.connect.MySinkConnector
  tasksMax: 2
  autoRestart:
    enabled: true
  config:
    topics: my-source-topic
    connection.url: jdbc:postgresql://postgresql.default.svc:5432/app
    connection.user: ${file:/opt/kafka/external-configuration/connector-secrets/db.properties:username}
    connection.password: ${file:/opt/kafka/external-configuration/connector-secrets/db.properties:password}

If the connector needs secrets, mount them into the KafkaConnect cluster through external configuration and reference them from connector configuration. Avoid storing passwords directly in the KafkaConnector resource.

Configure External Secrets for Connectors

To use ${file:...} references in connector configuration, mount secrets into the Kafka Connect pods using spec.externalConfiguration. The following example mounts a Kubernetes Secret as a volume so that connectors can read credentials from a properties file.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.1.1
  replicas: 2
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    group.id: my-connect-cluster
    config.storage.topic: my-connect-cluster-configs
    offset.storage.topic: my-connect-cluster-offsets
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: connector-secrets
        secret:
          secretName: my-connector-secrets

Connectors reference mounted secrets using the ${file:/opt/kafka/external-configuration/<volume-name>/<file>:<key>} syntax.

You can also pass secrets as environment variables:

spec:
  externalConfiguration:
    env:
      - name: DB_PASSWORD
        valueFrom:
          secretKeyRef:
            name: my-connector-secrets
            key: password
Migration Guidance

In newer versions, spec.externalConfiguration is being phased out in favor of standard pod template fields. Existing externalConfiguration setups continue to work, but new deployments should use the following alternatives:

  • For environment variables sourced from a Secret or ConfigMap, use spec.template.connectContainer.env with valueFrom.secretKeyRef or valueFrom.configMapKeyRef.
  • For file-based secrets used with ${file:...} references, mount the Secret or ConfigMap through spec.template.pod.volumes and spec.template.connectContainer.volumeMounts. Choose a mount path that matches your connector configuration and reference files through that path.

The following example mounts a Secret at /mnt/connector-secrets so that connectors can read credentials from a properties file:

spec:
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  template:
    pod:
      volumes:
        - name: connector-secrets
          secret:
            secretName: my-connector-secrets
    connectContainer:
      volumeMounts:
        - name: connector-secrets
          mountPath: /mnt/connector-secrets

Connectors then reference values using ${file:/mnt/connector-secrets/<file>:<key>}.

Configure Logging

Use spec.logging to configure the log levels for Kafka Connect workers. You can use inline logging or reference an external ConfigMap.

Kafka Connect uses log4j2. Both inline and external configuration follow the log4j2 property syntax.

Inline logging:

spec:
  logging:
    type: inline
    loggers:
      rootLogger.level: INFO
      logger.connect.name: org.apache.kafka.connect
      logger.connect.level: DEBUG
      logger.runtime.name: org.apache.kafka.connect.runtime
      logger.runtime.level: WARN

Each custom logger needs a logger.<key>.name line that declares the package it targets, plus a logger.<key>.level line that sets the level. The <key> is an arbitrary identifier used only inside the loggers map.

External logging using a ConfigMap:

spec:
  logging:
    type: external
    valueFrom:
      configMapKeyRef:
        name: my-connect-logging
        key: log4j2.properties

Create the ConfigMap before applying the KafkaConnect resource. The ConfigMap must contain a full log4j2 configuration:

kubectl -n default create configmap my-connect-logging --from-file=log4j2.properties=./log4j2.properties

Configure Metrics

Enable Prometheus metrics collection using spec.metricsConfig. Metrics are exported through a JMX Prometheus exporter sidecar.

spec:
  metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-connect-metrics
        key: metrics-config.yml

Create the metrics ConfigMap with JMX exporter rules:

kubectl -n default create configmap my-connect-metrics --from-file=metrics-config.yml=./metrics-config.yml

Example metrics-config.yml:

lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
  - pattern: "kafka.connect<type=connect-worker-metrics>([^:]+):"
    name: "kafka_connect_worker_$1"
  - pattern: "kafka.connect<type=connect-metrics, client-id=(.+)><>([^:]+)"
    name: "kafka_connect_$2"
    labels:
      client_id: "$1"

After metrics are enabled, configure a Prometheus ServiceMonitor or PodMonitor to scrape the metrics endpoint. Use Grafana dashboards to visualize Kafka Connect worker and connector metrics.

Configure Distributed Tracing

Enable OpenTelemetry distributed tracing to track messages through Kafka Connect pipelines.

spec:
  tracing:
    type: opentelemetry

Set the OpenTelemetry endpoint through environment variables in the Kafka Connect template:

spec:
  tracing:
    type: opentelemetry
  template:
    connectContainer:
      env:
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: http://otel-collector.observability.svc:4317
        - name: OTEL_SERVICE_NAME
          value: my-connect-cluster

Only messages produced and consumed by Kafka Connect itself are traced. Messages already in Kafka topics before tracing is enabled are not retroactively traced.

Configure Rack Awareness

Configure rack awareness so that Kafka Connect workers consume from the closest Kafka broker replicas. This reduces cross-zone network traffic in multi-zone deployments.

spec:
  rack:
    topologyKey: topology.kubernetes.io/zone

The topologyKey value must match a node label present on all Kubernetes nodes where Kafka Connect pods can be scheduled.

Configure JVM Options

Tune JVM memory allocation and garbage collection for Kafka Connect workers using spec.jvmOptions.

spec:
  jvmOptions:
    -Xms: 1g
    -Xmx: 2g
    gcLoggingEnabled: true

Set -Xms and -Xmx according to the spec.resources.requests.memory and spec.resources.limits.memory values. Leave enough memory for non-heap usage such as thread stacks and native memory.

Configure Health Checks

Customize liveness and readiness probe settings if the default values do not match your environment.

spec:
  livenessProbe:
    initialDelaySeconds: 60
    timeoutSeconds: 5
    periodSeconds: 10
  readinessProbe:
    initialDelaySeconds: 60
    timeoutSeconds: 5
    periodSeconds: 10

Increase initialDelaySeconds if Kafka Connect workers take longer to start, for example when loading large connector plugins.

Verify Kafka Connect and Connectors

Check the Kafka Connect cluster status:

kubectl -n default get kafkaconnect my-connect-cluster
kubectl -n default describe kafkaconnect my-connect-cluster

Check connector status:

kubectl -n default get kafkaconnector
kubectl -n default describe kafkaconnector my-file-source-connector
kubectl -n default get kafkaconnector my-file-source-connector -o yaml

Check worker pods:

kubectl -n default get pods -l strimzi.io/name=my-connect-cluster-connect

Check the Kafka Connect worker logs if the connector is not ready:

kubectl -n default logs -l strimzi.io/name=my-connect-cluster-connect

Update a Connector

Update a connector by editing the KafkaConnector resource:

kubectl -n default edit kafkaconnector my-file-source-connector

The operator reconciles the change and updates the connector through the Kafka Connect REST API.

Common updates include:

  • Changing tasksMax
  • Updating connector-specific configuration
  • Adding topic patterns
  • Adjusting connector retry or error-handling parameters

Pause, Resume, and Stop a Connector

Use spec.state to control whether a connector is running, paused, or stopped.

Pause a connector by setting spec.state to paused.

kubectl -n default patch kafkaconnector my-file-source-connector --type merge -p '{"spec":{"state":"paused"}}'

Resume the connector by setting spec.state to running.

kubectl -n default patch kafkaconnector my-file-source-connector --type merge -p '{"spec":{"state":"running"}}'

Stop the connector by setting spec.state to stopped.

kubectl -n default patch kafkaconnector my-file-source-connector --type merge -p '{"spec":{"state":"stopped"}}'

Manage Connector Offsets

You can list, alter, and reset connector offsets through the KafkaConnector resource using spec.listOffsets and spec.alterOffsets. The operator writes the results to a ConfigMap and reconciles the requested changes, so offset management stays declarative and does not require calling the Kafka Connect REST API.

Offset operations are driven by annotations on the KafkaConnector resource:

  • strimzi.io/connector-offsets=list triggers a list operation.
  • strimzi.io/connector-offsets=alter triggers an alter operation. The connector must be in the stopped state first.
  • strimzi.io/connector-offsets=reset triggers a reset operation. The connector must also be stopped.

List Offsets

Declare the target ConfigMap in spec.listOffsets, then annotate the resource to trigger the list:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-file-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 1
  listOffsets:
    toConfigMap:
      name: my-file-source-connector-offsets
  config:
    file: /tmp/test.txt
    topic: my-connect-topic
kubectl -n default annotate kafkaconnector my-file-source-connector \
  strimzi.io/connector-offsets=list --overwrite
kubectl -n default get configmap my-file-source-connector-offsets -o yaml

Alter Offsets

To alter offsets, stop the connector, provide the new offsets in a ConfigMap referenced by spec.alterOffsets, and annotate the resource:

spec:
  state: stopped
  alterOffsets:
    fromConfigMap:
      name: my-file-source-connector-new-offsets
kubectl -n default annotate kafkaconnector my-file-source-connector \
  strimzi.io/connector-offsets=alter --overwrite

Reset Offsets

To reset offsets, stop the connector and annotate the resource:

kubectl -n default patch kafkaconnector my-file-source-connector \
  --type merge -p '{"spec":{"state":"stopped"}}'
kubectl -n default annotate kafkaconnector my-file-source-connector \
  strimzi.io/connector-offsets=reset --overwrite

After the reset or alter operation completes, return the connector to running. The connector resumes processing from the new offset position or from the position defined by its offset reset policy.

Warning

Altering or resetting offsets can cause data to be reprocessed or skipped. Only do this when you understand the impact on downstream consumers and the external system.

Delete a Connector

Delete the KafkaConnector resource to remove the connector instance from Kafka Connect:

kubectl -n default delete kafkaconnector my-file-source-connector

Deleting a connector does not automatically delete Kafka topics, committed offsets, or data in the external system.

Key Configuration Parameters

KafkaConnect Resource

ParameterDescription
spec.replicasNumber of Kafka Connect worker pods.
spec.versionKafka Connect version. Defaults to the latest version supported by the operator.
spec.imageCustom container image for Kafka Connect pods.
spec.bootstrapServersKafka bootstrap server used by Kafka Connect.
spec.tlsTLS configuration for encrypted Kafka connections.
spec.authenticationAuthentication configuration, such as SCRAM-SHA-512, TLS, or OAuth.
spec.config.group.idKafka Connect worker group ID. Use a unique value per Kafka Connect cluster.
spec.config.config.storage.topicTopic used to store connector configurations.
spec.config.offset.storage.topicTopic used to store connector offsets.
spec.config.status.storage.topicTopic used to store connector and task status.
spec.buildOptional build configuration for adding connector plugins.
spec.externalConfigurationMount Secrets or ConfigMaps into Kafka Connect pods for connector credential access. Being phased out; use spec.template.pod.volumes with spec.template.connectContainer.volumeMounts for file-based secrets, or spec.template.connectContainer.env for environment variables.
spec.loggingLogging configuration. Supports inline loggers or an external ConfigMap reference.
spec.metricsConfigPrometheus JMX exporter configuration for metrics collection.
spec.tracingDistributed tracing configuration. Supports OpenTelemetry.
spec.rackRack awareness configuration for consuming from the closest replicas.
spec.jvmOptionsJVM memory and GC options for Kafka Connect pods.
spec.resourcesCPU and memory resource requests and limits.
spec.livenessProbeLiveness probe configuration for worker pods.
spec.readinessProbeReadiness probe configuration for worker pods.
spec.templateTemplate for customizing pod labels, annotations, affinity rules, and container environment variables.

KafkaConnector Resource

ParameterDescription
metadata.labels.strimzi.io/clusterName of the KafkaConnect cluster that runs the connector.
spec.classConnector implementation class. The class must be available in the Kafka Connect image.
spec.tasksMaxMaximum number of tasks for the connector. Actual parallelism depends on connector implementation and source or sink system capabilities.
spec.configConnector-specific configuration.
spec.stateDesired connector state. Supported values are running, paused, and stopped.
spec.autoRestartAutomatic restart policy for failed connectors or tasks. Set enabled: true and optionally maxRestarts to limit consecutive restarts.
spec.listOffsetsTarget ConfigMap for list-offsets operations. Triggered by the strimzi.io/connector-offsets=list annotation.
spec.alterOffsetsSource ConfigMap for alter-offsets operations. Triggered by the strimzi.io/connector-offsets=alter annotation. The connector must be in the stopped state.

Best Practices

  1. Use KafkaConnector resources for connector lifecycle management instead of manually creating connectors through the REST API.
  2. Use at least 2 Kafka Connect replicas for production workloads.
  3. Configure internal topic replication factors based on the Kafka cluster size.
  4. Keep connector plugin versions compatible with the Kafka Connect runtime version.
  5. Keep one KafkaConnector resource per connector instance.
  6. Avoid storing secrets directly in spec.config. Prefer spec.template.connectContainer.env for environment variables and spec.template.pod.volumes with spec.template.connectContainer.volumeMounts for file-based secrets. Use externalConfiguration only when you need compatibility with existing deployments.
  7. Set tasksMax according to connector capabilities and workload size.
  8. Enable autoRestart on all production connectors to recover from transient failures automatically.
  9. Enable Prometheus metrics and configure Grafana dashboards to monitor connector task status, restart count, lag, and error rates.
  10. Configure logging at INFO level by default. Use DEBUG only for troubleshooting specific connector issues.
  11. Use rack awareness in multi-zone clusters to reduce cross-zone network traffic.
  12. Use spec.template to set pod affinity and anti-affinity rules for high availability.

Troubleshooting

SymptomCheck
KafkaConnect is not readyCheck kubectl describe kafkaconnect, worker pod logs, and Kafka bootstrap connectivity.
Connector class not foundVerify that the plugin is installed and listed by /connector-plugins.
Connector is not assigned to a Connect clusterVerify the strimzi.io/cluster label.
Connector stays in NotReadyCheck whether the connector plugin class exists in curl /connector-plugins.
Connector tasks fail or fail repeatedlyCheck connector-specific configuration, worker logs, external system connectivity, and task status. Enable autoRestart to recover from transient failures.
Connector does not process new dataCheck source offsets, topic subscription, task status, and external system permissions.
Authentication failsVerify spec.authentication, Kafka ACLs, connector credentials, mounted secrets, and TLS configuration.
Build failsVerify registry access, plugin artifact URLs, push secret permissions, and image pull permissions.
Metrics not availableVerify the metrics ConfigMap exists and spec.metricsConfig references the correct key. Check if the Prometheus exporter port is accessible.
Tracing spans not visibleVerify spec.tracing is set, the OpenTelemetry collector is reachable, and the OTEL_EXPORTER_OTLP_ENDPOINT environment variable is correct.