零碎知识 模拟政府平台接收上游数据信息

大海 · 2023年02月16日 · 3159 次阅读

背景描述

在和政府进行数据对接的实际场景中,政府是调用我们的物联网接口后,我们将物联网数据发送给到他们的政府平台上。在测试环境中,我们不可能直接将上游的数据直接发给政府平台,必须造一个假的政府平台用来接收数据。

设计思路

因此,在测试环境中,我们需要 Mock 一个政府平台,用来接收上游推送的数据信息。

代码实现

# !/usr/bin/python
# -*- coding: utf-8 -*-

"""
@File    : mockHTTPServer.py
@Create Time: 2022-10-25 9:22
@Description:
"""

from http.server import HTTPServer, BaseHTTPRequestHandler

import json
import time
import os
import queue
from logger import Logger
import datetime
from hashlib import md5

# 创建队列
global msgQueue
msgQueue = queue.Queue()
log = Logger()


class Resquest(BaseHTTPRequestHandler):

    def handler(self):
        print("data:", self.rfile.readline().decode())
        self.wfile.write(self.rfile.readline())

    def do_GET(self):

        # 接受XX的数据, 理论上XX应该以post方式发送
        # 接受数据查询
        if self.path == '/getpayload':

            print('<脚本> 请求暂存服务器的数据 ...')
            payload = self.getpayload()

            # 将数据发送给 <脚本>
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(payload)
            print('<脚本> 成功发送数据给 脚本')
        elif self.path == '/getHistoryLog':
            logContent = self.get_historyLog()
            # 将数据发送给 <脚本>
            self.send_response(200)
            self.send_header('Content-type', self.headers['Content-type'])
            self.end_headers()
            self.wfile.write(logContent.encode())
        elif self.path == '/getplayloadsize':
            logContent = self.getplayloadsize()
            # 将数据发送给 <脚本>
            self.send_response(200)
            self.send_header('Content-type', self.headers['Content-type'])
            self.end_headers()
            self.wfile.write(logContent.encode())
        # 默认行为
        else:
            self.send_error(500, "Method is not supported!")
            return

    def do_POST(self):
        # 接受XX的数据, 理论上XX应该以post方式发送
        if self.path == '/gov':
            req_datas = self.rfile.read(int(self.headers['content-length']))
            # 此处为http client的请求payload
            print("<服务器> Gov paltfrom 请求的方法为 %s" % self.command)
            # 保存playload
            self.savepayload(req_datas)

            session = md5(str(time.time()).encode('utf-8'))
            session = session.hexdigest()
            data = {
                'status_code': '200',
                'message': 'Success',
                'timestamp': '',
                'id': session
            }
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(json.dumps(data).encode('utf-8'))
        else:
            self.send_error(500, "Method is not supported!")
        return

    def getplayloadsize(self):
        global msgQueue
        print('<服务器> 读取队列长度, 队列长度为 %s' % msgQueue.qsize())
        size = msgQueue.qsize()
        return "{'size': %s}" % size

    def getpayload(self):
        global msgQueue
        size = msgQueue.qsize()
        # 如果队列非空
        if size == 0:
            msgs = "{}".encode('utf-8')
        else:
            msgs = msgQueue.get()

        return msgs

    def savepayload(self, payload):
        global msgQueue
        msgQueue.put(payload)
        print('<服务器> 暂存数据')
        log.info('%s' % (str(payload)))

    def get_historyLog(self):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        logPath = cur_path + '/logs/all.log'
        with open(logPath, 'r', encoding='utf-8') as f:
            logContent = f.read()
        return logContent


if __name__ == '__main__':
    host = ('127.0.0.1', 9009)
    server = HTTPServer(host, Resquest)
    print("======server main begin======")
    print("starting server, listen at %s:%s" % host)
    server.serve_forever()
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册