Установка и настройка Kafka кластера

Дата: 28.10.2019 Автор Admin

В данной статье рассмотрим как настроить kafka кластер из 3-х серверов с поддержкой ssl.

Действия описанные ниже выполняем на всех 3-х серверах.

 

Добавляем правила Firewall для Kafka и Zookeeper

 

 

Активируем правила

 

Создаем пользователя для kafka

 

Устанавливаем Java

 

Скачиваем последнюю версию Kafka

 

Распаковываем

 

Перемещаем в /opt

 

Создаем каталоги для логов Kafka и для zooKeeper

 

Переходим к конфигурации zooKeeper, открываем файл с конфигурацией

 

и указываем:

директорию с данными:

  • dataDir=/opt/kafka/zookeeper/data

Сервера и лимиты синхронизации:

  • server.1=kafka1.dev.local:2888:3888
  • server.2=kafka2.dev.local:2888:3888
  • server.3=kafka3.dev.local:2888:3888
  • initLimit=5
  • syncLimit=2

Далее, на каждом сервере создаем свой id для zooKeeper

 

Переходим к настройке kafka

Редактируем конфиг сервера

 

Добавляем:

broker.id=1 (для каждого сервера свой 1,2,3)

директорию с логами

  • log.dirs=/opt/kafka/kafka-logs

указываем прослушиватели:

  • listeners=PLAINTEXT://:9092
  • advertised.listeners=PLAINTEXT://kafka1.dev.local:9092
  • listeners — используется для внутреннего траффика между нодами кластера
  • advertised.listeners — используется для клиентского траффика

указываем ноды zooKeeper:

  • zookeeper.connect=kafka1.dev.local:2181,kafka2.dev.local:2181,kafka3.dev.local:2181

Если вам требуется поддержка удаления топиков, включите опцию ниже:

  • delete.topic.enable=true

Далее меняем владельца на пользователя kafka

 

Теперь создадим сервисы systemd для zooKeeper и kafka

Создадим сервис zooKeeper

 

 

 

Создадим сервис Kafka

 

 

После создания файлов загружаем информацию о новых сервисах и включаем их

 

Перезагружаем сервера и убеждаемся что сервисы запущены

Логинимся под пользователем kafka

переходим в каталог с kafka

 

Проверим что zooKeeper работает корректно и все ноды видят друг друга

 

вывод должен быть таким:

 

Теперь проверим корректность работы Kafka, на текущей ноде запустим Producer, обратите внимание, что топик будет создан автоматически

 

На другой ноде, например на kafka2, запустим Consumer

 

Теперь на запущенном Producer вводим сообщения, они должны появится на ноде с запущенным Consumer

Посмотреть список созданных топиков можно следующей командой:

Получить информацию о созданном топике можно следующей командой:

 

Удалить топик можно следующей командой:

Обратите внимание, что для удаления топика необходимо добавить delete.topic.enable=True в конфиг сервера Kafka

Создать топик с определенными параметрами можно следующей командой:

 

Также можно выполнить более тонкую настройку топика при его создании, например:

Изменить параметры топика можно следующей командой (изменим количество партиций и параметр min.insync.replicas):

 

Посмотреть кол-во открытых соединений к Kafka можно командой:

 

Узнать размер топика можно командой:

 

Посмотреть список активных consumer можно командой:

Получив список клиентов можно посмотреть более подробную информацию о consumer, в примере ниже клиент console-consumer-70311

Теперь рассмотрим как добавить подержку SSL, это снизит производительность, но повысит безопасность, т.к. соединения к kafka будут зашифрованы

Создадим сертификаты для Kafka, в данном случае они будут самоподписанные, но ничего не мешает использовать свой CA для подписи, если он у вас есть.

Создаем каталог /opt/kafka/ssl на всех нодах

переходим в него

На этом этапе создадим ключи для нашего CA

 

Далее импортируем сертификат CA в создаваемые server.truststore.jks и client.truststore.jks

Копируем эти файлы на остальные ноды

 

Создаем Java keystore и CSR запрос для сертификатов брокера, выполняем эти операции каждой ноде

 

При создании сертифката убедитесь что CN полностью совпадает с FQDN вашего Kafka сервера

Создаем файл myssl-config.cnf со следующим содержимым

 

Подписываем его нашим CA (пароль укажите от своего CA)

Добавляем сертифкаты в server.keystore.jks

Теперь, когда на каждой ноде есть сертификаты, переходим к конфигурации KAFKA, добавляем поддержку SSL.

Открываем server.properties на серверах и добавляем следующие строки в конфиг

 

Добавим правила для firewall

 

Активируем правила

 

Перезапускаем сервисы Kafka

 

Теперь Kafka доступна по порту SSL 9093

Но чтобы к ней подключиться необходимо добавить конфигурацию для клиента, для этого нужно создать специальный файл client.properties, создаем его

 

содержимое файла:

 

В данном файле указывается путь к файлу truststore, тип хранилища и security protocol

Теперь, чтобы producer или consumer клиенты его использовали, нужно передать им этот конфиг в специальном параметре

Пример запуска producer

 

Пример запуска consumer

 

Обратите внимание что при подключении нужно указывать полный FQDN брокеров

Если вы будете использовать свой внутренний CA, то вам понадобятся следующие команды чтобы импортировать PFX сертификаты в keystore:

 

Также, обратите внимание что Kafka в параметре ssl.keystore.password (в файле server.properties) ожидает что пароль от приватного ключа и keystore совпадает.
Если это не так, выполните команду по замене пароля приватного ключа:

Узнать алиас сертификата в keystore можно командой:

 

Теперь перейдем к настройке авторизации, рассмотрим мы два варианта, по логину и паролю и по сертификату.

Начнем с первого метода.

Создаем файл /opt/kafka/config/kafka_jaas.conf , в котором будут храниться логины и пароли в SASL/PLAIN формате

содержимое файла будет следующим:

 

Итого мы создали 3 пользователя:brokeradmin,admin,test

Далее нужно добавить в строку запуска сервиса kafka параметр Djava.security.auth.login.config , в переменную KAFKA_OPTS, для этого выполняем:

Добавляем строку Environment=’KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf’

Должно получиться так:

 

Теперь редактируем файл /opt/kafka/config/server.properties

 

Добавляем/редактируем строки:

  • listeners=SASL_SSL://kafka1.dev.local:9093
  • advertised.listeners=SASL_SSL://kafka1.dev.local:9093
  • security.inter.broker.protocol=SASL_SSL
  • sasl.mechanism.inter.broker.protocol=PLAIN
  • sasl.enabled.mechanisms=PLAIN
  • authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  • ssl.endpoint.identification.algorithm=
  • super.users=User:admin

Выполняем эти настройки на всех трех нодах kafka1.dev.local/kafka2.dev.local/kafka2.dev.local и перезапускаем сервисы.

Теперь, если запустить Producer со старым конфигом мы получим ошибку авторизации.

ошибка — disconnected

Чтобы все заработало, изменим тип подключения в client.properties на SASL_SSL

 

добавляем/изменяем строки:

  • security.protocol=SASL_SSL
  • sasl.mechanism=PLAIN

Теперь создаем jaas файл для авторизации

 

содержимое файла:

 

Перед использованием Consumer или producer нужно экспортировать переменную KAFKA_OPTS, в которой будут переданы наши учетные данные

Теперь запустим команду еще раз, и подключение заработает, но мы получим ошибку TOPIC_AUTHORIZATION_FAILED

 

Настраиваем по аналогии Consumer на соседней ноде и видим что подключение работает, но сообщения не читаются, ошибка TOPIC_AUTHORIZATION_FAILED

 

Это связано с тем что мы еще не выдали права на топики Kafka. Исправим это.

Предоставим пользователю test права на запись в топик example-topic2 с любого хоста

 

И на чтение

 

Также нужно дать права на чтение группы, в моем случае это группа console-consumer-87796

 

Если топик еще не создан, нужно выдать права на их создание, делается это так:

 

Если не хотите заморачиваться можете выдавать права сразу на все группы, делается это параметром —group=’*’ , например так:

 

Теперь если запустить Consumer и Producer, все заработает корректно.

Более подробно ознакомится с правами доступа можно тут — https://docs.confluent.io/current/kafka/authorization.html

 

Теперь, рассмотрим второй метод, выдачу доступа по SSL сертификатам.

Создаем Java keystore и CSR запрос для сертификата клиента.

 

При создании сертифката убедитесь что CN полностью совпадает с FQDN вашего Kafka клиента

Создаем файл client-config.cnf со следующим содержимым

 

Подписываем его нашим CA (пароль укажите от своего CA)

 

Добавляем сертифкат в client.keystore.jks

 

Создаем следующей файл для подключения

 

Проверяем что параметры в server.properties следующие:

 

Выдадим права на запись в топик example-topic2

 

Выдадим права на чтение в топик example-topic2

Теперь запустим Producer

 

Запустим Consumer

 

Все работает.

Теперь протестируем отказоустойчивость kafka.

Для этого я сгенерирую текстовый файл в 10 000 000 строк, эти строки мы запишем в топик kafka, далее примем сообщения consumer и сохраним их в текстовый файл.
Во время заливки мы отключим одну из 3-х нод kafka и в конце сравним, файл на consumer и оригинальный файл.
Поскольку kafka гарантирует доставку сообщений, то файлы должны совпадать.

Создадим топик million

 

Создадим тестовый файл

 

Тестовый файл с сообщениями готов.

Теперь запускаем команду для заливки данных

 

теперь на сервере с consumer запускаем сам consumer с выводом в файл

 

Запускаем скрипт Producer-а , видим что пошла заливка данных и через некоторое время отключаем любую из нод сервера kafka

После завершения заливки данных проверим файл from-kafka.txt ,командой:

число строк в выводе должно быть 10000000.

Недоступность одного из брокеров не повлияла на отправку/доставку сообщений.

Проверим состояние топика во время недоступности одной из нод:

 

Вывод будет следующим:

 

Как видим число синхронных реплик — 1 (Isr), все партиции доступны и у каждой из них есть лидер.

В данном случае топик был настроен правильно, т.к. мы указали следующие параметры:

min.insync.replicas — кол-во реплик, на которые должны быть синхронизированы данные, прежде чем записаться.
unclean.leader.election.enable=false отключение возможности провести failover на не синхронную отстающую реплику с потенциальной потерей данных
ReplicationFactor:3 — кол-во реплик, на которые реплицируются данные.

После восстановления отключенной реплики видим что данные успешно синхронизированы

 


Добавить комментарий

Ваш адрес email не будет опубликован.