1. JMS + IBM MQ — чтение из топика (паттерн Publisher/Subscriber)
Для начала нам понадобится сервис IBM MQ. Поднимаем его локально с помощью Docker. Для этого выполним команду в консоли:
docker pull ibmcom/mq
docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 11414:1414 --publish 9443:9443 --detach ibmcom/mq
В результате скачается и запустится образ. Будут проброшены 2 порта — 11414 (для взаимодействия с MQ) и 9443 (web-интерфейс).
Web-интерфейс поднимется не сразу, у меня он начинает работать только минуты через 4 после запуска контейнера. До этого то соединение сброшено, то ошибка установки защищённого соединения, то неожиданный ответ. В итоге, когда web-интерфейс — таки поднимется, зайти в него можно используя логин и пароль
Подключение к web: https://localhost:9443/
Логин/пароль: admin passw0rd
Настройки по умолчанию:
Имя администратора очередей: QM1
Канал: DEV.ADMIN.SVRCONN
Внутри Web-интерфейса можно ничего не делать.
Теперь нам нужно будет иметь возможность кидать тестовые сообщения (например, в web-интерфейсе я не нашёл возможности этого делать). Поэтому устанавливаем программу MQ Explorer View.
Установка MQ Explorer View:
https://www.ibm.com/docs/en/ibm-mq/9.1?topic=windows-installing-stand-alone-mq-explorer
Для скачивания нужно будет зарегистрироваться на сайте.
В итоге у меня скачался файл 9.1.5.0-IBM-MQ-Explorer-Win64.zip размером около 400Мб. Установка не вызывает затруднений. Но потом следует подключиться к запущенному IBM MQ:
1. Добавляем администратор очередей:
2. Указываем имя администратора очередей QM1 (из стандартных параметров), соединяемся Напрямую
3. Вводим localhost, порт и канал из стандартных параметров:
4. Жмём далее, далее,… и доходим до ввода пользователя. Логин и пароль такие же как для подключения к web-интерфейсу:
5. Далее, далее,…, готово. Вводим пароль и получаем установленное соединение:
Реализую подписку на топик (паттерн Publisher/Subscriber)
Пример для подписки на топики:
https://github.com/cococomeon/demo/blob/bdf303f58299e7619b32f8fe219aed07dc57a81e/demo-mq/demo-mq-ibmmq/src/main/java/com/lml/test/ibmmq/wmqjava/MQPubSubApiSample.java
Пример для подписки на топики в спринге:
https://www.concretepage.com/spring-5/spring-jms-topic-listener
Ещё неплохая статья про транзакционность:
https://coderlessons.com/articles/java/spring-jms-obrabotka-soobshchenii-v-tranzaktsiiakh
Напишем приложение для получения сообщений из топика
Свойства:
client.publ.jms.conn-name=localhost(11414)
client.publ.jms.channel=DEV.ADMIN.SVRCONN
client.publ.jms.queue-manager=QM1
#так будет называться наш топик:
client.publ.jms.topic-name=hellotopic
Нам понадобится зависимость для работы с IBM MQ в pom.xml:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.4.4</version>
</dependency>
Включаем JMS проставив аннотацию @EnableJms в корневом классе приложения рядом со @SpringBootApplication
Создадим конфигурацию:
package ru.knastnt.springjmsibmmq.config;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.spring.boot.MQConfigurationProperties;
import com.ibm.mq.spring.boot.MQConnectionFactoryFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.support.converter.MessageConverter;
@Slf4j
@Configuration
public class IbmMqConfiguration {
/**
* Бин для чтения конфигурации соединения с брокером очередей
* @return
*/
@Bean
@ConfigurationProperties(prefix = "client.publ.jms")
public MQConfigurationProperties configProperties() {
return new MQConfigurationProperties();
}
/**
* Бин для создания соединения с брокером очередей
* @param configProperties
* @return
*/
@Bean
public MQConnectionFactory connectionFactory(MQConfigurationProperties configProperties) {
return new MQConnectionFactoryFactory(configProperties, null)
.createConnectionFactory(MQConnectionFactory.class);
}
/**
* Бин для настройки контекста обмена сообщениями (тип, конвертация, транзакционность, кэширование, таймауты, обработка ошибок)
* @param connectionFactory
* @return
*/
@Bean
public DefaultJmsListenerContainerFactory jmsContainer(MQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory listenerContainer = new DefaultJmsListenerContainerFactory();
//Включаем использование паттерна publisher/subscriber
listenerContainer.setPubSubDomain(true);
//Устанавливаем connectionFactory
listenerContainer.setConnectionFactory(connectionFactory);
//Обозначаем логику обработки ошибок
listenerContainer.setErrorHandler(error -> log.error("Error!!! has occurred in jms listener", error));
//Можно указать конвертер для автоматического биндинга сообщения в нужный формат, но пока сделаем без него
//listenerContainer.setMessageConverter(messageConverter);
return listenerContainer;
}
}
Теперь создадим класс для получения входящих сообщений:
package ru.knastnt.springjmsibmmq.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class Receiver {
@JmsListener(destination = "${client.publ.jms.topic-name}", containerFactory = "jmsContainer")
public void receiveMessage(final Message message) {
try {
log.debug("Message has been received. Id: {}; content: {}", message.getHeaders().getId(), message.getPayload());
} catch (Exception e) {
log.error("Error receiving of message. Message: {}. Error:", message, e);
}
}
}
Теперь осталось запустить и проверить работу! Исходный код здесь.
После запуска приложения в MQ Explorer View в разделе Подписки появится один подписчик:
Для целей тестирования, интересно создать ещё одного подписчика, непосредственно средствами MQ Explorer View. Для этого зайти в раздел Темы, кликнуть правой кнопкой на элементе и выбрать Проверить подписку… и подписаться на наш hellotopic
В это окошка будут прилетать сообщения из топика точно так же как и в наше приложение. Осталось только отправить тестовое сообщение в топик. Это делается в том же меню, но вызвав пункт Проверить публикацию…
Отправляем тестовое сообщение, указав Строку раздела hellotopic
Всё отлично, ещё есть варианты рассмотреть:
- Написание теста
- Автоматическую десериализацию сообщений JSON-формата
- Автоматический демаршаллинг сообщений XML-формата в соответствии с XSD схемой
- Приём сообщений в формате SOAP
- Реализация паттерна Point-to-Point
- Реализация паттерна Запрос/Ответ
- Обработка ошибок и передача сведений в очередь ошибок
- Рассмотрение транзакционности
Но это в последующих статьях