Задержка принятия следующего Jms сообщения после ошибки
Вариант реализации JmsListenerContainerFactory (на основе DefaultJmsListenerContainerFactory) для задержки принятия следующего jms сообщения, если был выброшен необработанный эксепшен:
import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerEndpoint; import org.springframework.jms.listener.DefaultMessageListenerContainer; import org.springframework.jms.listener.MessageListenerContainer; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.ErrorHandler; import javax.annotation.PostConstruct; import javax.jms.*; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import static java.time.temporal.ChronoUnit.MINUTES; /** * Делает задержку равную afterErrorSleepDurationMinutes перед следующим приёмом * сообщения, если при обработке сообщения выброшено необработанное исключение */ @Slf4j public class PauseOnErrorJmsListenerContainerFactory implements JmsListenerContainerFactory<MessageListenerContainer> { @Autowired private TaskScheduler taskScheduler; @Autowired private JmsMessageFormat jmsMessageFormat; @Setter private int afterErrorSleepDurationMinutes; @Setter private ConnectionFactory connectionFactory; @Setter private MessageConverter messageConverter; @Setter private PlatformTransactionManager transactionManager; // ---- // private DefaultJmsListenerContainerFactory delegate; private ConcurrentMap<Throwable, Message> exceptions2messages; @PostConstruct public void init() { delegate = new FactoryDelegate(); delegate.setConnectionFactory(connectionFactory); delegate.setMessageConverter(messageConverter); delegate.setTransactionManager(transactionManager); exceptions2messages = new ConcurrentHashMap<>(); } // ---- // @Override public @NonNull MessageListenerContainer createListenerContainer( @NonNull JmsListenerEndpoint endpoint) { DefaultMessageListenerContainer result = delegate.createListenerContainer(endpoint); ErrorHandler csErrorHandler = makeErrorHandler(result); result.setErrorHandler(csErrorHandler); result.setExceptionListener(csErrorHandler::handleError); return result; } @NonNull private ContainerStoppingErrorHandler makeErrorHandler( @NonNull DefaultMessageListenerContainer messageListenerContainer ) { return new ContainerStoppingErrorHandler( taskScheduler, messageListenerContainer, afterErrorSleepDurationMinutes ); } private class FactoryDelegate extends DefaultJmsListenerContainerFactory { @NonNull @Override protected DefaultMessageListenerContainer createContainerInstance() { return new CustomMessageListenerContainer(); } } private class CustomMessageListenerContainer extends DefaultMessageListenerContainer { @Override protected void invokeListener( @NonNull Session session, @NonNull Message message) throws JMSException { if (log.isInfoEnabled()) { String dn = super .getDestinationName(); String messageStr = jmsMessageFormat.format(message); log.info( "New message received from {}: {}." , dn, messageStr); } try { super .invokeListener(session, message); } catch (JMSException | RuntimeException e) { exceptions2messages.put(e, message); throw e; } } } @RequiredArgsConstructor public class ContainerStoppingErrorHandler implements ErrorHandler { private final TaskScheduler taskScheduler; private final DefaultMessageListenerContainer mlc; private final int durationAfterErrorMinutes; private final AtomicBoolean currentlyStopping = new AtomicBoolean( false ); private final Set<Throwable> exceptions = ConcurrentHashMap.newKeySet(); @Override public void handleError( @NonNull Throwable throwable) { exceptions.add(throwable); if (currentlyStopping.compareAndSet( false , true )) { mlc.stop(() -> { currentlyStopping.set( false ); Instant startTime = calcNextStartTime(); for (Throwable exception : exceptions) { Message message = exceptions2messages.remove(exception); logStoppingDuoError(mlc, startTime, exception, message); } exceptions.clear(); taskScheduler.schedule(mlc::start, startTime); }); } } private @NonNull Instant calcNextStartTime() { return Instant.now().plus(durationAfterErrorMinutes, MINUTES); } private void logStoppingDuoError( @NonNull DefaultMessageListenerContainer mlc, @NonNull Instant startTime, @NonNull Throwable throwable, @Nullable Message inboundMessage) { if (!log.isErrorEnabled()) { return ; } ZoneId zone = ZoneId.systemDefault(); ZonedDateTime startTimeLdt = ZonedDateTime.ofInstant(startTime, zone); if (inboundMessage != null ) { String inboundMessageStr = jmsMessageFormat.format(inboundMessage); log.error( "" + "Stopping listening '{}' duo message processing " + "error until {}. Input message: {}." , getDestinationName(mlc), startTimeLdt, inboundMessageStr, throwable ); } else { log.error( "Stopping listening '{}' duo queue access error until {}." , getDestinationName(mlc), startTimeLdt, throwable ); } } private @NonNull String getDestinationName( @NonNull DefaultMessageListenerContainer mlc) { try { String result = mlc.getDestinationName(); if (result != null ) { return result; } Destination destination = mlc.getDestination(); if (destination instanceof Queue) { return ((Queue) destination).getQueueName(); } if (destination != null ) { return destination.toString(); } } catch (Exception e) { log.error( "Destination name extraction error" , e); } return "Unknown destination" ; } } } |
Инициализация выглядит следующим образом
@Bean public PauseOnErrorJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) { PauseOnErrorJmsListenerContainerFactory factory = new PauseOnErrorJmsListenerContainerFactory(); factory.setAfterErrorSleepDurationMinutes(afterErrorDurationMinutes); factory.setConnectionFactory(connectionFactory); factory.setTransactionManager(transactionManager); return factory; } |
(Просмотрено 491 раз, 1 раз за сегодня)