为满足支付模拟回调的各种情况,搭建一个小小的 mock 接口环境,最简单的那种呦。根据访问的接口返回值,不校验传入的参数。
import json
import threading
import socketserver
import time
import random
import string
import socket
import urllib
from http.server import HTTPServer, BaseHTTPRequestHandler
IP = '0.0.0.0'
TCP_PORT = 8889
HTTP_PORT = 8890
#global SOCKET1
class TestHTTPHandle(BaseHTTPRequestHandler):
def do_GET(self):
path=self.path
query=path.split('?')
#query = urllib.parse.urlsplit(path)
#print(query)
api=query[0]
print("-------------Got HTTP GET Data--------------")
self.send_response(200)
self.send_header("Content-type", "application/json;charset=utf-8")
self.end_headers()
respond_json = {0}
willSend = False
#willSend = True
#根据请求的api信息,指定response data
if(api == "/aa"):
willSend = True
respond_json = {'code':200,
'message':{'a_first':'addressid_a','a_decond':321,'a_third':'phone_a'},'aa_param':'GET'}
elif( api == "/bb"):
willSend = True
respond_json = {'code':200,
'message':{'b_first':'addressid_b','b_decond':321,'b_third':'phone_b'},'bb_param':'GET'}
else:
willSend=True
respond_json={'message':'url有误'}
print("NO Match api.")
if(willSend):
d2 = json.dumps(respond_json)
self.wfile.write(d2.encode('UTF-8'))
print("HTTP response is sent.")
#self.wfile.close()
def do_POST(self):
path=self.path
#query = urllib.parse.urlsplit(path)
print("-------------Got HTTP POST Data--------------")
self.headers['content-length']
content_len = int(self.headers['content-length'])
post_body = self.rfile.read(content_len)
# 根据HTTP请求,取出self中想要的字段信息,此处代码省略。比如取出api信息和json信息
#
self.send_response(200)
self.send_header("Content-type", "application/json;charset=utf-8")
self.end_headers()
respond_json = {0}
willSend = False
#willSend = True
#根据请求的api信息,指定response data
if(path == "/aa"):
willSend = True
respond_json = {'code':200,
'message':{'a_first':'address_a_post','a_decond':321,'a_third':'phone_a_post'},'aa_param':'POST'}
elif( path == "/bb"):
willSend = True
respond_json = {'code':200,
'message':{'b_first':'address_b_post','b_decond':321,'b_third':'phone_b_post'},'bb_param':'POST'}
else:
willSend=True
respond_json={'error':'url有误'}
print("NO Match api.")
if(willSend):
d2 = json.dumps(respond_json)
self.wfile.write(d2.encode("utf-8"))
print("HTTP response is sent.")
#self.wfile.close()
class MockHTTPThread(threading.Thread):
def __init__(self, threadID):
threading.Thread.__init__(self)
self.threadID = threadID
def run(self):
print("HTTP server start!")
http_server = HTTPServer((IP, HTTP_PORT), TestHTTPHandle)
http_server.serve_forever()
if __name__ == "__main__":
threadHTTP = MockHTTPThread(1)
threadHTTP.start()
print("Start HTTP Thread")
经过压力测试,700+ 的多请求并发,且并发量与返回值的数据大小线性相关