Задержка принятия следующего Jms сообщения после ошибки

Задержка принятия следующего Jms сообщения после ошибки

Вариант реализации JmsListenerContainerFactory (на основе DefaultJmsListenerContainerFactory) для задержки принятия следующего jms сообщения, если был выброшен необработанный эксепшен:

import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpoint;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ErrorHandler;

import javax.annotation.PostConstruct;
import javax.jms.*;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.time.temporal.ChronoUnit.MINUTES;

/**
 * Делает задержку равную afterErrorSleepDurationMinutes перед следующим приёмом
 * сообщения, если при обработке сообщения выброшено необработанное исключение
 */
@Slf4j
public class PauseOnErrorJmsListenerContainerFactory implements JmsListenerContainerFactory<MessageListenerContainer> {
    @Autowired
    private TaskScheduler taskScheduler;

    @Autowired
    private JmsMessageFormat jmsMessageFormat;

    @Setter
    private int afterErrorSleepDurationMinutes;

    @Setter
    private ConnectionFactory connectionFactory;

    @Setter
    private MessageConverter messageConverter;

    @Setter
    private PlatformTransactionManager transactionManager;

    // ---- //

    private DefaultJmsListenerContainerFactory delegate;
    private ConcurrentMap<Throwable, Message> exceptions2messages;

    @PostConstruct
    public void init() {
        delegate = new FactoryDelegate();
        delegate.setConnectionFactory(connectionFactory);
        delegate.setMessageConverter(messageConverter);
        delegate.setTransactionManager(transactionManager);
        exceptions2messages = new ConcurrentHashMap<>();
    }

    // ---- //

    @Override
    public @NonNull
    MessageListenerContainer createListenerContainer(@NonNull JmsListenerEndpoint endpoint) {
        DefaultMessageListenerContainer result =
            delegate.createListenerContainer(endpoint);

        ErrorHandler csErrorHandler = makeErrorHandler(result);

        result.setErrorHandler(csErrorHandler);
        result.setExceptionListener(csErrorHandler::handleError);

        return result;
    }

    @NonNull
    private ContainerStoppingErrorHandler makeErrorHandler(
        @NonNull DefaultMessageListenerContainer messageListenerContainer
    ) {
        return new ContainerStoppingErrorHandler(
            taskScheduler,
            messageListenerContainer,
            afterErrorSleepDurationMinutes
        );
    }

    private class FactoryDelegate extends DefaultJmsListenerContainerFactory {
        @NonNull
        @Override
        protected DefaultMessageListenerContainer createContainerInstance() {
            return new CustomMessageListenerContainer();
        }
    }

    private class CustomMessageListenerContainer extends DefaultMessageListenerContainer {
        @Override
        protected void invokeListener(@NonNull Session session, @NonNull Message message) throws JMSException {
            if (log.isInfoEnabled()) {
                String dn = super.getDestinationName();
                String messageStr = jmsMessageFormat.format(message);
                log.info("New message received from {}: {}.", dn, messageStr);
            }

            try {
                super.invokeListener(session, message);

            } catch (JMSException | RuntimeException e) {
                exceptions2messages.put(e, message);
                throw e;
            }
        }
    }

    @RequiredArgsConstructor
    public class ContainerStoppingErrorHandler implements ErrorHandler {
        private final TaskScheduler taskScheduler;
        private final DefaultMessageListenerContainer mlc;
        private final int durationAfterErrorMinutes;
        private final AtomicBoolean currentlyStopping = new AtomicBoolean(false);
        private final Set<Throwable> exceptions = ConcurrentHashMap.newKeySet();

        @Override
        public void handleError(@NonNull Throwable throwable) {
            exceptions.add(throwable);

            if (currentlyStopping.compareAndSet(false, true)) {
                mlc.stop(() -> {
                    currentlyStopping.set(false);

                    Instant startTime = calcNextStartTime();

                    for (Throwable exception : exceptions) {
                        Message message = exceptions2messages.remove(exception);
                        logStoppingDuoError(mlc, startTime, exception, message);
                    }
                    exceptions.clear();

                    taskScheduler.schedule(mlc::start, startTime);
                });
            }
        }

        private @NonNull
        Instant calcNextStartTime() {
            return Instant.now().plus(durationAfterErrorMinutes, MINUTES);
        }

        private void logStoppingDuoError(@NonNull DefaultMessageListenerContainer mlc,
                                         @NonNull Instant startTime,
                                         @NonNull Throwable throwable,
                                         @Nullable Message inboundMessage) {
            if (!log.isErrorEnabled()) {
                return;
            }

            ZoneId zone = ZoneId.systemDefault();
            ZonedDateTime startTimeLdt = ZonedDateTime.ofInstant(startTime, zone);

            if (inboundMessage != null) {
                String inboundMessageStr = jmsMessageFormat.format(inboundMessage);
                log.error(""
                        + "Stopping listening '{}' duo message processing "
                        + "error until {}. Input message: {}.",
                    getDestinationName(mlc), startTimeLdt, inboundMessageStr,
                    throwable
                );

            } else {
                log.error(
                    "Stopping listening '{}' duo queue access error until {}.",
                    getDestinationName(mlc), startTimeLdt,
                    throwable
                );
            }
        }

        private @NonNull
        String getDestinationName(@NonNull DefaultMessageListenerContainer mlc) {
            try {
                String result = mlc.getDestinationName();
                if (result != null) {
                    return result;
                }

                Destination destination = mlc.getDestination();
                if (destination instanceof Queue) {
                    return ((Queue) destination).getQueueName();
                }
                if (destination != null) {
                    return destination.toString();
                }

            } catch (Exception e) {
                log.error("Destination name extraction error", e);
            }

            return "Unknown destination";
        }
    }
}

Инициализация выглядит следующим образом

    @Bean
    public PauseOnErrorJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {

        PauseOnErrorJmsListenerContainerFactory factory
            = new PauseOnErrorJmsListenerContainerFactory();
        factory.setAfterErrorSleepDurationMinutes(afterErrorDurationMinutes);
        factory.setConnectionFactory(connectionFactory);
        factory.setTransactionManager(transactionManager);
        return factory;
    }
(Просмотрено 414 раз, 1 раз за сегодня)

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *