• mitmproxy 生成 httprunner 脚本 at 2021年09月09日

    不好意思,最近比较忙,没看帖子。

    我应该标注下当时的 mitmproxy 的版本的,但是现在也不记得了,抱歉,这个他们更新了,脚本需要调整下。😅

    我下载的最新的版本 mitmproxy 7.0.2, python 3.8 用下面的代码是可以的。

    #coding:utf-8
    import json
    from datetime import datetime, timezone
    
    from mitmproxy import proxy, options
    from mitmproxy import ctx
    from mitmproxy.tools.web.master import WebMaster
    
    
    def flow_to_har(flow):
        '''
        将flow转换成har格式数据
        '''
    
        def fromat_cookies(l):
            return [{'name': i[0], 'value': i[1]} for i in l]
    
        def name_value(obj):
            return [{"name": k, "value": v} for k, v in obj.items()]
    
        HAR = {}
        HAR.update({
            "log": {
                "version": "1.2",
                "creator": {
                    "name": "mitmproxy har_dump",
                    "version": "0.1",
                    "comment": "mitmproxy"
                },
                "entries": []
            }
        })
    
        ssl_time = -1
        connect_time = -1
    
        if flow.server_conn and flow.server_conn:
            connect_time = (flow.server_conn.timestamp_tcp_setup -
                            flow.server_conn.timestamp_start)
    
            if flow.server_conn.timestamp_tls_setup is not None:
                ssl_time = (flow.server_conn.timestamp_tls_setup -
                            flow.server_conn.timestamp_tcp_setup)
    
        timings_raw = {
            'send': flow.request.timestamp_end - flow.request.timestamp_start,
            'receive': flow.response.timestamp_end - flow.response.timestamp_start,
            'wait': flow.response.timestamp_start - flow.request.timestamp_end,
            'connect': connect_time,
            'ssl': ssl_time,
        }
    
        timings = {
            k: int(1000 * v) if v != -1 else -1
            for k, v in timings_raw.items()
        }
    
        full_time = sum(v for v in timings.values() if v > -1)
    
        started_date_time = datetime.fromtimestamp(flow.request.timestamp_start, timezone.utc).isoformat()
    
        response_body_size = len(flow.response.raw_content) if flow.response.raw_content else 0
        response_body_decoded_size = len(flow.response.content) if flow.response.content else 0
        response_body_compression = response_body_decoded_size - response_body_size
    
        entry = {
            "startedDateTime": started_date_time,
            "time": full_time,
            "request": {
                "method": flow.request.method,
                "url": flow.request.url,
                "httpVersion": flow.request.http_version,
                "cookies": fromat_cookies(flow.request.cookies.fields),
                "headers": name_value(flow.request.headers),
                "queryString": name_value(flow.request.query or {}),
                "headersSize": len(str(flow.request.headers)),
                "bodySize": len(flow.request.content),
            },
            "response": {
                "status": flow.response.status_code,
                "statusText": flow.response.reason,
                "httpVersion": flow.response.http_version,
                "cookies": fromat_cookies(flow.response.cookies.fields),
                "headers": name_value(flow.response.headers),
                "content": {
                    "size": response_body_size,
                    "compression": response_body_compression,
                    "mimeType": flow.response.headers.get('Content-Type', '')
                },
                "redirectURL": flow.response.headers.get('Location', ''),
                "headersSize": len(str(flow.response.headers)),
                "bodySize": response_body_size,
            },
            "cache": {},
            "timings": timings,
        }
    
        entry["response"]["content"]["text"] = flow.response.get_text(strict=False)
    
        if flow.request.method in ["POST", "PUT", "PATCH"]:
            params = [
                {"name": a, "value": b}
                for a, b in flow.request.urlencoded_form.items(multi=True)
            ]
            entry["request"]["postData"] = {
                "mimeType": flow.request.headers.get("Content-Type", ""),
                "text": flow.request.get_text(strict=False),
                "params": params
            }
    
        if flow.server_conn.connected:
            entry["serverIPAddress"] = str(flow.server_conn.ip_address[0])
    
        HAR["log"]["entries"].append(entry)
    
        return HAR
    
    
    class Test:
        def response(self, flow):
            """
            在response事件中写处理逻辑
            """
            msg = json.dumps(flow_to_har(flow))
            ctx.log.info('flow转化har格式数据')
            ctx.log.info(msg)
    
    
    if __name__ == "__main__":
    
        opts = options.Options(listen_host='127.0.0.1', listen_port=8080)
        opts.add_option("body_size_limit", int, 0, "")
    
        m = WebMaster(opts)
    
        m.addons.add(Test())
    
        try:
            m.run()
        except KeyboardInterrupt:
            m.shutdown()
    
  • 自动化落地过程记录 at 2021年05月17日

    用 pytest 是支持的,目前用的是 3.0.1.2,没用 pytest 的脚本,所有脚本还是用的 yml。

  • 目前线上没有任何性能问题,因为使用的不多,参考你的建议,结合场景思考下🙌

  • 中台大部分业务其实偏后台,周期调度和门户类的可能并发场景多点。之前组内要求每个子平台都要出一份测试报告,因为交付需要,但是测试的内容全是围绕加工的数据量上,都在数据的量上下功夫,最终都是去测试 hadoop 的底层能力。 觉得没测试到平台的应用层的东西。

  • 对于数据中台的性能测试,大部分其实都在 hadoop 集群的性,但是这个一直疑惑有没有必要测试,还是用它自己的基准测试工具测试就行了

  • mitmproxy 生成 httprunner 脚本 at 2021年04月13日

    请教下数据库字段映射的逻辑大概是啥样的

  • mitmproxy 生成 httprunner 脚本 at 2021年04月13日

    1.并发不太懂你是指的什么,抓包的数据吗?这个是个抓包工具和 Fiddler、Charles 类似,客户端本地启动的
    2.这个你是说第三阶段的吗?就是选择一个接口,把接口数据发送出去。我这边得接口编写是 web 化的服务,脚本放在服务器,可以本地启动 mimtproxy,抓包后选择需要的请求,通过 web 化工具提供的接口,发送到服务器上,然后生成需要的脚本,落库或者生成文件都行。
    3.只负责抓包。 我们 web 化的服务能把接口的依赖数据自动参数化。

  • mitmproxy 生成 httprunner 脚本 at 2021年04月13日

    metersphere 怎么样,我们组里面也在调研使用,httprunner 脚本编写模式对没编写过自动化的还是难度大些。

  • 自动化落地过程记录 at 2021年03月28日

    是的,我是直接调用 httprunner 执行脚本,把 httprunner 的日志通过 websocket 发送给前端。

  • 是组里面去推动开发去关注这些😆 ,之前都是用 pt-query-disgest,mysqldumpslow 工具去看 比较麻烦