数据测试 流量回放系统的设计与实现 -- 流量录制模块

81—1 · 2020年11月10日 · 最后由 李晓 回复于 2022年11月25日 · 7136 次阅读

背景

    互联网应用提供服务,与用户的操作产生交互,其行为产生的流量蕴含着许多可挖掘的价值,如性能分析、数据对比、行为分析等等。用户流量就如同是现实世界的石油,由其中我们可以提炼的演化的用法非常多。
    本系列从软件测试角度对测试人员的流量展开了应用设计,形成一个集流量录制、流量回放、实时对比、流量用例编辑、用户管理于一体的流量管理平台,以提高测试工作效率,多方面的验证软件产品质量。

    本系统采用前后端分离架构,由于系统架构复杂,本文仅就 流量录制 模块的设计与实现,做分析讲解,其它模块将在后续文章中拆分讲述。

架构设计

流量录制功能数据流转示意图

实现效果

录制任务创建,代理并监控日志:

根据流量生成用例:

功能设计

流量录制模块主要包含的功能是:

  • 任务信息列表
  • 创建录制任务
  • 修改任务信息
  • 执行任务
  • 实时查看流量日志
  • 查看详情信息
  • 自定义生成流量用例并存储
  • 用例列表
  • 用例详情、编辑

    系统所有的列表和详情数据,都离不开 “增、删、改、查” 4 个接口,系统的删除目前都是逻辑删除,通过置状态值来控制显示,以备后续的数据回复和问题追踪。
    其中的比较特殊的是 实时流量日志展示 功能,由于 http 接口的异步特性,很难保证日志数据的实时性和序列化。因此这里采用的 websocket 长连接为主,http 接口为辅的双重保证。也是得益 mitmproxy 项目的开源,我结合系统 api,对其接口方法进行了二次自定义开发,以适应本系统的数据需求,保持数据的准确性。

问题与解决方案

一、多任务不同的配置需求如何隔离?

解决方案:
在系统首次启动前,需先执行已经封装的 DockerFile,创建我们所需的镜像。系统通过 docker service,根据任务配置的信息(host、代理等),创建 docker 容器。实际此时的 docker 镜像,已经封装了容器运行时所需的工具和基本的配置。每个容器创建挂载在各自独立的端口下,以达到隔离。

二、如何获取实时流量日志?

解决方案:
容器中的 mitmproxy 有多种启动方式,各有不同的使用场景,其中 mitmweb 方式启动的代理会有一个 socket 服务,通过对该接口的自定义开发,让其支持我们的跨域连接,并解析其报文信息,再通过前端渲染,即可达到实时日志的展现功能。

三、日志太多界面卡死怎么办?

解决方案:
日志内容大小最好根据实际的使用场景做限制,同时前端采用虚拟列表,只渲染在可视区域及少量即将可视的 DOM 数据,根据数据长度计算容器的高度。可以自己写组件去创建、销毁 Dom,本系统中使用的是结合 react-virtualized 和 antd 的 virtualizedtableforantd4 组件,这样做到日志展示模块的稳定与性能优化。

四、流量生成用例时,数据量很大怎么处理?

解决方案:
对于大量数据的录制用例,在生成用例时,会根据勾选的流量 ID,通过请求传值给自定义的 dump 接口,生成可以用于回放的 dump 文件并保存。同时为了便于前台展示,和后续的编辑,流量需要入库到用例表。数据量超过限制大小时,后台会舍弃部分冗余字段值,切片入库,保证系统的稳定性。

五、https 流量如何处理?

解决方案:
https 流量需要通过安装 mitmproxy 的中间人证书,才能解析获取。这里有个注意点,安卓移动设备 6.0 后,无法通过伪造证书去解析 https。简单的做法是,拿到服务器证书,用 openssl 生成 pem 证书。在系统中针对项目进行 pem 证书的配置,当该项目的代理启动时,以此证书去解析指定域名的 https 请求,这样就是解决所有来源请求的 https 数据了。

核心代码实现

本功能模块的核心部分是围绕着容器管理脚本,进行数据准备、配置管理和容器管理。

前端虚拟表组件设计

import React, { useRef, useEffect, useMemo } from 'react';
import { Table } from 'antd';
import { useVT } from 'virtualizedtableforantd4';

const MyRow = React.forwardRef((props, ref) => {
  const { children, ...rest } = props;
  return <tr {...rest} ref={ref}>{children}</tr>;
});
function CustomRowsHooks(props) {
  const columns = useRef(props.columns);
  const [VT, setVT] = useVT(() => ({ scroll: { y: 600 }, debug: true }));
  useMemo(() => setVT({ body: { row: MyRow } }), [setVT]);
  return (
    <Table
      {...props}
      components={VT}
      scroll={{ y: 600 }}
      dataSource={props.dataSource}
      columns={columns.current}
    />
  );
}

export default CustomRowsHooks;

服务端核心脚本

完整脚本代码如下:

#-*-coding:utf-8-*-
__author__="orion-c"

def buildContainer(taskInfo, mountPort, wsMountPort, projectDir):
    for i in range(5):
        if len(os.listdir(projectDir)) > 0:
            break
        time.sleep(1)
    if sys.platform.startswith('win'):
        projectDir = "/taskFile/"+projectDir.split('taskFile\\')[1]
    formatHost = {}
    try:
        if taskInfo:
            if taskInfo['hosts']:
                hosts = taskInfo['hosts'].split('\n')
                for host in hosts:
                    host = ' '.join(host.split())
                    host = host.split(' ')
                    if len(host) == 2:
                        formatHost[host[1]] = host[0]
            try:
                container = docker_client.containers.get(str(mountPort))
                if container.status == 'exited':
                    container.remove()
                    logger.info('移除容器成功:{}'.format(str(mountPort)))
            except:
                logger.info('can build')
            docker_client.containers.run(
                name=mountPort,
                image='myproxy:0.0.13',
                command='/bin/bash -c "sh /root/script/start_sync.sh"',
                volumes={
                    projectDir: {'bind': '/root/script', 'mode': 'rw'}
                },
                ports={'8080': mountPort, '8081': wsMountPort},
                extra_hosts=formatHost,
                stderr=True,
                detach=True
            )
            logger.info('任务 {} 创建容器 {} 成功,ws端口 {}'.format(taskInfo['id'], mountPort, wsMountPort))
            return str(mountPort)
    except Exception as e:
        logger.error(e)
        logger.error('创建容器:{} 失败'.format(mountPort))
        logger.error('请检查你的docker服务')

def copyDumpFile(projectDir, content):
    dumpFilePath = content['suiteInfo']['dumpFile']
    dumpFilePath = '../../' + dumpFilePath if os.path.isfile('../../' + dumpFilePath) else dumpFilePath
    replayDumpFile = projectDir + '/replayDumpFile'
    shutil.copyfile(dumpFilePath, replayDumpFile)

def makeSyncReplayShell(projectDir, mountPort, content):
    shellPath = '../../templte/start_sync.sh' if os.path.isfile('../../templte/start_sync.sh') else 'templte/start_sync.sh'
    with open(shellPath, 'r', encoding='utf-8') as shellFile:
        shellTemplte = shellFile.read()
        cmd = "mitmdump -nC /root/script/replayDumpFile -s /root/script/getReplayResponse.py --ssl-insecure --set ssl_version_client=all --set ssl_version_server=all"
        if content['proxySetting']:
            cmd = cmd + " --set http2=false --set mode=upstream:{proxySetting}".format(proxySetting=content['proxySetting'])
        if content['cerPem'] and content['domain']:
            cmd = cmd + " --certs {domain}=/root/script/cer.pem".format(domain= content['domain'])
        shellConfig = shellTemplte.format(cmd= cmd)
        with open(projectDir + '/start_sync.sh', 'w', encoding='utf-8') as file:
            file.write(shellConfig)

def makePem(content,projectDir):
    if content['cerPem'] and content['domain']:
        with open(projectDir + '/cer.pem', 'w', encoding='utf-8') as file:
            file.write(content['cerPem'])

def makeMiddlareScript(content,projectDir):
    responsePath = '../../templte/getReplayResponse.py' if os.path.isfile('../../templte/getReplayResponse.py') else 'templte/getReplayResponse.py'
    with open(responsePath, 'r', encoding='utf-8') as middlareFile:
        middlareTemplte = middlareFile.read()
        middlareConfig = middlareTemplte.format(appHost= app.config['HOST'],taskId=content['id'])
        with open(projectDir + '/getReplayResponse.py', 'w', encoding='utf-8') as file:
            file.write(middlareConfig)

def makeConfig(projectDir, mountPort, content):
    copyDumpFile(projectDir,content)
    makeSyncReplayShell(projectDir, mountPort,content)
    makePem(content, projectDir)
    makeMiddlareScript(content, projectDir)

@manager.option('-i','--taskId',dest='taskId',default='')
def runScript(taskId):
    now = datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
    taskRootPath = encrypt_name(now)
    content = getTaskInfo(taskId)
    for i in range(2):
        if i == 0:
            proxyMountPort = freeport.get()
        else:
            wsMountPort = freeport.get()
    projectDir = createProjectDir(taskRootPath)
    makeConfig(projectDir, proxyMountPort, content)
    setTaskStatus(taskId, 2, taskRootPath)
    containName = buildContainer(content, proxyMountPort, wsMountPort, projectDir)
    if containName:
        setTaskStatus(taskId, 3, taskRootPath, containName, wsMountPort)
    else:
        setTaskStatus(taskId, 6)
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
共收到 9 条回复 时间 点赞

期待后续文章

多谢关注,敬请期待

这个有实际应用场景么,与 fiddler,wireshark 抓包工具对比有什么优点呢?

cool 回复

实际可以理解为一个深度定制的云代理工具,平台化的运行,可将流量信息归档复用。还有个实时对比功能,可以实时对比不同版本,同样操作的返回信息,这部分已经开发,还没写帖子。

仅楼主可见

想请教下楼主,你这个是相当于单独部署一个代理服务器,测试人员同绑定该代理,产生实时数据,然后将这些数据推送到前端展示?
前面创建任务是开启代理服务吗?

李晓 回复

是的,理解正确。核心点将通过这个代理的流量转发 + 对比。

81—1 回复

好的,这套方案目前还在使用吗,在项目中有实际落地收益吗,我也最近也在研究接口录制上传这块,这个 mitmproxy 中间人代理部署落地确实比较简单,快捷,就是不知道实际的用处大不大

请教下楼主,mitmweb 的 socket 传输有相关文档吗?数据存档到库是定时任务吗?感谢

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册