Мы действительно делаем это во многих проектах, даже в инструментах командной строки WordPress, и мы используем эту библиотеку чтобы сделать это вместе с this для транспорта. Он не требует Symfony и может работать с большинством систем очередей, соответствующих общему стандарту.
Общая идея состоит в том, что вы хотите, чтобы что-то (возможно, одноэлементное) возвращало экземпляр Interop\Queue\Context
, и вот что мы используем:
function createContext(): \Interop\Queue\Context
{
$factory = new \Enqueue\Dbal\DbalConnectionFactory(
sprintf(
'mysql://%1$s:%2$s@%3$s/%4$s',
DB_USER,
DB_PASSWORD,
DB_HOST,
DB_NAME
)
);
$context = $factory->createContext();
$context->createDataBaseTable();
return $context;
}
Вам также понадобится что-то для обработки каждого сообщения, и вы захотите передать ему сообщение и потребителя:
function handleMessage($message, $consumer)
{
// Business logic here
if($business_logic_failed) {
$context = createContext();
$failed_queue = $context->createQueue('FAILED_QUEUE_HERE');
$context->createProducer()->send($failed_queue, $message);
} else {
$consumer->acknowledge($message);
}
}
Затем использовать:
$context = createContext();
$queue = $context->createQueue('QUEUE_NAME_HERE');
$consumer = $context->createConsumer($queue);
// This can be an infinite loop, or a loop for 10 messages and exit, whatever your logic
while(true) {
// This command will block unless you pass a timeout, so no sleep is needed
$message = $consumer->receive(/* optional timeout here */);
handleMessage($message, $consumer);
// Do whatever you want with message
}
Посыпьте много попыток / уловок и вокруг этого, и убедитесь, что независимо от того, что вы подтверждаете или каким-либо образом проваливаете сообщение.
person
Chris Haas
schedule
25.11.2020