Стадия жизненного цикла модуля: Preview
У модуля есть требования для установки

KafkaClass

KafkaClass — основной ресурс, с которым работает администратор. Модуль managed-kafka управляет инстансами Kafka в Deckhouse Kubernetes Platform, а KafkaClass — это cluster-scoped ресурс, который управляет всеми инстансами Kafka, привязанными к нему: определяет допустимые комбинации размеров, задаёт значения конфигурации брокера по умолчанию, контролирует, какие параметры пользователи могут переопределять, и применяет правила валидации.

Каждый ресурс Kafka обязательно должен ссылаться на существующий KafkaClass. Вся конфигурация проверяется на соответствие указанному классу перед развёртыванием сервиса.

Вместе с модулем поставляется KafkaClass с именем default, содержащий готовые базовые настройки, подходящие для большинства задач. Его можно использовать как есть, либо скопировать и изменить под требования конкретных команд или окружений.

Sizing Policies

Структура позволяет создавать набор политик определения размера для связанных ресурсов Kafka.
Это позволит избегать неравномерного распределения ресурсов CPU и Memory на нодах кластера.
Определяющим фактором выбора той или иной политики является попадание в интервал cores.
Далее будет проверено соответствие остальных полей конкретной политики.

spec:
  sizingPolicies:
    - cores:
        min: 1
        max: 3
      memory:
        min: "0.5Gi"
        max: "8Gi"
        step: "512Mi"
      coreFractions: ["25%", "50%", "100%"]
    - cores:
        min: 4
        max: 16
      memory:
        min: "8Gi"
        max: "64Gi"
        step: "1Gi"
      coreFractions: ["50%", "100%"]

Validation Rules

В качестве синтаксиса используется CEL (Common Expression Language) для создания гибких механизмов валидации.
Мы предоставляем набор предопределённых переменных, которые можно использовать в rule:

  • instance.memory.size int — лимит памяти пода в байтах
  • instance.cpu.cores int — количество ядер CPU
  • configuration.logRetentionHours int — время хранения логов в часах
  • configuration.logRetentionMs int — время хранения логов в миллисекундах (-1 = отключено)
  • configuration.logRetentionBytes int — размер хранения логов в байтах (-1 = отключено)
  • configuration.logSegmentBytes int — максимальный размер сегмента лога в байтах
  • configuration.messageMaxBytes int — максимальный размер сообщения в байтах
  • configuration.compressionType string — тип сжатия брокера
  • configuration.numNetworkThreads int — количество сетевых потоков
  • configuration.numIoThreads int — количество I/O потоков
  • configuration.logCleanerThreads int — количество потоков очистки логов
  • configuration.socketRequestMaxBytes int — максимальный размер запроса через сокет в байтах
spec:
  validations:
    - message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
      rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
    - message: "logRetentionHours must be at least 1"
      rule: "configuration.logRetentionHours >= 1"
    - message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
      rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
    - message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
      rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
    - message: "numNetworkThreads must not exceed 2 × CPU cores"
      rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
    - message: "numIoThreads must not exceed 4 × CPU cores"
      rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
    - message: "logCleanerThreads must not exceed the number of CPU cores"
      rule: "configuration.logCleanerThreads <= instance.cpu.cores"

Overridable Configuration

Белый список параметров конфигурации, которые могут быть переопределены в ресурсе Kafka.
Список всех возможных параметров.

spec:
  overridableConfiguration:
    - logRetentionHours
    - logRetentionMs
    - logRetentionBytes
    - messageMaxBytes
    - compressionType
    - autoCreateTopicsEnable

Configuration

Параметры конфигурации, которые могут быть определены на уровне KafkaClass.
Значения этих параметров переопределят defaults для всех связанных ресурсов Kafka.
Note: Параметры, которые были разрешены и переопределены в overridableConfiguration, будут являться приоритетными.

spec:
  configuration:
    logRetentionHours: 168
    logSegmentBytes: "512Mi"
    logCleanupPolicy: delete
    compressionType: producer
    numNetworkThreads: 3
    numIoThreads: 8
    numRecoveryThreadsPerDataDir: 1
    socketSendBufferBytes: "1Mi"
    socketReceiveBufferBytes: "1Mi"
    socketRequestMaxBytes: "100Mi"
    logCleanerThreads: 1
    logCleanerDeleteRetentionMs: 86400000
    logRetentionCheckIntervalMs: 300000
    deleteTopicEnable: true
    groupInitialRebalanceDelayMs: 3000

Поля конфигурации

logRetentionHours

Тип: integer | Параметр Kafka: log.retention.hours | Мин: 1 | Пример: 168

Количество часов хранения лог-файла по умолчанию. Взаимоисключающий с logRetentionMs.

logRetentionMs

Тип: integer | Параметр Kafka: log.retention.ms | Мин: -1 | Пример: 604800000

Время хранения логов с точностью до миллисекунды. Имеет приоритет над logRetentionHours.
Установите -1, чтобы отключить хранение по времени. Взаимоисключающий с logRetentionHours.

logRetentionBytes

Тип: int-or-string | Параметр Kafka: log.retention.bytes | Пример: 1Gi

Максимальный размер лога на партицию по умолчанию. Старые сегменты удаляются при превышении лимита.
Используйте -1, чтобы отключить хранение по размеру.

logSegmentBytes

Тип: int-or-string | Параметр Kafka: log.segment.bytes | Пример: 512Mi

Максимальный размер одного файла сегмента лога по умолчанию. Сообщение должно помещаться в один сегмент.

messageMaxBytes

Тип: int-or-string | Параметр Kafka: message.max.bytes | Пример: 1Mi

Максимальный размер сообщения, принимаемого брокером, по умолчанию.
Не должен превышать logSegmentBytes или socketRequestMaxBytes.

compressionType

Тип: string | Параметр Kafka: compression.type
Допустимые значения: producer, uncompressed, gzip, snappy, lz4, zstd

Сжатие сообщений на уровне брокера по умолчанию.
producer (рекомендуется) сохраняет то сжатие, которое применил производитель, избегая повторного сжатия.

autoCreateTopicsEnable

Тип: boolean | Параметр Kafka: auto.create.topics.enable

Управляет автоматическим созданием топиков при первом обращении. Значение по умолчанию для всех связанных экземпляров.
Отключайте в production для явного управления жизненным циклом топиков.

logCleanupPolicy

Тип: string | Параметр Kafka: log.cleanup.policy
Допустимые значения: delete, compact, delete,compact

Политика очистки сегментов лога по умолчанию при превышении лимитов хранения.

numNetworkThreads

Тип: integer | Параметр Kafka: num.network.threads | Мин: 1 | Пример: 3

Количество потоков, обрабатывающих сетевые запросы.
Увеличивайте, если метрика брокера NetworkProcessorAvgIdlePercent постоянно высока.
Валидация: не должно превышать 2 × количество ядер CPU.

numIoThreads

Тип: integer | Параметр Kafka: num.io.threads | Мин: 1 | Пример: 8

Количество потоков, выполняющих дисковые операции ввода-вывода для запросов.
Хорошая отправная точка — 8 × количество дисков (обычно 1 диск на брокер).
Валидация: не должно превышать 4 × количество ядер CPU.

numRecoveryThreadsPerDataDir

Тип: integer | Параметр Kafka: num.recovery.threads.per.data.dir | Мин: 1 | Пример: 1

Количество потоков для восстановления логов при запуске и сброса при остановке на каждую директорию данных.

socketSendBufferBytes

Тип: int-or-string | Параметр Kafka: socket.send.buffer.bytes | Пример: 1Mi

Размер буфера SO_SNDBUF для операций отправки через сокет. Увеличивайте для высоколатентных или высоконагруженных соединений.

socketReceiveBufferBytes

Тип: int-or-string | Параметр Kafka: socket.receive.buffer.bytes | Пример: 1Mi

Размер буфера SO_RCVBUF для операций приёма через сокет. Увеличивайте для высоколатентных или высоконагруженных соединений.

socketRequestMaxBytes

Тип: int-or-string | Параметр Kafka: socket.request.max.bytes | Пример: 100Mi

Максимальный размер запроса через сокет.
Должен быть ≥ messageMaxBytes, иначе брокер не сможет принять сообщение максимального размера.

logCleanerThreads

Тип: integer | Параметр Kafka: log.cleaner.threads | Мин: 1 | Пример: 1

Количество фоновых потоков для компактизации логов.
Валидация: не должно превышать количество ядер CPU.

logCleanerDeleteRetentionMs

Тип: integer | Параметр Kafka: log.cleaner.delete.retention.ms | Мин: 0 | Пример: 86400000

Время хранения маркера удаления (tombstone) после удаления связанного ключа.
Даёт потребителям время зафиксировать удаление до того, как маркер будет удалён.

logRetentionCheckIntervalMs

Тип: integer | Параметр Kafka: log.retention.check.interval.ms | Мин: 1000 | Пример: 300000

Как часто (в миллисекундах) очиститель логов проверяет наличие сегментов, подлежащих удалению.

deleteTopicEnable

Тип: boolean | Параметр Kafka: delete.topic.enable

Управляет возможностью удаления топиков через admin API.
Отключайте для предотвращения случайной потери данных в production.

groupInitialRebalanceDelayMs

Тип: integer | Параметр Kafka: group.initial.rebalance.delay.ms | Мин: 0 | Макс: 300000 | Пример: 3000

Время ожидания координатора группы перед инициацией первой перебалансировки, пока новые потребители могут присоединиться.
Меньшее значение сокращает время старта; большее значение даёт более равномерное распределение партиций.

Значения по умолчанию, которые устанавливаются Kafka Operator:

  • logRetentionHours: 168 (7 дней)
  • compressionType: producer
  • autoCreateTopicsEnable: false

Affinity

Стандартный механизм Kubernetes для управления планированием разворачиваемых подов.

spec:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
      - matchExpressions:
        - key: "node.deckhouse.io/group"
          operator: "In"
          values:
          - "kafka"

Tolerations

Стандартный механизм Kubernetes для управления планированием разворачиваемых подов.

spec:
  tolerations:
  - key: primary-role
    operator: Equal
    value: kafka
    effect: NoSchedule

Node Selector

Стандартный механизм Kubernetes для управления планированием разворачиваемых подов.

spec:
  nodeSelector:
    "node.deckhouse.io/group": "kafka"

Примеры использования

Базовое использование

apiVersion: managed-services.deckhouse.io/v1alpha1
kind: KafkaClass
metadata:
  name: default
spec:
  overridableConfiguration:
    - logRetentionHours
    - logRetentionMs
    - logRetentionBytes
    - messageMaxBytes
    - compressionType
    - autoCreateTopicsEnable

  configuration:
    logRetentionHours: 168
    logSegmentBytes: "512Mi"
    logCleanupPolicy: delete
    compressionType: producer
    numNetworkThreads: 3
    numIoThreads: 8
    numRecoveryThreadsPerDataDir: 1
    socketSendBufferBytes: "1Mi"
    socketReceiveBufferBytes: "1Mi"
    socketRequestMaxBytes: "100Mi"
    logCleanerThreads: 1
    logCleanerDeleteRetentionMs: 86400000
    logRetentionCheckIntervalMs: 300000
    deleteTopicEnable: true
    groupInitialRebalanceDelayMs: 3000

  sizingPolicies:
    - cores:
        min: 1
        max: 3
      memory:
        min: "0.5Gi"
        max: "8Gi"
        step: "512Mi"
      coreFractions: ["25%", "50%", "100%"]
    - cores:
        min: 4
        max: 16
      memory:
        min: "8Gi"
        max: "64Gi"
        step: "1Gi"
      coreFractions: ["50%", "100%"]

  validations:
    - message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
      rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
    - message: "logRetentionHours must be at least 1"
      rule: "configuration.logRetentionHours >= 1"
    - message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
      rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
    - message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
      rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
    - message: "numNetworkThreads must not exceed 2 × CPU cores"
      rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
    - message: "numIoThreads must not exceed 4 × CPU cores"
      rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
    - message: "logCleanerThreads must not exceed the number of CPU cores"
      rule: "configuration.logCleanerThreads <= instance.cpu.cores"