鉴于有些较早的帖子被翻出来,有些 tester 还在纠结这个方面的内容,于是我整理了下当时的解决的思路
http://testerhome.com/topics/30210#reply40
内容输出,写在自己的 blog 了,也方便自己做个记录:http://www.kikicyo.com/article/29/
博客的内容也搬过来帖子正文呗?跳来跳去看起来不方便。
from concurrent.futures.process import ProcessPoolExecutor
import os
import time
import pytest
import yaml
from meet.public import *
from meet.concurrent_startup import *
devices_list = ['Your device uid', 'Your device uid']
def runnerPool():
with open('kyb_caps.yaml', 'r', encoding="utf-8")as file:
data = yaml.load(file, Loader=yaml.FullLoader)
print(data)
devices_Pool = data
with ProcessPoolExecutor(len(devices_Pool)) as pool:
pool.map(runPytest, devices_Pool)
def runPytest(device):
print(f"cmdopt is {device}")
report = f"report-{device['deviceName']}".split(":", 1)[0]
report_dir = mkdir_dir(report)
now_time = time.strftime("%H-%M-%S", time.localtime())
print(f"pool run device is {device['deviceName']}")
pytest.main(["-s", "./TestCases/", f"--cmdopt={device}", "--alluredir", f"./{report_dir}/{now_time}/xml"])
time.sleep(1)
os.system(f"allure generate ./{report_dir}/{now_time}/xml -o ./{report_dir}/{now_time}/html")
if __name__ == '__main__':
"""
获取服务的端口
并发启动appium的服务
多设备启动执行用例
release端口
"""
ports = []
appium_start_sync()
runnerPool()
for i in range(len(devices_list)):
port_num = 4723 + 2 * i
ports.append(port_num)
print(ports)
for t in ports:
release_port(t)
import time
def mkdir(path):
"""创建目录函数"""
import os
path = path.strip() # 去除首位空格
path = path.rstrip("\\") # 去除尾部 \ 符号
is_exists = os.path.exists(path) # 判断路径是否存在
if not is_exists:
os.makedirs(path) # 不存在目录则创建
print(path + "创建成功")
return True
else:
print(path + "目录已存在") # 如果目录存在则不创建,并提示目录已存在
return False
def mkdir_dir(path):
"""创建当天相应的目录"""
now_date = time.strftime("%Y-%m-%d", time.localtime())
main_path = f"{path}"
global dir_path
dir_path = (main_path + '/' + now_date)
mkdir(dir_path)
return dir_path
def get_failed_screenshot(self, current_case_name):
"""获取失败截图函数"""
now_date = time.strftime("%Y-%m-%d", time.localtime())
now_time = time.strftime("%Y-%m-%d %H-%M-%S", time.localtime())
global dir_path
self.driver.get_screenshot_as_file(dir_path + "/" + now_time + " " + current_case_name + ".png")
import pytest
from appium import webdriver
def pytest_addoption(parser):
parser.addoption("--cmdopt", action="store", default="device", help="None")
@pytest.fixture(scope="session")
def cmdopt(request):
return request.config.getoption("--cmdopt")
@pytest.fixture
def connectDevice(cmdopt):
device = eval(cmdopt)
device_caps = {}
device_caps["platformVersion"] = device["platformVersion"]
device_caps["platformName"] = device['platformName']
device_caps["deviceName"] = device['deviceName']
device_caps["udid"] = device['udid']
device_caps["appPackage"] = device['appPackage']
device_caps["appActivity"] = device['appActivity']
device_caps["newCommandTimeout"] = device["newCommandTimeout"]
device_caps["noReset"] = True
device_caps["noSign"] = True
device_caps["unicodeKeyboard"] = True
device_caps["resetKeyboard"] = True
device_caps["systemPort"] = device["systemPort"]
remote = "http://127.0.0.1:" + str(device["port"]) + "/wd/hub"
driver = webdriver.Remote(remote, device_caps)
yield driver
driver.close_app()
driver.quit()
# coding=utf-8
import subprocess
import socket
from time import ctime
import multiprocessing
import os
from meet.public import *
def check_port(host, port):
"""
检测指定的端口是否被占用
:param host:
:param port:
:return:
"""
# 创建socket对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
s.shutdown(2)
except OSError as msg:
print('----------port %s is available! ----------' % port)
print(msg)
return True
else:
print('----------port %s already be in use !-----------' % port)
return False
def release_port(port):
"""
释放指定的端口
:param port:
:return:
"""
# 查找对应端口的pid
cmd_find = 'netstat -aon | findstr %s' % port
print(cmd_find)
# 返回命令执行后的结果
result = os.popen(cmd_find).read()
print(result)
if str(port) and 'LISTENING' in result:
# 获取端口对应的pid进程
i = result.index('LISTENING')
start = i + len('LISTENING') + 7
end = result.index('\n')
pid = result[start:end]
# 关闭被占用端口的pid
cmd_kill = 'taskkill -f -pid %s' % pid
print(cmd_kill)
os.popen(cmd_kill)
else:
print('----------port %s is available !-----------' % port)
def appium_start(host, port):
"""
启动appium服务
:param host:
:param port:
:return:
"""
bootstrap_port = str(port + 1)
# cmd = 'start /b appium -a ' + host + ' -p ' + str(port) + ' --bootstrap-port ' + str(bootstrap_port) # linux下
cmd = 'start node G:\\Appium\\resources\\app\\node_modules\\appium\\build\lib\\main.js -a ' + host + ' -p ' + str(
port) + ' --bootstrap-port ' + str(bootstrap_port) # win调试
print('%s at %s' % (cmd, ctime()))
mkdir("appium_log")
subprocess.Popen(cmd, shell=True, stdout=open('./appium_log/' + str(port) + '.log', 'a'), stderr=subprocess.STDOUT)
def start_appium_action(host, port):
"""
检测端口是否被占用,如果没有被占用则启动appium服务
:param host:
:param port:
:return:
"""
if check_port(host, port):
appium_start(host, port)
return True
else:
print("----------appium %s already start!----------" % port)
return True
def appium_start_sync(devices_list):
"""
并发启动appium服务
:return:
"""
appium_process = []
# 加载appium进程
for i in range(len(devices_list)):
host = '127.0.0.1'
port = 4723 + 2 * i
appium = multiprocessing.Process(target=start_appium_action, args=(host, port))
appium_process.append(appium)
# 启动appium服务
for appium in appium_process:
appium.start()
for appium in appium_process:
appium.join()
import allure
from selenium.webdriver.support.wait import WebDriverWait
from unittest import TestCase
import pytest
import time
from appium import webdriver
class TestMyConcurrent(object):
@allure.feature("group_Call")
@allure.story("search_Case")
def test_001(self, connectDevice):
"""
测试用例一
connectDevice = driver
:param connectDevice:
:return:
"""
time.sleep(5)
connectDevice.find_element_by_id("yiyayiyayo").click()
time.sleep(2)
print("已点击咿呀咿呀哟")
time.sleep(5)
@allure.feature("group_Call")
@allure.story("search_Case")
def test_002(self, connectDevice):
"""
测试用例二
:return:
"""
time.sleep(5)
connectDevice.find_element_by_id("sousuokuang").click()
time.sleep(2)
connectDevice.find_element_by_id("sousuokuang").send_keys("天气")
time.sleep(2)
connectDevice.find_element_by_id("sousuo_btn").click() # 点击搜索
print("已完成点击搜索")
time.sleep(5)
connectDevice.keyevent(4)
time.sleep(2)
connectDevice.keyevent(4)
print("已返回主页")
@allure.feature("group_Call")
@allure.story("search_Case")
def test003(self, connectDevice):
"""测试用例三"""
while True:
try:
connectDevice.find_element_by_xpath("//*[contains(@text,'更多')]").click()
break
except:
time.sleep(5)
print("点击完成更多")
time.sleep(5)
connectDevice.keyevent(4)
print("从更多页面返回主页")
time.sleep(4)
- platformName: Android
platformVersion: "10"
deviceName: 123456gf
appPackage: package_name
appActivity: package_home_name
udid: 123456gf
systemPort: 8208
newCommandTimeout: 6000
port: 4723
- platformName: Android
platformVersion: "11"
deviceName: sdakla9312
appPackage: package_name
appActivity: package_home_name
udid: sdakla9312
systemPort: 8209
newCommandTimeout: 6000
port: 4725
无论是单点执行还是并发执行,测试结果的验证都是测试用例必要的要素