Как заставить очередь Rabbit MQ работать в режиме хранения и пересылки?

Я экспериментирую с Rabbit MQ, используя клиент Python Pika. Я хотел бы, чтобы мой отправитель AMQP работал в режиме сохранения и пересылки, т. е. имел возможность начинать ставить сообщения в очередь, если сервер или сеть не работают, и надежно доставлять их позже. Как я могу это сделать? Мой код amqp-sender.py ниже:



    import pika
    import psutil
    import time
    import datetime
    import log
    import json
    import logging
    import uuid
    from dateutil.tz import tzlocal

    logging.basicConfig()
    logger = log.setup_custom_logger('amqp_send', 'amqp_send.log')

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='54.191.161.213'))
    channel = connection.channel()
    channel.confirm_delivery()

    channel.queue_declare(queue='ems.data')

    def get_mac_address():
        return ':'.join(['{:02x}'.format((uuid.getnode() >> i) & 0xff) for i in range(0,8*6,8)][::-1])

    while True:
        now = datetime.datetime.now(tzlocal())
        timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
        data = {
            'timestamp':timestamp,
            'systemId':get_mac_address(),
            'cpuPct':psutil.cpu_percent(),
            'memoryUsed':psutil.virtual_memory().used
        }
        msg=json.dumps(data)
        delivered=channel.basic_publish(exchange='', routing_key='abc', body=msg, mandatory=True)
        if delivered:
            logger.info("delivered %s" % msg)
        else:
            logger.error('failed to deliver %s' % msg)
        time.sleep(1)

    connection.close()


person user43995    schedule 17.08.2014    source источник


Ответы (1)


Вам нужно использовать channel.confirm_delivery() с mandatory=True

confirm_delivery будет возвращать логическое значение в зависимости от того, правильно ли сообщение было обработано Rabbit.

mandatory флаг:

Этот флаг указывает серверу, как реагировать, если сообщение не может быть направлено в очередь. В частности, если установлено обязательное значение и после выполнения привязок сообщение было помещено в нулевые очереди, то сообщение возвращается отправителю (с помощью basic.return). Если бы обязательный не был установлен при тех же обстоятельствах, сервер просто удалил бы сообщение.

Итак, у вас будет что-то вроде этого:

channel.confirm_delivery()
delivered = channel.basic_publish(exchange='', routing_key='ems.data', body=msg, mandatory=True)
if not delivered:
    # store message for later reprocessing
person Vor    schedule 18.08.2014
comment
Спасибо за ответ, я попытался реализовать ваши предложения, но доставлено по-прежнему True, даже несмотря на то, что мой сервер rabbitmq выключен. - person user43995; 19.08.2014