使用 Python 的 paho-mqtt 库,如下:

class MqttClient:
    mqtt_bridge = None
    message_id = None

    def __init__(self, client_id, clean_session=True, userdata=None):
        self.client = mqtt.Client(client_id=client_id, clean_session=clean_session, userdata=userdata)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_disconnect = self.on_disconnect
        self.client.on_subscribe = self.on_subscribe

    def connect(self, username, password, host, port=1883, keepalive=60):
        self.client.username_pw_set(username, password)
        self.client.connect(host=host, port=port, keepalive=keepalive)
        # 在connect*()之前或之后调用loop_start()一次,会在后台运行一个线程来自动调用loop()。
        # 这释放了可能阻塞的其他工作的主线程。这个调用也处理重新连接到代理。
        # 调用loop_stop()来停止后台线程。
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

    def subscribe(self, topic, qos=0, **kwargs):
        return self.client.subscribe(topic, qos, **kwargs)

    def publish(self, topic, message, qos=0, **kwargs):
        return self.client.publish(topic, json.dumps(message, ensure_ascii=False), qos=qos, **kwargs)

    def unsubscribe(self, topic, properties=None):
        """unsubscribe topic

        :param topic:
        :param properties:
        :return: 返回一个元组(result, mid)
        """
        return self.client.unsubscribe(topic, properties=properties)

    def on_connect(self, client, userdata, flags, rc):
        logger.info("向代理申请连接后 客户端收到来自代理的响应.")
        logger.info(f"Client: {str(client)}")
        logger.info(f"User Data: {str(userdata)}")
        logger.info(f"Flags: {flags}")
        logger.info(f"Connection returned: {mqtt.connack_string(rc)}")
        if rc == 0:
            self.mqtt_bridge = True

    def on_disconnect(self, client: mqtt.Client, userdata, rc):
        logger.info("与代理已断开连接.")
        logger.info(f"Client: {client.__dict__}")
        logger.info(f"User Data: {str(userdata)}")
        print(rc)
        logger.info(f"""
        ①MQTT_ERR_SUCCESS 成功
        ②MQTT_ERR_NO_CONN 客户端当前未连接
        ③MQTT_ERR_QUEUE_SIZE  当使用max_queued_messages_set来指示消息既不排队也不发送。

        Disconnection Status: {mqtt.error_string(rc)}
        """)
        if rc == 0:
            self.mqtt_bridge = False

    def on_message(self, client, userdata, message):
        logger.info("收到关于客户订阅的主题的消息.")
        logger.info(f"Client: {str(client)}")
        logger.info(f"User Data: {str(userdata)}")
        logger.info(f"Topic: {message.topic}")
        logger.info(f"Message: {message.payload.decode('utf-8')}")

    def on_publish(self, client, userdata, mid):
        logger.info("使用publish()发送的消息已经传输到代理.")
        logger.info(f"Client: {client.__dict__}")
        logger.info(f"User Data: {str(userdata)}")
        logger.info(f"Mid: {mid}")
        print(mid)
        self.message_id = mid

    def on_subscribe(self, client, userdata, mid, granted_qos):
        logger.info("代理已响应订阅请求.")
        logger.info(f"Client: {str(client)}")
        logger.info(f"User Data: {str(userdata)}")
        logger.info(f"Mid: {mid}")
        logger.info(f"Granted qos: {granted_qos}")

发现了一个问题,当我先实例化该类,并且使用 connect 连接 mqtt 代理服务器后,使用 publish() 想要发消息到代理时,发现会先调用 on_publish,然后是 on_subscribe,接下来会调用 on_disconnect,最后还会调用 on_connect。
其中 on_disconnect 的 rc 为 1(应该是有错误)

我的问题是:

因为网上都是对于这个库的 pypi 的翻译,我找不到具体的实例和详细的信息,想问一下有没有了解的大佬或者遇到过相同问题的同学。


↙↙↙阅读原文可查看相关链接,并与作者交流