Google Cloud IoT — несколько сообщений конфигурации

В моем коде я подписываюсь на 3 разные темы:

/devices/{}/config
/devices/{}/events
/devices/{}/state

Если я не подписан в /devices/{}/config, я не получаю никаких сообщений о конфигурации, и это нормально. Но если я подписан в /devices/{}/config, я получаю одно сообщение для каждой моей подписки.

Пример:

Подписавшись на "/devices/{}/config" и "/devices/{}/events", я получаю 2 сообщения о конфигурации.

Подписавшись на "/devices/{}/config" и "/devices/{}/state", я получаю 2 сообщения о конфигурации.

Подписавшись на "/devices/{}/config", "/devices/{}/state" и "/devices/{}/events", я получаю 3 сообщения о конфигурации.

Подписан в "/devices/{}/events" и "/devices/{}/state" и получаю 0 сообщений о конфигурации.

Это вызывает ошибку в ядре IoT: mqtt:

Устройство "xxxxxxxx" не может быть обновлено. Состояние устройства может обновляться только один раз в 1 с.

На самом деле я хочу и нужно только одно сообщение конфигурации. Что я делаю не так?

Это мой код:

# [START iot_mqtt_includes]
import argparse
import datetime
import os
import random
import ssl
import time
import log
import updateConfig
import jwt
import paho.mqtt.client as mqtt
import payload
# [END iot_mqtt_includes]

# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False


# [START iot_mqtt_jwt]

def create_jwt(project_id, private_key_file, algorithm):

    token = {
            # The time that the token was issued at
            'iat': datetime.datetime.utcnow(),
            # The time the token expires.
            'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60),
            # The audience field should always be set to the GCP project id.
            'aud': project_id
    }

    # Read the private key file.
    with open(private_key_file, 'r') as f:
        private_key = f.read()
        f.close()

    return jwt.encode(token, private_key, algorithm=algorithm)
# [END iot_mqtt_jwt]


# [START iot_mqtt_config]
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return '{}: {}'.format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    log.append_log('ao_conectar - ' +  mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    append_log('ao_desconectar - '+ error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    config = str(message.payload)
    retorno = updateConfig.update(config)

def get_client(
        project_id, cloud_region, registry_id, device_id, private_key_file,
        algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client = mqtt.Client(
            client_id=('projects/{}/locations/{}/registries/{}/devices/{}'
                       .format(
                               project_id,
                               cloud_region,
                               registry_id,
                               device_id)))

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
            username='unused',
            password=create_jwt(
                    project_id, private_key_file, algorithm))

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = '/devices/{}/config'.format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    return client
# [END iot_mqtt_config]


def parse_command_line_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description=(
            'Example Google Cloud IoT Core MQTT device connection code.'))
    parser.add_argument(
            '--project_id',
            default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
            help='GCP cloud project name')
    parser.add_argument(
            '--registry_id', required=True, help='Cloud IoT Core registry id')
    parser.add_argument(
            '--device_id', required=True, help='Cloud IoT Core device id')
    parser.add_argument(
            '--private_key_file',
            required=True, help='Path to private key file.')
    parser.add_argument(
            '--algorithm',
            choices=('RS256', 'ES256'),
            required=True,
            help='Which encryption algorithm to use to generate the JWT.')
    parser.add_argument(
            '--cloud_region', default='us-central1', help='GCP cloud region')
    parser.add_argument(
            '--ca_certs',
            default='roots.pem',
            help=('CA root from https://pki.google.com/roots.pem'))
    parser.add_argument(
            '--message_type',
            choices=('event', 'state'),
            default='event',
            help=('Indicates whether the message to be published is a '
                  'telemetry event or a device state message.'))
    parser.add_argument(
            '--mqtt_bridge_hostname',
            default='mqtt.googleapis.com',
            help='MQTT bridge hostname.')
    parser.add_argument(
            '--mqtt_bridge_port',
            choices=(8883, 443),
            default=8883,
            type=int,
            help='MQTT bridge port.')
    parser.add_argument(
            '--jwt_expires_minutes',
            default=20,
            type=int,
            help=('Expiration time, in minutes, for JWT tokens.'))

    return parser.parse_args()


# [START iot_mqtt_run]
def main():
    log.append_log("Iniciando uma nova conexao com o Google IoT.")
    global minimum_backoff_time

    args = parse_command_line_args()

    # Publish to the events or state topic based on the flag.

    jwt_iat = datetime.datetime.utcnow()
    jwt_exp_mins = args.jwt_expires_minutes
    client = get_client(
        args.project_id, args.cloud_region, args.registry_id, args.device_id,
        args.private_key_file, args.algorithm, args.ca_certs,
        args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    # Publish num_messages mesages to the MQTT bridge once per second.
    while True:
        # Process network events.
        client.loop()
        # Wait if backoff is required.
        if should_backoff:
            # If backoff time is too large, give up.
            if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
                log.append_log('Tempo maximo de backoff excedido. Desistindo.')
                break

            # Otherwise, wait and connect again.
            delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
            log.append_log('Esperando {} segundos antes de reconectar.'.format(delay))
            time.sleep(delay)
            minimum_backoff_time *= 2
            client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

        # [START iot_mqtt_jwt_refresh]
        seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
        if seconds_since_issue > 60 * jwt_exp_mins:
            log.append_log('Atualizando token de acesso depois de {} segundos'.format(seconds_since_issue))
            client.loop_stop()
            jwt_iat = datetime.datetime.utcnow()
            client = get_client(
                args.project_id, args.cloud_region,
                args.registry_id, args.device_id, args.private_key_file,
                args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
                args.mqtt_bridge_port)
        # [END iot_mqtt_jwt_refresh]
        # Publish "payload" to the MQTT topic. qos=1 means at least once
        # delivery. Cloud IoT Core also supports qos=0 for at most once
        # delivery.

        payloadToPublish = payload.lerPayload()

        if payloadToPublish != 'sem payload':
            if payloadToPublish[0] == 'event':
                mqtt_topic = '/devices/{}/{}'.format(args.device_id, 'events')
                log.append_log('publicando [' + payloadToPublish[1] + ']')
            else:
                mqtt_topic = '/devices/{}/{}'.format(args.device_id, 'state')

            client.publish(mqtt_topic, payloadToPublish[1], qos=1)
# [END iot_mqtt_run]


if __name__ == '__main__':
    main()

person Thiago Martins    schedule 28.06.2018    source источник


Ответы (1)


Это ожидаемое поведение:

  • Тема события и состояния используется для связи устройства с облаком, а тема конфигурации используется для отправки данных конфигурации на устройство IoT. Подписка на темы событий/состояний фактически является NOOP.
  • Cloud IoT Core поддерживает только QoS 1, то есть «по крайней мере один раз» для передачи сообщения, вы можете попробовать QoS 0, при котором передача сообщения не повторяется до подтверждения, но я не думаю, что это то, что вам нужно.
  • Передача STATE и CONFIG ограничена одним сообщением в секунду; передача изменений конфигурации быстрее, чем это, вызовет ошибку, которую вы видите
person class    schedule 14.08.2018