Задержка принятия следующего Jms сообщения после ошибки
Вариант реализации JmsListenerContainerFactory (на основе DefaultJmsListenerContainerFactory) для задержки принятия следующего jms сообщения, если был выброшен необработанный эксепшен:
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpoint;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ErrorHandler;
import javax.annotation.PostConstruct;
import javax.jms.*;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.time.temporal.ChronoUnit.MINUTES;
/**
* Делает задержку равную afterErrorSleepDurationMinutes перед следующим приёмом
* сообщения, если при обработке сообщения выброшено необработанное исключение
*/
@Slf4j
public class PauseOnErrorJmsListenerContainerFactory implements JmsListenerContainerFactory<MessageListenerContainer> {
@Autowired
private TaskScheduler taskScheduler;
@Autowired
private JmsMessageFormat jmsMessageFormat;
@Setter
private int afterErrorSleepDurationMinutes;
@Setter
private ConnectionFactory connectionFactory;
@Setter
private MessageConverter messageConverter;
@Setter
private PlatformTransactionManager transactionManager;
// ---- //
private DefaultJmsListenerContainerFactory delegate;
private ConcurrentMap<Throwable, Message> exceptions2messages;
@PostConstruct
public void init() {
delegate = new FactoryDelegate();
delegate.setConnectionFactory(connectionFactory);
delegate.setMessageConverter(messageConverter);
delegate.setTransactionManager(transactionManager);
exceptions2messages = new ConcurrentHashMap<>();
}
// ---- //
@Override
public @NonNull
MessageListenerContainer createListenerContainer(@NonNull JmsListenerEndpoint endpoint) {
DefaultMessageListenerContainer result =
delegate.createListenerContainer(endpoint);
ErrorHandler csErrorHandler = makeErrorHandler(result);
result.setErrorHandler(csErrorHandler);
result.setExceptionListener(csErrorHandler::handleError);
return result;
}
@NonNull
private ContainerStoppingErrorHandler makeErrorHandler(
@NonNull DefaultMessageListenerContainer messageListenerContainer
) {
return new ContainerStoppingErrorHandler(
taskScheduler,
messageListenerContainer,
afterErrorSleepDurationMinutes
);
}
private class FactoryDelegate extends DefaultJmsListenerContainerFactory {
@NonNull
@Override
protected DefaultMessageListenerContainer createContainerInstance() {
return new CustomMessageListenerContainer();
}
}
private class CustomMessageListenerContainer extends DefaultMessageListenerContainer {
@Override
protected void invokeListener(@NonNull Session session, @NonNull Message message) throws JMSException {
if (log.isInfoEnabled()) {
String dn = super.getDestinationName();
String messageStr = jmsMessageFormat.format(message);
log.info("New message received from {}: {}.", dn, messageStr);
}
try {
super.invokeListener(session, message);
} catch (JMSException | RuntimeException e) {
exceptions2messages.put(e, message);
throw e;
}
}
}
@RequiredArgsConstructor
public class ContainerStoppingErrorHandler implements ErrorHandler {
private final TaskScheduler taskScheduler;
private final DefaultMessageListenerContainer mlc;
private final int durationAfterErrorMinutes;
private final AtomicBoolean currentlyStopping = new AtomicBoolean(false);
private final Set<Throwable> exceptions = ConcurrentHashMap.newKeySet();
@Override
public void handleError(@NonNull Throwable throwable) {
exceptions.add(throwable);
if (currentlyStopping.compareAndSet(false, true)) {
mlc.stop(() -> {
currentlyStopping.set(false);
Instant startTime = calcNextStartTime();
for (Throwable exception : exceptions) {
Message message = exceptions2messages.remove(exception);
logStoppingDuoError(mlc, startTime, exception, message);
}
exceptions.clear();
taskScheduler.schedule(mlc::start, startTime);
});
}
}
private @NonNull
Instant calcNextStartTime() {
return Instant.now().plus(durationAfterErrorMinutes, MINUTES);
}
private void logStoppingDuoError(@NonNull DefaultMessageListenerContainer mlc,
@NonNull Instant startTime,
@NonNull Throwable throwable,
@Nullable Message inboundMessage) {
if (!log.isErrorEnabled()) {
return;
}
ZoneId zone = ZoneId.systemDefault();
ZonedDateTime startTimeLdt = ZonedDateTime.ofInstant(startTime, zone);
if (inboundMessage != null) {
String inboundMessageStr = jmsMessageFormat.format(inboundMessage);
log.error(""
+ "Stopping listening '{}' duo message processing "
+ "error until {}. Input message: {}.",
getDestinationName(mlc), startTimeLdt, inboundMessageStr,
throwable
);
} else {
log.error(
"Stopping listening '{}' duo queue access error until {}.",
getDestinationName(mlc), startTimeLdt,
throwable
);
}
}
private @NonNull
String getDestinationName(@NonNull DefaultMessageListenerContainer mlc) {
try {
String result = mlc.getDestinationName();
if (result != null) {
return result;
}
Destination destination = mlc.getDestination();
if (destination instanceof Queue) {
return ((Queue) destination).getQueueName();
}
if (destination != null) {
return destination.toString();
}
} catch (Exception e) {
log.error("Destination name extraction error", e);
}
return "Unknown destination";
}
}
}
Инициализация выглядит следующим образом
@Bean
public PauseOnErrorJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
PauseOnErrorJmsListenerContainerFactory factory
= new PauseOnErrorJmsListenerContainerFactory();
factory.setAfterErrorSleepDurationMinutes(afterErrorDurationMinutes);
factory.setConnectionFactory(connectionFactory);
factory.setTransactionManager(transactionManager);
return factory;
}
(Просмотрено 471 раз, 1 раз за сегодня)