Стадия жизненного цикла модуля: Preview
У модуля есть требования для установки
Kafka
Kafka — основной ресурс, с которым работает пользователь. Это namespaced ресурс, с помощью которого можно создавать инстансы брокера Kafka в Deckhouse Kubernetes Platform и привязывать их к конкретному KafkaClass.
Класс определяет допустимые размеры инстанса, перечень параметров конфигурации, доступных для переопределения, и правила валидации.
В spec ресурса Kafka есть две основные секции:
instance— требования к вычислительным ресурсам и хранилищу брокера (CPU, память, постоянный том). Должны соответствовать политикам sizing выбранного класса.configuration— настройки брокера, которые вы хотите изменить. Здесь можно указывать только параметры, перечисленные вoverridableConfigurationкласса; остальные настройки управляются значениями по умолчанию класса.
KafkaClassName
Имя KafkaClass, с которым будет связан данный инстанс.
Без существующего KafkaClass вы сможете создать ресурс Kafka, который будет ждать создания класса, чтобы пройти валидацию.
spec:
kafkaClassName: defaultInstance
Описывает требования к вычислительным ресурсам и хранилищу брокера.
Должна пройти валидацию по sizingPolicy соответствующего класса:
spec:
instance:
memory:
size: 1Gi
cpu:
cores: 1
coreFraction: "50%"
persistentVolumeClaim:
size: 10Gi
storageClassName: defaultConfiguration
Необязательные настройки брокера для переопределения значений по умолчанию класса.
Здесь принимаются только параметры, перечисленные в overridableConfiguration класса. Все значения проходят валидацию по правилам класса validations перед применением.
spec:
configuration:
logRetentionHours: 168
logRetentionBytes: -1
messageMaxBytes: "1Mi"
compressionType: producer
autoCreateTopicsEnable: falseПоля конфигурации
logRetentionHours
Тип: integer | Параметр Kafka: log.retention.hours | Мин: 1 | Пример: 168
Количество часов хранения лог-файла до его удаления.
Взаимоисключающий с logRetentionMs — задайте только одно. При указании обоих приоритет имеет logRetentionMs.
logRetentionMs
Тип: integer | Параметр Kafka: log.retention.ms | Мин: -1 | Пример: 604800000
Время хранения логов с точностью до миллисекунды. Имеет приоритет над logRetentionHours и применяется динамически без перезапуска брокера.
Установите -1, чтобы полностью отключить хранение по времени. Взаимоисключающий с logRetentionHours.
logRetentionBytes
Тип: int-or-string | Параметр Kafka: log.retention.bytes | Пример: 1Gi
Максимальный суммарный размер лога на партицию, по достижении которого удаляются старые сегменты.
Используйте -1, чтобы отключить хранение по размеру.
logSegmentBytes
Тип: int-or-string | Параметр Kafka: log.segment.bytes | Пример: 512Mi
Максимальный размер одного файла сегмента лога. По достижении лимита Kafka открывает новый сегмент.
Сообщение должно помещаться в один сегмент (messageMaxBytes ≤ logSegmentBytes).
messageMaxBytes
Тип: int-or-string | Параметр Kafka: message.max.bytes | Пример: 1Mi
Максимальный размер одного сообщения, принимаемого брокером от производителей.
Не должен превышать logSegmentBytes. Не должен превышать socketRequestMaxBytes, определённый в классе.
compressionType
Тип: string | Параметр Kafka: compression.type
Допустимые значения: producer, uncompressed, gzip, snappy, lz4, zstd
Сжатие сообщений на уровне брокера.
producer (рекомендуется) сохраняет то сжатие, которое применил производитель, избегая повторного сжатия и лишних затрат CPU.
autoCreateTopicsEnable
Тип: boolean | Параметр Kafka: auto.create.topics.enable
Управляет автоматическим созданием топиков при первом обращении производителя или потребителя.
Отключайте в production для явного управления жизненным циклом топиков.
logCleanupPolicy
Тип: string | Параметр Kafka: log.cleanup.policy
Допустимые значения: delete, compact, delete,compact
Политика очистки сегментов лога при превышении лимитов хранения.
delete удаляет старые сегменты; compact оставляет только последнее значение для каждого ключа; delete,compact применяет обе политики.
Примечание: Здесь можно указывать только поля, перечисленные в
overridableConfigurationсоответствующегоKafkaClass. Указание поля, отсутствующего в этом списке, приведёт к ошибке валидации.
Поддерживаемые версии Kafka
Единственная поддерживаемая версия Kafka — 4.2.0
Наши образы для запуска Kafka-контейнеров основаны на distroless-архитектуре.
Статус
Статус сервиса Managed Kafka отражается в ресурсе Kafka.
Структура Conditions однозначно показывает текущий статус сервиса.
Значимые типы:
LastValidConfigurationApplied— Агрегирующий тип, который показывает, была ли применена успешно последняя валидная конфигурация хотя бы раз.ConfigurationValid— показывает, прошла ли конфигурация все валидации связанногоKafkaClass.ScaledToLastValidConfiguration— показывает, соответствует ли количество работающих реплик заданной конфигурации.Available— показывает, работает ли брокер и принимает ли он подключения.
conditions:
- lastTransitionTime: '2025-09-22T23:20:36Z'
observedGeneration: 2
status: 'True'
type: Available
- lastTransitionTime: '2025-09-22T14:38:04Z'
observedGeneration: 2
status: 'True'
type: ConfigurationValid
- lastTransitionTime: '2025-09-22T14:38:47Z'
observedGeneration: 2
status: 'True'
type: LastValidConfigurationApplied
- lastTransitionTime: '2025-09-22T23:20:36Z'
observedGeneration: 2
status: 'True'
type: ScaledToLastValidConfigurationСтатус False говорит о проблеме на том или ином этапе либо незавершенной синхронизации состояния.
Для такого состояния будет указан reason и message с описанием.
---
- lastTransitionTime: '2025-09-23T14:53:33Z'
message: Syncing
observedGeneration: 1
reason: Syncing
status: 'False'
type: LastValidConfigurationApplied
- lastTransitionTime: '2025-09-23T14:54:58Z'
message: Not all the instances are running still waiting for 1 to become ready
observedGeneration: 1
reason: ScalingInProgress
status: 'False'
type: ScaledToLastValidConfiguration
---Примеры использования
Базовое использование
Стандартный брокер Kafka с постоянным хранилищем и конфигурацией по умолчанию.
- Создайте namespace с именем
kafka. - Создайте ресурс
Kafkac дефолтными значениями конфигурации:
apiVersion: managed-services.deckhouse.io/v1alpha1
kind: Kafka
metadata:
name: kafka-sample
spec:
kafkaClassName: default
instance:
memory:
size: "1Gi"
cpu:
cores: 2
coreFraction: "50%"
persistentVolumeClaim:
size: "10Gi"kubectl apply -f managed-services_v1alpha1_kafka.yaml -n kafka- Подождите, пока брокер будет готов и все conditions примут значение
True:
kubectl get kafka kafka-sample -n kafka -o wide -w- Подключитесь к брокеру через сервис
d8ms-kfk-kafka-sampleна порту9092:
--bootstrap-server d8ms-kfk-kafka-sample:9092Краткосрочное хранение сообщений
Брокер Kafka с уменьшенным временем хранения логов — подходит для сред разработки или тестирования.
- Создайте namespace с именем
kafka. - Создайте ресурс
Kafka:
kubectl apply -f managed-services_v1alpha1_kafka.yaml -n kafkaapiVersion: managed-services.deckhouse.io/v1alpha1
kind: Kafka
metadata:
name: kafka-dev
spec:
kafkaClassName: default
configuration:
logRetentionHours: 24
autoCreateTopicsEnable: true
instance:
memory:
size: "1Gi"
cpu:
cores: 2
coreFraction: "25%"
persistentVolumeClaim:
size: "5Gi"- Подождите, пока брокер будет готов и все conditions примут значение
True:
kubectl get kafka kafka-dev -n kafka -o wide -w- Подключитесь к брокеру через сервис
d8ms-kfk-kafka-devна порту9092:
--bootstrap-server d8ms-kfk-kafka-dev:9092Пользовательское сжатие и размер сообщений
Брокер Kafka с включённым сжатием gzip и увеличенным максимальным размером сообщения.
- Создайте namespace с именем
kafka. - Создайте ресурс
Kafka:
kubectl apply -f managed-services_v1alpha1_kafka.yaml -n kafkaapiVersion: managed-services.deckhouse.io/v1alpha1
kind: Kafka
metadata:
name: kafka-compressed
spec:
kafkaClassName: default
configuration:
compressionType: gzip
messageMaxBytes: "10Mi"
autoCreateTopicsEnable: false
instance:
memory:
size: "2Gi"
cpu:
cores: 2
coreFraction: "50%"
persistentVolumeClaim:
size: "20Gi"- Подождите, пока брокер будет готов и все conditions примут значение
True:
kubectl get kafka kafka-compressed -n kafka -o wide -w- Подключитесь к брокеру через сервис
d8ms-kfk-kafka-compressedна порту9092:
--bootstrap-server d8ms-kfk-kafka-compressed:9092