在和政府进行数据对接的实际场景中,政府是调用我们的物联网接口后,我们将物联网数据发送给到他们的政府平台上。在测试环境中,我们不可能直接将上游的数据直接发给政府平台,必须造一个假的政府平台用来接收数据。
因此,在测试环境中,我们需要 Mock 一个政府平台,用来接收上游推送的数据信息。
# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
@File : mockHTTPServer.py
@Create Time: 2022-10-25 9:22
@Description:
"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import time
import os
import queue
from logger import Logger
import datetime
from hashlib import md5
# 创建队列
global msgQueue
msgQueue = queue.Queue()
log = Logger()
class Resquest(BaseHTTPRequestHandler):
def handler(self):
print("data:", self.rfile.readline().decode())
self.wfile.write(self.rfile.readline())
def do_GET(self):
# 接受XX的数据, 理论上XX应该以post方式发送
# 接受数据查询
if self.path == '/getpayload':
print('<脚本> 请求暂存服务器的数据 ...')
payload = self.getpayload()
# 将数据发送给 <脚本>
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(payload)
print('<脚本> 成功发送数据给 脚本')
elif self.path == '/getHistoryLog':
logContent = self.get_historyLog()
# 将数据发送给 <脚本>
self.send_response(200)
self.send_header('Content-type', self.headers['Content-type'])
self.end_headers()
self.wfile.write(logContent.encode())
elif self.path == '/getplayloadsize':
logContent = self.getplayloadsize()
# 将数据发送给 <脚本>
self.send_response(200)
self.send_header('Content-type', self.headers['Content-type'])
self.end_headers()
self.wfile.write(logContent.encode())
# 默认行为
else:
self.send_error(500, "Method is not supported!")
return
def do_POST(self):
# 接受XX的数据, 理论上XX应该以post方式发送
if self.path == '/gov':
req_datas = self.rfile.read(int(self.headers['content-length']))
# 此处为http client的请求payload
print("<服务器> Gov paltfrom 请求的方法为 %s" % self.command)
# 保存playload
self.savepayload(req_datas)
session = md5(str(time.time()).encode('utf-8'))
session = session.hexdigest()
data = {
'status_code': '200',
'message': 'Success',
'timestamp': '',
'id': session
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode('utf-8'))
else:
self.send_error(500, "Method is not supported!")
return
def getplayloadsize(self):
global msgQueue
print('<服务器> 读取队列长度, 队列长度为 %s' % msgQueue.qsize())
size = msgQueue.qsize()
return "{'size': %s}" % size
def getpayload(self):
global msgQueue
size = msgQueue.qsize()
# 如果队列非空
if size == 0:
msgs = "{}".encode('utf-8')
else:
msgs = msgQueue.get()
return msgs
def savepayload(self, payload):
global msgQueue
msgQueue.put(payload)
print('<服务器> 暂存数据')
log.info('%s' % (str(payload)))
def get_historyLog(self):
cur_path = os.path.dirname(os.path.realpath(__file__))
logPath = cur_path + '/logs/all.log'
with open(logPath, 'r', encoding='utf-8') as f:
logContent = f.read()
return logContent
if __name__ == '__main__':
host = ('127.0.0.1', 9009)
server = HTTPServer(host, Resquest)
print("======server main begin======")
print("starting server, listen at %s:%s" % host)
server.serve_forever()