Установка и настройка Kafka кластера

Дата: 28.10.2019 Автор Admin

В данной статье рассмотрим как настроить kafka кластер из 3-х серверов с поддержкой ssl.

Действия описанные ниже выполняем на всех 3-х серверах.

 

Добавляем правила Firewall для Kafka и Zookeeper

sudo nano /etc/firewalld/services/zooKeeper.xml
 
<?xml version="1.0" encoding="utf-8"?>
 
<service>
 
<short>ZooKeeper</short>
 
<description>Firewall rule for ZooKeeper ports</description>
 
<port protocol="tcp" port="2888"/>
 
<port protocol="tcp" port="3888"/>
 
<port protocol="tcp" port="2181"/>
 
</service>

 

sudo nano /etc/firewalld/services/kafka.xml
<?xml version="1.0" encoding="utf-8"?>
 
<service>
 
<short>Kafka</short>
 
<description>Firewall rule for Kafka port</description>
 
<port protocol="tcp" port="9092"/>
 
</service>

 

Активируем правила

sudo service firewalld restart

sudo firewall-cmd --permanent --add-service=zooKeeper

sudo firewall-cmd --permanent --add-service=kafka

sudo service firewalld restart

 

Создаем пользователя для kafka

sudo adduser kafka

sudo passwd kafka

 

Устанавливаем Java

yum install java-1.8.0-openjdk

 

Скачиваем последнюю версию Kafka

wget http://apache-mirror.8birdsvideo.com/kafka/2.3.0/kafka_2.11-2.3.0.tgz

 

Распаковываем

tar -xzf kafka_2.11-2.3.0.tgz

 

Перемещаем в /opt

mv kafka_2.11-2.3.0 /opt/kafka

 

Создаем каталоги для логов Kafka и для zooKeeper

mkdir -p /opt/kafka/zookeeper/data

mkdir -p /opt/kafka/kafka-logs

 

Переходим к конфигурации zooKeeper, открываем файл с конфигурацией

nano /opt/kafka/config/zookeeper.properties

 

и указываем:

директорию с данными:

  • dataDir=/opt/kafka/zookeeper/data

Сервера и лимиты синхронизации:

  • server.1=kafka1.dev.local:2888:3888
  • server.2=kafka2.dev.local:2888:3888
  • server.3=kafka3.dev.local:2888:3888
  • initLimit=5
  • syncLimit=2

Далее, на каждом сервере создаем свой id для zooKeeper

echo "1" > /opt/kafka/zookeeper/data/myid (для сервера kafka1.dev.local)

echo "2" > /opt/kafka/zookeeper/data/myid (для сервера kafka2.dev.local)

echo "3" > /opt/kafka/zookeeper/data/myid (для сервера kafka3.dev.local)

 

Переходим к настройке kafka

Редактируем конфиг сервера

nano /opt/kafka/config/server.properties

 

Добавляем:

broker.id=1 (для каждого сервера свой 1,2,3)

директорию с логами

  • log.dirs=/opt/kafka/kafka-logs

указываем прослушиватели:

  • listeners=PLAINTEXT://:9092
  • advertised.listeners=PLAINTEXT://kafka1.dev.local:9092
  • listeners — используется для внутреннего траффика между нодами кластера
  • advertised.listeners — используется для клиентского траффика

указываем ноды zooKeeper:

  • zookeeper.connect=kafka1.dev.local:2181,kafka2.dev.local:2181,kafka3.dev.local:2181

Если вам требуется поддержка удаления топиков, включите опцию ниже:

  • delete.topic.enable=true

Далее меняем владельца на пользователя kafka

chown -R kafka /opt/kafka

 

Теперь создадим сервисы systemd для zooKeeper и kafka

Создадим сервис zooKeeper

nano /etc/systemd/system/zookeeper.service

 

[Unit]

Description=ZooKeeper for Kafka

After=network.target

[Service]

Type=forking

User=kafka

Restart=on-failure

LimitNOFILE=16384:163840

ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

[Install]

WantedBy=multi-user.target

 

 

Создадим сервис Kafka

nano /etc/systemd/system/kafka.service

 

[Unit]

Description=Kafka Broker

After=network.target

After=zookeeper.service

[Service]

Type=forking

User=kafka

SyslogIdentifier=kafka (%i)

Restart=on-failure

LimitNOFILE=16384:163840

ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

[Install]

WantedBy=multi-user.target

 

После создания файлов загружаем информацию о новых сервисах и включаем их

systemctl daemon-reload

systemctl enable zookeeper

systemctl enable kafka

 

Перезагружаем сервера и убеждаемся что сервисы запущены

Логинимся под пользователем kafka

su kafka

переходим в каталог с kafka

cd /opt/kafka

 

Проверим что zooKeeper работает корректно и все ноды видят друг друга

bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

 

вывод должен быть таким:

Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[1, 2, 3]

 

Теперь проверим корректность работы Kafka, на текущей ноде запустим Producer, обратите внимание, что топик будет создан автоматически

bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic example-topic

 

На другой ноде, например на kafka2, запустим Consumer

su kafka
cd /opt/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example-topic

 

Теперь на запущенном Producer вводим сообщения, они должны появится на ноде с запущенным Consumer

Посмотреть список созданных топиков можно следующей командой:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Получить информацию о созданном топике можно следующей командой:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic example-topic

 

Удалить топик можно следующей командой:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic example-topic

Обратите внимание, что для удаления топика необходимо добавить delete.topic.enable=True в конфиг сервера Kafka

Создать топик с определенными параметрами можно следующей командой:

bin/kafka-topics.sh --create \

--zookeeper localhost:2181 \

--topic <topic-name> \

--partitions <number-of-partitions> \

--replication-factor <number-of-replicating-servers>

 

Также можно выполнить более тонкую настройку топика при его создании, например:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --config min.insync.replicas=1 --config retention.ms=-1 --config unclean.leader.election.enable=false --topic test-topic

Изменить параметры топика можно следующей командой (изменим количество партиций и параметр min.insync.replicas):

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test-topic --partitions 8 --config min.insync.replicas=2

 

Посмотреть кол-во открытых соединений к Kafka можно командой:

netstat -anp | grep :9092 | grep ESTABLISHED | wc -l

 

Узнать размер топика можно командой:

bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --topic-list 'test-topic' --describe | grep '^{' | jq '[ ..|.size? | numbers ] | add'

 

Посмотреть список активных consumer можно командой:

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

Получив список клиентов можно посмотреть более подробную информацию о consumer, в примере ниже клиент console-consumer-70311

bin/kafka-consumer-groups.sh --describe --group console-consumer-70311 --bootstrap-server localhost:9092

Теперь рассмотрим как добавить подержку SSL, это снизит производительность, но повысит безопасность, т.к. соединения к kafka будут зашифрованы

Создадим сертификаты для Kafka, в данном случае они будут самоподписанные, но ничего не мешает использовать свой CA для подписи, если он у вас есть.

Создаем каталог /opt/kafka/ssl на всех нодах

mkdir /opt/kafka/ssl

переходим в него

cd /opt/kafka/ssl

На этом этапе создадим ключи для нашего CA

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

 

Далее импортируем сертификат CA в создаваемые server.truststore.jks и client.truststore.jks

keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

Копируем эти файлы на остальные ноды

scp ca-cert root@kafka2:/opt/kafka/ssl/

scp ca-cert root@kafka3:/opt/kafka/ssl/

scp ca-key root@kafka2:/opt/kafka/ssl/

scp ca-key root@kafka3:/opt/kafka/ssl/

scp server.truststore.jks root@kafka2:/opt/kafka/ssl/

scp server.truststore.jks root@kafka3:/opt/kafka/ssl/

scp client.truststore.jks root@kafka2:/opt/kafka/ssl/

scp client.truststore.jks root@kafka3:/opt/kafka/ssl/

 

Создаем Java keystore и CSR запрос для сертификатов брокера, выполняем эти операции каждой ноде

keytool -genkeypair -alias KafkaServerSSL -keyalg RSA -keystore server.keystore.jks -keysize 2048 -dname "CN=$(hostname -f),OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU" -ext san=dns:kafka1.dev.local,dns:kafka2.dev.local,dns:kafka3.dev.local
keytool -certreq -alias KafkaServerSSL -keystore server.keystore.jks -file $(hostname -f).csr -ext san=dns:kafka1.dev.local,dns:kafka2.dev.local,dns:kafka3.dev.local -ext EKU=serverAuth,clientAuth

 

При создании сертифката убедитесь что CN полностью совпадает с FQDN вашего Kafka сервера

Создаем файл myssl-config.cnf со следующим содержимым

[ SAN ]

extendedKeyUsage = serverAuth, clientAuth

subjectAltName = @alt_names

[alt_names]

DNS.1 = kafka1.dev.local

DNS.2 = kafka2.dev.local

DNS.3 = kafka3.dev.local

 

Подписываем его нашим CA (пароль укажите от своего CA)

openssl x509 -req -CA ca-cert -CAkey ca-key -extensions SAN -extfile myssl-config.cnf -in $(hostname -f).csr -out cert-signed -CAcreateserial -passin pass:123123

Добавляем сертифкаты в server.keystore.jks

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert

keytool -import -file cert-signed -keystore server.keystore.jks -alias KafkaServerSSL

Теперь, когда на каждой ноде есть сертификаты, переходим к конфигурации KAFKA, добавляем поддержку SSL.

Открываем server.properties на серверах и добавляем следующие строки в конфиг

nano /opt/kafka/config/server.properties
ssl.keystore.location=/opt/kafka/ssl/server.keystore.jks

ssl.keystore.password=123123

ssl.key.password=123123

ssl.truststore.location=/opt/kafka/ssl/server.truststore.jks

ssl.truststore.password=123123

ssl.client.auth=none

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type=JKS

ssl.truststore.type=JKS

security.inter.broker.protocol=SSL

ssl.endpoint.identification.algorithm=

listeners=PLAINTEXT://:9092,SSL://kafka1.dev.local:9093

advertised.listeners=PLAINTEXT://kafka1.dev.local:9092,SSL://kafka1.dev.local:9093

 

Добавим правила для firewall

sudo nano /etc/firewalld/services/kafkassl.xml
<?xml version="1.0" encoding="utf-8"?>

<service>

<short>Kafkassl</short>

<description>Firewall rule for Kafka SSL port</description>

<port protocol="tcp" port="9093"/>

</service>

 

Активируем правила

sudo service firewalld restart

sudo firewall-cmd --permanent --add-service=kafkassl

sudo service firewalld restart

 

Перезапускаем сервисы Kafka

service kafka restart

 

Теперь Kafka доступна по порту SSL 9093

Но чтобы к ней подключиться необходимо добавить конфигурацию для клиента, для этого нужно создать специальный файл client.properties, создаем его

nano client.properties

 

содержимое файла:

security.protocol=SSL

ssl.truststore.location=/opt/kafka/ssl/client.truststore.jks

ssl.truststore.password=123123

ssl.truststore.type=JKS

ssl.keystore.type=JKS

 

В данном файле указывается путь к файлу truststore, тип хранилища и security protocol

Теперь, чтобы producer или consumer клиенты его использовали, нужно передать им этот конфиг в специальном параметре

Пример запуска producer

/opt/kafka/bin/kafka-console-producer.sh --producer.config client.properties --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2

 

Пример запуска consumer

/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.properties --bootstrap-server kafka3.dev.local:9093 --topic example-topic2

 

Обратите внимание что при подключении нужно указывать полный FQDN брокеров

Если вы будете использовать свой внутренний CA, то вам понадобятся следующие команды чтобы импортировать PFX сертификаты в keystore:

keytool -importkeystore -srckeystore cert.pfx -srcstoretype pkcs12 -destkeystore cert.keystore.jks -deststoretype JKS

 

Также, обратите внимание что Kafka в параметре ssl.keystore.password (в файле server.properties) ожидает что пароль от приватного ключа и keystore совпадает.
Если это не так, выполните команду по замене пароля приватного ключа:

keytool -keypasswd -keystore cert.keystore.jks -alias Hnmshuefg7efsjfh3w4hffs

Узнать алиас сертификата в keystore можно командой:

keytool -list -v -keystore cert.keystore.jks

 

Теперь перейдем к настройке авторизации, рассмотрим мы два варианта, по логину и паролю и по сертификату.

Начнем с первого метода.

Создаем файл /opt/kafka/config/kafka_jaas.conf , в котором будут храниться логины и пароли в SASL/PLAIN формате

nano /opt/kafka/config/kafka_jaas.conf

содержимое файла будет следующим:

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="password"

user_admin="password"

user_test="password";

};

 

Итого мы создали 3 пользователя:brokeradmin,admin,test

Далее нужно добавить в строку запуска сервиса kafka параметр Djava.security.auth.login.config , в переменную KAFKA_OPTS, для этого выполняем:

nano /etc/systemd/system/kafka.service

Добавляем строку Environment=’KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf’

Должно получиться так:

[Unit]

Description=Kafka Broker

After=network.target

After=zookeeper.service

[Service]

Type=forking

User=kafka

SyslogIdentifier=kafka (%i)

Restart=on-failure

LimitNOFILE=16384:163840

Environment='KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_jaas.conf'

ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

[Install]

WantedBy=multi-user.target

 

Теперь редактируем файл /opt/kafka/config/server.properties

nano /opt/kafka/config/server.properties

 

Добавляем/редактируем строки:

  • listeners=SASL_SSL://kafka1.dev.local:9093
  • advertised.listeners=SASL_SSL://kafka1.dev.local:9093
  • security.inter.broker.protocol=SASL_SSL
  • sasl.mechanism.inter.broker.protocol=PLAIN
  • sasl.enabled.mechanisms=PLAIN
  • authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  • ssl.endpoint.identification.algorithm=
  • super.users=User:admin

Выполняем эти настройки на всех трех нодах kafka1.dev.local/kafka2.dev.local/kafka2.dev.local и перезапускаем сервисы.

systemctl daemon-reload

systemctl restart kafka

Теперь, если запустить Producer со старым конфигом мы получим ошибку авторизации.

/opt/kafka/bin/kafka-console-producer.sh --producer.config client.properties --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2

ошибка — disconnected

Чтобы все заработало, изменим тип подключения в client.properties на SASL_SSL

nano client.properties

 

добавляем/изменяем строки:

  • security.protocol=SASL_SSL
  • sasl.mechanism=PLAIN

Теперь создаем jaas файл для авторизации

nano jaas.conf

 

содержимое файла:

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="test"

password="password";

};

 

Перед использованием Consumer или producer нужно экспортировать переменную KAFKA_OPTS, в которой будут переданы наши учетные данные

export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"

Теперь запустим команду еще раз, и подключение заработает, но мы получим ошибку TOPIC_AUTHORIZATION_FAILED

/opt/kafka/bin/kafka-console-producer.sh --producer.config client.properties --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2

 

Настраиваем по аналогии Consumer на соседней ноде и видим что подключение работает, но сообщения не читаются, ошибка TOPIC_AUTHORIZATION_FAILED

/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.properties --bootstrap-server kafka3.dev.local:9093 --topic example-topic2

 

Это связано с тем что мы еще не выдали права на топики Kafka. Исправим это.

Предоставим пользователю test права на запись в топик example-topic2 с любого хоста

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Write --topic example-topic2

 

И на чтение

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic example-topic2

 

Также нужно дать права на чтение группы, в моем случае это группа console-consumer-87796

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --group console-consumer-87796

 

Если топик еще не создан, нужно выдать права на их создание, делается это так:

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Create --group console-consumer-87796

 

Если не хотите заморачиваться можете выдавать права сразу на все группы, делается это параметром —group=’*’ , например так:

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic example-topic2 --group='*'

 

Теперь если запустить Consumer и Producer, все заработает корректно.

Более подробно ознакомится с правами доступа можно тут — https://docs.confluent.io/current/kafka/authorization.html

 

Теперь, рассмотрим второй метод, выдачу доступа по SSL сертификатам.

Создаем Java keystore и CSR запрос для сертификата клиента.

keytool -genkeypair -alias KafkaClientSSL -keyalg RSA -keystore client.keystore.jks -keysize 2048 -dname "CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU"

keytool -certreq -alias KafkaClientSSL -keystore client.keystore.jks -file client.csr -ext EKU=serverAuth,clientAuth

 

При создании сертифката убедитесь что CN полностью совпадает с FQDN вашего Kafka клиента

Создаем файл client-config.cnf со следующим содержимым

[ SAN ]

extendedKeyUsage = serverAuth, clientAuth

 

Подписываем его нашим CA (пароль укажите от своего CA)

openssl x509 -req -CA ca-cert -CAkey ca-key -extensions SAN -extfile client-config.cnf -in client.csr -out client-cert-signed -CAcreateserial -passin pass:123123

 

Добавляем сертифкат в client.keystore.jks

keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert

keytool -import -file client-cert-signed -keystore client.keystore.jks -alias KafkaClientSSL

 

Создаем следующей файл для подключения

security.protocol=SSL

sasl.mechanism=PLAIN

ssl.truststore.location=/opt/kafka/ssl/client.truststore.jks

ssl.truststore.password=123123

#ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

ssl.truststore.type=JKS

ssl.keystore.type=JKS

ssl.keystore.location=/opt/kafka/ssl/client.keystore.jks

ssl.keystore.password=123123

ssl.key.password=123123

 

Проверяем что параметры в server.properties следующие:

ssl.keystore.location=/opt/kafka/ssl/server.keystore.jks

ssl.keystore.password=123123

ssl.key.password=123123

ssl.truststore.location=/opt/kafka/ssl/server.truststore.jks

ssl.truststore.password=123123

ssl.client.auth=required

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type=JKS

ssl.truststore.type=JKS

security.inter.broker.protocol=SSL

ssl.endpoint.identification.algorithm=
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:CN=kafka1.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU

 

Выдадим права на запись в топик example-topic2

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU' --operation Write --topic example-topic2

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU' --operation Create --topic example-topic2

 

Выдадим права на чтение в топик example-topic2

/opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'CN=kafka-client.dev.local,OU=IT,O=COMPANY,L=RUSSIA,ST=Moscow,C=RU' --operation Read --topic example-topic2 --group '*'

Теперь запустим Producer

/opt/kafka/bin/kafka-console-producer.sh --producer.config client.propertiesssl --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic example-topic2

 

Запустим Consumer

/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.propertiesssl --bootstrap-server kafka3.dev.local:9093 --topic example-topic2

 

Все работает.

Теперь протестируем отказоустойчивость kafka.

Для этого я сгенерирую текстовый файл в 10 000 000 строк, эти строки мы запишем в топик kafka, далее примем сообщения consumer и сохраним их в текстовый файл.
Во время заливки мы отключим одну из 3-х нод kafka и в конце сравним, файл на consumer и оригинальный файл.
Поскольку kafka гарантирует доставку сообщений, то файлы должны совпадать.

Создадим топик million

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --config min.insync.replicas=2 --config retention.ms=-1 --config unclean.leader.election.enable=false --topic million

 

Создадим тестовый файл

touch generate
nano generate
#/bin/bash

for i in {1..10000000};

do

echo $i
echo $i >> tokafka.txt
chmod +x generate
./generate

 

Тестовый файл с сообщениями готов.

Теперь запускаем команду для заливки данных

cat tokafka.txt | /opt/kafka/bin/kafka-console-producer.sh --producer.config client.propertiesssl --broker-list kafka1.dev.local:9093,kafka2.dev.local:9093,kafka3.dev.local:9093 --topic million

 

теперь на сервере с consumer запускаем сам consumer с выводом в файл

/opt/kafka/bin/kafka-console-consumer.sh --consumer.config client.propertiesssl --bootstrap-server kafka1.dev.local:9093 --topic million > from-kafka.txt

 

Запускаем скрипт Producer-а , видим что пошла заливка данных и через некоторое время отключаем любую из нод сервера kafka

После завершения заливки данных проверим файл from-kafka.txt ,командой:

wc -l from-kafka.txt

число строк в выводе должно быть 10000000.

Недоступность одного из брокеров не повлияла на отправку/доставку сообщений.

Проверим состояние топика во время недоступности одной из нод:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic million

 

Вывод будет следующим:

Topic:million PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=-1,unclean.leader.election.enable=false,min.insync.replicas=2
Topic: million Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Topic: million Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: million Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2

 

Как видим число синхронных реплик — 1 (Isr), все партиции доступны и у каждой из них есть лидер.

В данном случае топик был настроен правильно, т.к. мы указали следующие параметры:

min.insync.replicas — кол-во реплик, на которые должны быть синхронизированы данные, прежде чем записаться.
unclean.leader.election.enable=false отключение возможности провести failover на не синхронную отстающую реплику с потенциальной потерей данных
ReplicationFactor:3 — кол-во реплик, на которые реплицируются данные.

После восстановления отключенной реплики видим что данные успешно синхронизированы

Topic:million PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=-1,unclean.leader.election.enable=false,min.insync.replicas=2
Topic: million Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: million Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: million Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3

 


Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *