物联网测试 使用 python 实现 mqtt 订阅和发布,并且使用到接口自动化中

wqg-tom · 2019年11月12日 · 最后由 快乐天空 回复于 2020年11月18日 · 5538 次阅读

安装依赖库

pip install paho-mqtt

封装订阅的部分,形成公用函数 (subscribe.py)

import threading
import paho.mqtt.client as mqtt
import time

HOST = "xxxxx" #emq服务器地址
PORT = 1883

class Mqtt_subscribe(threading.Thread):
    """
    mqtt thread, 完成订阅功能
    """

    def __init__(self, subtopic):
        super(Mqtt_subscribe, self).__init__()
        self.client_id = time.strftime(
            '%Y%m%d%H%M%S', time.localtime(
                time.time()))
        self.client = mqtt.Client(self.client_id)
        self.client.user_data_set(subtopic)
        self.client.username_pw_set("admin", "public")
        self.message = None

    def run(self):
        # ClientId不能重复,所以使用当前时间
        # 必须设置,否则会返回「Connected with result code 4」
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect(HOST, PORT, 60)
        self.client.loop_forever(timeout=60)

    def on_connect(self, client, subtopic, flags, rc):
        print("Connected with result code " + str(rc))
        print("topic:" + subtopic)
        client.subscribe(subtopic, 2)

    def on_message(self, client, userdata, msg):
        # print(msg.topic + " " + msg.payload.decode("utf-8"))
        self.mess_age = msg.payload.decode("utf-8")
        if 'test' in self.mess_age:   #根据订阅内容判断,如果含有想要的值则会停止订阅
            self.client.disconnect()
        return self.mess_age

if __name__ == "__main__":
    subtopic = "test"
    t = Mqtt_subscribe(subtopic)
    t.start()

封装发布的部分 (mqtt_publish.py)

import paho.mqtt.publish as publish
import time

def mqtt_Publish(topic,msg):
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    publish.single(topic,msg,qos=0,
                   hostname="xxxx", #emq地址
                   port=1883, 
                   client_id=client_id,
                   auth={'username': "admin", 'password': "public"}) #登录密码,官方是amdin,public

if __name__ == '__main__':
    topic = 'test'
    message = "this is a test"
    mqtt_Publish(topic,message)

以下面使用为例,使用订阅和发布(received.py,先运行该文件,然后往该主题中发布订阅 - 工具或者 mqtt_publish.py 文件都可以,即可把订阅到的变量值取出用于断言)

from testcase.test_04 import subscribe
topic = 'test'
mess = subscribe.Mqtt_subscribe(topic)
mess.start()
threads =[]
threads.append(mess)
for t in threads:
    t.join(20) #超时20S不会再线程阻塞,如果没有接受到内容需要一直阻塞,则t.join()即可
a = mess.mess_age
print("这是变量信息",a)
共收到 6 条回复 时间 点赞

为大佬点赞,虽然我没搞过

并没看出来,怎么使用到接口自动化里了

from common.operation_yaml import reader_Yaml
请问这个装那个库,装了 common 库还是提示报错

fjqken 回复

这个库是接口自动化框架里面封装的读取 yaml 文件的方法,如果看不懂的话,我找个时间把这些订阅发布简化

韩将 回复

最近有小伙伴问到我,我就重新更新了下,这个会很方便

wqg-tom 关闭了讨论 11月14日 18:56
wqg-tom 重新开启了讨论 08月20日 10:39

如果必须是先订阅,再 publish ,订阅如何阻塞式的接受信息

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册