相信很多做 Android 测试朋友都会用到 uiautomator 这款由 google 提供的 UI 自动化测试工具。他提供了丰富的 api 让使用者方便的定位桌面元素,并可以在各个应用之间灵活的切换。他的好处就不赘述。但是在实际使用过程中,每次都是运行开一个窗口,然后抓日志再开一个窗口,运行完成还需要手动把运行截图弄出来等等,过程很繁琐。作为一名自动化测试,必须解放双手让脚本来执行这些繁琐的操作。基于以上考虑自定义了一个框架,来代替手动操作。功能如下:
考虑到运行时间可能比较长,收到的日志文件会比较大,所以该框架设计在 pc 端运行。这里使用 python 来实现。框架主要由以下几部分构成:
首先 uiautomator`有一套特有日志输出,通过解析该日志可以获取到很多有用的信息。因此需要在 uiautomator 中定义输出的信息规则,以便最后结果展示。这里可以根据 uiautomator 中的 status code 来制定,比如按如下规则:
private int INSTRUMENTATION_STATUS_CODE_OUTPUT = 15; #这个用来记录输出测试过程中的输出信息,比如当前网速等
private int INSTRUMENTATION_STATUS_CODE_STEP = 16; #这个用来记录测试步骤
private int INSTRUMENTATION_STATUS_CODE_TITLE = 20; #这个用来记录测试的中文测试名
public static final String path = "/data/local/tmp/test/"; #这里用来存储uiautomator中的截图
然后定义一个日志相关文件,定义日志等级,日志路径已经一些常量
log.py
#coding=utf-8
import os
import re
import logging
#logging.basicConfig(level=logging.DEBUG)
"""logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename='debug.log',
filemode='w')"""
#logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
CAPTURE_MEMINFO_SLEEP_GAP = 30
def _format(message, level, tag):
return '\n'.join(['[%d][%s]: %s'%(level, tag, msg) for msg in message.splitlines()])
def debug(message, tag="DEBUG"):
logging.debug(_format(message, logging.DEBUG, tag))
def warning(message, tag="WARNING"):
logging.warning(_format(message, logging.WARNING, tag))
def info(message, tag="INFO"):
logging.info(_format(message, logging.INFO, tag))
def error(message, tag="ERROR"):
logging.error(_format(message, logging.ERROR, tag))
def log_directory():
return "."
def report_directory():
return "."
def procrank():
return 'procrank.log'
def dumpsys_meminfo():
return 'dumpsys_meminfo.log'
def meminfo():
return 'proc_meminfo.log'
def kernel():
return 'kmsg.log'
def logcat():
return 'logcat_main.log'
def uiautomator():
return 'uiautomator.log'
def top():
return 'top.log'
def save_pic_path():
return '/data/local/tmp/test'
如果设备是 usb 线连接,那比较简单,只要判断连接是否存在即可。 如果是网络连接,需要有一个连接的过程,这里以网络为主。
connect.py
#coding=utf-8
import re
import time
import subprocess
import log
class AdbException(Exception):
def __init__(self, message):
Exception.__init__(self, message+'\n')
RETRY_CONNECTION_TIMES = 5
RETRY_CONNECTION_BETWEEN = 10
class ADB(object):
def __init__(self, id):
self.id = id.rstrip()
log.debug('target devices: %s' %self.id)
self.adbd = 'adb'
#check adb is installed
try:
subprocess.Popen( [ self.adbd, 'devices' ], stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read(1)
self.adb_command = [self.adbd]
except:
raise AdbException("Error: is `adb` installed ??")
self.__isNetworkConnection()
self.adb_command = [ self.adbd, '-s', self.id ]
self.adbsh_command = [ self.adbd, '-s', self.id, 'shell' ]
self.retry_connection()
def retry_connection(self):
while not self._connection():
print 'Retry Connection'
time.sleep(RETRY_CONNECTION_BETWEEN)
def _connection(self):
devices = self.__adb('devices')
if not str(devices).__contains__(self.id):
# self.adb('disconnect')
r = self.__adb('connect %s' %self.id.split(":")[0] if ":" in self.id else self.id)
if str(r).__contains__('unable to connect to'):
print 'unable to connect to %s' %self.id
return False
time.sleep(1)
r = self.adb('root')
log.debug('root devices:\n %s' %r)
if 'adbd is already running as root' not in r:
time.sleep(2)
self.__adb('connect %s' %self.id.split(":")[0] if ":" in self.id else self.id)
time.sleep(1)
self.adb('remount')
log.debug('remount devices:\n %s' %r)
return True
def __adb(self, command):
if not isinstance(command, list): command = command.split()
log.debug(str([self.adbd] + command))
return subprocess.Popen([self.adbd]+command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0]
def adb(self, command):
""" run `adb -s serial` command """
if not isinstance(command, list): command = command.split()
log.debug(str(self.adb_command + command))
return subprocess.Popen(self.adb_command + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0]
def adbshell(self, command):
""" run `adb -s serial shell` command """
if not isinstance(command, list): command = command.split()
log.debug(str(self.adbsh_command + command))
return subprocess.Popen(self.adbsh_command + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0]
def popen(self, cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
"""run adb shell command as nohup, return subprocess.Popen instance"""
if not isinstance(cmd, list): cmd = [cmd]
log.debug("popen command: "+str(self.adbsh_command+cmd))
return subprocess.Popen(self.adbsh_command+cmd, stdout=stdout,stderr=stderr)
def screenshot(self, filename, path="/data/local/tmp"):
self.adbshell("screencap -p %s" %os.path.join(path,filename, ".png"))
def push(self, jar, path="/data/local/tmp"):
#95 KB/s (6332 bytes in 0.064s)
r = self.adb("push %s %s/" %(jar, path))
pflag = False
for _retry_times in range(1,6):
if re.compile(r'\d+ KB/s \(\d+ bytes in .*s\)').match(r):
pflag = True
break;
print r
self.retry_connection()
r = self.adb("push %s %s/" %(jar, path))
if not pflag: raise AdbException(r)
def install(self, apkname):
#2685 KB/s (2433480 bytes in 0.884s)
# pkg: /data/local/tmp/Settings.apk
#Success
r = self.adb("install -r %s" %apkname)
pflag = False
for _retry_times in range(1,6):
if str(r).__contains__('Success'):
pflag = True
break;
print r
self.retry_connection()
r = self.adb("push %s %s/" %(jar, path))
if not pflag: raise AdbException(r)
上面代码主要提供了 adb 相关的一些功能,通过 adb 实例可以调用。
这个模块定义了两个类,TestCase 定义 case 需要的一些基本信息,比如 case 名字,jar 包,测试结果,步骤,输出等等; 这里需要定义一个 case 管理配置文件,TestSuite 可以通过读取这个配置文件来加载用例。
配置文件的格式命名为:jar.casename
; 例如:smoke.com.test.smoke.Calculator#testAdd
smoke 为编译出来的 jar 报名,后面是被测方法名,跟运行 uiautomator 的时候给的-c 参数一样。换行来配置多个。
case.py
#coding=utf-8
import os
class TestCase(object):
def __init__(self, jar, casename):
self.jar = jar+".jar"
self.name = casename
self.id = ""
self.title = ""
self.steps = []
self.result = None
self.output = []
self.errorinfo = []
self.runtime = ""
self.descriptive = ""
self.logDirectory = None
def uicommand(self):
"""return a uiautomator string command """
uicmd = self.jar
uicmd = uicmd + " " + "-c " + self.name
return uicmd
class TestSuite(object):
def __init__(self):
self.tests = []
def __loadTestFromName(self, name):
""" add single TestCase instance """
for each in name.split("|"):
if each != "":
case = each.split(".", 1)
test = TestCase(case[0], case[1])
self.tests.append(test)
def __loadTestFromFile(self, filename):
""" add TestCase list for all case in given file """
with open(filename, "r+") as fp:
for line in fp:
line = line.strip()
if line != "" and not line.startswith("#"):
self.__loadTestFromName(line)
def addTestCase(self, caselist):
if os.path.isfile(caselist):
self.__loadTestFromFile(caselist)
else:
self.__loadTestFromName(caselist)
def _convertCastList(self, casestr):
"""return a tuple contain jar, and castlist"""
castlist = []
if casestr == "":
raise Exception("no case found")
names = casestr.split(",")
for name in names:
castlist.append(name.split(".", 1)[1])
return names[0].split(".", 1)[0], castlist
这部分为运行的核心部分,创建日志路径,运行 case,记录日志,获取截图,结果分析都在这里处理; 直接上代码
core.py
#coding=utf-8
import os
import re
import time
import threading
import subprocess
from optparse import OptionParser
import log
from case import TestCase
from case import TestSuite
from connect import ADB, AdbException
def hook(c):
def deco(f):
setattr(c, f.__name__, f)
return f
return deco
LOGCAT_RUNTEST = [ ('logcat -c;logcat -v threadtime', log.logcat()),
('cat /proc/kmsg', log.kernel()),]
class TextRunner(object):
def __init__(self, a, option):
self.a = a
self.option = option
self.test_number = 0
self.report_dir = os.path.join(option.reportdir, 'Logs',option.reportflag, 'report.%s'%time.strftime("%Y_%m_%d.%H_%M_%S",time.localtime(time.time())), 'logs')
@hook(log)
def report_directory():
return self.report_dir
try:
os.makedirs(self.report_dir)
except:
pass
try:
have_procrank = self.a.adbshell('procrank')
if 'not found' in have_procrank:
self.a.adb('push tools/procrank /system/xbin/')
self.a.adbshell('chmod 777 /system/xbin/procrank')
except:
log.error('fail to push procrank')
def startTest(self, test):
self.a.retry_connection()
self.a.push(self.option.jarpath)
self.test_number += 1
test.id = '%04d' %self.test_number
test.logDirectory = os.path.join(self.report_dir, test.id)
@hook(log)
def log_directory():
return test.id
try:
os.mkdir(test.logDirectory)
except:
log.error('mkdir log directory (%s) failed' %test.logDirectory)
self.clearLog()
self.running_process = [self.a.popen(command,stdout=open(os.path.join(test.logDirectory,filename), "a+")) for command, filename in LOGCAT_RUNTEST]
print "running: %s" %(test.uicommand())
runner = self.a.popen("uiautomator runtest %s" %(test.uicommand()),stdout=open(os.path.join(test.logDirectory, log.uiautomator()),"a+"))
runner.wait()
title, steps, runtime, info, output, result = self.__captureResult(os.path.join(test.logDirectory, log.uiautomator()))
print result,"\n"
test.title = title
test.result = result
test.runtime = runtime
test.steps += steps
test.output += output
if info != []:test.errorinfo = info
self.__stopTest(test)
def __stopTest(self, test):
for p in self.running_process:
if isinstance(p, subprocess.Popen):
p.kill()
self.getLog(test.logDirectory)
def clearLog(self):
def isExists(filename):
ishave = self.a.adbshell('ls %s' %filename)
if 'No such file or directory' in str(ishave):
return False
else:
return True
if(isExists('/data/anr')):
log.debug('rm /data/anr/*')
self.a.adbshell('rm /data/anr/*')
if(isExists('/data/tombstones/tombstone_0*')):
tbs = self.a.adbshell('/data/tombstones/tombstone_0*')
for tombstone in tbs.splitlines():
log.debug('rm %s' %tombstone)
self.a.adbshell('rm %s' %tombstone)
self.a.adbshell('rm %s/*.png' %log.save_pic_path())
def getLog(self, path):
def isExists(filename):
ishave = self.a.adbshell('ls %s' %filename)
if 'No such file or directory' in str(ishave):
return False
else:
return True
if(isExists('/data/Logs/Log.0/anr')):
os.mkdir(os.path.join(path, 'anr'))
self.a.adb('pull /data/anr %s' %os.path.join(path, 'anr'))
if(isExists('/data/tombstones/tombstone_0*')):
os.mkdir(os.path.join(path, 'tombstones'))
self.a.adb('pull /data/tombstones %s' %os.path.join(path, 'tombstones'))
s = self.a.adbshell('ls %s' %(log.save_pic_path()))
if s != "":
for pic in s.splitlines():
self.a.adb('pull %s/%s %s/' %(log.save_pic_path(), pic, path))
def startSuite(self, suite):
# add capture meminfo such as `procrank`, `dumpsys meminfo`, `cat /proc/meminfo`, `promen pid` here
# do not block anything
self.__flag = True
t = threading.Thread(target=self.catMeminfo)
t.setDaemon(True)
t.start()
for test in suite.tests:
self.startTest(test)
self.__flag = False
def __captureResult(self, filename):
"""analyze uiautomator log
return a tuple as (result, info)
result: PASS or FAIL
info: [] if PASS otherwise failed info
output: [] if PASS otherwise output info
"""
result = "ERROR"
runtime = "0.000"
uilog = []
with open(filename, "rb+") as fp:
for line in fp:
# line = line.split('\r\n')
l = line.strip()
if l:
uilog.append(l)
for i in range(len(uilog)):
if uilog[i].startswith("Time:"):
runtime = uilog[i].strip().split(":")[1]
if i== len(uilog)-1:
result = 'PASS'
else:
if uilog[i+1].startswith("OK"):
result = 'PASS'
else:
result = 'FAIL'
info = []
output = []
title = ""
steps = []
for i in range(len(uilog)):
if uilog[i].startswith('INSTRUMENTATION_STATUS: stack='):
info.append(uilog[i].rstrip())
if uilog[i].startswith("INSTRUMENTATION_STATUS: fail file"):
info.append(uilog[i].rstrip())
if uilog[i].startswith('INSTRUMENTATION_STATUS_CODE: 15'):
output.append(uilog[i-1].split("=")[1].rstrip())
if uilog[i].startswith('INSTRUMENTATION_STATUS_CODE: 16'):
steps.append(uilog[i-1].split("=")[1].rstrip())
if uilog[i].startswith('INSTRUMENTATION_STATUS_CODE: 20'):
title = uilog[i-1].split("=")[1]
for line in uilog:
if line.startswith('INSTRUMENTATION_STATUS: stack='):
info.append(line.rstrip())
if line.startswith("INSTRUMENTATION_STATUS: fail file"):
info.append(line.rstrip())
return title, steps, runtime, list(set(info)), output, result
def catMeminfo(self):
while self.__flag:
self.a.popen('procrank', stdout=open(os.path.join(self.report_dir, log.procrank()), 'a+')).wait()
self.a.popen('dumpsys meminfo', stdout=open(os.path.join(self.report_dir, log.dumpsys_meminfo()), 'a+')).wait()
self.a.popen('cat /proc/meminfo', stdout=open(os.path.join(self.report_dir, log.meminfo()), 'a+')).wait()
self.a.popen('busybox top -n 1', stdout=open(os.path.join(self.report_dir, log.top()), 'a+')).wait()
time.sleep(log.CAPTURE_MEMINFO_SLEEP_GAP)
上面的代码在正常运行每个 case 的同时获取日志以外又在整个框架运行的同时每隔一段时间获取被测机器的内存信息,这主要是为了查看内存在整个测试过程中的走势,后续可以根据这些信息画一个内存走势图,如果有内存泄露可以一目了然。
到这里除了报告部分以后,基本功能都已经差不多,现在需要一个启动文件来配置参数,将上面的功能使用起来。
runner.py
#coding=utf-8
import os
from optparse import OptionParser
import ucore
def main():
parse = OptionParser()
parse.add_option('-f', '--file', dest='listfile', help='case or list to run', action='store')
parse.add_option('-s', '--serial', dest='serialno', help='which device to run', action='store')
parse.add_option('-c', '--count', dest='count', help='how many times to run', action='store', default='1')
parse.add_option('-r', '--reportdir', dest='reportdir', help='where to capture report', action='store', default=".")
parse.add_option('-j', '--jar', dest='jarpath', help='which jar to run', action='store')
(option, args) = parse.parse_args()
a = ucore.ADB(option.serialno)
attribute = a.adbshell('getprop')
if option.serialno == None:
raise ucore.AdbException('Error: MUST specify a serial number!')
count = int(option.count)
suite = ucore.TestSuite()
for i in range(count):
suite.addTestCase(option.listfile)
#start test
runner = ucore.TextRunner(a, option)
runner.startSuite(suite)
print '\nLog Directory: %s \n' %os.path.dirname(runner.report_dir)
if __name__ == '__main__':
main()
直接运行 runner.py,如下
python runner.py -s 192.168.1.11:5555 -f caselist.list -j smoke.jar -c 20
按照上述以后,测试结果所需要的数据都保存在 suite.tests 中,我们可以根据他来生成测试结果。日志展示选用的是 html,这里可以自己写 html 代码,不过推荐使用 django 的 html 模板来做。由于本人审美不咋滴,html 报告就不展示了,只大概说下怎么调用 django 来写
runner.py 中指定 settings 路径
os.environ['DJANGO_SETTINGS_MODULE'] = os.path.join(os.path.dirname(file), "settings")
测试结束以后组织数据,渲染你的 html 模板,并将渲染后的 html 代码写入到你想生成的 html 中
本地环境搭建起来以后,如果想更加方便,可以搭建一台 jenkins 服务器,将代码部署到 jenkins 上,在 jenkins 上创建一个参数化构建的 job,测试结束以后通过 jenkins 将邮件发给测试者。这样每个人都可以通过 jenkins 来部署自己想进行的测试了。