Обработка параллелизма: синхронизация против нескольких вызовов БД

Представьте себе банк MNC, который хочет реализовать API перевода счета, используя только базовую Java, а API будет использоваться в многопоточной среде и поддерживать согласованность суммы счета все время без тупика, конечно.

Я имею в виду два подхода к реализации этого и знаю их плюсы и минусы, как описано ниже, но не могу измерить, какие плюсы здесь более хороши в случае параллелизма или когда у нас больше нагрузки. Или какие минусы должны влиять больше в случае параллелизма или когда у нас больше нагрузки.

Пожалуйста, посоветуйте и предоставьте свое предложение.

Ссылка на репозиторий Github с полным кодом https://github.com/sharmama07/money-transfer

Прототип API: public boolean transferAmount(Integer fromAccountId, Integer toAccountId, Integer amount);

Подход 1: обработка параллелизма с помощью цикла while и оператора SQL для проверки предыдущего баланса в предложении where. Если предыдущий баланс не совпадает, вызов суммы обновления в учетной записи завершится ошибкой, и он извлечет последнюю сумму учетной записи из БД и попытается обновить ее снова, пока не обновит ее успешно. Здесь ни один поток не будет заблокирован, что означает отсутствие вероятности взаимоблокировки, отсутствие накладных расходов на приостановку потока и отсутствие задержки потока. Но у него может быть еще несколько вызовов БД

public boolean transferAmount(Integer fromAccountId, Integer toAccountId, Double amount) {

        boolean updated = false;

        try {
            while(!updated) {

                Account fromAccount = accountDAO.getAccount(fromAccountId);

                if(fromAccount.getAmount()-amount < 0) {throw new OperationCannotBePerformedException("Insufficient balance!");}

                int recordsUpdated = accountDAO.updateAccountAmount(fromAccount.getId(), fromAccount.getAmount(), 
                        fromAccount.getAmount()-amount);
                updated = (recordsUpdated==1);

            }
        }catch (OperationCannotBePerformedException e) {
            LOG.log(Level.SEVERE, "Debit Operation cannot be performed, because " + e.getMessage());
        }

        if(updated) {
            updated = false;
            try {
                while(!updated) {
                    Account toAccount = accountDAO.getAccount(toAccountId);
                    int recordsUpdated = accountDAO.updateAccountAmount(toAccount.getId(), toAccount.getAmount(), toAccount.getAmount()+amount);
                    updated = (recordsUpdated==1);
                }
            }catch (OperationCannotBePerformedException e) {
                LOG.log(Level.SEVERE, "Credit Operation cannot be performed, because " + e.getMessage());
                revertDebittransaction(fromAccountId, amount);
            }
        }


        return updated;
    }

// Account DAO call
@Override
    public Account getAccount(Integer accountId) throws OperationCannotBePerformedException {
        String SQL = "select id, amount from ACCOUNT where id="+accountId+"";
        ResultSet rs;
        try {
            rs = statement.executeQuery(SQL);
            if (rs.next()) {
                return new Account(rs.getInt(1), rs.getDouble(2));
            }
            return null;
        } catch (SQLException e) {
            LOG.error("Cannot retrieve account from DB, reason: "+ e.getMessage());
            throw new OperationCannotBePerformedException("Cannot retrieve account from DB, reason: "+ e.getMessage(), e);
        }

    }

    @Override
    public int updateAccountAmount(Integer accountId, Double currentAmount, Double newAmount) throws OperationCannotBePerformedException {
        String SQL = "update ACCOUNT set amount=" + newAmount +" where id="+accountId+" and amount="+currentAmount+"";
        int rs;
        try {
            rs = statement.executeUpdate(SQL);
            return rs;
        } catch (SQLException e) {
            LOG.error("Cannot update account amount, reason: "+ e.getMessage());
            throw new OperationCannotBePerformedException("Cannot update account amount, reason: "+ e.getMessage(), e);
        }
    }

Подход 2: Здесь другие потоки будут заблокированы, если одна и та же учетная запись находится в двух транзакциях в разных потоках,
но будет меньше вызовов БД

    public boolean transferAmount1(Integer fromAccountId, Integer toAccountId, Double amount) {

            boolean updated = false;
            Integer smallerAccountId = (fromAccountId<toAccountId)? fromAccountId: toAccountId;
            Integer largerAccountId =  (fromAccountId<toAccountId)? toAccountId:fromAccountId;

            synchronized(smallerAccountId) {
                synchronized(largerAccountId) {
                    try {
                        Account fromAccount = accountDAO.getAccount(fromAccountId);
                        if(fromAccount.getAmount()-amount < 0) {
                            throw new OperationCannotBePerformedException("Insufficient balance!");
                        }
                        int recordsUpdated = accountDAO.updateAccountAmount(fromAccount.getId(),
                            fromAccount.getAmount(), fromAccount.getAmount()-amount);
                        updated = (recordsUpdated==1);
                    }catch (OperationCannotBePerformedException e) {
                        LOG.log(Level.SEVERE, "Debit Operation cannot be performed, because " + e.getMessage());
                    }
                    // credit operation
                    if(updated) {
                        try {
                            updated = false;
                            Account toAccount = accountDAO.getAccount(toAccountId);
                            int recordsUpdated = accountDAO.updateAccountAmount(toAccount.getId(),
                                toAccount.getAmount(), toAccount.getAmount()+amount);
                            updated = (recordsUpdated==1);
                        }catch (OperationCannotBePerformedException e) {
                            LOG.log(Level.SEVERE, "Credit Operation cannot be performed, because " + e.getMessage());
                            revertDebittransaction(fromAccountId, amount);
                        }
                    }
                }
            }

            return updated;
    }

person Mayur Sharma    schedule 27.01.2020    source источник
comment
Я думаю, что вы слишком много думаете об этом. Вам нужно только проверить баланс для снятия средств - в т.ч. отрицательная часть передачи. Все операции с БД должны быть атомарными, и вы можете использовать select ... for update см. stackoverflow.com/questions/46995155/   -  person Scary Wombat    schedule 27.01.2020
comment
Если БД - это SQL, то я ожидаю, что одна транзакция SQL справится с этим, возможно, в то время как объекты памяти, представляющие эти банковские счета, заблокированы (отдельно).   -  person root    schedule 08.02.2020


Ответы (1)


Я полагаю, мы предполагаем, что по какой-то причине транзакции БД непригодны для этой схемы. Потому что с транзакциями БД ничего из этого не нужно.

Первая схема должна работать, но она небезопасна. Если первый цикл обновления работает, но второй терпит неудачу, первый аккаунт теряет деньги, не внося их на второй. Однако есть лучшие способы справиться с этим. Циклы чтения-операции-обновления по своей сути являются колоритными, поэтому вы можете изменить функцию updateAccountAmount, чтобы получить дельту вместо конечного значения, и обновить сумму, если конечная сумма >=0. Это set amount=amount+delta where amount+delta>0. Это будет как проверять, так и обновлять. Вам не понадобятся циклы while, потому что если первая операция завершится ошибкой, то нет смысла повторять попытку.

Вторая реализация, как она написана, неверна. Вам нужно будет синхронизировать общий объект, соответствующий номеру исходящей учетной записи и номеру учетной записи, чтобы предотвратить одновременные обновления. Вы можете использовать HashMap, чтобы получить объект блокировки для идентификатора, а затем заблокировать этот объект. Вы должны синхронизировать доступ к хэш-карте, и как только вы получите два объекта, вы должны заблокировать идентификаторы в одном и том же порядке для всех потоков, чтобы предотвратить взаимоблокировки (что вы, кажется, уже сделали, но ваша синхронизация находится в стеке переменная, следовательно, не будет препятствовать параллельному доступу). Однако, как только вы это исправите, он должен работать, пока у вас работает только один экземпляр этого приложения. Тогда у вас возникнет проблема с постоянно растущим хэш-картой.

Более простая схема заключается в использовании блокировки. Вы можете использовать набор блокировок, совместно используемый потоками, с целочисленными ключами, соответствующими номерам учетных записей. Поскольку метод блокировки является синхронизированным, вам не нужно беспокоиться о блокировке или другом. Затем:

public boolean lock(Integer id1, Integer id2) {
  synchronized(lockSet) {
    if(!lockSet.contains(id1) && !lockSet.contains(id2)) {
       lockSet.put(id1)
       lockSet.put(id2)
       return true
    }
    return false
  }
}

public void unlock(Integer id1, Integer id2) {
  synchronized(lockSet) {
     lockSet.remove(id1)
     lockSet.remove(id2)
  }
}

Затем блокируйте перед операциями с базой данных и разблокируйте после них. Это предотвращает вход любого другого потока, работающего с этими идентификаторами учетных записей. Эта схема страдает от тех же проблем, что и другие: если первое обновление пройдет успешно, а второе не удастся, вы потеряете деньги.

person Burak Serdar    schedule 03.02.2020