Мы столкнулись с проблемой в производственной среде, когда потребители не используют ресурсы, а очереди продолжают расти, а производительность снижается.
Каждый из потребителей представляет собой контейнер, который содержит один экземпляр не ориентированного на многопотоковое исполнение bean-компонента прослушивателя.
Каждый слушатель должен записывать в свой собственный набор файлов. Чтобы избежать конфликта потоков, я бы хотел, чтобы только один поток писал в свой собственный набор файлов.
Каждый слушатель создается только один раз с помощью @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
Я использую конфигурацию, подобную той, что описана в этом question< /а>
Каждый контейнер также настроен с советом о повторной попытке, который имеет следующий код:
public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
private static final int DEFAULT_RETRY_COUNT = 5;
private static final int DEFAULT_BACKOFF_MS = 250;
private int retryCount;
private int backOffPeriodInMS;
public RetryMessageAdvice() {
this.retryCount = DEFAULT_RETRY_COUNT;
this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
initializeRetryPolicy();
}
public RetryMessageAdvice(int retryCount, int backoff) {
this.retryCount = retryCount;
this.backOffPeriodInMS = backoff;
initializeRetryPolicy();
}
public void initializeRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(this.retryCount);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(backOffPeriodInMS);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.setRetryOperations(retryTemplate);
this.setMessageRecoverer(new RetryMessageRecoverer());
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
}
Потребитель выглядит примерно так:
@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);
private final ExportMapper exportMapper;
private final ExportFormatter exportFormatter;
@Autowired
public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
@Qualifier("exportMapper") ExportedMapper exportedMapper,
@Value("${export.root.dir}") String exportDirectory) {
super(exportDirectory);
this.exportedFormatter = exportFormatter;
this.exportedMapper = exportedMapper;
}
@Override
public void handle(AnalyticsEvent analyticsEvent) throws Exception {
ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);
File csvFile = getCsvFile(exportedEvent);
String csvRow = exportFormatter.writeAsString(exportedEvent);
writeCsvRow(csvRow, csvFile);
}
}
Что еще нужно отметить
- Преобразователь экспорта и средство форматирования экспорта являются потокобезопасными, но не используют @Scope (значение = ConfigurableBeanFactory.SCOPE_PROTOTYPE).
- Метод writeCsvRow является синхронизированным.
- Существует большое количество ошибок, из-за которых exportMapper выдает исключение и запускает совет повторной попытки.
- Скорость входящего сообщения 120/с
- Соотношение между входящим и доставленным тарифом обычно составляет 5:1.
Мои теории о том, что не так,
- Большое количество ошибок приводит к большому количеству повторных попыток и снижению производительности. Я бы лучше поместил плохое сообщение в очередь ошибок.
- Каким-то образом синхронизированный метод в writeCsvRow вызывает проблемы с некоторым потоком более высокого уровня, управляемым spring-amqp.
Мой вопрос в том, какая теория верна? Является ли влияние рекомендации повторной попытки проблемой?