Java + Apache Kafka = Первый проект
Статья разбита на 2 части: Установка Kafka на сервер и создание SpringBoot приложения.
Если вы вообще не в курсе за брокеров сообщений, то советую посмотреть два видосика про Apache Kafka и Rabbit MQ. Они дадут понимание как там что работает и чем отличается:
I. Установка Apache Kafka
В качестве базовой системы у меня CentOS 7 x86_64 Minimal 1804. Для кафки рекомендуется иметь 4 Gb оперативки.
Перед началом работы ставлю нужные мне программы:
sudo yum install mc nano net-tools wget -y
далее, согласно инструкции https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-centos-7:
1 Установка OpenJDK8
1 | sudo yum install java-1.8.0-openjdk |
2 Создание нового пользователя
1 | sudo useradd kafka -m |
флаг -m означает, что также будет создана домашняя папка этого пользователя (/home/kafka).
Устанавливаем пароль:
1 | sudo passwd kafka |
Добовляем пользователя в группу wheel чтобы у него были права устанавливать зависимости Kafka:
1 | sudo usermod -aG wheel kafka |
Теперь войдем из-под этого пользователя:
1 | su -l kafka |
3 Загрузка и установка Kafka Binaries
Для начала создадим папку для загрузок
1 | mkdir ~/downloads |
Теперь идём на сайт и достаём оттуда ссылку на свежий дистрибутив кафки https://kafka.apache.org/downloads. Только мой вам совет — удостоверьтесь что эта ссылка ведёт непосредственно на файл, а не на какую-то другую страницу, а то я так целый час тупил и не мог распаковать скаченный архив .tgz.
Далее качаем этот файл себе в папку:
1 | cd ~/downloads |
1 | wget "https://apache-mirror.rbc.ru/pub/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz" |
Создаем папку под кафку и распаковываем его туда:
1 | mkdir ~/kafka |
1 | cd ~/kafka |
1 | tar -xvzf ~/Downloads/kafka.tgz --strip 1 |
флаг —strip 1 просит архиватор не создавать родительскую папку типа ~/kafka/kafka_2.13-2.6.0, а распаковывать содержимое сразю в ~/kafka.
4 Настройка сервера Kafka
Поведение Kafka по умолчанию не позволяет нам удалить тему, категорию, группу или название канала, в котором можно публиковать сообщения. Чтобы исправить это, давайте отредактируем файл конфигурации:
1 | nano ~/kafka/config/server.properties |
и добавим это в конец:
1 | delete.topic.enable = true |
5 Создание Systemd Unit — файлов и запуск сервера Kafka
Создаем файл для зукипера (необходим для работы Kafka)
1 | sudo nano /etc/systemd/system/zookeeper.service |
со следующим содержанием
1 2 3 4 | <br> [Unit]<br> Requires=network.target remote-fs.target<br> After=network.target remote-fs.target |
1 |
1 2 3 4 5 6 | <p>[Service]<br> Type=simple<br> User=kafka<br> ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties<br> ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh<br> Restart=on-abnormal</p> |
1 |
1 2 | [Install]<br> WantedBy=multi-user.target<br> |
секция Unit требует чтобы перед запуском zookeeper сеть и файловая система уже были готовы
секция Service назначает используемые при старте и остановке zookeeper скрипты
Теперь создаем то же самое для Kafka:
1 | sudo nano /etc/systemd/system/kafka.service |
со следующим содержанием
1 2 3 4 | <br> [Unit]<br> Requires=zookeeper.service<br> After=zookeeper.service |
1 |
1 2 3 4 5 6 | <p>[Service]<br> Type=simple<br> User=kafka<br> ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'<br> ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh<br> Restart=on-abnormal</p> |
1 |
1 2 | [Install]<br> WantedBy=multi-user.target<br> |
секция Unit требует чтобы перед запуском kafka — zookeeper уже был готов.
Запускаем:
1 | sudo systemctl start kafka |
для проверки успешности запуска, смотрим журнал:
1 | journalctl -u kafka |
вывод должен содержать что-то подобное: Jul 17 18:38:59 kafka-centos systemd[1]: Started kafka.service.
Теперь по идее кафка работает на 9092 порту. Для проверки можно взглянуть на прослушиваемые порты:
1 | netstat -tulpn |
Теперь добавим кафку в автозагрузку:
1 | sudo systemctl enable kafka |
6 Настройка фаерволла
Хоть порт и прослушивается, но фаерволл не даёт подключиться к серверу извне. Исправляем это следующими командами:
1 | firewall-cmd --permanent --zone=public --add-port=9092/tcp |
1 | firewall-cmd --permanent --zone=public --add-port=9092/udp |
1 | firewall-cmd --reload |
Теперь подключение извне должно быть доступно. Возможно потребуется перезапуск.
7 Тестирование работы Kafka
Можете пропустить этот шаг если он вам не нужен. Но можно и потестировать в консоли:
7.1 Создайте новый топик
1 | ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic |
вывод должен быть Created topic «TutorialTopic».
7.2 Создайте продюссера и пошлите сообщение:
1 | echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null |
7.3 Создайте консьюмера и запустите приём сообщений:
1 | ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning |
флаг —from-beginning даст возможность получить сообщения отправленные до запуска консьюмера. Вывод должен быть Hello, World.
Этот скрипт будет выполняться и принимать сообщения в реальном времени. Можете подключиться через другой терминал и попробовать отправить ещё что-нибудь в этот топик.
Также рекомендую скачать и установить клёвый десктопный клиент https://www.conduktor.io/ и законнектиться к серверу Kafka снаружи.

II. Создание Java SpringBoot проекта
Источником инфотмации послужила статья https://habr.com/ru/post/496182/, поэтому если что непонятно — вэлком туда.
Для начала создадим новый SpringBoot проект с зависимостями: Spring Web и Spring for Apache Kafka.
Мы не будем юзать Spring Web, он тут нужен только для того чтобы подтянулся Jackson. Либо можно не включать его в зависимости, а в файле pom.xml, прописать ручками
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> |
Создадим класс который будет слать сообщения каждые 3 секунды:
@Component public static class Runner implements CommandLineRunner{ @Autowired @SuppressWarnings ( "SpringJavaInjectionPointsAutowiringInspection" ) private KafkaTemplate<String, String> kafkaTemplate; @Override public void run(String... args) throws Exception { Thread thread = new Thread(() -> { try { while ( true ) { sleep( 3000 ); kafkaTemplate.send( "msg" , "currentTime" , ( new Date()).toString()); } } catch (InterruptedException e){} }); thread.setDaemon( true ); thread.start(); } } |
SuppressWarnings здесь потому что идея упорно говорит что такого бина не существует. Но он есть.
Теперь создадим класс консьюмера:
package ru.knastnt.kafkatest; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @EnableKafka @Component public class Listener { @KafkaListener (topics = "msg" , groupId = "app.1" ) public void messageListener(String msg) { System.out.println(msg); } } |
метод messageListener будет вызываться как поступит новое сообщение.
Група консьюмеров — это группа в рамках которой доставляется один экземпляр сообщения. Например, у Вас есть три консьюмера в одной группе, и все они слушают одну тему. Как только на сервере появляется новое сообщение с данной темой, оно доставляется кому-то одному из группы. Остальные два консьюмера сообщение не получают.
Ну и напоследок отредактируем файл application.properties:
#адрес сервера Kafka spring.kafka.bootstrap-servers= 192.168 . 0.239 : 9092 |
Запускаем и тестируем! Можно юзать, например, Postman, либо встроенный в идею инструмент Tools > HTTP Client > Test RESTful Web Service.
Эксперименты
Продюссер при отправке сообщения может вернуть ответ об успешности или ошибке. Настрою коллбэк и буду выводить в консоль результат отправки сообщения. Дополним вызов kafkaTemplate.send:
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send( "msg" , "currentTime" , ( new Date()).toString()); future.addCallback(System.out::println, System.err::println); |
Теперь попробую перезагружать сервер Kafka. Что из этого вышло:
Если приложение запускается при недоступном сервере Kafka, То возникает исключение org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic msg not present in metadata after 60000 ms.
Если сервер Kafka становится недоступным в процессе работы, то ошибки обрабатываются в возвращаемом future объекте метода kafkaTemplate.send.
Пока Kafka недоступен, Spring копит отправленные но недоставленные сообщения внутни себя
- Если сервер недоступен менее 2-х минут, то как только Kafka станет доступен, спринг выплюнет все сообщения туда прикрепив к ним правильное время отправки.
- Если сервер недоступен более 2-х минут, то вызовется коллбэк с ошибкой доставки. Типа такого org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation и такого org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation. Причем пачкой для нескольких сообщений. В итоге получится такой цикл: накопление сообщений — пачка ошибок — накопление сообщений — следующая пачка ошибок. Не разбирался по какому принципу, но как-то так. При появлении сервера Kafka, накопленные будут отправлены, а те, что с ошибками, — потерялись навсегда.
Отправка и получение кастомных объектов
Сначала реализуем продюссер. Для отправки объектов будем использовать сериализацию в Json. Для каждого кастомного класса нам нужен свой KafkaTemplate. Прежде чем приступить к их созданию, сделаем пару кастомных классов: UserDTO и UserDTO.Address:
package ru.knastnt.kafkatest; public class UserDTO { private String name; private int age; private Address address; public static UserDTO getTestInstance(){ UserDTO u = new UserDTO(); UserDTO.Address a = new UserDTO.Address(); a.setStreet( "Ленина" ); a.setHouse( 16 ); u.setName( "Иван" ); u.setAge( 25 ); u.setAddress(a); return u; } public String getName() { return name; } public void setName(String name) { this .name = name; } public int getAge() { return age; } public void setAge( int age) { this .age = age; } public Address getAddress() { return address; } public void setAddress(Address address) { this .address = address; } public static class Address { private String street; private int house; public String getStreet() { return street; } public void setStreet(String street) { this .street = street; } public int getHouse() { return house; } public void setHouse( int house) { this .house = house; } } } |
Теперь настроим кастомные шаблоны. Всё поместим в отдельный класс KafkaProducerConfig:
package ru.knastnt.kafkatest; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Value ( "${spring.kafka.bootstrap-servers}" ) private String kafkaSrv; @Bean public <T> KafkaTemplate<String, T> kafkaStringJsonTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer. class ); return new KafkaTemplate<>( new DefaultKafkaProducerFactory<>(props)); } } |
Основное что мы тут делаем — назначаем сериализаторы для ключа и значения сообщений.
Дополним наш класс Runner. Добавим пару @Autowired полей:
@Autowired private KafkaTemplate<String, UserDTO> kafkaUserTemplate; @Autowired private KafkaTemplate<String, UserDTO.Address> kafkaAddressTemplate; |
А внутрь цикла в методе run, допишем действия по отправке наших кастомных объектов:
//Шлём объекты через кастомные шаблоны sleep( 2000 ); ListenableFuture<SendResult<String, UserDTO>> userFuture = kafkaUserTemplate.send( "msg2" , "user" , UserDTO.getTestInstance()); userFuture.addCallback(System.out::println, System.err::println); sleep( 2000 ); ListenableFuture<SendResult<String, UserDTO.Address>> userFuture2 = kafkaAddressTemplate.send( "msg3" , "addr" , UserDTO.getTestInstance().getAddress()); userFuture2.addCallback(System.out::println, System.err::println); |
Предлагаю запуститься и посмотреть как объекты улетают в кафку (это будет видно в консоли). К тому же можете воспользоваться https://www.conduktor.io/ и убедиться что топики пополняются новыми сообщениями.
Сделаем теперь несколько консьюмеров чтобы получать эти сообщения из кафки. Это несколько сложнее, но принцип тот же.
Создаем класс KafkaConsumerConfig:
package ru.knastnt.kafkatest; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { //Этот класс сейчас используется только для настройки приёма кастомных объектов через JSON @Value ( "${spring.kafka.bootstrap-servers}" ) private String kafkaSrv; public <T> ConsumerFactory<String, T> consumerFactory(Class<T> clazz) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer. class ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(clazz)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, UserDTO> userKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, UserDTO> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(UserDTO. class )); return factory; } @Bean public ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> addresKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(UserDTO.Address. class )); return factory; } } |
Собственно для каждого кастомного класса нужно сделать по параметризованному ConcurrentKafkaListenerContainerFactory — это спринговая потокобезопасная обёртка.
В каждую из них нужно заинджектить конфигурацию консьюмера. Я реализовал её в виде параметризованного метода, в котором ключ — строка, а значение — объект передаваемого класса в json формате.
Теперь добавим пару слушающих методов в наш класс Listener:
//Кастомный Json слушатель для класса UserDTO @KafkaListener (topics = "msg2" , containerFactory = "userKafkaListenerContainerFactory" , groupId = "usersConsumers" ) public void messageListener( @Payload UserDTO userDTO, @Headers MessageHeaders headers) { System.out.println(headers); System.out.println(userDTO); } //Кастомный Json слушатель для класса Address @KafkaListener (topics = "msg3" , containerFactory = "addresKafkaListenerContainerFactory" , groupId = "addressesConsumers" ) public void messageListener( @Payload UserDTO.Address s, @Headers MessageHeaders headers) { System.out.println(headers); System.out.println(s); } |
Каждому @KafkaListener нужно присвоить свой containerFactory, чтобы всё работало. Значение равно имени метода.
Запускайтесь, всё должно работать! Можете скачать мой проект — https://github.com/knastnt/kafkatest, там ничего лишнего — разберётесь