Установка и настройка Kafka кластера
В данной статье рассмотрим как настроить kafka кластер из 3-х серверов с поддержкой ssl.
Действия описанные ниже выполняем на всех 3-х серверах.
Добавляем правила Firewall для Kafka и Zookeeper
1 |
sudo nano /etc/firewalld/services/zooKeeper.xml |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<?xml version="1.0" encoding="utf-8"?> <service> <short>ZooKeeper</short> <description>Firewall rule for ZooKeeper ports</description> <port protocol="tcp" port="2888"/> <port protocol="tcp" port="3888"/> <port protocol="tcp" port="2181"/> </service> |
1 |
sudo nano /etc/firewalld/services/kafka.xml |
1 2 3 4 5 6 7 8 9 10 11 |
<?xml version="1.0" encoding="utf-8"?> <service> <short>Kafka</short> <description>Firewall rule for Kafka port</description> <port protocol="tcp" port="9092"/> </service> |
Активируем правила
1 2 3 4 5 6 7 |
sudo service firewalld restart sudo firewall-cmd --permanent --add-service=zooKeeper sudo firewall-cmd --permanent --add-service=kafka sudo service firewalld restart |
Создаем пользователя для kafka
1 2 3 |
sudo adduser kafka sudo passwd kafka |
Устанавливаем Java
1 |
yum install java-1.8.0-openjdk |
Скачиваем последнюю версию Kafka
1 |
wget http://apache-mirror.8birdsvideo.com/kafka/2.3.0/kafka_2.11-2.3.0.tgz |
Распаковываем
1 |
tar -xzf kafka_2.11-2.3.0.tgz |
Перемещаем в /opt
1 |
mv kafka_2.11-2.3.0 /opt/kafka |
Создаем каталоги для логов Kafka и для zooKeeper
1 2 3 |
mkdir -p /opt/kafka/zookeeper/data mkdir -p /opt/kafka/kafka-logs |
Переходим к конфигурации zooKeeper, открываем файл с конфигурацией
1 |
nano /opt/kafka/config/zookeeper.properties |
и указываем:
директорию с данными:
- dataDir=/opt/kafka/zookeeper/data
Сервера и лимиты синхронизации:
- server.1=kafka1.dev.local:2888:3888
- server.2=kafka2.dev.local:2888:3888
- server.3=kafka3.dev.local:2888:3888
- initLimit=5
- syncLimit=2
Далее, на каждом сервере создаем свой id для zooKeeper
1 2 3 4 5 |
echo "1" > /opt/kafka/zookeeper/data/myid (для сервера kafka1.dev.local) echo "2" > /opt/kafka/zookeeper/data/myid (для сервера kafka2.dev.local) echo "3" > /opt/kafka/zookeeper/data/myid (для сервера kafka3.dev.local) |
Переходим к настройке kafka
Редактируем конфиг сервера
1 |
nano /opt/kafka/config/server.properties |
Добавляем:
broker.id=1 (для каждого сервера свой 1,2,3)
директорию с логами
- log.dirs=/opt/kafka/kafka-logs
указываем прослушиватели:
- listeners=PLAINTEXT://:9092
- advertised.listeners=PLAINTEXT://kafka1.dev.local:9092
- listeners — используется для внутреннего траффика между нодами кластера
- advertised.listeners — используется для клиентского траффика
указываем ноды zooKeeper:
- zookeeper.connect=kafka1.dev.local:2181,kafka2.dev.local:2181,kafka3.dev.local:2181
Если вам требуется поддержка удаления топиков, включите опцию ниже:
- delete.topic.enable=true
Далее меняем владельца на пользователя kafka
1 |
chown -R kafka /opt/kafka |
Теперь создадим сервисы systemd для zooKeeper и kafka
Создадим сервис zooKeeper
1 |
nano /etc/systemd/system/zookeeper.service |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
[Unit] Description=ZooKeeper for Kafka After=network.target [Service] Type=forking User=kafka Restart=on-failure LimitNOFILE=16384:163840 ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties [Install] WantedBy=multi-user.target |
Создадим сервис Kafka
1 |
nano /etc/systemd/system/kafka.service |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
[Unit] Description=Kafka Broker After=network.target After=zookeeper.service [Service] Type=forking User=kafka SyslogIdentifier=kafka (%i) Restart=on-failure LimitNOFILE=16384:163840 ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties [Install] WantedBy=multi-user.target |
После создания файлов загружаем информацию о новых сервисах и включаем их
1 2 3 4 5 |
systemctl daemon-reload systemctl enable zookeeper systemctl enable kafka |
Перезагружаем сервера и убеждаемся что сервисы запущены
Логинимся под пользователем kafka
1 |
su kafka |
переходим в каталог с kafka
1 |
cd /opt/kafka |
Проверим что zooKeeper работает корректно и все ноды видят друг друга
1 |
bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids |
вывод должен быть таким:
1 2 3 4 5 6 7 |
Connecting to localhost:2181 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [1, 2, 3] |
Теперь проверим корректность работы Kafka, на текущей ноде запустим Producer, обратите внимание, что топик будет создан автоматически
1 |
bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic example-topic |
На другой ноде, например на kafka2, запустим Consumer
1 |
su kafka |
1 |
cd /opt/kafka |
1 |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example-topic |
Теперь на запущенном Producer вводим сообщения, они должны появится на ноде с запущенным Consumer
Посмотреть список созданных топиков можно следующей командой:
1 |
bin/kafka-topics.sh --list --zookeeper localhost:2181 |
Получить информацию о созданном топике можно следующей командой:
1 |
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic example-topic |
Удалить топик можно следующей командой:
1 |
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic example-topic |
Обратите внимание, что для удаления топика необходимо добавить delete.topic.enable=True в конфиг сервера Kafka
Создать топик с определенными параметрами можно следующей командой:
1 2 3 4 5 6 7 8 9 |
bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --topic <topic-name> \ --partitions <number-of-partitions> \ --replication-factor <number-of-replicating-servers> |
Также можно выполнить более тонкую настройку топика при его создании, например:
1 |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --config min.insync.replicas=1 --config retention.ms=-1 --config unclean.leader.election.enable=false --topic test-topic |
Изменить параметры топика можно следующей командой (изменим количество партиций и параметр min.insync.replicas):
1 |
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test-topic --partitions 8 --config min.insync.replicas=2 |
Посмотреть кол-во открытых соединений к Kafka можно командой:
1 |
netstat -anp | grep :9092 | grep ESTABLISHED | wc -l |
Узнать размер топика можно командой:
1 |
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --topic-list 'test-topic' --describe | grep '^{' | jq '[ ..|.size? | numbers ] | add' |
Посмотреть список активных consumer можно командой:
1 |
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 |
Получив список клиентов можно посмотреть более подробную информацию о consumer, в примере ниже клиент console-consumer-70311
1 |
bin/kafka-consumer-groups.sh --describe --group console-consumer-70311 --bootstrap-server localhost:9092 |
Теперь рассмотрим как добавить подержку SSL, это снизит производительность, но повысит безопасность, т.к. соединения к kafka будут зашифрованы
Создадим сертификаты для Kafka, в данном случае они будут самоподписанные, но ничего не мешает использовать свой CA для подписи, если он у вас есть.
Создаем каталог /opt/kafka/ssl на всех нодах
1 |
mkdir /opt/kafka/ssl |
переходим в него
1 |
cd /opt/kafka/ssl |
На этом этапе создадим ключи для нашего CA
1 |
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 |
Далее импортируем сертификат CA в создаваемые server.truststore.jks и client.truststore.jks
1 2 3 |
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert |
Копируем эти файлы на остальные ноды
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
scp ca-cert root@kafka2:/opt/kafka/ssl/ scp ca-cert root@kafka3:/opt/kafka/ssl/ scp ca-key root@kafka2:/opt/kafka/ssl/ scp ca-key root@kafka3:/opt/kafka/ssl/ scp server.truststore.jks root@kafka2:/opt/kafka/ssl/ scp server.truststore.jks root@kafka3:/opt/kafka/ssl/ scp client.truststore.jks root@kafka2:/opt/kafka/ssl/ scp client.truststore.jks root@kafka3:/opt/kafka/ssl/ |
Создаем Java keystore и CSR запрос для сертификатов брокера, выполняем эти операции каждой ноде
1 |
keytool -genkeypair -alias KafkaServerSSL -keyalg RSA -keystore server.keystore.jks -keysize 2048 -dname "CN=$(hostname -f),OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU" -ext san=dns:kafka1.dev.local,dns:kafka2.dev.local,dns:kafka3.dev.local |
1 |
keytool -certreq -alias KafkaServerSSL -keystore server.keystore.jks -file $(hostname -f).csr -ext san=dns:kafka1.dev.local,dns:kafka2.dev.local,dns:kafka3.dev.local -ext EKU=serverAuth,clientAuth |
При создании сертифката убедитесь что CN полностью совпадает с FQDN вашего Kafka сервера
Создаем файл myssl-config.cnf со следующим содержимым
1 2 3 4 5 6 7 8 9 10 11 12 13 |
[ SAN ] extendedKeyUsage = serverAuth, clientAuth subjectAltName = @alt_names [alt_names] DNS.1 = kafka1.dev.local DNS.2 = kafka2.dev.local DNS.3 = kafka3.dev.local |
Подписываем его нашим CA (пароль укажите от своего CA)
1 |
openssl x509 -req -CA ca-cert -CAkey ca-key -extensions SAN -extfile myssl-config.cnf -in $(hostname -f).csr -out cert-signed -CAcreateserial -passin pass:123123 |
Добавляем сертифкаты в server.keystore.jks
1 2 3 |
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -import -file cert-signed -keystore server.keystore.jks -alias KafkaServerSSL |
Теперь, когда на каждой ноде есть сертификаты, переходим к конфигурации KAFKA, добавляем поддержку SSL.
Открываем server.properties на серверах и добавляем следующие строки в конфиг
1 |
nano /opt/kafka/config/server.properties |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
ssl.keystore.location=/opt/kafka/ssl/server.keystore.jks ssl.keystore.password=123123 ssl.key.password=123123 ssl.truststore.location=/opt/kafka/ssl/server.truststore.jks ssl.truststore.password=123123 ssl.client.auth=none ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS security.inter.broker.protocol=SSL ssl.endpoint.identification.algorithm= listeners=PLAINTEXT://:9092,SSL://kafka1.dev.local:9093 advertised.listeners=PLAINTEXT://kafka1.dev.local:9092,SSL://kafka1.dev.local:9093 |
Добавим правила для firewall
1 |
sudo nano /etc/firewalld/services/kafkassl.xml |
1 2 3 4 5 6 7 8 9 10 11 |
<?xml version="1.0" encoding="utf-8"?> <service> <short>Kafkassl</short> <description>Firewall rule for Kafka SSL port</description> <port protocol="tcp" port="9093"/> </service> |
Активируем правила
1 2 3 4 5 |
sudo service firewalld restart sudo firewall-cmd --permanent --add-service=kafkassl sudo service firewalld restart |
Перезапускаем сервисы Kafka
1 |
service kafka restart |
Теперь Kafka доступна по порту SSL 9093
Но чтобы к ней подключиться необходимо добавить конфигурацию для клиента, для этого нужно создать специальный файл client.properties, создаем его
1 |
nano client.properties |
содержимое файла:
1 2 3 4 5 6 7 8 9 |
security.protocol=SSL ssl.truststore.location=/opt/kafka/ssl/client.truststore.jks ssl.truststore.password=123123 ssl.truststore.type=JKS ssl.keystore.type=JKS |
В данном файле указывается путь к файлу truststore, тип хранилища и security protocol
Теперь, чтобы producer или consumer клиенты его использовали, нужно передать им этот конфиг в специальном параметре
Пример запуска producer
1 |
/opt/kafka/bin/kafka-console-producer.sh --producer.config client.properties --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2 |
Пример запуска consumer
1 |
/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.properties --bootstrap-server kafka3.dev.local:9093 --topic example-topic2 |
Обратите внимание что при подключении нужно указывать полный FQDN брокеров
Если вы будете использовать свой внутренний CA, то вам понадобятся следующие команды чтобы импортировать PFX сертификаты в keystore:
1 |
keytool -importkeystore -srckeystore cert.pfx -srcstoretype pkcs12 -destkeystore cert.keystore.jks -deststoretype JKS |
Также, обратите внимание что Kafka в параметре ssl.keystore.password (в файле server.properties) ожидает что пароль от приватного ключа и keystore совпадает.
Если это не так, выполните команду по замене пароля приватного ключа:
1 |
keytool -keypasswd -keystore cert.keystore.jks -alias Hnmshuefg7efsjfh3w4hffs |
Узнать алиас сертификата в keystore можно командой:
1 |
keytool -list -v -keystore cert.keystore.jks |
Теперь перейдем к настройке авторизации, рассмотрим мы два варианта, по логину и паролю и по сертификату.
Начнем с первого метода.
Создаем файл /opt/kafka/config/kafka_jaas.conf , в котором будут храниться логины и пароли в SASL/PLAIN формате
1 |
nano /opt/kafka/config/kafka_jaas.conf |
содержимое файла будет следующим:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password" user_admin="password" user_test="password"; }; |
Итого мы создали 3 пользователя:brokeradmin,admin,test
Далее нужно добавить в строку запуска сервиса kafka параметр Djava.security.auth.login.config , в переменную KAFKA_OPTS, для этого выполняем:
1 |
nano /etc/systemd/system/kafka.service |
Добавляем строку Environment=’KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf’
Должно получиться так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
[Unit] Description=Kafka Broker After=network.target After=zookeeper.service [Service] Type=forking User=kafka SyslogIdentifier=kafka (%i) Restart=on-failure LimitNOFILE=16384:163840 Environment='KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf' ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties [Install] WantedBy=multi-user.target |
Теперь редактируем файл /opt/kafka/config/server.properties
1 |
nano /opt/kafka/config/server.properties |
Добавляем/редактируем строки:
- listeners=SASL_SSL://kafka1.dev.local:9093
- advertised.listeners=SASL_SSL://kafka1.dev.local:9093
- security.inter.broker.protocol=SASL_SSL
- sasl.mechanism.inter.broker.protocol=PLAIN
- sasl.enabled.mechanisms=PLAIN
- authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
- ssl.endpoint.identification.algorithm=
- super.users=User:admin
Выполняем эти настройки на всех трех нодах kafka1.dev.local/kafka2.dev.local/kafka2.dev.local и перезапускаем сервисы.
1 2 3 |
systemctl daemon-reload systemctl restart kafka |
Теперь, если запустить Producer со старым конфигом мы получим ошибку авторизации.
1 |
/opt/kafka/bin/kafka-console-producer.sh --producer.config client.properties --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2 |
ошибка — disconnected
Чтобы все заработало, изменим тип подключения в client.properties на SASL_SSL
1 |
nano client.properties |
добавляем/изменяем строки:
- security.protocol=SASL_SSL
- sasl.mechanism=PLAIN
Теперь создаем jaas файл для авторизации
1 |
nano jaas.conf |
содержимое файла:
1 2 3 4 5 6 7 8 9 |
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="password"; }; |
Перед использованием Consumer или producer нужно экспортировать переменную KAFKA_OPTS, в которой будут переданы наши учетные данные
1 |
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf" |
Теперь запустим команду еще раз, и подключение заработает, но мы получим ошибку TOPIC_AUTHORIZATION_FAILED
1 |
/opt/kafka/bin/kafka-console-producer.sh --producer.config client.properties --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2 |
Настраиваем по аналогии Consumer на соседней ноде и видим что подключение работает, но сообщения не читаются, ошибка TOPIC_AUTHORIZATION_FAILED
1 |
/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.properties --bootstrap-server kafka3.dev.local:9093 --topic example-topic2 |
Это связано с тем что мы еще не выдали права на топики Kafka. Исправим это.
Предоставим пользователю test права на запись в топик example-topic2 с любого хоста
1 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Write --topic example-topic2 |
И на чтение
1 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic example-topic2 |
Также нужно дать права на чтение группы, в моем случае это группа console-consumer-87796
1 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --group console-consumer-87796 |
Если топик еще не создан, нужно выдать права на их создание, делается это так:
1 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Create --group console-consumer-87796 |
Если не хотите заморачиваться можете выдавать права сразу на все группы, делается это параметром —group=’*’ , например так:
1 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic example-topic2 --group='*' |
Теперь если запустить Consumer и Producer, все заработает корректно.
Более подробно ознакомится с правами доступа можно тут — https://docs.confluent.io/current/kafka/authorization.html
Теперь, рассмотрим второй метод, выдачу доступа по SSL сертификатам.
Создаем Java keystore и CSR запрос для сертификата клиента.
1 2 3 |
keytool -genkeypair -alias KafkaClientSSL -keyalg RSA -keystore client.keystore.jks -keysize 2048 -dname "CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU" keytool -certreq -alias KafkaClientSSL -keystore client.keystore.jks -file client.csr -ext EKU=serverAuth,clientAuth |
При создании сертифката убедитесь что CN полностью совпадает с FQDN вашего Kafka клиента
Создаем файл client-config.cnf со следующим содержимым
1 2 3 |
[ SAN ] extendedKeyUsage = serverAuth, clientAuth |
Подписываем его нашим CA (пароль укажите от своего CA)
1 |
openssl x509 -req -CA ca-cert -CAkey ca-key -extensions SAN -extfile client-config.cnf -in client.csr -out client-cert-signed -CAcreateserial -passin pass:123123 |
Добавляем сертифкат в client.keystore.jks
1 2 3 |
keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert keytool -import -file client-cert-signed -keystore client.keystore.jks -alias KafkaClientSSL |
Создаем следующей файл для подключения
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
security.protocol=SSL sasl.mechanism=PLAIN ssl.truststore.location=/opt/kafka/ssl/client.truststore.jks ssl.truststore.password=123123 #ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.truststore.type=JKS ssl.keystore.type=JKS ssl.keystore.location=/opt/kafka/ssl/client.keystore.jks ssl.keystore.password=123123 ssl.key.password=123123 |
Проверяем что параметры в server.properties следующие:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
ssl.keystore.location=/opt/kafka/ssl/server.keystore.jks ssl.keystore.password=123123 ssl.key.password=123123 ssl.truststore.location=/opt/kafka/ssl/server.truststore.jks ssl.truststore.password=123123 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS security.inter.broker.protocol=SSL ssl.endpoint.identification.algorithm= authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:CN=kafka1.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU |
Выдадим права на запись в топик example-topic2
1 2 3 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU' --operation Write --topic example-topic2 /opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU' --operation Create --topic example-topic2 |
Выдадим права на чтение в топик example-topic2
1 |
/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU' --operation Read --topic example-topic2 --group '*' |
Теперь запустим Producer
1 |
/opt/kafka/bin/kafka-console-producer.sh --producer.config client.propertiesssl --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2 |
Запустим Consumer
1 |
/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.propertiesssl --bootstrap-server kafka3.dev.local:9093 --topic example-topic2 |
Все работает.
Теперь протестируем отказоустойчивость kafka.
Для этого я сгенерирую текстовый файл в 10 000 000 строк, эти строки мы запишем в топик kafka, далее примем сообщения consumer и сохраним их в текстовый файл.
Во время заливки мы отключим одну из 3-х нод kafka и в конце сравним, файл на consumer и оригинальный файл.
Поскольку kafka гарантирует доставку сообщений, то файлы должны совпадать.
Создадим топик million
1 |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --config min.insync.replicas=2 --config retention.ms=-1 --config unclean.leader.election.enable=false --topic million |
Создадим тестовый файл
1 |
touch generate |
1 |
nano generate |
1 2 3 4 5 6 7 8 |
#/bin/bash for i in {1..10000000}; do echo $i echo $i >> tokafka.txt |
1 |
chmod +x generate |
1 |
./generate |
Тестовый файл с сообщениями готов.
Теперь запускаем команду для заливки данных
1 |
cat tokafka.txt | /opt/kafka/bin/kafka-console-producer.sh --producer.config client.propertiesssl --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic million |
теперь на сервере с consumer запускаем сам consumer с выводом в файл
1 |
/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.propertiesssl --bootstrap-server kafka1.dev.local:9093 --topic million > from-kafka.txt |
Запускаем скрипт Producer-а , видим что пошла заливка данных и через некоторое время отключаем любую из нод сервера kafka
После завершения заливки данных проверим файл from-kafka.txt ,командой:
1 |
wc -l from-kafka.txt |
число строк в выводе должно быть 10000000.
Недоступность одного из брокеров не повлияла на отправку/доставку сообщений.
Проверим состояние топика во время недоступности одной из нод:
1 |
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic million |
Вывод будет следующим:
1 2 3 4 |
Topic:million PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=-1,unclean.leader.election.enable=false,min.insync.replicas=2 Topic: million Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2 Topic: million Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1 Topic: million Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2 |
Как видим число синхронных реплик — 1 (Isr), все партиции доступны и у каждой из них есть лидер.
В данном случае топик был настроен правильно, т.к. мы указали следующие параметры:
min.insync.replicas — кол-во реплик, на которые должны быть синхронизированы данные, прежде чем записаться.
unclean.leader.election.enable=false отключение возможности провести failover на не синхронную отстающую реплику с потенциальной потерей данных
ReplicationFactor:3 — кол-во реплик, на которые реплицируются данные.
После восстановления отключенной реплики видим что данные успешно синхронизированы
1 2 3 4 |
Topic:million PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=-1,unclean.leader.election.enable=false,min.insync.replicas=2 Topic: million Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: million Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3 Topic: million Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3 |
Добавить комментарий