spring-amqp нулевое использование потребителя с не потокобезопасным слушателем

Мы столкнулись с проблемой в производственной среде, когда потребители не используют ресурсы, а очереди продолжают расти, а производительность снижается.

Каждый из потребителей представляет собой контейнер, который содержит один экземпляр не ориентированного на многопотоковое исполнение bean-компонента прослушивателя.

Каждый слушатель должен записывать в свой собственный набор файлов. Чтобы избежать конфликта потоков, я бы хотел, чтобы только один поток писал в свой собственный набор файлов.

Каждый слушатель создается только один раз с помощью @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

Я использую конфигурацию, подобную той, что описана в этом question< /а>

Каждый контейнер также настроен с советом о повторной попытке, который имеет следующий код:

public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
    private static final int DEFAULT_RETRY_COUNT = 5;
    private static final int DEFAULT_BACKOFF_MS = 250;
    private int retryCount;
    private int backOffPeriodInMS;

    public RetryMessageAdvice() {
        this.retryCount = DEFAULT_RETRY_COUNT;
        this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
        initializeRetryPolicy();
    }

    public RetryMessageAdvice(int retryCount, int backoff) {
        this.retryCount = retryCount;
        this.backOffPeriodInMS = backoff;
        initializeRetryPolicy();
    }

    public void initializeRetryPolicy() {

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(this.retryCount);

        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(backOffPeriodInMS);

        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        this.setRetryOperations(retryTemplate);
        this.setMessageRecoverer(new RetryMessageRecoverer());
    }

    public int getRetryCount() {
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }
}

Потребитель выглядит примерно так:

@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
    private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);

    private final ExportMapper exportMapper;
    private final ExportFormatter exportFormatter;

    @Autowired
    public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
            @Qualifier("exportMapper") ExportedMapper exportedMapper,
            @Value("${export.root.dir}") String exportDirectory) {
        super(exportDirectory);
        this.exportedFormatter = exportFormatter;
        this.exportedMapper = exportedMapper;
    }

    @Override
    public void handle(AnalyticsEvent analyticsEvent) throws Exception {

        ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);

        File csvFile = getCsvFile(exportedEvent);
        String csvRow = exportFormatter.writeAsString(exportedEvent);
        writeCsvRow(csvRow, csvFile);
    }
}

Что еще нужно отметить

  1. Преобразователь экспорта и средство форматирования экспорта являются потокобезопасными, но не используют @Scope (значение = ConfigurableBeanFactory.SCOPE_PROTOTYPE).
  2. Метод writeCsvRow является синхронизированным.
  3. Существует большое количество ошибок, из-за которых exportMapper выдает исключение и запускает совет повторной попытки.
  4. Скорость входящего сообщения 120/с
  5. Соотношение между входящим и доставленным тарифом обычно составляет 5:1.

Мои теории о том, что не так,

  1. Большое количество ошибок приводит к большому количеству повторных попыток и снижению производительности. Я бы лучше поместил плохое сообщение в очередь ошибок.
  2. Каким-то образом синхронизированный метод в writeCsvRow вызывает проблемы с некоторым потоком более высокого уровня, управляемым spring-amqp.

Мой вопрос в том, какая теория верна? Является ли влияние рекомендации повторной попытки проблемой?


person Fab    schedule 09.02.2016    source источник


Ответы (1)


  1. Если эти bean-компоненты также не потокобезопасны, они также должны быть областью действия прототипа.
  2. Поскольку есть только один поток, синхронизировать этот метод не обязательно, но это не помешает.
  3. Если ошибки неисправимы, вы должны настроить политику повторных попыток, чтобы не повторять эти исключения.

.

  1. С этими настройками повтора вы будете приостанавливать поток контейнера на 250 мс каждый раз, когда получаете сообщение об ошибке. Так да; это повредит производительности.
  2. Не должно быть значительных накладных расходов.
person Gary Russell    schedule 09.02.2016