Стадия жизненного цикла модуля: Preview
У модуля есть требования для установки
KafkaClass
KafkaClass — основной ресурс, с которым работает администратор. Модуль managed-kafka управляет инстансами Kafka в Deckhouse Kubernetes Platform, а KafkaClass — это cluster-scoped ресурс, который управляет всеми инстансами Kafka, привязанными к нему: определяет допустимые комбинации размеров, задаёт значения конфигурации брокера по умолчанию, контролирует, какие параметры пользователи могут переопределять, и применяет правила валидации.
Каждый ресурс Kafka обязательно должен ссылаться на существующий KafkaClass. Вся конфигурация проверяется на соответствие указанному классу перед развёртыванием сервиса.
Вместе с модулем поставляется KafkaClass с именем default, содержащий готовые базовые настройки, подходящие для большинства задач. Его можно использовать как есть, либо скопировать и изменить под требования конкретных команд или окружений.
Sizing Policies
Структура позволяет создавать набор политик определения размера для связанных ресурсов Kafka.
Это позволит избегать неравномерного распределения ресурсов CPU и Memory на нодах кластера.
Определяющим фактором выбора той или иной политики является попадание в интервал cores.
Далее будет проверено соответствие остальных полей конкретной политики.
spec:
sizingPolicies:
- cores:
min: 1
max: 3
memory:
min: "0.5Gi"
max: "8Gi"
step: "512Mi"
coreFractions: ["25%", "50%", "100%"]
- cores:
min: 4
max: 16
memory:
min: "8Gi"
max: "64Gi"
step: "1Gi"
coreFractions: ["50%", "100%"]Validation Rules
В качестве синтаксиса используется CEL (Common Expression Language) для создания гибких механизмов валидации.
Мы предоставляем набор предопределённых переменных, которые можно использовать в rule:
instance.memory.sizeint— лимит памяти пода в байтахinstance.cpu.coresint— количество ядер CPUconfiguration.logRetentionHoursint— время хранения логов в часахconfiguration.logRetentionMsint— время хранения логов в миллисекундах (-1= отключено)configuration.logRetentionBytesint— размер хранения логов в байтах (-1= отключено)configuration.logSegmentBytesint— максимальный размер сегмента лога в байтахconfiguration.messageMaxBytesint— максимальный размер сообщения в байтахconfiguration.compressionTypestring— тип сжатия брокераconfiguration.numNetworkThreadsint— количество сетевых потоковconfiguration.numIoThreadsint— количество I/O потоковconfiguration.logCleanerThreadsint— количество потоков очистки логовconfiguration.socketRequestMaxBytesint— максимальный размер запроса через сокет в байтах
spec:
validations:
- message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
- message: "logRetentionHours must be at least 1"
rule: "configuration.logRetentionHours >= 1"
- message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
- message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
- message: "numNetworkThreads must not exceed 2 × CPU cores"
rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
- message: "numIoThreads must not exceed 4 × CPU cores"
rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
- message: "logCleanerThreads must not exceed the number of CPU cores"
rule: "configuration.logCleanerThreads <= instance.cpu.cores"Overridable Configuration
Белый список параметров конфигурации, которые могут быть переопределены в ресурсе Kafka.
Список всех возможных параметров.
spec:
overridableConfiguration:
- logRetentionHours
- logRetentionMs
- logRetentionBytes
- messageMaxBytes
- compressionType
- autoCreateTopicsEnableConfiguration
Параметры конфигурации, которые могут быть определены на уровне KafkaClass.
Значения этих параметров переопределят defaults для всех связанных ресурсов Kafka.
Note: Параметры, которые были разрешены и переопределены в overridableConfiguration, будут являться приоритетными.
spec:
configuration:
logRetentionHours: 168
logSegmentBytes: "512Mi"
logCleanupPolicy: delete
compressionType: producer
numNetworkThreads: 3
numIoThreads: 8
numRecoveryThreadsPerDataDir: 1
socketSendBufferBytes: "1Mi"
socketReceiveBufferBytes: "1Mi"
socketRequestMaxBytes: "100Mi"
logCleanerThreads: 1
logCleanerDeleteRetentionMs: 86400000
logRetentionCheckIntervalMs: 300000
deleteTopicEnable: true
groupInitialRebalanceDelayMs: 3000Поля конфигурации
logRetentionHours
Тип: integer | Параметр Kafka: log.retention.hours | Мин: 1 | Пример: 168
Количество часов хранения лог-файла по умолчанию. Взаимоисключающий с logRetentionMs.
logRetentionMs
Тип: integer | Параметр Kafka: log.retention.ms | Мин: -1 | Пример: 604800000
Время хранения логов с точностью до миллисекунды. Имеет приоритет над logRetentionHours.
Установите -1, чтобы отключить хранение по времени. Взаимоисключающий с logRetentionHours.
logRetentionBytes
Тип: int-or-string | Параметр Kafka: log.retention.bytes | Пример: 1Gi
Максимальный размер лога на партицию по умолчанию. Старые сегменты удаляются при превышении лимита.
Используйте -1, чтобы отключить хранение по размеру.
logSegmentBytes
Тип: int-or-string | Параметр Kafka: log.segment.bytes | Пример: 512Mi
Максимальный размер одного файла сегмента лога по умолчанию. Сообщение должно помещаться в один сегмент.
messageMaxBytes
Тип: int-or-string | Параметр Kafka: message.max.bytes | Пример: 1Mi
Максимальный размер сообщения, принимаемого брокером, по умолчанию.
Не должен превышать logSegmentBytes или socketRequestMaxBytes.
compressionType
Тип: string | Параметр Kafka: compression.type
Допустимые значения: producer, uncompressed, gzip, snappy, lz4, zstd
Сжатие сообщений на уровне брокера по умолчанию.
producer (рекомендуется) сохраняет то сжатие, которое применил производитель, избегая повторного сжатия.
autoCreateTopicsEnable
Тип: boolean | Параметр Kafka: auto.create.topics.enable
Управляет автоматическим созданием топиков при первом обращении. Значение по умолчанию для всех связанных экземпляров.
Отключайте в production для явного управления жизненным циклом топиков.
logCleanupPolicy
Тип: string | Параметр Kafka: log.cleanup.policy
Допустимые значения: delete, compact, delete,compact
Политика очистки сегментов лога по умолчанию при превышении лимитов хранения.
numNetworkThreads
Тип: integer | Параметр Kafka: num.network.threads | Мин: 1 | Пример: 3
Количество потоков, обрабатывающих сетевые запросы.
Увеличивайте, если метрика брокера NetworkProcessorAvgIdlePercent постоянно высока.
Валидация: не должно превышать 2 × количество ядер CPU.
numIoThreads
Тип: integer | Параметр Kafka: num.io.threads | Мин: 1 | Пример: 8
Количество потоков, выполняющих дисковые операции ввода-вывода для запросов.
Хорошая отправная точка — 8 × количество дисков (обычно 1 диск на брокер).
Валидация: не должно превышать 4 × количество ядер CPU.
numRecoveryThreadsPerDataDir
Тип: integer | Параметр Kafka: num.recovery.threads.per.data.dir | Мин: 1 | Пример: 1
Количество потоков для восстановления логов при запуске и сброса при остановке на каждую директорию данных.
socketSendBufferBytes
Тип: int-or-string | Параметр Kafka: socket.send.buffer.bytes | Пример: 1Mi
Размер буфера SO_SNDBUF для операций отправки через сокет. Увеличивайте для высоколатентных или высоконагруженных соединений.
socketReceiveBufferBytes
Тип: int-or-string | Параметр Kafka: socket.receive.buffer.bytes | Пример: 1Mi
Размер буфера SO_RCVBUF для операций приёма через сокет. Увеличивайте для высоколатентных или высоконагруженных соединений.
socketRequestMaxBytes
Тип: int-or-string | Параметр Kafka: socket.request.max.bytes | Пример: 100Mi
Максимальный размер запроса через сокет.
Должен быть ≥ messageMaxBytes, иначе брокер не сможет принять сообщение максимального размера.
logCleanerThreads
Тип: integer | Параметр Kafka: log.cleaner.threads | Мин: 1 | Пример: 1
Количество фоновых потоков для компактизации логов.
Валидация: не должно превышать количество ядер CPU.
logCleanerDeleteRetentionMs
Тип: integer | Параметр Kafka: log.cleaner.delete.retention.ms | Мин: 0 | Пример: 86400000
Время хранения маркера удаления (tombstone) после удаления связанного ключа.
Даёт потребителям время зафиксировать удаление до того, как маркер будет удалён.
logRetentionCheckIntervalMs
Тип: integer | Параметр Kafka: log.retention.check.interval.ms | Мин: 1000 | Пример: 300000
Как часто (в миллисекундах) очиститель логов проверяет наличие сегментов, подлежащих удалению.
deleteTopicEnable
Тип: boolean | Параметр Kafka: delete.topic.enable
Управляет возможностью удаления топиков через admin API.
Отключайте для предотвращения случайной потери данных в production.
groupInitialRebalanceDelayMs
Тип: integer | Параметр Kafka: group.initial.rebalance.delay.ms | Мин: 0 | Макс: 300000 | Пример: 3000
Время ожидания координатора группы перед инициацией первой перебалансировки, пока новые потребители могут присоединиться.
Меньшее значение сокращает время старта; большее значение даёт более равномерное распределение партиций.
Значения по умолчанию, которые устанавливаются Kafka Operator:
logRetentionHours:168(7 дней)compressionType:producerautoCreateTopicsEnable:false
Affinity
Стандартный механизм Kubernetes для управления планированием разворачиваемых подов.
spec:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "node.deckhouse.io/group"
operator: "In"
values:
- "kafka"Tolerations
Стандартный механизм Kubernetes для управления планированием разворачиваемых подов.
spec:
tolerations:
- key: primary-role
operator: Equal
value: kafka
effect: NoScheduleNode Selector
Стандартный механизм Kubernetes для управления планированием разворачиваемых подов.
spec:
nodeSelector:
"node.deckhouse.io/group": "kafka"Примеры использования
Базовое использование
apiVersion: managed-services.deckhouse.io/v1alpha1
kind: KafkaClass
metadata:
name: default
spec:
overridableConfiguration:
- logRetentionHours
- logRetentionMs
- logRetentionBytes
- messageMaxBytes
- compressionType
- autoCreateTopicsEnable
configuration:
logRetentionHours: 168
logSegmentBytes: "512Mi"
logCleanupPolicy: delete
compressionType: producer
numNetworkThreads: 3
numIoThreads: 8
numRecoveryThreadsPerDataDir: 1
socketSendBufferBytes: "1Mi"
socketReceiveBufferBytes: "1Mi"
socketRequestMaxBytes: "100Mi"
logCleanerThreads: 1
logCleanerDeleteRetentionMs: 86400000
logRetentionCheckIntervalMs: 300000
deleteTopicEnable: true
groupInitialRebalanceDelayMs: 3000
sizingPolicies:
- cores:
min: 1
max: 3
memory:
min: "0.5Gi"
max: "8Gi"
step: "512Mi"
coreFractions: ["25%", "50%", "100%"]
- cores:
min: 4
max: 16
memory:
min: "8Gi"
max: "64Gi"
step: "1Gi"
coreFractions: ["50%", "100%"]
validations:
- message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
- message: "logRetentionHours must be at least 1"
rule: "configuration.logRetentionHours >= 1"
- message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
- message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
- message: "numNetworkThreads must not exceed 2 × CPU cores"
rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
- message: "numIoThreads must not exceed 4 × CPU cores"
rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
- message: "logCleanerThreads must not exceed the number of CPU cores"
rule: "configuration.logCleanerThreads <= instance.cpu.cores"