Mac mini:10.13.4
Appium:1.7.2
Python:python3.6
链接:https://testerhome.com/topics/8375
https://github.com/Lemonzhulixin/UItest.git
import os
import random
import socket
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
class myserver(object):
def isOpen(self,ip, port): # 判断端口是否被占用
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((ip, int(port)))
s.shutdown(2) # shutdown参数表示后续可否读写
print('%d is used' % port)
return True
except Exception:
print('%d is available' % port)
return False
def getport(self): # 获得端口号
port = random.randint(4723, 4800)
# 判断端口是否被占用
while self.isOpen('127.0.0.1', port):
port = random.randint(4723, 4800)
return port
def run(self,port):
"""启动appium服务
:return port_list"""
print('start appium service')
# bport = aport + 1 #for android
now_time = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))
cmd_appium = 'appium -p ' + str(port) + " --session-override"
#for android
#cmd_appium = 'appium -p ' + str(aport) + ' --bootstrap-port ' + str(bport) + ' --session-override'
try:
# 启动appium服务
appiumlog = open(now_time + '_log.txt', 'w')
subprocess.Popen(cmd_appium, shell=True, stdout=appiumlog)
except Exception as msg:
print('error message:', msg)
raise
executor = ThreadPoolExecutor(6)#分配几个线程池
ports = list()
def create_pools(self,device_list_length):#启动多个appium服务
for i in range(device_list_length):
port = self.getport()
self.ports.append(port)
self.executor.submit(self.run, port)
return ('running')
def kill_appium(self):# 关闭appium 服务
cmd_kill = 'pkill node'
os.system(cmd_kill)
print('close appium service')
from appium import webdriver
import time
from iOS.start_appium import myserver
def iOS_driver(devicename, wdaport,port):
desired_caps = {
'platformName': 'iOS',
'platformVersion': '',
'deviceName': devicename,
'bundleId': 'com.xxxx.xxxx',
'app': '',
'noReset': True,
'automationName': 'XCUITest',
'udid': 'auto',
'xcodeOrgId': 'xxxxx',
'xcodeSigningId': 'iPhone Developer',
'autoLaunch': True,
'wdaLocalPort': wdaport
}
remote_url = 'http://localhost:' + str(port) + '/wd/hub'
time.sleep(5)
drivers = webdriver.Remote(remote_url, desired_caps)
return drivers
device_list = [('i62078', 8001),('6s2050', 8005)]
myserver().create_pools(len(device_list))
port_list = myserver().ports
time.sleep(5)
executor = ThreadPoolExecutor(6) #分配几个线程池
for i in range(len(device_list)):
dev = device_list[i][0]
wdaport = device_list[i][1]
port = port_list[i]
print(dev, wdaport, port)
driver = executor.submit(iOS_driver, dev, wdaport, port)
备注:若在同一台 pc 上跑,相应 appium 分配端口段修改下,eg:4801-4820
import os
from appium import webdriver
import time
from Android.start_appium import myserver
#定义返回路径(绝对路径)
PATH = lambda p: os.path.abspath(
os.path.join(os.path.dirname(__file__), p)
)
def Android_driver(dev,port):
desired_caps = {
'platformName': 'Android',
'platformVersion': '',
'deviceName': 'xxxxt',
'udid':dev,
'appPackage': "com.xxx.xxxxx",
'appActivity': "com.xxxx.xxxx.app.SplashActivity",
# 'app': ' ',
'unicodeKeyboard': True,
'resetKeyboard': True,
'automationName': 'Appium',
'noR
remote_url = 'http://localhost:' + str(port) + '/wd/hub'
time.sleep(5)
drivers = webdriver.Remote(remote_url, desired_caps)
return drivers
def get_devices():
dev_list = []
rt = os.popen('adb devices').readlines() # os.popen()执行系统命令并返回执行后的结果
n = len(rt) - 2
print("当前已连接待测手机数为:" + str(n))
for i in range(n):
nPos = rt[i + 1].index("\t")
dev = rt[i + 1][:nPos]
dev_list.append(dev)
return dev_list
device_list = get_devices()
myserver().create_pools(len(device_list))
aport_list = myserver().aports
time.sleep(5)
executor = ThreadPoolExecutor(6)
for i in range(len(device_list)):
dev = device_list[i]
port = aport_list[i]
print(dev, port)
driver = executor.submit(Android_driver, dev, port)
import os
import time
import logging
import logging.config
from iOS.script_params import driver
def mkdir(path): #自定义的创建文件夹方法
# 去除首位空格
path = path.strip()
# 去除尾部 \ 符号
path = path.rstrip('\\')
path = path.rstrip('/')
# 判断路径是否存在
if os.path.exists(path):
# 如果目录存在则不创建,并提示目录已存在
logging.debug(u'%s目录已存在', path)
else:
# 如果不存在则创建目录
os.makedirs(path)
logging.info(u'%s创建成功', path)
return True
return False
def test_init():
#测试初始化
print('Test init begin!!!')
local_time = time.strftime('%Y%m%d%H%M%S', time.localtime())
path_list = ['./iOS/Report/', local_time, '/']
capture_list = path_list + ['screenshots/']
log_list = path_list + ['logs/']
report_list = path_list + ['reports/']
capture_dir = ''.join(capture_list)
log_dir = ''.join(log_list)
report_dir = ''.join(report_list)
mkdir(capture_dir)
mkdir(log_dir)
mkdir(report_dir)
return capture_dir, log_dir, report_dir
def logger_init():
#logger初始化方法
log_config = {
'version': 1,
'formatters': {
'simple': {
'format': u'%(asctime)s-%(levelname)s: %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'simple'
},
'file': {
'class': 'logging.FileHandler',
'filename': path_lists[1] + 'logging.log',
'level': 'DEBUG',
'formatter': 'simple',
'encoding': 'utf-8'
},
},
'loggers': {
'root': {
'handlers': ['console'],
'level': 'DEBUG',
},
'simple': {
'handlers': ['console', 'file'],
'level': 'INFO',
}
}
}
logging.config.dictConfig(log_config)
loggers = logging.getLogger('simple')
return loggers
def capture_screen(fun, path):#截图
if path.endswith('/') or path.endswith('\\'):
# 考虑到截图操作可能很多,为了效率,所以使用列表拼接字符串
mkdir(path)
local_time = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime())
name_list = [local_time, fun, '.png']
capture_name = '_'.join(name_list)
path_name = path + capture_name
driver.get_screenshot_as_file(path_name)
logger.info(u'保存截图%s', capture_name)
else:
logger.error(u'截图路径请使用"\\"或者"/"结尾')
return False
return True
def get_size():
#获取屏幕分辨率
rect = driver.get_window_size()
return rect['width'], rect['height']
def swipe_by_ratio(start_x, start_y, direction, ratio, duration=None):
"""
按照屏幕比例的滑动.
:param start_x: 起始横坐标
:param start_y: 起始纵坐标
:param direction: 滑动方向,只支持'up'、'down'、'left'、'right'四种方向参数
:param ratio: 滑动距离与屏幕的比例,范围0到1
:param duration: 滑动时间,单位ms
:return:
"""
direction_list = ['up', 'down', 'left', 'right']
if direction not in direction_list:
logger.error(u'滑动方向%s不支持', direction)
width, height = get_size()
def swipe_up():
#上滑
end_y = start_y - ratio * height
if end_y < 0:
logger.warning(u'上滑距离过大')
return False
else:
driver.swipe(start_x, start_y, start_x, end_y, duration)
return True
def swipe_down():
#下滑
end_y = start_y + ratio * height
if end_y > height:
logger.warning(u'下滑距离过大')
return False
else:
driver.swipe(start_x, start_y, start_x, end_y, duration)
return True
def swipe_left():
#左滑
end_x = start_x - ratio * width
if end_x < 0:
logger.warning(u'左滑距离过大')
return False
else:
driver.swipe(start_x, start_y, end_x, start_y, duration)
return True
def swipe_right():
#右滑
end_x = start_x + ratio * width
if end_x > width:
logger.warning(u'右滑距离过大')
return False
else:
driver.swipe(start_x, start_y, end_x, start_y, duration)
return True
swipe_dict = {'up': swipe_up, 'down': swipe_down, 'left': swipe_left,
'right': swipe_right}
return swipe_dict[direction]()
path_lists = test_init()
logger = logger_init()
import os
import time
import unittest
from iOS import script_ultils as sc, HTMLTestRunner
from iOS.start_appium import myserver
def test_run():
sc_path = os.path.join(os.getcwd(), "xxxx") #用例所在路径
suite = unittest.TestLoader().discover(sc_path, pattern="*.py", top_level_dir=None)
now_time = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))
report_path = sc.path_lists[2]
filename = report_path + now_time + ".html"
fp = open(filename, 'wb+')
runner = HTMLTestRunner.HTMLTestRunner(
stream=fp,
title='xxxx UI 测试结果',
description='详细测试报告'
)
# for i in range(10):
# sc.logger.info('第 %d 次测试开始', i)
runner.run(suite)
fp.close()
if __name__ == '__main__':
print('start test')
test_run()
print('finish test')