Java + Apache Kafka = Первый проект

Java + Apache Kafka = Первый проект

Статья разбита на 2 части: Установка Kafka на сервер и создание SpringBoot приложения.

Если вы вообще не в курсе за брокеров сообщений, то советую посмотреть два видосика про Apache Kafka и Rabbit MQ. Они дадут понимание как там что работает и чем отличается:


I. Установка Apache Kafka

В качестве базовой системы у меня CentOS 7 x86_64 Minimal 1804. Для кафки рекомендуется иметь 4 Gb оперативки.

Перед началом работы ставлю нужные мне программы:

sudo yum install mc nano net-tools wget -y

далее, согласно инструкции https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-centos-7:

1 Установка OpenJDK8

1
sudo yum install java-1.8.0-openjdk

2 Создание нового пользователя

1
sudo useradd kafka -m

флаг -m означает, что также будет создана домашняя папка этого пользователя (/home/kafka).

Устанавливаем пароль:

1
sudo passwd kafka

Добовляем пользователя в группу wheel чтобы у него были права устанавливать зависимости Kafka:

1
sudo usermod -aG wheel kafka

Теперь войдем из-под этого пользователя:

1
su -l kafka

3 Загрузка и установка Kafka Binaries

Для начала создадим папку для загрузок

1
mkdir ~/downloads

Теперь идём на сайт и достаём оттуда ссылку на свежий дистрибутив кафки https://kafka.apache.org/downloads. Только мой вам совет — удостоверьтесь что эта ссылка ведёт непосредственно на файл, а не на какую-то другую страницу, а то я так целый час тупил и не мог распаковать скаченный архив .tgz.

Далее качаем этот файл себе в папку:

1
cd ~/downloads
1
wget "https://apache-mirror.rbc.ru/pub/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz"

Создаем папку под кафку и распаковываем его туда:

1
mkdir ~/kafka
1
cd ~/kafka
1
tar -xvzf ~/Downloads/kafka.tgz --strip 1

флаг —strip 1 просит архиватор не создавать родительскую папку типа ~/kafka/kafka_2.13-2.6.0, а распаковывать содержимое сразю в ~/kafka.

4 Настройка сервера Kafka

Поведение Kafka по умолчанию не позволяет нам удалить тему, категорию, группу или название канала, в котором можно публиковать сообщения. Чтобы исправить это, давайте отредактируем файл конфигурации:

1
nano ~/kafka/config/server.properties

и добавим это в конец:

1
delete.topic.enable = true

5 Создание Systemd Unit — файлов и запуск сервера Kafka

Создаем файл для зукипера (необходим для работы Kafka)

1
sudo nano /etc/systemd/system/zookeeper.service

со следующим содержанием

1
2
3
4
<br>
[Unit]<br>
Requires=network.target remote-fs.target<br>
After=network.target remote-fs.target

1
 

1
2
3
4
5
6
<p>[Service]<br>
Type=simple<br>
User=kafka<br>
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties<br>
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh<br>
Restart=on-abnormal</p>

1
 

1
2
[Install]<br>
WantedBy=multi-user.target<br>

секция Unit требует чтобы перед запуском zookeeper сеть и файловая система уже были готовы

секция Service назначает используемые при старте и остановке zookeeper скрипты

Теперь создаем то же самое для Kafka:

1
sudo nano /etc/systemd/system/kafka.service

со следующим содержанием

1
2
3
4
<br>
[Unit]<br>
Requires=zookeeper.service<br>
After=zookeeper.service

1
 

1
2
3
4
5
6
<p>[Service]<br>
Type=simple<br>
User=kafka<br>
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties &gt; /home/kafka/kafka/kafka.log 2&gt;&amp;1'<br>
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh<br>
Restart=on-abnormal</p>

1
 

1
2
[Install]<br>
WantedBy=multi-user.target<br>

секция Unit требует чтобы перед запуском kafka — zookeeper уже был готов.

Запускаем:

1
sudo systemctl start kafka

для проверки успешности запуска, смотрим журнал:

1
journalctl -u kafka

вывод должен содержать что-то подобное: Jul 17 18:38:59 kafka-centos systemd[1]: Started kafka.service.

Теперь по идее кафка работает на 9092 порту. Для проверки можно взглянуть на прослушиваемые порты:

1
netstat -tulpn

Теперь добавим кафку в автозагрузку:

1
sudo systemctl enable kafka

6 Настройка фаерволла

Хоть порт и прослушивается, но фаерволл не даёт подключиться к серверу извне. Исправляем это следующими командами:

1
firewall-cmd --permanent --zone=public --add-port=9092/tcp

1
firewall-cmd --permanent --zone=public --add-port=9092/udp

1
firewall-cmd --reload

Теперь подключение извне должно быть доступно. Возможно потребуется перезапуск.

7 Тестирование работы Kafka

Можете пропустить этот шаг если он вам не нужен. Но можно и потестировать в консоли:

7.1 Создайте новый топик

1
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

вывод должен быть Created topic «TutorialTopic».

7.2 Создайте продюссера и пошлите сообщение:

1
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic &gt; /dev/null

7.3 Создайте консьюмера и запустите приём сообщений:

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

флаг —from-beginning даст возможность получить сообщения отправленные до запуска консьюмера. Вывод должен быть Hello, World.

Этот скрипт будет выполняться и принимать сообщения в реальном времени. Можете подключиться через другой терминал и попробовать отправить ещё что-нибудь в этот топик.

Также рекомендую скачать и установить клёвый десктопный клиент  https://www.conduktor.io/ и законнектиться к серверу Kafka снаружи.


II. Создание Java SpringBoot проекта

Источником инфотмации послужила статья https://habr.com/ru/post/496182/, поэтому если что непонятно — вэлком туда.

Для начала создадим новый SpringBoot проект с зависимостями: Spring Web и Spring for Apache Kafka.

Мы не будем юзать Spring Web, он тут нужен только для того чтобы подтянулся Jackson. Либо можно не включать его в зависимости, а в файле pom.xml, прописать ручками

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Создадим класс который будет слать сообщения каждые 3 секунды:

@Component
    public static class Runner implements CommandLineRunner{
        @Autowired
        //it's ok. https://stackoverflow.com/questions/55280173/the-correct-way-for-creation-of-kafkatemplate-in-spring-boot
        @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
        private KafkaTemplate<String, String> kafkaTemplate;

        @Override
        public void run(String... args) throws Exception {
            Thread thread = new Thread(() -> {
                try {
                    while (true) {
                        sleep(3000);
                        kafkaTemplate.send("msg", "currentTime", (new Date()).toString());
                    }
                }catch (InterruptedException e){}
            });
            thread.setDaemon(true);
            thread.start();
        }
    }

SuppressWarnings здесь потому что идея упорно говорит что такого бина не существует. Но он есть.

Теперь создадим класс консьюмера:

package ru.knastnt.kafkatest;

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@EnableKafka
@Component
public class Listener {
    @KafkaListener(topics = "msg", groupId = "app.1")
    public void messageListener(String msg) {
        System.out.println(msg);
    }
}

метод messageListener будет вызываться как поступит новое сообщение.

Група консьюмеров — это группа в рамках которой доставляется один экземпляр сообщения. Например, у Вас есть три консьюмера в одной группе, и все они слушают одну тему. Как только на сервере появляется новое сообщение с данной темой, оно доставляется кому-то одному из группы. Остальные два консьюмера сообщение не получают.

Ну и напоследок отредактируем файл application.properties:

#адрес сервера Kafka
spring.kafka.bootstrap-servers=192.168.0.239:9092

Запускаем и тестируем! Можно юзать, например, Postman, либо встроенный в идею инструмент Tools > HTTP Client > Test RESTful Web Service.


Эксперименты

Продюссер при отправке сообщения может вернуть ответ об успешности или ошибке. Настрою коллбэк и буду выводить в консоль результат отправки сообщения. Дополним вызов kafkaTemplate.send:

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", "currentTime", (new Date()).toString());
future.addCallback(System.out::println, System.err::println);

Теперь попробую перезагружать сервер Kafka. Что из этого вышло:

Если приложение запускается при недоступном сервере Kafka, То возникает исключение org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic msg not present in metadata after 60000 ms.

Если сервер Kafka становится недоступным в процессе работы, то ошибки обрабатываются в возвращаемом future объекте метода kafkaTemplate.send.

Пока Kafka недоступен, Spring копит отправленные но недоставленные сообщения внутни себя

  1. Если сервер недоступен менее 2-х минут, то как только Kafka станет доступен, спринг выплюнет все сообщения туда прикрепив к ним правильное время отправки.
  2. Если сервер недоступен более 2-х минут, то вызовется коллбэк с ошибкой доставки. Типа такого org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation и такого org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation. Причем пачкой для нескольких сообщений. В итоге получится такой цикл: накопление сообщений — пачка ошибок — накопление сообщений — следующая пачка ошибок. Не разбирался по какому принципу, но как-то так. При появлении сервера Kafka, накопленные будут отправлены, а те, что с ошибками, — потерялись навсегда.

Отправка и получение кастомных объектов

Сначала реализуем продюссер. Для отправки объектов будем использовать сериализацию в Json. Для каждого кастомного класса нам нужен свой KafkaTemplate. Прежде чем приступить к их созданию, сделаем пару кастомных классов: UserDTO и UserDTO.Address:

package ru.knastnt.kafkatest;

public class UserDTO {
    private String name;
    private int age;
    private Address address;

    public static UserDTO getTestInstance(){
        UserDTO u = new UserDTO();
        UserDTO.Address a = new UserDTO.Address();

        a.setStreet("Ленина");
        a.setHouse(16);

        u.setName("Иван");
        u.setAge(25);
        u.setAddress(a);

        return u;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Address getAddress() {
        return address;
    }

    public void setAddress(Address address) {
        this.address = address;
    }

    public static class Address {
        private String street;
        private int house;

        public String getStreet() {
            return street;
        }

        public void setStreet(String street) {
            this.street = street;
        }

        public int getHouse() {
            return house;
        }

        public void setHouse(int house) {
            this.house = house;
        }
    }
}

Теперь настроим кастомные шаблоны. Всё поместим в отдельный класс KafkaProducerConfig:

package ru.knastnt.kafkatest;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaSrv;

    @Bean
    public <T> KafkaTemplate<String, T> kafkaStringJsonTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }
}

Основное что мы тут делаем — назначаем сериализаторы для ключа и значения сообщений.

Дополним наш класс Runner. Добавим пару @Autowired полей:

@Autowired
private KafkaTemplate<String, UserDTO> kafkaUserTemplate;

@Autowired
private KafkaTemplate<String, UserDTO.Address> kafkaAddressTemplate;

А внутрь цикла в методе run, допишем действия по отправке наших кастомных объектов:

//Шлём объекты через кастомные шаблоны
sleep(2000);
ListenableFuture<SendResult<String, UserDTO>> userFuture = kafkaUserTemplate.send("msg2", "user", UserDTO.getTestInstance());
userFuture.addCallback(System.out::println, System.err::println);

sleep(2000);
ListenableFuture<SendResult<String, UserDTO.Address>> userFuture2 = kafkaAddressTemplate.send("msg3", "addr", UserDTO.getTestInstance().getAddress());
userFuture2.addCallback(System.out::println, System.err::println);

Предлагаю запуститься и посмотреть как объекты улетают в кафку (это будет видно в консоли). К тому же можете воспользоваться  https://www.conduktor.io/ и убедиться что топики пополняются новыми сообщениями.

Сделаем теперь несколько консьюмеров чтобы получать эти сообщения из кафки. Это несколько сложнее, но принцип тот же.

Создаем класс KafkaConsumerConfig:

package ru.knastnt.kafkatest;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    //Этот класс сейчас используется только для настройки приёма кастомных объектов через JSON


    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaSrv;


    public <T> ConsumerFactory<String, T> consumerFactory(Class<T> clazz) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(clazz));
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserDTO> userKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(UserDTO.class));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> addresKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(UserDTO.Address.class));
        return factory;
    }

}

Собственно для каждого кастомного класса нужно сделать по параметризованному ConcurrentKafkaListenerContainerFactory — это спринговая потокобезопасная обёртка.

В каждую из них нужно заинджектить конфигурацию консьюмера. Я реализовал её в виде параметризованного метода, в котором ключ — строка, а значение — объект передаваемого класса в json формате.

Теперь добавим пару слушающих методов в наш класс Listener:

//Кастомный Json слушатель для класса UserDTO
@KafkaListener(topics = "msg2", containerFactory = "userKafkaListenerContainerFactory", groupId = "usersConsumers")
public void messageListener(@Payload UserDTO userDTO, @Headers MessageHeaders headers) {
        System.out.println(headers);
        System.out.println(userDTO);
    }

//Кастомный Json слушатель для класса Address
@KafkaListener(topics = "msg3", containerFactory = "addresKafkaListenerContainerFactory", groupId = "addressesConsumers")
public void messageListener(@Payload UserDTO.Address s, @Headers MessageHeaders headers) {
        System.out.println(headers);
        System.out.println(s);
    }

Каждому @KafkaListener нужно присвоить свой containerFactory, чтобы всё работало. Значение равно имени метода.

Запускайтесь, всё должно работать! Можете скачать мой проект — https://github.com/knastnt/kafkatest, там ничего лишнего — разберётесь 😉

(Просмотрено 17 184 раз, 1 раз за сегодня)

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *