使用 Python 的 paho-mqtt 库,如下:
class MqttClient:
mqtt_bridge = None
message_id = None
def __init__(self, client_id, clean_session=True, userdata=None):
self.client = mqtt.Client(client_id=client_id, clean_session=clean_session, userdata=userdata)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
self.client.on_disconnect = self.on_disconnect
self.client.on_subscribe = self.on_subscribe
def connect(self, username, password, host, port=1883, keepalive=60):
self.client.username_pw_set(username, password)
self.client.connect(host=host, port=port, keepalive=keepalive)
# 在connect*()之前或之后调用loop_start()一次,会在后台运行一个线程来自动调用loop()。
# 这释放了可能阻塞的其他工作的主线程。这个调用也处理重新连接到代理。
# 调用loop_stop()来停止后台线程。
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, topic, qos=0, **kwargs):
return self.client.subscribe(topic, qos, **kwargs)
def publish(self, topic, message, qos=0, **kwargs):
return self.client.publish(topic, json.dumps(message, ensure_ascii=False), qos=qos, **kwargs)
def unsubscribe(self, topic, properties=None):
"""unsubscribe topic
:param topic:
:param properties:
:return: 返回一个元组(result, mid)
"""
return self.client.unsubscribe(topic, properties=properties)
def on_connect(self, client, userdata, flags, rc):
logger.info("向代理申请连接后 客户端收到来自代理的响应.")
logger.info(f"Client: {str(client)}")
logger.info(f"User Data: {str(userdata)}")
logger.info(f"Flags: {flags}")
logger.info(f"Connection returned: {mqtt.connack_string(rc)}")
if rc == 0:
self.mqtt_bridge = True
def on_disconnect(self, client: mqtt.Client, userdata, rc):
logger.info("与代理已断开连接.")
logger.info(f"Client: {client.__dict__}")
logger.info(f"User Data: {str(userdata)}")
print(rc)
logger.info(f"""
①MQTT_ERR_SUCCESS 成功
②MQTT_ERR_NO_CONN 客户端当前未连接
③MQTT_ERR_QUEUE_SIZE 当使用max_queued_messages_set来指示消息既不排队也不发送。
Disconnection Status: {mqtt.error_string(rc)}
""")
if rc == 0:
self.mqtt_bridge = False
def on_message(self, client, userdata, message):
logger.info("收到关于客户订阅的主题的消息.")
logger.info(f"Client: {str(client)}")
logger.info(f"User Data: {str(userdata)}")
logger.info(f"Topic: {message.topic}")
logger.info(f"Message: {message.payload.decode('utf-8')}")
def on_publish(self, client, userdata, mid):
logger.info("使用publish()发送的消息已经传输到代理.")
logger.info(f"Client: {client.__dict__}")
logger.info(f"User Data: {str(userdata)}")
logger.info(f"Mid: {mid}")
print(mid)
self.message_id = mid
def on_subscribe(self, client, userdata, mid, granted_qos):
logger.info("代理已响应订阅请求.")
logger.info(f"Client: {str(client)}")
logger.info(f"User Data: {str(userdata)}")
logger.info(f"Mid: {mid}")
logger.info(f"Granted qos: {granted_qos}")
发现了一个问题,当我先实例化该类,并且使用 connect 连接 mqtt 代理服务器后,使用 publish() 想要发消息到代理时,发现会先调用 on_publish,然后是 on_subscribe,接下来会调用 on_disconnect,最后还会调用 on_connect。
其中 on_disconnect 的 rc 为 1(应该是有错误)
我的问题是:
因为网上都是对于这个库的 pypi 的翻译,我找不到具体的实例和详细的信息,想问一下有没有了解的大佬或者遇到过相同问题的同学。