物联网测试 使用 python 实现 mqtt 订阅和发布,并且使用到接口自动化中

wqg-tom · November 12, 2019 · Last by wqg-tom replied at November 13, 2019 · 2334 hits

封装订阅的部分,形成公用函数

import threading
import paho.mqtt.client as mqtt
import time
import ast

HOST = "ip地址"
PORT = 1883


class Mqtt_subscribe(threading.Thread):
"""
mqtt thread, 完成订阅功能
"""


def __init__(self, subtopic):
super(Mqtt_subscribe, self).__init__()
self.client_id = time.strftime(
'%Y%m%d%H%M%S', time.localtime(
time.time()))
self.client = mqtt.Client(self.client_id)
self.client.user_data_set(subtopic)
self.client.username_pw_set("admin", "public")
self.tid = None
self.macaddress = None
self.type = None
self.status = None
self.subtype = None
self.info = None
self.answer_result = None

def run(self):
# ClientId不能重复,所以使用当前时间
# 必须设置,否则会返回「Connected with result code 4
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(HOST, PORT, 60)
self.client.loop_forever(timeout=10)

def on_connect(self, client, subtopic, flags, rc):
print("Connected with result code " + str(rc))
print("topic:" + subtopic)
client.subscribe(subtopic, 2)

def on_message(self, client, userdata, msg):
# print(msg.topic + " " + msg.payload.decode("utf-8"))
mess = msg.payload.decode("utf-8")
print(mess)
#在此处处理订阅主题返回的信息
user_dict = ast.literal_eval(mess)
self.answer_result = user_dict
self.tid = user_dict['tid']
self.macaddress = user_dict['macaddr']
try:
self.type = user_dict['type']
except BaseException:
pass
try:
self.subtype = user_dict['subtype']
except BaseException:
pass
try:
self.info = user_dict['info']
except BaseException:
pass
try:
self.status = user_dict['status']
except BaseException:
pass
if self.macaddress and self.tid:
self.client.disconnect()
# print('macaddress:',self.macaddress)
# print('tid:', self.tid)

return self.macaddress, self.tid, self.status, self.type, self.info, self.subtype


if __name__ == "__main__":
subtopic = "com/2000000838/1000237443/answer"
t = Mqtt_subscribe(subtopic)
a = t.client
t.start()
# t.on_connect("com/99:00:AA:AA:00:CS/binding")

封装发布的部分,形成公共函数

import paho.mqtt.publish as publish
import time
from common.operation_yaml import reader_Yaml


def mqtt_Publish(topic,msg):
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
IP_DNS = reader_Yaml('emq_information','IP_DNS')
port = reader_Yaml('emq_information','port')
publish.single(topic,msg,qos=0,
hostname=IP_DNS,
port=port,
client_id=client_id,
auth={'username': "admin", 'password': "public"})

if __name__ == '__main__':
mqtt_Publish('test','Hello world!')

以下面使用为例,使用订阅和发布

import json
from common.operation_yaml import reader_Yaml
from common.mqtt_publish import mqtt_Publish
from time import sleep
from common.mqtt_subscribe import Mqtt_subscribe
from common.do_mysql import db_connect
import datetime


class Gateway_again_and_again_update:
def __init__(self,update_file_low,update_file_high):
self.update_file_low = update_file_low
self.update_file_high = update_file_high
self.gateway_version_low = self.update_file_low.replace(".bin","")
self.gateway_version_high = self.update_file_high.replace(".bin", "")
self.mac_address = reader_Yaml('gateway_information','mac_address')
self.orignal_sql = "SELECT status FROM gateway WHERE mac_addr='%s'"
self.answer_topic = reader_Yaml('gateway_information','gateway_answer_topic')
self.update_topic = self.answer_topic.replace("answer","update")

def check_gateway_version(self):
"""检测网关的当前版本"""
check_gateway_version_message =json.dumps({"type":"1","macaddr":"99:00:AA:AA:00:BE","tid":1931759568})
mess = Mqtt_subscribe(self.answer_topic)
mess.start()
result = db_connect(self.orignal_sql % (self.mac_address), 'amdb')
gateway_status = str(result).replace("(", "").replace(")", "").replace(",", "").replace("'","")
while True:
if gateway_status =="1":
mqtt_Publish(self.update_topic, check_gateway_version_message)
threads = []
threads.append(mess)
for t in threads:
t.join(20)
break
else:
sleep(20)
continue
answer_info = mess.info
print("网关当前的版本为:" + str(answer_info)) #打印网关检测版本返回的信息

return answer_info #返回网关的当前版本

def push_gateway_update_verion_message(self):
"""升级网关的版本"""
gateway_version_now = self.check_gateway_version().replace(".bin","")
update_version_message = {"filename":"IIG1000E-V1.0.1.4.bin","type":"2","macaddr":"88:00:AA:A0:00:3B","tid":"PBE029EF8C258B848"}
update_version_message['macaddr'] = self.mac_address
mess = Mqtt_subscribe(self.answer_topic)
mess.start()
if gateway_version_now == self.gateway_version_low:
update_version_message["filename"] = self.update_file_high
mqtt_Publish( topic=self.update_topic,msg=json.dumps(update_version_message))
elif gateway_version_now == self.gateway_version_high:
update_version_message["filename"] = self.update_file_low
mqtt_Publish(topic=self.update_topic, msg=json.dumps(update_version_message))
else:
print("返回值错误")
pass
threads = []
threads.append(mess)
for t in threads:
t.join()
result = str(mess.answer_result)
print("网关升级完返回的信息为:"+ result) # 打印网关检测版本返回的信息
answer_info = mess.info
print(answer_info)# 返回网关的当前版本

if __name__ == "__main__":
s = 0
while True:
s += 1
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(("第%d次升级"%s).ljust(100,"-"))
print(now_time)
main = Gateway_again_and_again_update(update_file_low="IIG1000E-V1.0.1.3.bin",update_file_high="IIG1000E-V1.0.1.4.bin")
main.push_gateway_update_verion_message()
sleep(60)

以上即可完成接口自动化中mqtt协议的使用,有物联网测试的小伙伴可以交流下

最佳回复

from common.operation_yaml import reader_Yaml
请问这个装那个库,装了common库还是提示报错

共收到 5 条回复 时间 点赞

为大佬点赞,虽然我没搞过

并没看出来,怎么使用到接口自动化里了

from common.operation_yaml import reader_Yaml
请问这个装那个库,装了common库还是提示报错

fjqken 回复

这个库是接口自动化框架里面封装的读取yaml文件的方法,如果看不懂的话,我找个时间把这些订阅发布简化

韩将 回复

这里是写成类的方法,然后里面的东西写成这样就是我的业务需要用到的东西。我实际使用的接口自动化发布订阅公共部分就是这个。只是代码没有简化,最简单的发布订阅网上是有资料的,只是想调用的时候,需要你去设置线程阻塞,不然的话。直接订阅完后,线程不阻塞,publish的消息还是订阅不到

wqg-tom 关闭了讨论 14 Nov 18:56
需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up