1. JMS + IBM MQ — чтение из топика (паттерн Publisher/Subscriber)

1. JMS + IBM MQ — чтение из топика (паттерн Publisher/Subscriber)

Для начала нам понадобится сервис IBM MQ. Поднимаем его локально с помощью Docker. Для этого выполним команду в консоли:

docker pull ibmcom/mq
docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 11414:1414 --publish 9443:9443 --detach ibmcom/mq

В результате скачается и запустится образ. Будут проброшены 2 порта — 11414 (для взаимодействия с MQ) и 9443 (web-интерфейс).
Web-интерфейс поднимется не сразу, у меня он начинает работать только минуты через 4 после запуска контейнера. До этого то соединение сброшено, то ошибка установки защищённого соединения, то неожиданный ответ. В итоге, когда web-интерфейс — таки поднимется, зайти в него можно используя логин и пароль

Подключение к web: https://localhost:9443/
Логин/пароль: admin passw0rd

Настройки по умолчанию:

Имя администратора очередей: QM1
Канал: DEV.ADMIN.SVRCONN

Внутри Web-интерфейса можно ничего не делать.

Теперь нам нужно будет иметь возможность кидать тестовые сообщения (например, в web-интерфейсе я не нашёл возможности этого делать). Поэтому устанавливаем программу MQ Explorer View.

Установка MQ Explorer View:
https://www.ibm.com/docs/en/ibm-mq/9.1?topic=windows-installing-stand-alone-mq-explorer
Для скачивания нужно будет зарегистрироваться на сайте.
В итоге у меня скачался файл 9.1.5.0-IBM-MQ-Explorer-Win64.zip размером около 400Мб. Установка не вызывает затруднений. Но потом следует подключиться к запущенному IBM MQ:
1. Добавляем администратор очередей:

2. Указываем имя администратора очередей QM1 (из стандартных параметров), соединяемся Напрямую

3. Вводим localhost, порт и канал из стандартных параметров:

4. Жмём далее, далее,… и доходим до ввода пользователя. Логин и пароль такие же как для подключения к web-интерфейсу:

5. Далее, далее,…, готово. Вводим пароль и получаем установленное соединение:

Реализую подписку на топик (паттерн Publisher/Subscriber)

Пример для подписки на топики:
https://github.com/cococomeon/demo/blob/bdf303f58299e7619b32f8fe219aed07dc57a81e/demo-mq/demo-mq-ibmmq/src/main/java/com/lml/test/ibmmq/wmqjava/MQPubSubApiSample.java
Пример для подписки на топики в спринге:
https://www.concretepage.com/spring-5/spring-jms-topic-listener
Ещё неплохая статья про транзакционность:
https://coderlessons.com/articles/java/spring-jms-obrabotka-soobshchenii-v-tranzaktsiiakh

Напишем приложение для получения сообщений из топика

Свойства:

client.publ.jms.conn-name=localhost(11414)
client.publ.jms.channel=DEV.ADMIN.SVRCONN
client.publ.jms.queue-manager=QM1
#так будет называться наш топик:
client.publ.jms.topic-name=hellotopic

Нам понадобится зависимость для работы с IBM MQ в pom.xml:

<dependency>
	<groupId>com.ibm.mq</groupId>
	<artifactId>mq-jms-spring-boot-starter</artifactId>
	<version>2.4.4</version>
</dependency>

Включаем JMS проставив аннотацию @EnableJms в корневом классе приложения рядом со @SpringBootApplication

Создадим конфигурацию:

package ru.knastnt.springjmsibmmq.config;

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.spring.boot.MQConfigurationProperties;
import com.ibm.mq.spring.boot.MQConnectionFactoryFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.support.converter.MessageConverter;

@Slf4j
@Configuration
public class IbmMqConfiguration {

    /**
     * Бин для чтения конфигурации соединения с брокером очередей
     * @return
     */
    @Bean
    @ConfigurationProperties(prefix = "client.publ.jms")
    public MQConfigurationProperties configProperties() {
        return new MQConfigurationProperties();
    }

    /**
     * Бин для создания соединения с брокером очередей
     * @param configProperties
     * @return
     */
    @Bean
    public MQConnectionFactory connectionFactory(MQConfigurationProperties configProperties) {
        return new MQConnectionFactoryFactory(configProperties, null)
                .createConnectionFactory(MQConnectionFactory.class);
    }

    /**
     * Бин для настройки контекста обмена сообщениями (тип, конвертация, транзакционность, кэширование, таймауты, обработка ошибок)
     * @param connectionFactory
     * @return
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsContainer(MQConnectionFactory connectionFactory) {

        DefaultJmsListenerContainerFactory listenerContainer = new DefaultJmsListenerContainerFactory();
        //Включаем использование паттерна publisher/subscriber
        listenerContainer.setPubSubDomain(true);
        //Устанавливаем connectionFactory
        listenerContainer.setConnectionFactory(connectionFactory);
        //Обозначаем логику обработки ошибок
        listenerContainer.setErrorHandler(error -> log.error("Error!!! has occurred in jms listener", error));
        //Можно указать конвертер для автоматического биндинга сообщения в нужный формат, но пока сделаем без него
        //listenerContainer.setMessageConverter(messageConverter);

        return listenerContainer;
    }
}

Теперь создадим класс для получения входящих сообщений:

package ru.knastnt.springjmsibmmq.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class Receiver {
    @JmsListener(destination = "${client.publ.jms.topic-name}", containerFactory = "jmsContainer")
    public void receiveMessage(final Message message) {
        try {
            log.debug("Message has been received. Id: {}; content: {}", message.getHeaders().getId(), message.getPayload());

        } catch (Exception e) {
            log.error("Error receiving of message. Message: {}. Error:", message, e);
        }
    }
}

Теперь осталось запустить и проверить работу! Исходный код здесь.

После запуска приложения в MQ Explorer View в разделе Подписки появится один подписчик:

Для целей тестирования, интересно создать ещё одного подписчика, непосредственно средствами MQ Explorer View. Для этого зайти в раздел Темы, кликнуть правой кнопкой на элементе и выбрать Проверить подписку… и подписаться на наш hellotopic

В это окошка будут прилетать сообщения из топика точно так же как и в наше приложение. Осталось только отправить тестовое сообщение в топик. Это делается в том же меню, но вызвав пункт Проверить публикацию…
Отправляем тестовое сообщение, указав Строку раздела hellotopic

Всё отлично, ещё есть варианты рассмотреть:

  • Написание теста
  • Автоматическую десериализацию сообщений JSON-формата
  • Автоматический демаршаллинг сообщений XML-формата в соответствии с XSD схемой
  • Приём сообщений в формате SOAP
  • Реализация паттерна Point-to-Point
  • Реализация паттерна Запрос/Ответ
  • Обработка ошибок и передача сведений в очередь ошибок
  • Рассмотрение транзакционности

Но это в последующих статьях

(Просмотрено 2 150 раз, 1 раз за сегодня)

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *