Apollo GraphQL: MQTT Подпишитесь на брокера, чтобы просто предоставлять опубликованные данные

Сценарий

У меня есть сенсорный узел, который публикует информацию по определенной теме MQTT (отправляется брокеру Mosquitto). Отправляемые данные представляют собой чистую строку.

Бэкенд

в настоящее время я использую apollo-server-express для создания сервера GraphQL. Я хочу использовать `graphql-mqtt-subscriptions, чтобы:

  • Подпишитесь на брокера MQTT
  • Прочитайте информацию по определенной теме и просто верните ее в graphiql интерфейс.

dependencies

"dependencies": {
    "apollo-server-express": "^2.8.1",
    "express": "^4.17.1",
    "graphql": "^14.4.2",
    "graphql-mqtt-subscriptions": "^1.1.0",
    "graphql-subscriptions": "^1.1.0",
    "graphql-tools": "^4.0.5",
    "mqtt": "^3.0.0",
    "subscriptions-transport-ws": "^0.9.16"
  },

Фрагменты кода

код точки входа server.js:

import express from 'express';
import {ApolloServer } from 'apollo-server-express';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { createServer } from 'http';


const server = new ApolloServer({ typeDefs, resolvers});

const app = express();

server.applyMiddleware({ app });

const httpServer = createServer(app);

server.installSubscriptionHandlers(httpServer);

httpServer.listen({port: 4000}, () => {
    console.log(`???? Server ready at http://localhost:4000/${server.graphqlPath}`)
    console.log(`???? Subscriptions ready at ws://localhost:4000/${server.subscriptionsPath}`)
});

схема typeDefs для GraphQL следующая:


type Result {
        data: String
}

type Subscription {
        siteAdded(topic: String): Result
    }

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

где siteAdded(topic: String) возьмет тему, на которую MQTT должен подписаться. Пример:

subscription {
   siteAdded(topic: "test/1/env") {
       data
   }

resolvers.js выглядит следующим образом (как указано в майской документации):

import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect } from 'mqtt';

const client = connect('mqtt://my.mqtt.broker.ip.address', {
    reconnectPeriod: 1000,
});

const pubsub = new MQTTPubSub({
    client
});

export const resolvers: {
Subscription: {
        siteAdded: {
            subscribe: (_, args) => {
                console.log(args.topic); // to check if this gets called or not.
                pubsub.asyncIterator([args.topic]);

            }
        }
    }
};

Вывод

вызывается console.log на args.topic, но после этого следующая ошибка в graphiql:

{
  "error": {
    "message": "Subscription field must return Async Iterable. Received: undefined"
  }
}

Если я выполняю return pubsub.asyncIterator():

Он предоставляет своевременные данные от брокера, но вывод null:

{
  "data": {
    "siteAdded": null
  }
}

Я добавил промежуточное ПО Websockets в server.js, упомянутое выше, в соответствии с Документами Apollo< /а>

Где я ошибаюсь и как просто добавить данные, поступающие из подписанной темы, в graphiql?


person Shan-Desai    schedule 08.08.2019    source источник
comment
Вы забыли вызвать return в преобразователе подписки. return pubsub.asyncIterator([args.topic]);   -  person Dom    schedule 08.08.2019
comment
@Dom Я пробовал, не работает. Пользовательский интерфейс продолжает отправлять HTTP POST, но без данных   -  person Shan-Desai    schedule 08.08.2019
comment
Вам определенно нужно return результат вызова pubsub.asyncIterator(), как сказал @Dom. Как вы тестируете подписку? После того, как вы подпишетесь на Playground, вам нужно будет каким-то образом инициировать вызов publish (например, открыв Playground на другой вкладке и отправив мутацию, вызывающую publish).   -  person Daniel Rearden    schedule 08.08.2019
comment
@DanielRearden, но, поскольку датчик уже публикует эту тему, не даст ли это мне что-нибудь? На самом деле у меня нет мутации, которая вызывает publish. У меня уже есть датчик, который публикуется в теме test/1/env, и поскольку я использую подстановочный знак test/+/env, данные должны быть доступны.   -  person Shan-Desai    schedule 08.08.2019
comment
Ах я вижу. Я бы создал фиктивную мутацию, которая вызывает publish, и по крайней мере посмотрел бы, работает ли она так, как ожидалось. Предполагая, что это так, вы можете отлаживать оттуда. Может несоответствие в названиях тем?   -  person Daniel Rearden    schedule 08.08.2019


Ответы (1)


Резюме

Обновлять

Предостережения

  • Подстановочные знаки, такие как + и # в graphql-mqtt-subscriptions v1.1.0 в реестре NPM, недоступны. Однако в репозитории уже есть реализации. Владельцу репозитория необходимо обновить реестр. См. Открытая проблема для graphql-mqtt-subscriptions.

  • В настоящее время я использую полную тему для подписки MQTT, чтобы получить данные с датчика, например. test/1/env вместо test/+/env

Обновление разработки

  • Раньше я отправлял данные с датчика в формате необработанной строки (обычный текст), поэтому я обновил прошивку для отправки данных в формате JSON String следующим образом:

      {"data": "temp=23,humid=56 1500394302"}
    

Решение

  1. Как упоминалось в комментариях @Dom и @DanielRearden, я изначально забыл добавить return, если использовал фигурные скобки {}. например.:

        Subscription: {
        siteAdded: {
            subscribe: (_, args) => {
                console.log(args.topic); // to check if this gets called or not.
                return pubsub.asyncIterator([args.topic]);
    
            }
        }
    }
    

    или я просто удалил скобки и return, написав преобразователь следующим образом:

        Subscription: {
           siteAdded: {
               subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
           }
        }
    

    Это все еще возвращало мне null, как указано в запросе.

  2. Мне удалось получить данные из подписки, выполнив Преобразование полезной нагрузки для Apollo, где в моих преобразователях я сделал следующее:

       Subscription: {
        siteAdded: {
            resolve: (payload) => {
                return {
                    data: payload.data,
                };
            },
            subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
        }
    }
    

    Полезная нагрузка должна была быть разрешена соответствующим образом для схемы.

Результат

Теперь подписка следующим образом работает как шарм:

    subscription {
        siteAdded(topic: "test/1/env") {
            data
        }
    }

дает следующий результат:

{
  "data": {
    "siteAdded": {
      "data": "temp=27.13,humid=43.33 1565345004"
    }
  }
}
person Shan-Desai    schedule 09.08.2019