Я хочу задать вам вопрос об архитектурной проблеме, с которой мы столкнулись на самом деле.
- у нас есть приложение, распределенное по нескольким серверам
- мы потребляем данные через activeMQ.
- у нас есть объект Price, который необходимо синхронизировать.
- этот объект может быть получен несколько раз.
- Цена имеет поле под названием «lastUpdate», и нам нужно сохранить/обновить цену с помощью самого последнего «lastUpdate».
- нет никаких гарантий, что последняя полученная цена имеет lastUpdate более поздний, чем предыдущий.
Чтобы синхронизировать все, у нас есть 3 способа (мы должны использовать их все)
- мы можем получить асинхронное обновление/вставку цены (используя прослушиватель)
- Каждые 15 минут мы будем просить производителя присылать нам все цены, которые были изменены за последние 15 минут (scheduledTask)
- Каждый час мы обновляем ВСЕ цены(schduledTask)
Используемые технологии:
- ActiveMQ как шина
- Hazelcast как распределенная карта для синхронизации перед вставкой
- JPA
Как это работает сейчас: каждый раз, когда приходит асинхронная цена/обновление, мы выполняем:
это то, что мы делаем для каждой цены, которую мы хотим обновить / сохранить (этот метод вызывает как одиночное, так и массовое обновление / вставка)
private void priceAddedOrUpdated(Price retrievedPrice)
{
Date dateInTheMap = hazelcastPriceMap.get(retrievedPrice.getId());
if(retrievedPrice.getLastUpdate>dateInTheMap(
{
//doInTransacion
try{
hazelcastPriceMap.lock(retrievedPrice.getId())
//do some logic including
persist the price
hazelcastPriceMap.put(retrievedPrice.getId(),retrievedPrice.getLastUpdate())
}
finally{
//release the lock
}
}
}
проблема в том, что для выполнения задач требуется время (30/40 секунд), в то время как мы хотим сделать это за 4/5 секунд (обрабатывая в среднем 100 тысяч цен). Логика, которую мы используем, кажется, не имеет никакого способа улучшить производительность, поэтому я думаю, что нам нужно изменить способ синхронизации данных... есть предложения?
отредактировано:
следуя предложению pveentjer, я расширил класс EntryProcessor, чтобы можно было передавать карту цен для обновления в конструкторе:
public class PriceEntryProcessor implements EntryProcessor, EntryBackupProcessor, Serializable {
Map<Long, Price> priceMap;
public PriceEntryProcessor(Map<Long, Price> priceMap)
{
super();
this.priceMap = priceMap;
}
public Object process(Map.Entry entry)
{//get the price from the map for the entry and do the logic/db insertion
}
я видел, что в EntryProcessor мы можем просто отправить значения ключей. Но мы используем в качестве ключа только цену.getId().