1. JMS + IBM MQ — чтение из топика (паттерн Publisher/Subscriber)
Для начала нам понадобится сервис IBM MQ. Поднимаем его локально с помощью Docker. Для этого выполним команду в консоли:
docker pull ibmcom/mq docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 11414:1414 --publish 9443:9443 --detach ibmcom/mq |
В результате скачается и запустится образ. Будут проброшены 2 порта — 11414 (для взаимодействия с MQ) и 9443 (web-интерфейс).
Web-интерфейс поднимется не сразу, у меня он начинает работать только минуты через 4 после запуска контейнера. До этого то соединение сброшено, то ошибка установки защищённого соединения, то неожиданный ответ. В итоге, когда web-интерфейс — таки поднимется, зайти в него можно используя логин и пароль
Подключение к web: https://localhost:9443/
Логин/пароль: admin passw0rd
Настройки по умолчанию:
Имя администратора очередей: QM1
Канал: DEV.ADMIN.SVRCONN
Внутри Web-интерфейса можно ничего не делать.
Теперь нам нужно будет иметь возможность кидать тестовые сообщения (например, в web-интерфейсе я не нашёл возможности этого делать). Поэтому устанавливаем программу MQ Explorer View.
Установка MQ Explorer View:
https://www.ibm.com/docs/en/ibm-mq/9.1?topic=windows-installing-stand-alone-mq-explorer
Для скачивания нужно будет зарегистрироваться на сайте.
В итоге у меня скачался файл 9.1.5.0-IBM-MQ-Explorer-Win64.zip размером около 400Мб. Установка не вызывает затруднений. Но потом следует подключиться к запущенному IBM MQ:
1. Добавляем администратор очередей:

2. Указываем имя администратора очередей QM1 (из стандартных параметров), соединяемся Напрямую

3. Вводим localhost, порт и канал из стандартных параметров:

4. Жмём далее, далее,… и доходим до ввода пользователя. Логин и пароль такие же как для подключения к web-интерфейсу:

5. Далее, далее,…, готово. Вводим пароль и получаем установленное соединение:

Реализую подписку на топик (паттерн Publisher/Subscriber)
Пример для подписки на топики:
https://github.com/cococomeon/demo/blob/bdf303f58299e7619b32f8fe219aed07dc57a81e/demo-mq/demo-mq-ibmmq/src/main/java/com/lml/test/ibmmq/wmqjava/MQPubSubApiSample.java
Пример для подписки на топики в спринге:
https://www.concretepage.com/spring-5/spring-jms-topic-listener
Ещё неплохая статья про транзакционность:
https://coderlessons.com/articles/java/spring-jms-obrabotka-soobshchenii-v-tranzaktsiiakh
Напишем приложение для получения сообщений из топика
Свойства:
client.publ.jms.conn-name=localhost(11414) client.publ.jms.channel=DEV.ADMIN.SVRCONN client.publ.jms.queue-manager=QM1 #так будет называться наш топик: client.publ.jms.topic-name=hellotopic |
Нам понадобится зависимость для работы с IBM MQ в pom.xml:
< dependency > < groupId >com.ibm.mq</ groupId > < artifactId >mq-jms-spring-boot-starter</ artifactId > < version >2.4.4</ version > </ dependency > |
Включаем JMS проставив аннотацию @EnableJms в корневом классе приложения рядом со @SpringBootApplication
Создадим конфигурацию:
package ru.knastnt.springjmsibmmq.config; import com.ibm.mq.jms.MQConnectionFactory; import com.ibm.mq.spring.boot.MQConfigurationProperties; import com.ibm.mq.spring.boot.MQConnectionFactoryFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.support.converter.MessageConverter; @Slf4j @Configuration public class IbmMqConfiguration { /** * Бин для чтения конфигурации соединения с брокером очередей * @return */ @Bean @ConfigurationProperties (prefix = "client.publ.jms" ) public MQConfigurationProperties configProperties() { return new MQConfigurationProperties(); } /** * Бин для создания соединения с брокером очередей * @param configProperties * @return */ @Bean public MQConnectionFactory connectionFactory(MQConfigurationProperties configProperties) { return new MQConnectionFactoryFactory(configProperties, null ) .createConnectionFactory(MQConnectionFactory. class ); } /** * Бин для настройки контекста обмена сообщениями (тип, конвертация, транзакционность, кэширование, таймауты, обработка ошибок) * @param connectionFactory * @return */ @Bean public DefaultJmsListenerContainerFactory jmsContainer(MQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory listenerContainer = new DefaultJmsListenerContainerFactory(); //Включаем использование паттерна publisher/subscriber listenerContainer.setPubSubDomain( true ); //Устанавливаем connectionFactory listenerContainer.setConnectionFactory(connectionFactory); //Обозначаем логику обработки ошибок listenerContainer.setErrorHandler(error -> log.error( "Error!!! has occurred in jms listener" , error)); //Можно указать конвертер для автоматического биндинга сообщения в нужный формат, но пока сделаем без него //listenerContainer.setMessageConverter(messageConverter); return listenerContainer; } } |
Теперь создадим класс для получения входящих сообщений:
package ru.knastnt.springjmsibmmq.mq; import lombok.extern.slf4j.Slf4j; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Slf4j @Component public class Receiver { @JmsListener (destination = "${client.publ.jms.topic-name}" , containerFactory = "jmsContainer" ) public void receiveMessage( final Message message) { try { log.debug( "Message has been received. Id: {}; content: {}" , message.getHeaders().getId(), message.getPayload()); } catch (Exception e) { log.error( "Error receiving of message. Message: {}. Error:" , message, e); } } } |
Теперь осталось запустить и проверить работу! Исходный код здесь.
После запуска приложения в MQ Explorer View в разделе Подписки появится один подписчик:

Для целей тестирования, интересно создать ещё одного подписчика, непосредственно средствами MQ Explorer View. Для этого зайти в раздел Темы, кликнуть правой кнопкой на элементе и выбрать Проверить подписку… и подписаться на наш hellotopic


В это окошка будут прилетать сообщения из топика точно так же как и в наше приложение. Осталось только отправить тестовое сообщение в топик. Это делается в том же меню, но вызвав пункт Проверить публикацию…
Отправляем тестовое сообщение, указав Строку раздела hellotopic

Всё отлично, ещё есть варианты рассмотреть:
- Написание теста
- Автоматическую десериализацию сообщений JSON-формата
- Автоматический демаршаллинг сообщений XML-формата в соответствии с XSD схемой
- Приём сообщений в формате SOAP
- Реализация паттерна Point-to-Point
- Реализация паттерна Запрос/Ответ
- Обработка ошибок и передача сведений в очередь ошибок
- Рассмотрение транзакционности
Но это в последующих статьях