Использование сигналов django в классах потребителей каналов

Я пытаюсь разработать систему типа аукциона, когда покупатель делает заказ, а затем разные магазины могут предлагать цену для этого заказа.

Интересной частью этой системы является то, что при первоначальном создании заказа у доступных магазинов будет 60 секунд, чтобы сделать соответствующее предложение. Когда первый магазин делает свое предложение, у «аукциона» теперь есть только следующие 20 секунд, чтобы другие магазины сделали свое собственное предложение. Если они все же сделают другое предложение за это меньшее выделенное время, то эти 20 секунд будут обновлены. Предложения могут поступать до тех пор, пока есть достаточно времени, которое не может превышать начальные предоставленные 60 секунд.

class Order(models.Model):
    customer = models.ForeignKey(Customer)
    create_time = models.DateTimeField(auto_now_add=True)
    update_time = models.DateTimeField(auto_now_add=True)
    total = models.FloatField(default=0)
    status = models.IntegerField(default=0)
    delivery_address = models.ForeignKey(DeliveryAddress)
    store = models.ForeignKey(Store, null=True, blank=True, related_name='orders', on_delete=models.CASCADE)
    credit_card = models.ForeignKey(CreditCard, null=True, blank=True, related_name='orders')

class OrderOffer(models.Model):
    store = models.ForeignKey(Store, related_name="offers", on_delete=models.CASCADE)
    order = models.ForeignKey(Order, related_name="offers", on_delete=models.CASCADE)
    create_time = models.DateTimeField(auto_now_add=True)

Помимо этих требований, я также хочу обновлять клиента при поступлении новых предложений в режиме реального времени. Для этого я использую django-channels реализацию WebSockets.

У меня есть следующий consumers.pyfile:

from channels.generic.websockets import WebsocketConsumer
from threading import Timer
from api.models import Order, OrderOffer
from django.db.models.signals import post_save
from django.dispatch import receiver

class OrderConsumer(WebsocketConsumer):

    def connect(self, message, **kwargs):
        """
        Initialize objects here.
        """
        order_id = int(kwargs['order_id'])
        self.order = Order.objects.get(id=order_id)
        self.timer = Timer(60, self.sendDone)
        self.timer.start()
        self.message.reply_channel.send({"accept": True})

    def sendDone(self):
        self.send(text="Done")

    # How do I bind self to onOffer?
    @receiver(post_save, sender=OrderOffer)
    def onOffer(self, sender, **kwargs):
        self.send(text="Offer received!")
        if (len(self.offers) == 0):
            self.offerTimer = Timer(20, self.sendDone)
            self.offers = [kwargs['instance'],]
        else:
            self.offerTimer = Timer(20, self.sendDone)

        self.offers.append(kwargs['instance'])


    def receive(self, text=None, bytes=None, **kwargs):
        # Echo
        self.send(text=text, bytes=bytes)

    def disconnect(self, message, **kwargs):
        """
        Perform necessary disconnect operations.
        """
        pass

Мне удалось установить канал связи WebSocket между моим клиентом и сервером. Я протестировал отправку сообщений, и все вроде нормально. Теперь я хочу обнаружить создание новых OrderOffer и отправить уведомление клиенту. Для этого мне нужен доступ к переменной self, чтобы использовать self.send, что невозможно, так как декоратор сигналов не отправляет этот параметр. Я попытался заставить его объявить onOffer с помощью self, но получаю следующую ошибку:

TypeError: onOffer() missing 1 required positional argument: 'self'

Если бы я мог каким-то образом получить доступ к аргументам ключевого слова, которые устанавливают сигналы, я мог бы сделать что-то вроде: context = self.

Буду признателен за любую помощь или даже альтернативные решения моей первоначальной проблемы.


person jhc    schedule 06.10.2017    source источник


Ответы (3)


Если кто-то наткнется на это, я решил это в signals.py. У меня есть Job, и мне нужно отправлять его status клиенту каждый раз, когда он изменяется. Это мой signals.py:

import channels.layers
from asgiref.sync import async_to_sync

from django.db.models.signals import post_save
from django.dispatch import receiver

from .models import Job


def send_message(event):
    '''
    Call back function to send message to the browser
    '''
    message = event['text']
    channel_layer = channels.layers.get_channel_layer()
    # Send message to WebSocket
    async_to_sync(channel_layer.send)(text_data=json.dumps(
        message
    ))


@receiver(post_save, sender=Job, dispatch_uid='update_job_status_listeners')
def update_job_status_listeners(sender, instance, **kwargs):
    '''
    Sends job status to the browser when a Job is modified
    '''

    user = instance.owner
    group_name = 'job-user-{}'.format(user.username)

    message = {
        'job_id': instance.id,
        'title': instance.title,
        'status': instance.status,
        'modified': instance.modified.isoformat(),
    }

    channel_layer = channels.layers.get_channel_layer()

    async_to_sync(channel_layer.group_send)(
        group_name,
        {
            'type': 'send_message',
            'text': message
        }
    )

Кстати, у меня есть Consumer class JobUserConsumer(AsyncWebsocketConsumer), где я определяю группы:

async def connect(self):

    user = self.scope["user"]
    self.group_name = 'job-user-{}'.format(user.username)

    await self.channel_layer.group_add(
        self.group_name,
        self.channel_name
    )

    await self.accept()

Проект, который я использовал, находится здесь: https://github.com/ornl-ndav/django-remote-submission/tree/master/django_remote_submission

person RicLeal    schedule 19.04.2018
comment
Я должен сказать, что мне пришлось бороться с этой реализацией. Вот ссылка на мой репозиторий, который успешно реализует этот пример - Django сигнализирует о завершении WebSockets - person Lukasz Dynowski; 17.05.2021

Для тех, у кого все еще есть проблемы с веб-сокетами, это может быть полезно:

from api.models import Order, OrderOffer
from asgiref.sync import async_to_sync
import channels.layers
from channels.generic.websocket import JsonWebsocketConsumer
from django.db.models import signals
from django.dispatch import receiver


class OrderOfferConsumer(JsonWebsocketConsumer):
    def connect(self):
        async_to_sync(self.channel_layer.group_add)(
            'order_offer_group',
            self.channel_name
        )
        self.accept()

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            'order_offer_group',
            self.channel_name
        )
        self.close()

    def receive_json(self, content, **kwargs):
        print(f"Received event: {content}")

    def events_alarm(self, event):
        self.send_json(event['data'])

    @staticmethod
    @receiver(signals.post_save, sender=OrderOffer)
    def order_offer_observer(sender, instance, **kwargs):
        layer = channels.layers.get_channel_layer()
        async_to_sync(layer.group_send)('order_offer_group', {
            'type': 'events.alarm',
            'data': {
                'text': 'Offer received',
                'id': instance.pk
            }
        })

В urls.py вам нужно зарегистрировать новый маршрут webscoket:

websocket_urlpatterns = [url(r'^order_offer$', OrderOfferConsumer)]
person Tavy    schedule 07.06.2019
comment
Спасибо! Этот ответ был очень полезным. - person Ajay Lingayat; 17.12.2020
comment
Я должен сказать, что мне пришлось бороться с этой реализацией. Вот ссылка на мой репозиторий, который успешно реализует этот пример - Django сигнализирует о завершении WebSockets - person Lukasz Dynowski; 17.05.2021

Если вы хотите поговорить с потребителем «извне» - в данном случае из метода сохранения модели - вам необходимо использовать уровень канала, чтобы поговорить с ним: http://channels.readthedocs.io/en/latest/themes/channel_layers.html

По сути, вам необходимо:

  • Добавить потребителя в группу при запуске (возможно, на основе его идентификатора заказа)
  • Отправлять сообщение Группе всякий раз, когда появляется новый OrderOffer с настраиваемым type - например, {"type": "order.new_offer", "order_offer_id": 45}
  • Определите обработчик на Consumer, который обрабатывает это - он соответствует имени типа, поэтому в этом случае это будет def order_new_offer(self, event):
  • В этом обработчике вы можете затем использовать self.send, чтобы отключить сокет (и запросить базу данных, если вам нужна дополнительная информация для отправки клиенту, который вы не поместили в сообщение о событии).

Вы можете увидеть вариант этого в примере проекта MultiChat: https://github.com/andrewgodwin/channels-examples/tree/master/multichat.

person Andrew Godwin    schedule 14.02.2018