Cycle.js — Драйвер — PhoenixJS (веб-сокеты)

В настоящее время у нас есть приложение VueJS, и я рассматриваю возможность его переноса на Cycle.js (первый крупный проект).

Я понимаю, что в Cycle.JS у нас есть SI и SO для драйверов (используя адаптацию()); естественно, реализация WebSocket подходит для этого, поскольку она имеет эффекты чтения и записи.

Мы используем Phoenix (Elixir) в качестве нашего бэкэнда, используя каналы для мягкого общения в реальном времени. Наша клиентская библиотека WS называется Phoenix здесьhttps://www.npmjs.com/package/phoenix.

пример на Cycle.js.org идеален, если вы знаете, как подключить.

В нашем случае мы аутентифицируемся с помощью конечной точки REST, которая возвращает токен (JWT), который используется для инициализации WebSocket (параметр токена). Этот токен нельзя просто передать в драйвер, так как драйвер инициализируется при запуске приложения Cycle.js.

Пример (не фактический код) того, что у нас есть сейчас (в нашем приложении VueJS):

// Code ommited for brevity 
socketHandler = new vueInstance.$phoenix.Socket(FQDN, {
    _token: token
});
socketHandler.onOpen(() => VueBus.$emit('SOCKET_OPEN'));

//...... Vue component (example)
VueBus.$on('SOCKET_OPEN', function () {
    let chan = VueStore.socketHandler.channel('PRIV_CHANNEL', {
        _token: token
    });

    chan.join()
        .receive('ok', () => {
            //... code
        })
})

В приведенном выше примере у нас есть хранилище Vuex для глобального состояния (сокет и т. д.), централизованная шина сообщений (приложение Vue) для связи между компонентами и настройками канала, которые поступают из созданного экземпляра сокета Phoenix.

Наша настройка канала основана на аутентифицированном сокет-соединении, которое требует аутентификации для присоединения к этому конкретному каналу.

Вопрос в том, возможно ли это вообще с Cycle.js?

  1. Инициализировать соединение WebSocket с параметрами токена из вызова REST (ответ токена JWT) — мы реализовали это частично
  2. Создавать каналы на основе этого сокета и токена (потоки каналов от драйвера?)
  3. Доступ к многоканальным потокам (предполагаю, что это может работать как sources.HTTP.select(CATEGORY))

У нас здесь зависимость 1: N, что, я не уверен, возможно с драйверами.

Заранее спасибо,

Обновление от 17 декабря 2018 г.

По сути, я пытаюсь имитировать следующее (из Cycle.js.org):

Драйвер принимает приемник для выполнения эффектов записи (отправки сообщений по определенным каналам), но также может возвращать источник; это означает, что есть два асинхронных потока? Это означает, что создание сокета во время выполнения может привести к тому, что один поток получит доступ к «сокету» до того, как он будет создан; см. комментарии во фрагменте ниже.

import {adapt} from '@cycle/run/lib/adapt';

function makeSockDriver(peerId) {
  // This socket may be created at an unknown period
  //let socket = new Sock(peerId);
  let socket = undefined;

  // Sending is perfect
  function sockDriver(sink$) {
    sink$.addListener({
      next: listener => {

        sink$.addListener({
                next: ({ channel, data }) => {
                    if(channel === 'OPEN_SOCKET' && socket === null) {
                        token = data;

                        // Initialising the socket
                        socket = new phoenix.Socket(FQDN, { token });
                        socketHandler.onOpen(() => listener.next({
                            channel: 'SOCKET_OPEN'
                        }));
                    } else {
                        if(channels[channel] === undefined) {
                            channels[channel] = new Channel(channel, { token });
                        }
                        channels[channel].join()
                            .receive('ok', () => {
                                sendData(data);
                            });
                    }
                }
            });
      },
      error: () => {},
      complete: () => {},
    });

    const source$ = xs.create({
      start: listener => {
        sock.onReceive(function (msg) {
            // There is no guarantee that "socket" is defined here, as this may fire before the socket is actually created 
            socket.on('some_event'); // undefined

            // This works however because a call has been placed back onto the browser stack which probably gives the other blocking thread chance to write to the local stack variable "socket". But this is far from ideal
            setTimeout(() => socket.on('some_event'));
        });
      },
      stop: () => {},
    });

    return adapt(source$);
  }

  return sockDriver;
}

Ян ван Брюгге, решение, которое вы предоставили, идеально (спасибо), за исключением того, что у меня возникли проблемы с ответной частью. См. приведенный выше пример.

Например, то, что я пытаюсь достичь, выглядит примерно так:

// login component
return {
    DOM: ...
    WS: xs.of({
        channel: "OPEN_CHANNEL",
        data: {
            _token: 'Bearer 123'
        }
    })
}

//////////////////////////////////////
// Some authenticated component

// Intent
const intent$ = sources.WS.select(CHANNEL_NAME).startWith(null)

// Model
const model$ = intent$.map(resp => {
    if (resp.some_response !== undefined) {
        return {...}; // some model
    }
    return resp;
})

return {
    DOM: model$.map(resp => {
        // Use response from websocket to create UI of some sort
    })
}

person CCG_Intranet    schedule 14.12.2018    source источник


Ответы (1)


во-первых, да, это возможно с драйвером, и мое предложение приведет к тому, что драйвер будет очень похож на драйвер HTTP.

Прежде всего, чтобы иметь какой-то грубый псевдокод, где я могу все объяснить, я мог неправильно понять части вашего вопроса, поэтому это может быть неправильно.

interface WebsocketMessage {
    channel: string;
    data: any;
}

function makeWebSocketDriver() {
    let socket = null;
    let token = null;
    let channels = {}
    return function websocketDriver(sink$: Stream<WebsocketMessage> {
        return xs.create({
            start: listener => {
                sink$.addListener({
                    next: ({ channel, data }) => {
                        if(channel === 'OPEN_SOCKET' && socket === null) {
                            token = data;
                            socket = new phoenix.Socket(FQDN, { token });
                            socketHandler.onOpen(() => listener.next({
                                channel: 'SOCKET_OPEN'
                            }));
                        } else {
                            if(channels[channel] === undefined) {
                                channels[channel] = new Channel(channel, { token });
                            }
                            channels[channel].join()
                                .receive('ok', () => {
                                    sendData(data);
                                });
                        }
                    }
                });
            }
        });
    };
}

Это будет грубая структура такого драйвера. Вы видите, что он ждет сообщения с токеном, а затем открывает сокет. Он также отслеживает открытые каналы и отправляет/получает их в зависимости от категории сообщения. Этот метод просто требует, чтобы все каналы имели уникальные имена, я не уверен, как работает протокол вашего канала в этом отношении или что вы хотите в частности.

Я надеюсь, что этого достаточно, чтобы вы начали, если вы уточните API канала отправки / получения и сокета, я мог бы помочь больше. Вы также всегда можете задать вопросы на нашем канале gitter.

person Jan van Brügge    schedule 14.12.2018
comment
Спасибо за ответ. Это похоже на конкретную отправную точку; оценил. Допустимо ли возвращать обещание при получении данных?; таким образом я могу объединять потоки, которые вызывают побочные эффекты; например пузыриться, чтобы изменить DOM? Допустимо ли по стандартам Cycle.js смешивать такие эффекты чтения и записи? Моя первая попытка перенастроила драйвер‹SI, SO›, но инициализация сокета была асинхронными потоками RO; не гарантировалось, что экземпляр драйвера сокета был определен, когда на этот поток воздействовали; Я полагаю, что на Github ведется обсуждение таких проблем параллелизма. - person CCG_Intranet; 17.12.2018
comment
Зачем вам возвращать поток обещаний? Поток уже может делать все, что может обещание. Обычно вы используете then в драйвере и вызываете listener.next с данными, поэтому приложению не приходится иметь дело с промисами, а только с потоками. - person Jan van Brügge; 17.12.2018
comment
Я не понял, что вы имеете в виду с остальными - person Jan van Brügge; 17.12.2018
comment
Спасибо за ответ. Извиняюсь за мой вопрос. Я имею в виду, как мне обрабатывать асинхронные обратные вызовы, используя вышеуказанный метод? Так, например, когда канал получает сообщение, такое как channel.on('some_event_name', () => ..code), я не уверен, как это будет использоваться как намерение. В драйвере HTTP у нас есть sources.HTTP.select(CATEGORY).startWith(undefined), что позволяет мне привязать ответ XHR к просмотру; есть ли аналогичный подход с решением, которое вы разместили? Возможно, я не так хорошо понимаю потоковую систему, как я думал. Спасибо. - person CCG_Intranet; 17.12.2018
comment
Пожалуйста, смотрите обновление в вопросе :) Спасибо за вашу помощь! - person CCG_Intranet; 17.12.2018