UiAutomator python+uiautomator 自动义测试框架

Jerry · 2016年02月23日 · 最后由 ldd 回复于 2017年04月12日 · 3226 次阅读

相信很多做 Android 测试朋友都会用到 uiautomator 这款由 google 提供的 UI 自动化测试工具。他提供了丰富的 api 让使用者方便的定位桌面元素,并可以在各个应用之间灵活的切换。他的好处就不赘述。但是在实际使用过程中,每次都是运行开一个窗口,然后抓日志再开一个窗口,运行完成还需要手动把运行截图弄出来等等,过程很繁琐。作为一名自动化测试,必须解放双手让脚本来执行这些繁琐的操作。基于以上考虑自定义了一个框架,来代替手动操作。功能如下:

  • 判断 adb 的连接状态并能重连
  • 可以在 case 运行过程中抓取日志和截图
  • 可以选择一个或者多个 case
  • 可以选择运行次数
  • 运行完成以后相关日志截图能归档
  • 最后能生成一个测试报告

考虑到运行时间可能比较长,收到的日志文件会比较大,所以该框架设计在 pc 端运行。这里使用 python 来实现。框架主要由以下几部分构成:

  • 连接
  • case 管理
  • 运行过程管理
  • 日志展示

首先 uiautomator`有一套特有日志输出,通过解析该日志可以获取到很多有用的信息。因此需要在 uiautomator 中定义输出的信息规则,以便最后结果展示。这里可以根据 uiautomator 中的 status code 来制定,比如按如下规则:

private int INSTRUMENTATION_STATUS_CODE_OUTPUT = 15; #这个用来记录输出测试过程中的输出信息比如当前网速等
private int INSTRUMENTATION_STATUS_CODE_STEP = 16; #这个用来记录测试步骤
private int INSTRUMENTATION_STATUS_CODE_TITLE = 20; #这个用来记录测试的中文测试名
public static final String path = "/data/local/tmp/test/"; #这里用来存储uiautomator中的截图

然后定义一个日志相关文件,定义日志等级,日志路径已经一些常量
log.py

#coding=utf-8
import os
import re
import logging

#logging.basicConfig(level=logging.DEBUG)

"""logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='debug.log',
    filemode='w')"""
#logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)

CAPTURE_MEMINFO_SLEEP_GAP = 30

def _format(message, level, tag):
    return '\n'.join(['[%d][%s]: %s'%(level, tag, msg) for msg in message.splitlines()])

def debug(message, tag="DEBUG"):
    logging.debug(_format(message, logging.DEBUG, tag))

def warning(message, tag="WARNING"):
    logging.warning(_format(message, logging.WARNING, tag))

def info(message, tag="INFO"):
    logging.info(_format(message, logging.INFO, tag))

def error(message, tag="ERROR"):
    logging.error(_format(message, logging.ERROR, tag))

def log_directory():
    return "."

def report_directory():
    return "."

def procrank():
    return 'procrank.log'

def dumpsys_meminfo():
    return 'dumpsys_meminfo.log'

def meminfo():
    return 'proc_meminfo.log'

def kernel():
    return 'kmsg.log'

def logcat():
    return 'logcat_main.log'

def uiautomator():
    return 'uiautomator.log'

def top():
    return 'top.log'

def save_pic_path():
    return '/data/local/tmp/test'

连接

如果设备是 usb 线连接,那比较简单,只要判断连接是否存在即可。 如果是网络连接,需要有一个连接的过程,这里以网络为主。
connect.py

#coding=utf-8
import re
import time
import subprocess
import log

class AdbException(Exception):
    def __init__(self, message):
        Exception.__init__(self, message+'\n')

RETRY_CONNECTION_TIMES = 5
RETRY_CONNECTION_BETWEEN = 10

class ADB(object):
    def __init__(self, id):
        self.id = id.rstrip()
        log.debug('target devices: %s' %self.id)
        self.adbd = 'adb'
        #check adb is installed
        try:
            subprocess.Popen( [ self.adbd, 'devices' ], stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read(1)
            self.adb_command = [self.adbd]
        except:
            raise AdbException("Error: is `adb` installed ??")

        self.__isNetworkConnection()
        self.adb_command = [ self.adbd, '-s', self.id ]
        self.adbsh_command = [ self.adbd, '-s', self.id, 'shell' ]
        self.retry_connection()

    def retry_connection(self):
        while not self._connection():
            print 'Retry Connection'
            time.sleep(RETRY_CONNECTION_BETWEEN)

    def _connection(self):
        devices = self.__adb('devices')

        if not str(devices).__contains__(self.id):
            # self.adb('disconnect')
            r = self.__adb('connect %s' %self.id.split(":")[0] if ":" in self.id else self.id)
            if str(r).__contains__('unable to connect to'):
                print 'unable to connect to %s' %self.id
                return False
            time.sleep(1)

        r = self.adb('root')
        log.debug('root devices:\n %s' %r)
        if 'adbd is already running as root' not in r:
            time.sleep(2)
            self.__adb('connect %s' %self.id.split(":")[0] if ":" in self.id else self.id)
            time.sleep(1)
        self.adb('remount')
        log.debug('remount devices:\n %s' %r)
        return True

    def __adb(self, command):
        if not isinstance(command, list): command = command.split()
        log.debug(str([self.adbd] + command))
        return subprocess.Popen([self.adbd]+command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0]

    def adb(self, command):
        """ run `adb -s serial` command """
        if not isinstance(command, list): command = command.split()
        log.debug(str(self.adb_command + command))
        return subprocess.Popen(self.adb_command + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0]

    def adbshell(self, command):
        """ run `adb -s serial shell` command """
        if not isinstance(command, list): command = command.split()
        log.debug(str(self.adbsh_command + command))
        return subprocess.Popen(self.adbsh_command + command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0]

    def popen(self, cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
        """run adb shell command as nohup, return subprocess.Popen instance"""
        if not isinstance(cmd, list): cmd = [cmd]
        log.debug("popen command: "+str(self.adbsh_command+cmd))
        return subprocess.Popen(self.adbsh_command+cmd, stdout=stdout,stderr=stderr)

    def screenshot(self, filename, path="/data/local/tmp"):
        self.adbshell("screencap -p %s" %os.path.join(path,filename, ".png"))

    def push(self, jar, path="/data/local/tmp"):
        #95 KB/s (6332 bytes in 0.064s)
        r = self.adb("push %s %s/" %(jar, path))
        pflag = False
        for _retry_times in range(1,6):
            if re.compile(r'\d+ KB/s \(\d+ bytes in .*s\)').match(r):
                pflag = True
                break;
            print r
            self.retry_connection()
            r = self.adb("push %s %s/" %(jar, path))
        if not pflag: raise AdbException(r)

    def install(self, apkname):
        #2685 KB/s (2433480 bytes in 0.884s)
        #    pkg: /data/local/tmp/Settings.apk
        #Success
        r = self.adb("install -r %s" %apkname)
        pflag = False
        for _retry_times in range(1,6):
            if str(r).__contains__('Success'):
                pflag = True
                break;
            print r
            self.retry_connection()
            r = self.adb("push %s %s/" %(jar, path))
        if not pflag: raise AdbException(r)

上面代码主要提供了 adb 相关的一些功能,通过 adb 实例可以调用。

case 管理

这个模块定义了两个类,TestCase 定义 case 需要的一些基本信息,比如 case 名字,jar 包,测试结果,步骤,输出等等; 这里需要定义一个 case 管理配置文件,TestSuite 可以通过读取这个配置文件来加载用例。
配置文件的格式命名为:jar.casename; 例如:smoke.com.test.smoke.Calculator#testAdd

smoke 为编译出来的 jar 报名,后面是被测方法名,跟运行 uiautomator 的时候给的-c 参数一样。换行来配置多个。
case.py

#coding=utf-8
import os

class TestCase(object):
    def __init__(self, jar, casename):
        self.jar = jar+".jar"
        self.name = casename
        self.id = ""
        self.title = ""
        self.steps = []
        self.result = None
        self.output = []
        self.errorinfo = []
        self.runtime = ""
        self.descriptive = ""
        self.logDirectory = None

    def uicommand(self):
        """return a uiautomator string command """
        uicmd = self.jar
        uicmd = uicmd + " " + "-c " + self.name
        return uicmd

class TestSuite(object):
    def __init__(self):
        self.tests = []

    def __loadTestFromName(self, name):
        """ add single TestCase instance """
        for each in name.split("|"):
            if each != "":
                case = each.split(".", 1)
                test = TestCase(case[0], case[1])
                self.tests.append(test)

    def __loadTestFromFile(self, filename):
        """ add TestCase list for all case in given file """
        with open(filename, "r+") as fp:
            for line in fp:
                line = line.strip()
                if line != "" and not line.startswith("#"):
                    self.__loadTestFromName(line)

    def addTestCase(self, caselist):
        if os.path.isfile(caselist):
            self.__loadTestFromFile(caselist)
        else:
            self.__loadTestFromName(caselist)

    def _convertCastList(self, casestr):
        """return a tuple contain jar, and castlist"""
        castlist = []
        if casestr == "":
            raise Exception("no case found")
        names = casestr.split(",")
        for name in names:
            castlist.append(name.split(".", 1)[1])
        return names[0].split(".", 1)[0], castlist

运行过程管理

这部分为运行的核心部分,创建日志路径,运行 case,记录日志,获取截图,结果分析都在这里处理; 直接上代码
core.py

#coding=utf-8
import os
import re
import time
import threading
import subprocess
from optparse import OptionParser

import log
from case import TestCase
from case import TestSuite
from connect import ADB, AdbException


def hook(c):
    def deco(f):
        setattr(c, f.__name__, f)
        return f
    return deco

LOGCAT_RUNTEST = [  ('logcat -c;logcat -v threadtime', log.logcat()),
                    ('cat /proc/kmsg', log.kernel()),]

class TextRunner(object):

    def __init__(self, a, option):
        self.a = a
        self.option = option
        self.test_number = 0
        self.report_dir = os.path.join(option.reportdir, 'Logs',option.reportflag, 'report.%s'%time.strftime("%Y_%m_%d.%H_%M_%S",time.localtime(time.time())), 'logs')

        @hook(log)
        def report_directory():
            return self.report_dir
        try:
            os.makedirs(self.report_dir)
        except:
            pass

        try:
            have_procrank = self.a.adbshell('procrank')
            if 'not found' in have_procrank:
                self.a.adb('push tools/procrank /system/xbin/')
                self.a.adbshell('chmod 777 /system/xbin/procrank')
        except:
            log.error('fail to push procrank')

    def startTest(self, test):
        self.a.retry_connection()
        self.a.push(self.option.jarpath)
        self.test_number += 1
        test.id = '%04d' %self.test_number
        test.logDirectory = os.path.join(self.report_dir, test.id)

        @hook(log)
        def log_directory():
            return test.id
        try:
            os.mkdir(test.logDirectory)
        except:
            log.error('mkdir log directory (%s) failed' %test.logDirectory)

        self.clearLog()
        self.running_process = [self.a.popen(command,stdout=open(os.path.join(test.logDirectory,filename), "a+")) for command, filename in LOGCAT_RUNTEST]
        print "running: %s" %(test.uicommand())
        runner = self.a.popen("uiautomator runtest %s" %(test.uicommand()),stdout=open(os.path.join(test.logDirectory, log.uiautomator()),"a+"))
        runner.wait()
        title, steps, runtime, info, output, result = self.__captureResult(os.path.join(test.logDirectory, log.uiautomator()))
        print result,"\n"
        test.title = title
        test.result = result
        test.runtime = runtime
        test.steps += steps
        test.output += output
        if info != []:test.errorinfo = info

        self.__stopTest(test)

    def __stopTest(self, test):
        for p in self.running_process:
            if isinstance(p, subprocess.Popen):
                p.kill()
        self.getLog(test.logDirectory)

    def clearLog(self):
        def isExists(filename):
            ishave = self.a.adbshell('ls %s' %filename)
            if 'No such file or directory' in str(ishave):
                return False
            else:
                return True
        if(isExists('/data/anr')):
            log.debug('rm /data/anr/*')
            self.a.adbshell('rm /data/anr/*')
        if(isExists('/data/tombstones/tombstone_0*')):
            tbs = self.a.adbshell('/data/tombstones/tombstone_0*')
            for tombstone in tbs.splitlines():
                log.debug('rm %s' %tombstone)
                self.a.adbshell('rm %s' %tombstone)
        self.a.adbshell('rm %s/*.png' %log.save_pic_path())

    def getLog(self, path):
        def isExists(filename):
            ishave = self.a.adbshell('ls %s' %filename)
            if 'No such file or directory' in str(ishave):
                return False
            else:
                return True
        if(isExists('/data/Logs/Log.0/anr')):
            os.mkdir(os.path.join(path, 'anr'))
            self.a.adb('pull /data/anr %s' %os.path.join(path, 'anr'))
        if(isExists('/data/tombstones/tombstone_0*')):
            os.mkdir(os.path.join(path, 'tombstones'))
            self.a.adb('pull /data/tombstones %s' %os.path.join(path, 'tombstones'))

        s = self.a.adbshell('ls %s' %(log.save_pic_path()))
        if s != "":
            for pic in s.splitlines():
                self.a.adb('pull %s/%s %s/' %(log.save_pic_path(), pic, path))

    def startSuite(self, suite):
        # add capture meminfo such as `procrank`, `dumpsys meminfo`, `cat /proc/meminfo`, `promen pid` here
        # do not block anything

        self.__flag = True
        t = threading.Thread(target=self.catMeminfo)
        t.setDaemon(True)
        t.start()
        for test in suite.tests:
            self.startTest(test)

        self.__flag = False

    def __captureResult(self, filename):
        """analyze uiautomator log
            return a tuple as (result, info)
            result: PASS or FAIL
            info: [] if PASS otherwise failed info
            output: [] if PASS otherwise output info
        """
        result = "ERROR"
        runtime = "0.000"
        uilog = []
        with open(filename, "rb+") as fp:
            for line in fp:
                # line = line.split('\r\n')
                l = line.strip()
                if l:
                    uilog.append(l)
        for i in range(len(uilog)):
            if uilog[i].startswith("Time:"):
                runtime = uilog[i].strip().split(":")[1]
                if i== len(uilog)-1:
                    result = 'PASS'
                else:
                    if uilog[i+1].startswith("OK"):
                        result = 'PASS'
                    else:
                        result = 'FAIL'
        info = []
        output = []
        title = ""
        steps = []
        for i in range(len(uilog)):
            if uilog[i].startswith('INSTRUMENTATION_STATUS: stack='):
                info.append(uilog[i].rstrip())
            if uilog[i].startswith("INSTRUMENTATION_STATUS: fail file"):
                info.append(uilog[i].rstrip())
            if uilog[i].startswith('INSTRUMENTATION_STATUS_CODE: 15'):
                output.append(uilog[i-1].split("=")[1].rstrip())
            if uilog[i].startswith('INSTRUMENTATION_STATUS_CODE: 16'):
                steps.append(uilog[i-1].split("=")[1].rstrip())
            if uilog[i].startswith('INSTRUMENTATION_STATUS_CODE: 20'):
                title = uilog[i-1].split("=")[1]

        for line in uilog:
            if line.startswith('INSTRUMENTATION_STATUS: stack='):
                info.append(line.rstrip())
            if line.startswith("INSTRUMENTATION_STATUS: fail file"):
                info.append(line.rstrip())
        return title, steps, runtime, list(set(info)), output, result

    def catMeminfo(self):
        while self.__flag:
            self.a.popen('procrank', stdout=open(os.path.join(self.report_dir, log.procrank()), 'a+')).wait()

            self.a.popen('dumpsys meminfo', stdout=open(os.path.join(self.report_dir, log.dumpsys_meminfo()), 'a+')).wait()
            self.a.popen('cat /proc/meminfo', stdout=open(os.path.join(self.report_dir, log.meminfo()), 'a+')).wait()
            self.a.popen('busybox top -n 1', stdout=open(os.path.join(self.report_dir, log.top()), 'a+')).wait()
            time.sleep(log.CAPTURE_MEMINFO_SLEEP_GAP)

上面的代码在正常运行每个 case 的同时获取日志以外又在整个框架运行的同时每隔一段时间获取被测机器的内存信息,这主要是为了查看内存在整个测试过程中的走势,后续可以根据这些信息画一个内存走势图,如果有内存泄露可以一目了然。

到这里除了报告部分以后,基本功能都已经差不多,现在需要一个启动文件来配置参数,将上面的功能使用起来。
runner.py

#coding=utf-8
import os
from optparse import OptionParser
import ucore

def main():
    parse = OptionParser()
    parse.add_option('-f', '--file', dest='listfile', help='case or list to run', action='store')
    parse.add_option('-s', '--serial', dest='serialno', help='which device to run', action='store')
    parse.add_option('-c', '--count', dest='count', help='how many times to run', action='store', default='1')
    parse.add_option('-r', '--reportdir', dest='reportdir', help='where to capture report', action='store', default=".")
    parse.add_option('-j', '--jar', dest='jarpath', help='which jar to run', action='store')

    (option, args) = parse.parse_args()

    a = ucore.ADB(option.serialno)
    attribute = a.adbshell('getprop')

    if option.serialno == None:
        raise ucore.AdbException('Error: MUST specify a serial number!')
    count = int(option.count)
    suite = ucore.TestSuite()
    for i in range(count):
        suite.addTestCase(option.listfile)

    #start test
    runner = ucore.TextRunner(a, option)
    runner.startSuite(suite)

    print '\nLog Directory: %s \n' %os.path.dirname(runner.report_dir)

if __name__ == '__main__':
    main()

直接运行 runner.py,如下

python runner.py -s 192.168.1.11:5555 -f caselist.list -j smoke.jar -c 20

日志展示

按照上述以后,测试结果所需要的数据都保存在 suite.tests 中,我们可以根据他来生成测试结果。日志展示选用的是 html,这里可以自己写 html 代码,不过推荐使用 django 的 html 模板来做。由于本人审美不咋滴,html 报告就不展示了,只大概说下怎么调用 django 来写

  1. 将 django 项目中的 settings.py 文件拷贝到 runner.py 文件同级目录下,修改 TEMPLATE_DIRS 为你存放模板的路径
  2. runner.py 中指定 settings 路径

    os.environ['DJANGO_SETTINGS_MODULE'] = os.path.join(os.path.dirname(file), "settings")

  3. 测试结束以后组织数据,渲染你的 html 模板,并将渲染后的 html 代码写入到你想生成的 html 中

结束语

本地环境搭建起来以后,如果想更加方便,可以搭建一台 jenkins 服务器,将代码部署到 jenkins 上,在 jenkins 上创建一个参数化构建的 job,测试结束以后通过 jenkins 将邮件发给测试者。这样每个人都可以通过 jenkins 来部署自己想进行的测试了。

共收到 12 条回复 时间 点赞

支持下,这轮子造的可以啊

牛!点赞!能否把代码放在 github 上呢?

感觉棒棒的

哦哦你们用 python 管理,之前没关注你们部门怎么做的,测试中心的自动化是将连接, case 管理,运行过程管理用 bash 实现的,只有出监控图和 HTML 报告是 python 完成。bash 和监控是我搞的,uiautmator 部分和 HTML 报告部分没参与。

我们这边 caselist 的控制就是一个 caselist.txt 文本,每个 case 的参数独立一行逗号分隔,bash 自动逐行拼接成 uiautomator 执行语句执行。

学习中

你好,我想问一下 uiautmator 识别不了 app 中的控件是哪里出现了问题

又一个轮子,展示下报告呢

#6 楼 @tiancai 有些控件需要在源码中添加控件属性

这个看起来不错,先收藏了,后续研究研究

—— 来自 TesterHome 官方 安卓客户端

runner.py 中的 import ucore,ucore 模块是自定义的吗?

import ucore
ImportError: No module named ucore
这里的 ucore 是自己定义的模块吗?

首先 uiautomator`有一套特有日志输出,通过解析该日志可以获取到很多有用的信息。因此需要在 uiautomator 中定义输出的信息规则,以便最后结果展示。这里可以根据 uiautomator 中的 status code 来制定,比如按如下规则
----请问下这个在哪定义规则?

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册