哈哈,好久没上论坛了,没有开源,在上家公司做的小工具,就是一个 web 的在线编辑器。用 vue 也很容易实现的。
不好意思,最近比较忙,没看帖子。
我应该标注下当时的 mitmproxy 的版本的,但是现在也不记得了,抱歉,这个他们更新了,脚本需要调整下。
我下载的最新的版本 mitmproxy 7.0.2, python 3.8 用下面的代码是可以的。
#coding:utf-8
import json
from datetime import datetime, timezone
from mitmproxy import proxy, options
from mitmproxy import ctx
from mitmproxy.tools.web.master import WebMaster
def flow_to_har(flow):
'''
将flow转换成har格式数据
'''
def fromat_cookies(l):
return [{'name': i[0], 'value': i[1]} for i in l]
def name_value(obj):
return [{"name": k, "value": v} for k, v in obj.items()]
HAR = {}
HAR.update({
"log": {
"version": "1.2",
"creator": {
"name": "mitmproxy har_dump",
"version": "0.1",
"comment": "mitmproxy"
},
"entries": []
}
})
ssl_time = -1
connect_time = -1
if flow.server_conn and flow.server_conn:
connect_time = (flow.server_conn.timestamp_tcp_setup -
flow.server_conn.timestamp_start)
if flow.server_conn.timestamp_tls_setup is not None:
ssl_time = (flow.server_conn.timestamp_tls_setup -
flow.server_conn.timestamp_tcp_setup)
timings_raw = {
'send': flow.request.timestamp_end - flow.request.timestamp_start,
'receive': flow.response.timestamp_end - flow.response.timestamp_start,
'wait': flow.response.timestamp_start - flow.request.timestamp_end,
'connect': connect_time,
'ssl': ssl_time,
}
timings = {
k: int(1000 * v) if v != -1 else -1
for k, v in timings_raw.items()
}
full_time = sum(v for v in timings.values() if v > -1)
started_date_time = datetime.fromtimestamp(flow.request.timestamp_start, timezone.utc).isoformat()
response_body_size = len(flow.response.raw_content) if flow.response.raw_content else 0
response_body_decoded_size = len(flow.response.content) if flow.response.content else 0
response_body_compression = response_body_decoded_size - response_body_size
entry = {
"startedDateTime": started_date_time,
"time": full_time,
"request": {
"method": flow.request.method,
"url": flow.request.url,
"httpVersion": flow.request.http_version,
"cookies": fromat_cookies(flow.request.cookies.fields),
"headers": name_value(flow.request.headers),
"queryString": name_value(flow.request.query or {}),
"headersSize": len(str(flow.request.headers)),
"bodySize": len(flow.request.content),
},
"response": {
"status": flow.response.status_code,
"statusText": flow.response.reason,
"httpVersion": flow.response.http_version,
"cookies": fromat_cookies(flow.response.cookies.fields),
"headers": name_value(flow.response.headers),
"content": {
"size": response_body_size,
"compression": response_body_compression,
"mimeType": flow.response.headers.get('Content-Type', '')
},
"redirectURL": flow.response.headers.get('Location', ''),
"headersSize": len(str(flow.response.headers)),
"bodySize": response_body_size,
},
"cache": {},
"timings": timings,
}
entry["response"]["content"]["text"] = flow.response.get_text(strict=False)
if flow.request.method in ["POST", "PUT", "PATCH"]:
params = [
{"name": a, "value": b}
for a, b in flow.request.urlencoded_form.items(multi=True)
]
entry["request"]["postData"] = {
"mimeType": flow.request.headers.get("Content-Type", ""),
"text": flow.request.get_text(strict=False),
"params": params
}
if flow.server_conn.connected:
entry["serverIPAddress"] = str(flow.server_conn.ip_address[0])
HAR["log"]["entries"].append(entry)
return HAR
class Test:
def response(self, flow):
"""
在response事件中写处理逻辑
"""
msg = json.dumps(flow_to_har(flow))
ctx.log.info('flow转化har格式数据')
ctx.log.info(msg)
if __name__ == "__main__":
opts = options.Options(listen_host='127.0.0.1', listen_port=8080)
opts.add_option("body_size_limit", int, 0, "")
m = WebMaster(opts)
m.addons.add(Test())
try:
m.run()
except KeyboardInterrupt:
m.shutdown()
用 pytest 是支持的,目前用的是 3.0.1.2,没用 pytest 的脚本,所有脚本还是用的 yml。
目前线上没有任何性能问题,因为使用的不多,参考你的建议,结合场景思考下
中台大部分业务其实偏后台,周期调度和门户类的可能并发场景多点。之前组内要求每个子平台都要出一份测试报告,因为交付需要,但是测试的内容全是围绕加工的数据量上,都在数据的量上下功夫,最终都是去测试 hadoop 的底层能力。 觉得没测试到平台的应用层的东西。
对于数据中台的性能测试,大部分其实都在 hadoop 集群的性,但是这个一直疑惑有没有必要测试,还是用它自己的基准测试工具测试就行了
请教下数据库字段映射的逻辑大概是啥样的
1.并发不太懂你是指的什么,抓包的数据吗?这个是个抓包工具和 Fiddler、Charles 类似,客户端本地启动的
2.这个你是说第三阶段的吗?就是选择一个接口,把接口数据发送出去。我这边得接口编写是 web 化的服务,脚本放在服务器,可以本地启动 mimtproxy,抓包后选择需要的请求,通过 web 化工具提供的接口,发送到服务器上,然后生成需要的脚本,落库或者生成文件都行。
3.只负责抓包。 我们 web 化的服务能把接口的依赖数据自动参数化。
metersphere 怎么样,我们组里面也在调研使用,httprunner 脚本编写模式对没编写过自动化的还是难度大些。
是的,我是直接调用 httprunner 执行脚本,把 httprunner 的日志通过 websocket 发送给前端。