Java + Apache Kafka = Первый проект
Статья разбита на 2 части: Установка Kafka на сервер и создание SpringBoot приложения.
Если вы вообще не в курсе за брокеров сообщений, то советую посмотреть два видосика про Apache Kafka и Rabbit MQ. Они дадут понимание как там что работает и чем отличается:
I. Установка Apache Kafka
В качестве базовой системы у меня CentOS 7 x86_64 Minimal 1804. Для кафки рекомендуется иметь 4 Gb оперативки.
Перед началом работы ставлю нужные мне программы:
sudo yum install mc nano net-tools wget -y
далее, согласно инструкции https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-centos-7:
1 Установка OpenJDK8
1 | sudo yum install java-1.8.0-openjdk |
2 Создание нового пользователя
1 | sudo useradd kafka -m |
флаг -m означает, что также будет создана домашняя папка этого пользователя (/home/kafka).
Устанавливаем пароль:
1 | sudo passwd kafka |
Добовляем пользователя в группу wheel чтобы у него были права устанавливать зависимости Kafka:
1 | sudo usermod -aG wheel kafka |
Теперь войдем из-под этого пользователя:
1 | su -l kafka |
3 Загрузка и установка Kafka Binaries
Для начала создадим папку для загрузок
1 | mkdir ~/downloads |
Теперь идём на сайт и достаём оттуда ссылку на свежий дистрибутив кафки https://kafka.apache.org/downloads. Только мой вам совет — удостоверьтесь что эта ссылка ведёт непосредственно на файл, а не на какую-то другую страницу, а то я так целый час тупил и не мог распаковать скаченный архив .tgz.
Далее качаем этот файл себе в папку:
1 | cd ~/downloads |
1 | wget "https://apache-mirror.rbc.ru/pub/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz" |
Создаем папку под кафку и распаковываем его туда:
1 | mkdir ~/kafka |
1 | cd ~/kafka |
1 | tar -xvzf ~/Downloads/kafka.tgz --strip 1 |
флаг —strip 1 просит архиватор не создавать родительскую папку типа ~/kafka/kafka_2.13-2.6.0, а распаковывать содержимое сразю в ~/kafka.
4 Настройка сервера Kafka
Поведение Kafka по умолчанию не позволяет нам удалить тему, категорию, группу или название канала, в котором можно публиковать сообщения. Чтобы исправить это, давайте отредактируем файл конфигурации:
1 | nano ~/kafka/config/server.properties |
и добавим это в конец:
1 | delete.topic.enable = true |
5 Создание Systemd Unit — файлов и запуск сервера Kafka
Создаем файл для зукипера (необходим для работы Kafka)
1 | sudo nano /etc/systemd/system/zookeeper.service |
со следующим содержанием
1 2 3 4 | <br> [Unit]<br> Requires=network.target remote-fs.target<br> After=network.target remote-fs.target |
1 |
1 2 3 4 5 6 | <p>[Service]<br> Type=simple<br> User=kafka<br> ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties<br> ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh<br> Restart=on-abnormal</p> |
1 |
1 2 | [Install]<br> WantedBy=multi-user.target<br> |
секция Unit требует чтобы перед запуском zookeeper сеть и файловая система уже были готовы
секция Service назначает используемые при старте и остановке zookeeper скрипты
Теперь создаем то же самое для Kafka:
1 | sudo nano /etc/systemd/system/kafka.service |
со следующим содержанием
1 2 3 4 | <br> [Unit]<br> Requires=zookeeper.service<br> After=zookeeper.service |
1 |
1 2 3 4 5 6 | <p>[Service]<br> Type=simple<br> User=kafka<br> ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'<br> ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh<br> Restart=on-abnormal</p> |
1 |
1 2 | [Install]<br> WantedBy=multi-user.target<br> |
секция Unit требует чтобы перед запуском kafka — zookeeper уже был готов.
Запускаем:
1 | sudo systemctl start kafka |
для проверки успешности запуска, смотрим журнал:
1 | journalctl -u kafka |
вывод должен содержать что-то подобное: Jul 17 18:38:59 kafka-centos systemd[1]: Started kafka.service.
Теперь по идее кафка работает на 9092 порту. Для проверки можно взглянуть на прослушиваемые порты:
1 | netstat -tulpn |
Теперь добавим кафку в автозагрузку:
1 | sudo systemctl enable kafka |
6 Настройка фаерволла
Хоть порт и прослушивается, но фаерволл не даёт подключиться к серверу извне. Исправляем это следующими командами:
1 | firewall-cmd --permanent --zone=public --add-port=9092/tcp |
1 | firewall-cmd --permanent --zone=public --add-port=9092/udp |
1 | firewall-cmd --reload |
Теперь подключение извне должно быть доступно. Возможно потребуется перезапуск.
7 Тестирование работы Kafka
Можете пропустить этот шаг если он вам не нужен. Но можно и потестировать в консоли:
7.1 Создайте новый топик
1 | ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic |
вывод должен быть Created topic «TutorialTopic».
7.2 Создайте продюссера и пошлите сообщение:
1 | echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null |
7.3 Создайте консьюмера и запустите приём сообщений:
1 | ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning |
флаг —from-beginning даст возможность получить сообщения отправленные до запуска консьюмера. Вывод должен быть Hello, World.
Этот скрипт будет выполняться и принимать сообщения в реальном времени. Можете подключиться через другой терминал и попробовать отправить ещё что-нибудь в этот топик.
Также рекомендую скачать и установить клёвый десктопный клиент https://www.conduktor.io/ и законнектиться к серверу Kafka снаружи.
II. Создание Java SpringBoot проекта
Источником инфотмации послужила статья https://habr.com/ru/post/496182/, поэтому если что непонятно — вэлком туда.
Для начала создадим новый SpringBoot проект с зависимостями: Spring Web и Spring for Apache Kafka.
Мы не будем юзать Spring Web, он тут нужен только для того чтобы подтянулся Jackson. Либо можно не включать его в зависимости, а в файле pom.xml, прописать ручками
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
Создадим класс который будет слать сообщения каждые 3 секунды:
@Component
public static class Runner implements CommandLineRunner{
@Autowired
//it's ok. https://stackoverflow.com/questions/55280173/the-correct-way-for-creation-of-kafkatemplate-in-spring-boot
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void run(String... args) throws Exception {
Thread thread = new Thread(() -> {
try {
while (true) {
sleep(3000);
kafkaTemplate.send("msg", "currentTime", (new Date()).toString());
}
}catch (InterruptedException e){}
});
thread.setDaemon(true);
thread.start();
}
}
SuppressWarnings здесь потому что идея упорно говорит что такого бина не существует. Но он есть.
Теперь создадим класс консьюмера:
package ru.knastnt.kafkatest;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@EnableKafka
@Component
public class Listener {
@KafkaListener(topics = "msg", groupId = "app.1")
public void messageListener(String msg) {
System.out.println(msg);
}
}
метод messageListener будет вызываться как поступит новое сообщение.
Група консьюмеров — это группа в рамках которой доставляется один экземпляр сообщения. Например, у Вас есть три консьюмера в одной группе, и все они слушают одну тему. Как только на сервере появляется новое сообщение с данной темой, оно доставляется кому-то одному из группы. Остальные два консьюмера сообщение не получают.
Ну и напоследок отредактируем файл application.properties:
#адрес сервера Kafka
spring.kafka.bootstrap-servers=192.168.0.239:9092
Запускаем и тестируем! Можно юзать, например, Postman, либо встроенный в идею инструмент Tools > HTTP Client > Test RESTful Web Service.
Эксперименты
Продюссер при отправке сообщения может вернуть ответ об успешности или ошибке. Настрою коллбэк и буду выводить в консоль результат отправки сообщения. Дополним вызов kafkaTemplate.send:
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", "currentTime", (new Date()).toString());
future.addCallback(System.out::println, System.err::println);
Теперь попробую перезагружать сервер Kafka. Что из этого вышло:
Если приложение запускается при недоступном сервере Kafka, То возникает исключение org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic msg not present in metadata after 60000 ms.
Если сервер Kafka становится недоступным в процессе работы, то ошибки обрабатываются в возвращаемом future объекте метода kafkaTemplate.send.
Пока Kafka недоступен, Spring копит отправленные но недоставленные сообщения внутни себя
- Если сервер недоступен менее 2-х минут, то как только Kafka станет доступен, спринг выплюнет все сообщения туда прикрепив к ним правильное время отправки.
- Если сервер недоступен более 2-х минут, то вызовется коллбэк с ошибкой доставки. Типа такого org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation и такого org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation. Причем пачкой для нескольких сообщений. В итоге получится такой цикл: накопление сообщений — пачка ошибок — накопление сообщений — следующая пачка ошибок. Не разбирался по какому принципу, но как-то так. При появлении сервера Kafka, накопленные будут отправлены, а те, что с ошибками, — потерялись навсегда.
Отправка и получение кастомных объектов
Сначала реализуем продюссер. Для отправки объектов будем использовать сериализацию в Json. Для каждого кастомного класса нам нужен свой KafkaTemplate. Прежде чем приступить к их созданию, сделаем пару кастомных классов: UserDTO и UserDTO.Address:
package ru.knastnt.kafkatest;
public class UserDTO {
private String name;
private int age;
private Address address;
public static UserDTO getTestInstance(){
UserDTO u = new UserDTO();
UserDTO.Address a = new UserDTO.Address();
a.setStreet("Ленина");
a.setHouse(16);
u.setName("Иван");
u.setAge(25);
u.setAddress(a);
return u;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
public static class Address {
private String street;
private int house;
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
public int getHouse() {
return house;
}
public void setHouse(int house) {
this.house = house;
}
}
}
Теперь настроим кастомные шаблоны. Всё поместим в отдельный класс KafkaProducerConfig:
package ru.knastnt.kafkatest;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaSrv;
@Bean
public <T> KafkaTemplate<String, T> kafkaStringJsonTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}
Основное что мы тут делаем — назначаем сериализаторы для ключа и значения сообщений.
Дополним наш класс Runner. Добавим пару @Autowired полей:
@Autowired
private KafkaTemplate<String, UserDTO> kafkaUserTemplate;
@Autowired
private KafkaTemplate<String, UserDTO.Address> kafkaAddressTemplate;
А внутрь цикла в методе run, допишем действия по отправке наших кастомных объектов:
//Шлём объекты через кастомные шаблоны
sleep(2000);
ListenableFuture<SendResult<String, UserDTO>> userFuture = kafkaUserTemplate.send("msg2", "user", UserDTO.getTestInstance());
userFuture.addCallback(System.out::println, System.err::println);
sleep(2000);
ListenableFuture<SendResult<String, UserDTO.Address>> userFuture2 = kafkaAddressTemplate.send("msg3", "addr", UserDTO.getTestInstance().getAddress());
userFuture2.addCallback(System.out::println, System.err::println);
Предлагаю запуститься и посмотреть как объекты улетают в кафку (это будет видно в консоли). К тому же можете воспользоваться https://www.conduktor.io/ и убедиться что топики пополняются новыми сообщениями.
Сделаем теперь несколько консьюмеров чтобы получать эти сообщения из кафки. Это несколько сложнее, но принцип тот же.
Создаем класс KafkaConsumerConfig:
package ru.knastnt.kafkatest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
//Этот класс сейчас используется только для настройки приёма кастомных объектов через JSON
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaSrv;
public <T> ConsumerFactory<String, T> consumerFactory(Class<T> clazz) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(clazz));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserDTO> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(UserDTO.class));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> addresKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(UserDTO.Address.class));
return factory;
}
}
Собственно для каждого кастомного класса нужно сделать по параметризованному ConcurrentKafkaListenerContainerFactory — это спринговая потокобезопасная обёртка.
В каждую из них нужно заинджектить конфигурацию консьюмера. Я реализовал её в виде параметризованного метода, в котором ключ — строка, а значение — объект передаваемого класса в json формате.
Теперь добавим пару слушающих методов в наш класс Listener:
//Кастомный Json слушатель для класса UserDTO
@KafkaListener(topics = "msg2", containerFactory = "userKafkaListenerContainerFactory", groupId = "usersConsumers")
public void messageListener(@Payload UserDTO userDTO, @Headers MessageHeaders headers) {
System.out.println(headers);
System.out.println(userDTO);
}
//Кастомный Json слушатель для класса Address
@KafkaListener(topics = "msg3", containerFactory = "addresKafkaListenerContainerFactory", groupId = "addressesConsumers")
public void messageListener(@Payload UserDTO.Address s, @Headers MessageHeaders headers) {
System.out.println(headers);
System.out.println(s);
}
Каждому @KafkaListener нужно присвоить свой containerFactory, чтобы всё работало. Значение равно имени метода.
Запускайтесь, всё должно работать! Можете скачать мой проект — https://github.com/knastnt/kafkatest, там ничего лишнего — разберётесь 😉