Массовое обновление и одиночное асинхронное обновление с Java, распределенное приложение

Я хочу задать вам вопрос об архитектурной проблеме, с которой мы столкнулись на самом деле.

  • у нас есть приложение, распределенное по нескольким серверам
  • мы потребляем данные через activeMQ.
  • у нас есть объект Price, который необходимо синхронизировать.
  • этот объект может быть получен несколько раз.
  • Цена имеет поле под названием «lastUpdate», и нам нужно сохранить/обновить цену с помощью самого последнего «lastUpdate».
  • нет никаких гарантий, что последняя полученная цена имеет lastUpdate более поздний, чем предыдущий.

Чтобы синхронизировать все, у нас есть 3 способа (мы должны использовать их все)

  • мы можем получить асинхронное обновление/вставку цены (используя прослушиватель)
  • Каждые 15 минут мы будем просить производителя присылать нам все цены, которые были изменены за последние 15 минут (scheduledTask)
  • Каждый час мы обновляем ВСЕ цены(schduledTask)

Используемые технологии:

  • ActiveMQ как шина
  • Hazelcast как распределенная карта для синхронизации перед вставкой
  • JPA

Как это работает сейчас: каждый раз, когда приходит асинхронная цена/обновление, мы выполняем:

это то, что мы делаем для каждой цены, которую мы хотим обновить / сохранить (этот метод вызывает как одиночное, так и массовое обновление / вставка)

private void priceAddedOrUpdated(Price retrievedPrice)
{
    Date dateInTheMap = hazelcastPriceMap.get(retrievedPrice.getId());
    if(retrievedPrice.getLastUpdate>dateInTheMap(
    {
        //doInTransacion
        try{
            hazelcastPriceMap.lock(retrievedPrice.getId())
            //do some logic including
            persist the price
            hazelcastPriceMap.put(retrievedPrice.getId(),retrievedPrice.getLastUpdate())
        }
        finally{
             //release the lock
        }
    }
}

проблема в том, что для выполнения задач требуется время (30/40 секунд), в то время как мы хотим сделать это за 4/5 секунд (обрабатывая в среднем 100 тысяч цен). Логика, которую мы используем, кажется, не имеет никакого способа улучшить производительность, поэтому я думаю, что нам нужно изменить способ синхронизации данных... есть предложения?

отредактировано:

следуя предложению pveentjer, я расширил класс EntryProcessor, чтобы можно было передавать карту цен для обновления в конструкторе:

    public class PriceEntryProcessor implements EntryProcessor, EntryBackupProcessor, Serializable {

    Map<Long, Price> priceMap;

    public PriceEntryProcessor(Map<Long, Price> priceMap)
    {
        super();
        this.priceMap = priceMap;
    }
    public Object process(Map.Entry entry)
    {//get the price from the map for the entry and do the logic/db insertion

}

я видел, что в EntryProcessor мы можем просто отправить значения ключей. Но мы используем в качестве ключа только цену.getId().


person user1974059    schedule 07.11.2015    source источник
comment
Измеряли ли вы производительность, чтобы найти точки доступа?   -  person Kayaman    schedule 07.11.2015


Ответы (1)


Одним из потенциальных ускорений может быть использование EntryProcessor.

В EntryProcessor вы получаете блокировку бесплатно, поскольку вы получаете гарантию того, что никакой другой процесс не работает в том же разделе. Логика базы данных, которую вы также перемещаете в EntryProcessor. Это сократит удаленность вдвое.

Вы занимаетесь пакетированием? Потому что это может иметь огромное значение. Таким образом, пакет, например, 100 изменений для одного раздела, как только они будут получены, используйте процессор ввода для обработки всего этого пакета за один раз.

Без пакетной обработки у вас было бы 100 x (1 блокировка, 1 разблокировка, 1 получение, 1 размещение) = 400 операций. При пакетной обработке 100 у вас будет 1 удаленная операция. Так что в 400 раз меньше (без учета репликации).

person pveentjer    schedule 08.11.2015
comment
спасибо пвентджер! я читаю учебник для EntryProcessor. Действительно, это ускорит наше удаленное взаимодействие обновлений вдвое! - person user1974059; 08.11.2015
comment
Также подумайте о пакетировании. Пакетная обработка может существенно повлиять на производительность. - person pveentjer; 08.11.2015
comment
В нашей общей карте мы храним только priceId (ключ), lastUpdate (значение). Мы делаем это, чтобы не хранить слишком много объектов в памяти. Таким образом, решение может быть таким: создайте EntryProcessorPrice, который расширяет EntryProcessor и имеет поле Map‹Long,Price›. создайте экземпляр, передающий карту с ценами, которые должны быть обработаны. вызовите executeOnEntries, который выполнит обновление только цен, содержащихся в карте. - person user1974059; 08.11.2015
comment
есть возможность (или пример) передать некоторый параметр в EntryProcessor, а не единственную раскладку клавиатуры? Мне нужно выполнить некоторую операцию, но это должно быть сделано на основе полученных цен. - person user1974059; 12.11.2015
comment
что ты имеешь в виду? Вы можете передать параметры EntryProcecessor, используя, например. конструктор вашего собственного созданного класса/экземпляра EntryProcessor. - person pveentjer; 13.11.2015