Как сделать простой Pika SelectConnection для отправки сообщения в python?

Я пытаюсь преобразовать свой код для отправки сообщений rabbitmq через Pika. У меня много проблем с пониманием того, как отправить простое сообщение с использованием асинхронного соединения (например, SelectConnection).

В моем старом коде, в котором я использую библиотеку amqp, я просто объявляю такой класс:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 

Затем в другом месте своего кода я вызываю sendMQ("это мое сообщение"), после чего код продолжается. Мне не нужно слушать подтверждения и т. д.

Может ли кто-нибудь написать простой класс, использующий pika и SelectConnection, который также будет работать, чтобы просто отправить сообщение с помощью sendMQ («это мое сообщение»)? Я просмотрел примеры pika, но не знаю, как обойти ioloop и KeyboardInterrupt. Я думаю, я просто не уверен, как заставить мой код продолжать работать без всех этих попыток/исключений... Кроме того, я не совсем уверен, как я могу передать свое сообщение через все обратные вызовы...

Любая помощь приветствуется!

Спасибо.


person TheBear    schedule 19.05.2015    source источник


Ответы (2)


Все это управляется обратным вызовом, так как это асинхронный способ ведения дел. Асинхронный потребитель легко понять, мы можем получить сообщение, предоставив функцию обратного вызова. Однако часть издателя немного сложна для понимания, по крайней мере, для новичка.

Обычно нам нужна очередь для связи, и издатель периодически получает от нее данные.

Ключевым моментом использования SelectConnection является регистрация функции публикации сообщений в цикле обработки событий, что может быть выполнено с помощью connection.add_timeout. После того, как вы закончите публикацию, зарегистрируйте следующий раунд публикации.

Следующий вопрос, где поставить первоначальную регистрацию. Первоначальную регистрацию можно сделать в обратном вызове открытия канала.

Ниже приведен фрагмент кода для лучшего понимания. Имейте в виду, что это не готово к производству. Потому что он публикует сообщения только с максимальной скоростью 10 в секунду. Вам нужно настроить интервал публикации и опубликовать больше сообщений за один обратный звонок.

class MQ(Object):
    def __init___(self, queue):
        self.queue = queue
    def on_channel_open(self, chn):
        self.channel = chn
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def schedule_next_message(self):
        try:
            msg = self.queue.get(True, 0.01)
            self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
        except Queue.Empty:
            pass
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def on_open(self, conn):
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)
    def run(self):
        # create a connection
        self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
        try:
            self.connection.ioloop.start()
        except Exception:
            print("exception in publisher")
            self.connection.close()
            self.connection.ioloop.start()

Поместите MQ(queue).run() в отдельный поток, и всякий раз, когда вы хотите поместить сообщение в mq, просто поместите его в объект очереди.

person TerrenceSun    schedule 15.08.2019

В качестве первого подхода я рекомендую вам начать с этих примеров pub/sub, приведенных в конце поста. Как только вы поймете этот простой пример, начните следовать руководству, представленному прямо перед блоками кода в конце. Учебник, который имеет 6 различных вариантов использования, с примерами Python. С помощью 5 первых шагов вы поймете, как это работает. У вас должна быть четкая концепция обмена (сущность, которая направляет сообщения в каждую очередь), ключ привязки (ключ, используемый для соединения обмена и очереди), ключ маршрутизации (ключ, который отправляется вместе с сообщением от издателя и который используется обменом для направления сообщения в ту или иную очередь) и очередь (буфер, который может хранить сообщения, может иметь более 1 (или 1, если требуется) подписчика и который может получать сообщения от более чем 1 обмена и на основе разных обязательные ключи). Кроме того, существует более одного типа обмена (разветвление, тема (вероятно, это то, что вам нужно)...).

Если все это звучит по-новому, пожалуйста, следуйте руководству, предоставленному RabbitMQ:

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

паб.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                  routing_key='hello',
                  body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

sub.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                  queue='hello',
                  no_ack=True)

channel.start_consuming()
person ederollora    schedule 26.05.2015
comment
Здорово, что вы пытаетесь помочь, но он спрашивал об адаптере SelectConnection. Упомянутые вами примеры используют BlockingConnection. - person eandersson; 04.06.2015
comment
Пример кода с SelectConnection был бы более наглядным. - person mprat; 28.10.2015