前段时间大版本对账功能,项目过程踩了不少坑,和开发讨论结果,得出更好的测试方法,由于测试时间短简单做了后端,最近有时间把功能完善,记录下自己的想法和思路
1.对账系统场景复杂,涉及代扣、代付、退款等不同机构对账文件,对出差异需差错处理,通过正常的手段造不出所需场景
2.开发冒烟难度增加,无法覆盖测试提供的冒烟用例
3.bug 复现和回归难,通过正常跑数据改数据的手段,时间和产出不正比
4.后续换个人测试也需要对此块业务熟悉才能进行测试
5.大批量造数据
不 mock 返回值,mock 系统数据流和机构对账文件
import django
django.setup()
from celery.task import Task
from MockServer.settings import BASE_DIR
from mock_task.models import PaymentMockTask
from mock_data.recon.mockOrder import MockOrder
from mock_data.recon.mockReconFile import MockReconFile
from utils.OPMysql import OPMysql
from utils.easylogger import getEasyLogger
from utils.toolutils import ToolsUtils
import os
from concurrent.futures import ThreadPoolExecutor, wait
logger = getEasyLogger(targetName='MockTask')
class MockTask(Task):
"""
Mock任务
"""
name = 'mock-work-task'
def run(self, *args, **kwargs):
"""
异步任务
:param order_no:
:param env:
:return:
"""
kwargs['db'] = OPMysql(kwargs['mockEnv'])
kwargs['tools'] = ToolsUtils()
records = PaymentMockTask.objects.filter(task_no=kwargs['taskNo'])
record = records[0]
errorStatus = self._createOrderThreadPoolExecutor(kwargs['uploadPath'], kwargs['resPath'], kwargs['mockType'],
kwargs['db'], kwargs['tools']
)
if errorStatus:
record.mock_status = 2
record.save()
else:
record.mock_status = 3
record.save()
def _createOrderThreadPoolExecutor(self, uploadPath, resPath, mockType, db, tools):
"""
创建线程池
:return:
"""
allTask = []
errorStatus = True
with ThreadPoolExecutor(max_workers=5) as executor:
with open(os.path.join(BASE_DIR, "resource", uploadPath), "r", encoding="utf8") as fr:
with open(os.path.join(BASE_DIR, "resource", resPath), "a", encoding="utf8") as fw:
fw.seek(0)
fw.truncate()
self._preconditionMock(mockType, fw)
for line in fr:
oneTask = self._startMockTask(line, executor, mockType, fw, db, tools)
if oneTask.exception():
print("{}".format(oneTask.exception()))
errorStatus = False
break
allTask.append(oneTask)
wait(allTask)
db.release_conn()
return errorStatus
def _startMockTask(self, line, executor, mockType, fw, db, tools):
"""
开始mock任务
:return:
"""
jsonline = ToolsUtils.fileJsonFromt(line)
small_task = executor.submit(self._runMcckTask, jsonData=jsonline, mockType=mockType, fw=fw,
db=db, tools=tools)
return small_task
def _runMcckTask(self, jsonData, mockType, fw, db, tools):
"""
执行mock任务
:param orderDataList, mockType, fw:
:return:
"""
mockOrder = MockOrder()
mockReconFile = MockReconFile()
switcher = {"MOCK_UNI": mockOrder.mcokV2Order, "MOCK_V1_AGREE": mockOrder.mcokV2Order,
"MOCK_V1_PAY": mockOrder.mcokV2Order, "MOCK_REMIT": mockOrder.mockV1Order,
"MOCK_REFUND": mockOrder.mockRefundOrder, "MOCK_PAYREMIT": mockOrder.mockPayRemitOrder,
"MOCK_RECON_PAYREMIT_SHARELINK_FILE": mockReconFile.mcokShareLinkPayRemitFile}
mockres = switcher.get(mockType)(jsonData, db, tools)
fw.write(str(mockres))
fw.write("\n")
效果
主界面
创建 mock 任务
mock 文件格式(mock 不同业务流不同文件格式)
上传 mock 文件
下载 mock 结果(结果包含关键信息便于在业务系统里面找数据,文件格式和上传 mock 文件类似,都是 json)