Стадия жизненного цикла модуля: Preview
У модуля есть требования для установки

Kafka

Kafka — основной ресурс, с которым работает пользователь. Это namespaced ресурс, с помощью которого можно создавать инстансы брокера Kafka в Deckhouse Kubernetes Platform и привязывать их к конкретному KafkaClass.
Класс определяет допустимые размеры инстанса, перечень параметров конфигурации, доступных для переопределения, и правила валидации.

В spec ресурса Kafka есть две основные секции:

  • instance — требования к вычислительным ресурсам и хранилищу брокера (CPU, память, постоянный том). Должны соответствовать политикам sizing выбранного класса.
  • configuration — настройки брокера, которые вы хотите изменить. Здесь можно указывать только параметры, перечисленные в overridableConfiguration класса; остальные настройки управляются значениями по умолчанию класса.

KafkaClassName

Имя KafkaClass, с которым будет связан данный инстанс.
Без существующего KafkaClass вы сможете создать ресурс Kafka, который будет ждать создания класса, чтобы пройти валидацию.

spec:
  kafkaClassName: default

Instance

Описывает требования к вычислительным ресурсам и хранилищу брокера.
Должна пройти валидацию по sizingPolicy соответствующего класса:

spec:
  instance:
    memory:
      size: 1Gi
    cpu:
      cores: 1
      coreFraction: "50%"
    persistentVolumeClaim:
      size: 10Gi
      storageClassName: default

Configuration

Необязательные настройки брокера для переопределения значений по умолчанию класса.
Здесь принимаются только параметры, перечисленные в overridableConfiguration класса. Все значения проходят валидацию по правилам класса validations перед применением.

spec:
  configuration:
    logRetentionHours: 168
    logRetentionBytes: -1
    messageMaxBytes: "1Mi"
    compressionType: producer
    autoCreateTopicsEnable: false

Поля конфигурации

logRetentionHours

Тип: integer | Параметр Kafka: log.retention.hours | Мин: 1 | Пример: 168

Количество часов хранения лог-файла до его удаления.
Взаимоисключающий с logRetentionMs — задайте только одно. При указании обоих приоритет имеет logRetentionMs.

logRetentionMs

Тип: integer | Параметр Kafka: log.retention.ms | Мин: -1 | Пример: 604800000

Время хранения логов с точностью до миллисекунды. Имеет приоритет над logRetentionHours и применяется динамически без перезапуска брокера.
Установите -1, чтобы полностью отключить хранение по времени. Взаимоисключающий с logRetentionHours.

logRetentionBytes

Тип: int-or-string | Параметр Kafka: log.retention.bytes | Пример: 1Gi

Максимальный суммарный размер лога на партицию, по достижении которого удаляются старые сегменты.
Используйте -1, чтобы отключить хранение по размеру.

logSegmentBytes

Тип: int-or-string | Параметр Kafka: log.segment.bytes | Пример: 512Mi

Максимальный размер одного файла сегмента лога. По достижении лимита Kafka открывает новый сегмент.
Сообщение должно помещаться в один сегмент (messageMaxByteslogSegmentBytes).

messageMaxBytes

Тип: int-or-string | Параметр Kafka: message.max.bytes | Пример: 1Mi

Максимальный размер одного сообщения, принимаемого брокером от производителей.
Не должен превышать logSegmentBytes. Не должен превышать socketRequestMaxBytes, определённый в классе.

compressionType

Тип: string | Параметр Kafka: compression.type
Допустимые значения: producer, uncompressed, gzip, snappy, lz4, zstd

Сжатие сообщений на уровне брокера.
producer (рекомендуется) сохраняет то сжатие, которое применил производитель, избегая повторного сжатия и лишних затрат CPU.

autoCreateTopicsEnable

Тип: boolean | Параметр Kafka: auto.create.topics.enable

Управляет автоматическим созданием топиков при первом обращении производителя или потребителя.
Отключайте в production для явного управления жизненным циклом топиков.

logCleanupPolicy

Тип: string | Параметр Kafka: log.cleanup.policy
Допустимые значения: delete, compact, delete,compact

Политика очистки сегментов лога при превышении лимитов хранения.
delete удаляет старые сегменты; compact оставляет только последнее значение для каждого ключа; delete,compact применяет обе политики.

Примечание: Здесь можно указывать только поля, перечисленные в overridableConfiguration соответствующего KafkaClass. Указание поля, отсутствующего в этом списке, приведёт к ошибке валидации.

Поддерживаемые версии Kafka

Единственная поддерживаемая версия Kafka — 4.2.0

Наши образы для запуска Kafka-контейнеров основаны на distroless-архитектуре.

Статус

Статус сервиса Managed Kafka отражается в ресурсе Kafka.
Структура Conditions однозначно показывает текущий статус сервиса.

Значимые типы:

  • LastValidConfigurationApplied — Агрегирующий тип, который показывает, была ли применена успешно последняя валидная конфигурация хотя бы раз.
  • ConfigurationValid — показывает, прошла ли конфигурация все валидации связанного KafkaClass.
  • ScaledToLastValidConfiguration — показывает, соответствует ли количество работающих реплик заданной конфигурации.
  • Available — показывает, работает ли брокер и принимает ли он подключения.
conditions:
    - lastTransitionTime: '2025-09-22T23:20:36Z'
      observedGeneration: 2
      status: 'True'
      type: Available
    - lastTransitionTime: '2025-09-22T14:38:04Z'
      observedGeneration: 2
      status: 'True'
      type: ConfigurationValid
    - lastTransitionTime: '2025-09-22T14:38:47Z'
      observedGeneration: 2
      status: 'True'
      type: LastValidConfigurationApplied
    - lastTransitionTime: '2025-09-22T23:20:36Z'
      observedGeneration: 2
      status: 'True'
      type: ScaledToLastValidConfiguration

Статус False говорит о проблеме на том или ином этапе либо незавершенной синхронизации состояния.
Для такого состояния будет указан reason и message с описанием.

---
    - lastTransitionTime: '2025-09-23T14:53:33Z'
      message: Syncing
      observedGeneration: 1
      reason: Syncing
      status: 'False'
      type: LastValidConfigurationApplied
    - lastTransitionTime: '2025-09-23T14:54:58Z'
      message: Not all the instances are running still waiting for 1 to become ready
      observedGeneration: 1
      reason: ScalingInProgress
      status: 'False'
      type: ScaledToLastValidConfiguration
---

Примеры использования

Базовое использование

Стандартный брокер Kafka с постоянным хранилищем и конфигурацией по умолчанию.

  1. Создайте namespace с именем kafka.
  2. Создайте ресурс Kafka c дефолтными значениями конфигурации:
apiVersion: managed-services.deckhouse.io/v1alpha1
kind: Kafka
metadata:
  name: kafka-sample
spec:
  kafkaClassName: default
  instance:
    memory:
      size: "1Gi"
    cpu:
      cores: 2
      coreFraction: "50%"
    persistentVolumeClaim:
      size: "10Gi"
kubectl apply -f managed-services_v1alpha1_kafka.yaml -n kafka
  1. Подождите, пока брокер будет готов и все conditions примут значение True:
kubectl get kafka kafka-sample -n kafka -o wide -w
  1. Подключитесь к брокеру через сервис d8ms-kfk-kafka-sample на порту 9092:
--bootstrap-server d8ms-kfk-kafka-sample:9092

Краткосрочное хранение сообщений

Брокер Kafka с уменьшенным временем хранения логов — подходит для сред разработки или тестирования.

  1. Создайте namespace с именем kafka.
  2. Создайте ресурс Kafka:
kubectl apply -f managed-services_v1alpha1_kafka.yaml -n kafka
apiVersion: managed-services.deckhouse.io/v1alpha1
kind: Kafka
metadata:
  name: kafka-dev
spec:
  kafkaClassName: default
  configuration:
    logRetentionHours: 24
    autoCreateTopicsEnable: true
  instance:
    memory:
      size: "1Gi"
    cpu:
      cores: 2
      coreFraction: "25%"
    persistentVolumeClaim:
      size: "5Gi"
  1. Подождите, пока брокер будет готов и все conditions примут значение True:
kubectl get kafka kafka-dev -n kafka -o wide -w
  1. Подключитесь к брокеру через сервис d8ms-kfk-kafka-dev на порту 9092:
--bootstrap-server d8ms-kfk-kafka-dev:9092

Пользовательское сжатие и размер сообщений

Брокер Kafka с включённым сжатием gzip и увеличенным максимальным размером сообщения.

  1. Создайте namespace с именем kafka.
  2. Создайте ресурс Kafka:
kubectl apply -f managed-services_v1alpha1_kafka.yaml -n kafka
apiVersion: managed-services.deckhouse.io/v1alpha1
kind: Kafka
metadata:
  name: kafka-compressed
spec:
  kafkaClassName: default
  configuration:
    compressionType: gzip
    messageMaxBytes: "10Mi"
    autoCreateTopicsEnable: false
  instance:
    memory:
      size: "2Gi"
    cpu:
      cores: 2
      coreFraction: "50%"
    persistentVolumeClaim:
      size: "20Gi"
  1. Подождите, пока брокер будет готов и все conditions примут значение True:
kubectl get kafka kafka-compressed -n kafka -o wide -w
  1. Подключитесь к брокеру через сервис d8ms-kfk-kafka-compressed на порту 9092:
--bootstrap-server d8ms-kfk-kafka-compressed:9092