комбу не подключается к RabbitMQ

У меня есть два сервера, назовем их A и B. B запускает RabbitMQ, а A подключается к RabbitMQ через Kombu. Если я перезапущу RabbitMQ на B, соединение kombu разорвется, и сообщения больше не будут доставляться. Затем мне нужно сбросить процесс на A, чтобы восстановить соединение. Есть ли лучший подход, то есть есть ли способ автоматического повторного подключения Kombu, даже если процесс RabbitMQ перезапущен?

Моя базовая реализация кода приведена ниже, заранее спасибо! :)

def start_consumer(routing_key, incoming_exchange_name, outgoing_exchange_name):
    global rabbitmq_producer

    incoming_exchange = kombu.Exchange(name=incoming_exchange_name, type='direct')
    incoming_queue = kombu.Queue(name=routing_key+'_'+incoming_exchange_name, exchange=incoming_exchange, routing_key=routing_key)#, auto_delete=True)

    outgoing_exchange = kombu.Exchange(name=outgoing_exchange_name, type='direct')
    rabbitmq_producer = kombu.Producer(settings.rabbitmq_connection0, exchange=outgoing_exchange, serializer='json', compression=None, auto_declare=True)

    settings.rabbitmq_connection0.connect()
    if settings.rabbitmq_connection0.connected:
        callbacks=[]
        queues=[]

        callbacks.append(callback)
        # if push_queue:
        #   callbacks.append(push_message_callback)
        queues.append(incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (incoming_exchange.name, incoming_queue.name)
        incoming_exchange(settings.rabbitmq_connection0).declare()
        incoming_queue(settings.rabbitmq_connection0).declare()

        print 'opening a new *outgoing* rabbitmq connection to the %s exchange' % outgoing_exchange.name
        outgoing_exchange(settings.rabbitmq_connection0).declare()

        with settings.rabbitmq_connection0.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                settings.rabbitmq_connection0.drain_events()

person vgoklani    schedule 01.06.2016    source источник
comment
Это не для комбу, но недавно я написал надежный потребительский пример для своей собственной библиотеки. Во всяком случае, вы, вероятно, могли бы использовать его для комбу/пика. github.com/eanandersson/amqpstorm/blob/stable/examples/   -  person eandersson    schedule 02.06.2016


Ответы (1)


Со стороны потребителя комбу. mixins.ConsumerMixin обрабатывает повторное подключение, когда соединение прерывается (а также выполняет пульсацию и т. д., что позволяет писать меньше кода). К сожалению, похоже, что ProducerMixin нет, но вы могли бы покопаться в коде и адаптировать его...?

person gimboland    schedule 09.08.2016