Лучший способ использовать таблицу БД в качестве очереди заданий (она же пакетная очередь или очередь сообщений)

У меня есть таблица баз данных с ~ 50 тыс. строк, каждая строка представляет собой задание, которое необходимо выполнить. У меня есть программа, которая извлекает задание из БД, выполняет задание и возвращает результат в БД. (эта система работает прямо сейчас)

Теперь я хочу разрешить более чем одной задаче обработки выполнять задания, но быть уверенным, что ни одна задача не будет выполняться дважды (из соображений производительности, а не из-за того, что это вызовет другие проблемы). Поскольку доступ осуществляется через хранимую процедуру, мой текущий вопрос заключается в том, чтобы заменить указанную хранимую процедуру чем-то вроде этого

update tbl 
set owner = connection_id() 
where available and owner is null limit 1;

select stuff 
from tbl 
where owner = connection_id();

КСТАТИ; рабочие задачи могут прервать связь между получением работы и отправкой результатов. Кроме того, я не ожидаю, что БД даже приблизится к тому, чтобы стать узким местом, если я не испорчу эту часть (~ 5 заданий в минуту).

Есть ли какие-либо проблемы с этим? Есть лучший способ сделать это?

Примечание: "База данных как антишаблон IPC" здесь лишь немного уместна, потому что

  1. Я не делаю IPC (нет процесса, генерирующего строки, они все уже существуют прямо сейчас) и
  2. основная проблема, описанная для этого анти-шаблона, заключается в том, что он приводит к ненужной нагрузке на БД, поскольку процессы ждут сообщений (в моем случае, если сообщений нет, все может завершиться, когда все будет сделано)

person BCS    schedule 17.11.2008    source источник
comment
Правильно - плохо = синхронный IPC с блокировкой на DBMS SELECT как чтение. Вероятно, вы делаете это как стратегию введения асинхронности.   -  person dkretz    schedule 18.11.2008
comment
Кстати, если вы хотите поставить считыватели на таймер, полезно, чтобы они проверяли нечасто, но если они найдут работу, они могут опустошить очередь перед тем, как снова заснуть.   -  person dkretz    schedule 18.11.2008
comment
Обратите внимание на мое редактирование: если они не найдут работу, они никогда не найдут работу. Но если бы это было не так...   -  person BCS    schedule 18.11.2008


Ответы (6)


Вот что я успешно использовал в прошлом:

Схема таблицы MsgQueue

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

Типы ваших сообщений соответствуют вашим ожиданиям: сообщения, которые соответствуют контракту между процессом(ами) вставки и процессом(ями) чтения, структурированы с помощью XML или другого выбранного вами представления (JSON может быть удобен в некоторых случаях, для пример).

Затем процессы от 0 до n могут выполнять вставку, а процессы от 0 до n могут читать и обрабатывать сообщения. Каждый процесс чтения обычно обрабатывает один тип сообщения. Несколько экземпляров типа процесса могут быть запущены для балансировки нагрузки.

Читатель извлекает одно сообщение и меняет состояние на «Активно», пока работает с ним. Когда это будет сделано, он изменит состояние на «Завершено». Он может удалить сообщение или нет, в зависимости от того, хотите ли вы сохранить контрольный журнал. Сообщения State = 'N' извлекаются в порядке MsgType/Timestamp, поэтому есть индекс для MsgType + State + CreateTime.

Варианты:
Состояние ошибки.
Столбец для кода процесса чтения.
Временные метки для переходов состояний.

Это обеспечило хороший, масштабируемый, видимый, простой механизм для выполнения ряда действий, подобных описанным вами. Если у вас есть базовые знания о базах данных, это довольно надежно и расширяемо.


Код из комментариев:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN
person dkretz    schedule 17.11.2008
comment
Часть, описываемая как Читатель, извлекает одно сообщение и меняет состояние на Активно, пока работает с ним. это та часть, которая меня интересует. Как вы это делаете? (кроме того, похоже, что мой такой же, как и ваш, за исключением того, что не нужно для моего случая.) - person BCS; 18.11.2008
comment
Правильно, для этого требуется несколько операторов SQL между BEGIN TRAN и COMMIT TRAN. Сразу после этого — SP для извлечения следующего сообщения — немного подправили, я пропустил перехват ошибок, так как он был написан до TRY/CATCH. - person dkretz; 18.11.2008
comment
-- ЧАСТЬ 1 СОЗДАТЬ ПРОЦЕДУРУ GetMessage @MsgType VARCHAR(8) ) AS DECLARE @MsgId INT BEGIN TRAN SELECT TOP 1 @MsgId = MsgId FROM MsgQueue WHERE MessageType = @pMessageType AND State = 'N' ORDER BY CreateTime - person dkretz; 18.11.2008
comment
ЧАСТЬ 2 ЕСЛИ @MsgId НЕ NULL НАЧНИТЕ ОБНОВЛЕНИЕ MsgQueue SET State = 'A' WHERE MsgId = @MsgId SELECT MsgId, Msg FROM MsgQueue WHERE MsgId = @MsgId END ELSE BEGIN SELECT MsgId = NULL, Msg = NULL END COMMIT TRAN - person dkretz; 18.11.2008
comment
что, если мне нужно выбрать более (несколько) одной строки (строк) за раз? можно обновить все одновременно? - person Amitd; 22.12.2009
comment
Предполагая, что вы отмечаете их общей отметкой времени или идентификатором пакета выбора, вы можете обновить их все в одном выражении, да. Или используйте состояние A, описанное выше, и обновите, где состояние = 'A'. - person dkretz; 22.12.2009

Лучший способ реализовать очередь заданий в системе реляционной базы данных — использовать SKIP LOCKED.

SKIP LOCKED — это вариант получения блокировки, который применяется как к блокировкам чтения/разделения (FOR SHARE), так и к блокировкам записи/исключения (FOR UPDATE) и широко поддерживается в настоящее время:

  • Oracle 10g и выше
  • PostgreSQL 9.5 и выше
  • SQL Server 2005 и более поздние версии
  • MySQL 8.0 и выше

Теперь представьте, что у нас есть следующая таблица post:

стол таблицы

Столбец status используется как Enum со значениями:

  • PENDING (0),
  • APPROVED (1),
  • SPAM (2).

Если у нас есть несколько одновременных пользователей, пытающихся модерировать записи post, нам нужен способ координировать их усилия, чтобы избежать проверки одной и той же строки post двумя модераторами.

Итак, SKIP LOCKED — это именно то, что нам нужно. Если два одновременных пользователя, Алиса и Боб, выполняют следующие запросы SELECT, которые блокируют исключительно записи сообщений, а также добавляют параметр SKIP LOCKED:

[Alice]:
SELECT
    p.id AS id1_0_,1
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
 
[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

Мы видим, что Алиса может выбрать первые две записи, в то время как Боб выбирает следующие две записи. Без SKIP LOCKED запрос на получение блокировки Боба будет блокироваться до тех пор, пока Алиса не снимет блокировку с первых двух записей.

person Vlad Mihalcea    schedule 16.04.2019

Вместо того, чтобы иметь owner = null, когда он не принадлежит, вы должны вместо этого установить поддельную запись Nobody. Поиск нулевого значения не ограничивает индекс, вы можете в конечном итоге просмотреть таблицу. (это для оракула, SQL-сервер может быть другим)

person Nathan Lee    schedule 17.11.2008

В качестве возможного изменения технологии вы можете рассмотреть возможность использования MSMQ или чего-то подобного.

Каждое из ваших заданий/потоков может запрашивать очередь сообщений, чтобы узнать, доступно ли новое задание. Поскольку акт чтения сообщения удаляет его из стека, вы можете быть уверены, что сообщение получит только одно задание/поток.

Конечно, это при условии, что вы работаете с платформой Microsoft.

person NotMe    schedule 17.11.2008
comment
У меня есть данные в БД, когда я закончу, мне нужны данные в БД. В моем случае я не вижу смысла добавлять еще один компонент в систему. (Кстати microsoft.com/windowsserver2003/technologies/msmq/default.mspx) - person BCS; 18.11.2008

Смотрите ответ Влада для контекста, я просто добавляю эквивалент в Oracle, потому что есть несколько ошибок, о которых нужно знать.

То

SELECT * FROM t order by x limit 2 FOR UPDATE OF t SKIP LOCKED

не будет напрямую транслироваться в Oracle, как вы могли бы ожидать. Если мы рассмотрим несколько вариантов перевода, мы можем попробовать любой из следующих:

SQL> create table t as
  2   select rownum x
  3   from dual
  4   connect by level <= 100;

Table created.

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x for update skip locked fetch first 2 rows only;
  5  end;
  6  /
  open rc for select * from t order by x for update skip locked fetch first 2 rows only;
                                                                *
ERROR at line 4:
ORA-06550: line 4, column 65:
PL/SQL: ORA-00933: SQL command not properly ended
ORA-06550: line 4, column 15:
PL/SQL: SQL Statement ignored

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x fetch first 2 rows only for update skip locked ;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

или, возможно, попробуйте вернуться к опции ROWNUM

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from ( select * from t order by x ) where rownum <= 10 for update skip locked;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

И не получишь никакой радости. Таким образом, вам нужно самостоятельно контролировать выборку n строк. Таким образом, вы можете написать что-то вроде:

SQL> declare
  2    rc sys_refcursor;
  3    res1 sys.odcinumberlist := sys.odcinumberlist();
  4  begin
  5    open rc for select * from t order by x for update skip locked;
  6    fetch rc bulk collect into res1 limit 10;
  7  end;
  8  /

PL/SQL procedure successfully completed.
person Connor McDonald    schedule 26.10.2020

Вы пытаетесь реализовать антипаттерн "База данных как IPC". Посмотрите его, чтобы понять, почему вам следует подумать о том, чтобы правильно перепроектировать свое программное обеспечение.

person Krunch    schedule 17.11.2008
comment
Откуда вы знаете, что в данном случае это антипаттерн или что дизайн программы неправильный? У вас нет никакого контекста, на котором можно основывать этот комментарий. - person Greg Beech; 18.11.2008
comment
Я назвал это полезным шаблоном для асинхронного IPC. Вы можете настроить его так, чтобы он работал как любая очередь сообщений, и, по моему опыту, они не являются фирменными антипаттернами. - person dkretz; 18.11.2008
comment
Вот ссылка на антипаттерн — tripatlas.com/Database_as_an_IPC. Разница в том, что мы обсуждаем использование базы данных как очередь сообщений, а не как механизм взаимодействия процессов. - person dkretz; 18.11.2008
comment
Использование базы данных в качестве очереди сообщений является антишаблоном. Вы получите конкуренцию за блокировку на уровне инь-ян, и если вы используете систему MVCC с несколькими рабочими процессами, вы получите туманное состояние для любой записи. Вы должны использовать брокер очереди сообщений, такой как RabbitMQ. - person jasonjwwilliams; 09.11.2011