Использование каналов Django для отправки информации всем подключенным клиентам

Я создаю систему, в которой Twitter Live Streaming API получает прямой поток данных твита по заданным ключевым словам. Каждый раз, когда приходит твит, я хочу отправить эти новые данные через WebSockets (используя каналы Django) всем подключенным клиентам. Вот моя текущая процедура:

  1. Код прямой трансляции Twitter вызывает initiateHandshake() функцию в файле consumer.py всякий раз, когда получает новые данные
  2. Каналы Django отправляют текст «рукопожатие» всем клиентам.
  3. Клиент получает сообщение о рукопожатии и отправляет сообщение обратно
  4. Функция websocket.recieve принимает это сообщение и на основе информации о фильтре, которая хранится в сеансе подключения клиента, отправляет обратно необходимые данные (вот почему требуется рукопожатие - потому что каждый клиент имеет свой собственный фильтр, который находится в его сеанс подключения)

Прямо сейчас проблема в том, что сообщение «рукопожатие» в initiateHandshake() не отправляется. Почему это так и как это исправить? Большое спасибо! Мой код ниже.

WebSockets работает правильно (если клиент отправляет сообщение, я могу использовать reply_channel, чтобы ответить на него - все работает)

routing.py

from channels.routing import route
from tweetrunner.consumers import *

channel_routing = [
    route("websocket.connect", ws_connect),
    route("websocket.receive", ws_message),
    route("websocket.disconnect", ws_disconnect),
]

потребители.py

# In consumers.py
from channels import Group, Channel
from channels.sessions import channel_session
from .models import InputTweet
from django.shortcuts import render

# Outside world connection
def initiateHandshake():
    Group("table").send({"text": "handshake"})


# Connected to websocket.connect
@channel_session
def ws_connect(message):
    # Accept connection
    message.reply_channel.send({"accept": True})
    message.channel_session["sort"] = "none"
    # Add to group
    Group("table").add(message.reply_channel)


# Connected to websocket.receive
@channel_session
def ws_message(message):
    definition = message.content['text'][:1]

    if definition == "1":
        emotion = message.content['text'][1:]
        message.channel_session["sort"] = message.content['text'][1:]

        sendback = ""

        if emotion == 'none':
            given_tweets = InputTweet.objects.all()
            given_tweets = given_tweets[(len(given_tweets) - 1250):]
            print("before render")
            sendback = render(None, 'tweetrunner/get_table_update.html', {'given_tweets': given_tweets})
            print("rendered")
        else:
            given_tweets = InputTweet.objects.filter(emotion__startswith=emotion).order_by('score')
            given_tweets = given_tweets[(len(given_tweets) - 1250):]
            sendback = render(None, 'tweetrunner/get_table_update.html', {'given_tweets': given_tweets})

        print("about to send")
        message.reply_channel.send({
            "text": sendback.content.decode('utf-8'),
        })
        print("sent -- END CONNECTION LOGS")

    elif definition == "3":
        print("Keep alive!")

    else:

        emotion = message.channel_session["sort"]

        sendback = ""

        print("entered 2")

        if emotion == 'none':
            given_tweets = InputTweet.objects.all()
            given_tweets = given_tweets[(len(given_tweets) - 1250):]
            print("before render")
            sendback = render(None, 'tweetrunner/get_table_update.html', {'given_tweets': given_tweets})
            print("rendered")
        else:
            given_tweets = InputTweet.objects.filter(emotion__startswith=emotion).order_by('score')     
            given_tweets = given_tweets[(len(given_tweets) - 1250):]
            sendback = render(None, 'tweetrunner/get_table_update.html', {'given_tweets': given_tweets})

        print("about to send")

        #content = sendback.read()

        message.reply_channel.send({
            "text": sendback.content.decode('utf-8'),
        })

        print("sent -- END CONNECTION LOGS")


# Connected to websocket.disconnect
@channel_session
def ws_disconnect(message):
    Group("table").discard(message.reply_channel)

Мой код, которого нет ни в одном приложении django, инициализирует django следующим образом (я могу получить доступ к базе данных и использовать вызовы django, поэтому он работает):

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "analyticsui.settings")

import django
django.setup()

JavaScript на клиенте:

socket = new WebSocket("ws://" + window.location.host + "/");

socket.onmessage = function(e) {
alert("Got message: " + e.data)
    if (e.data == "handshake") {
        socket.send("2handshake");
        alert("sent");
    } else {
        document.getElementById("tweetTable").innerHTML = e.data;
        $('.tooltiphere').tooltip('remove');
        $('.tooltiphere').tooltip({delay: 10});
    }
}

person Nisala    schedule 14.07.2017    source источник


Ответы (1)


initiateHandshake() не будет отправлять никаких сообщений, когда вы используете слой каналов в памяти.

Из документа: Уровень в памяти не поддерживает межпроцессное взаимодействие

Пожалуйста, используйте, в частности, другой тип слоя канала Redis channel layer. И все будет работать как положено.

person Raja Simon    schedule 14.07.2017
comment
Не могли бы вы уточнить? Что мне нужно сделать, чтобы решить мою проблему? - person Nisala; 14.07.2017
comment
Обновлено. Воспользуйтесь этой ссылкой, чтобы узнать, как настроить уровень каналов Redis - person Raja Simon; 14.07.2017
comment
Эта ссылка только что сломалась, поэтому вот новая ссылка: channels.readthedocs.io/en /stable/topics/channel_layers.html - person Nisala; 09.08.2018