• 这是我写的一个小测试框架,case 和 lib 可以自己定义,提供了 log 和 controler 接口。log 用来打印日志,controler 用来控制设备。

  • 针对 android 操作系统 UI 进行自动化测试,可以进行绝大多数的操作,多个 app 之间的交互。还可以在多台设备之间进行交互,譬如互相打电话,接电话等。

  • 自己顶一下😄

  • 厉害厉害!!!

  • 测试用例:

    import os
    import unittest
    import sys
    import pickle
    sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
    from container.base import settings
    from engine import log
    
    
    dir_path = sys.argv[1]
    f = open(dir_path,'rb')
    config_dictory = pickle.load(f)
    f.close()
    name = os.path.basename(config_dictory['TC']).split('.')[0]
    workpath = config_dictory['work_dir']
    logpath = config_dictory['log_path']    
    log_path = os.path.join(workpath,logpath,name)
    l = log.Logger(name,log_path)
    mylogger = l.logger()
    
    class test_setup_wifi_class(unittest.TestCase):
        def setUp(self):
            self.test_class = settings.Settings(config_dictory,mylogger)
        def tearDown(self):
            self.test_class.op.hardware().press_back(3)
            self.test_class.op.hardware().return_home()
        def test_skip_wizard(self):
            self.assertEqual(self.test_class.setup_wifi(),True,'test setup_wifi case fail')    
    
    
    
    
    if __name__ == "__main__":
        my_suite = unittest.TestLoader().loadTestsFromTestCase(test_setup_wifi_class)
        result = unittest.TextTestRunner(verbosity=0).run(my_suite)
        if len(result.failures) != 0 or len(result.errors) != 0:
            flag = 'Fail'
        else:
            flag = 'Pass'
        print flag,(len(result.failures)+len(result.errors))
    
  • 自定义生成测试报告:

    import os
    import sys
    from pyh import *
    import pickle
    
    def main(address,report_path):
        f = open(address,'rb')
        database1 = pickle.load(f)
        f.close()
        case_name_pre = report_path.split('/')[-1]
        report_path_base = os.path.abspath(os.path.join(report_path,os.pardir))
        report_file_path = os.path.join(report_path_base,'report.html')
        content_dic = database1['suite_content']    
        page = PyH('test report')
        page << h1('this is report of yath project')
        order_list = page << ul(type='square')
        order_list << li('device id : %s'%database1['device_id'])
        order_list << li('suite mode : %s'%database1['suite_mode'])
        order_list << li('suite loop : %s'%database1['suite_loop'])
        order_list << li('suite name : %s'%database1['suite_name'])
        page << hr()
        p1_content = 'Loop_'+database1['suite_loop']+':'
        page << p(i(p1_content))
        t = page << table(border='1',bgcolor='#bbbb99')
        t << tr(th('test case',align='left')+th('test loop',align='center')+th('test result',align='center')+th('start time',align='center')+th('stop time',align='center')+th('duration(s)',align='center')+th('crashes',align='center'))
        for k in content_dic:
            item = content_dic[k]
            if len(item) == 2:
                continue
            test_case = item[0]
            test_loop = item[1]    
            if item[1] == '0':
                test_result,start_time,stop_time,duration,crashes = 'NA','NA','NA','NA','0'
            else:
                test_result = item[2]
                start_time = item[3]
                stop_time = item[4]
                duration = item[5]
                crashes = item[6]
            if test_result == 'Pass':
                attr_co_test_result = 'green'
            elif test_result == 'Fail':
                attr_co_test_result = 'red'
            elif test_result == 'NA':
                attr_co_test_result = 'grey'
            if int(crashes) > 0:
                crashes_dic = item[7]
                generate_crashes_page(crashes_dic,test_case,report_path)
                link_crash = 'file://'+os.path.join(report_path,'crash.html')
                t << tr(td(test_case,align='left')+td(test_loop,align='center')+td(test_result,align='center',bgcolor=attr_co_test_result)+td(start_time,align='center')+td(stop_time,align='center')+td(duration,align='center')+td(a(crashes,href=link_crash),align='center'))
            else:              
                t << tr(td(test_case,align='left')+td(test_loop,align='center')+td(test_result,align='center',bgcolor=attr_co_test_result)+td(start_time,align='center')+td(stop_time,align='center')+td(duration,align='center')+td(crashes,align='center'))    
    
        page.printOut(report_file_path)
    
    
    def generate_crashes_page(dictory,case_name,crash_path):
        crash_file_path = os.path.join(crash_path,'crash.html')
        crash_page = PyH('Crash Page')
        crash_page << h1('this is a page of crash.')
        crash_page << p('detail info : ')
        for j in dictory:
            div1 = crash_page << div()
            h2_content = 'Crash %s occurs when execute %s : '%(str(j),case_name)
            div1 << h2(h2_content)
            crash_table = div1 << table(frame='box')
            crash_table << tr(th('key',align='left')+th('value'),align='left')
            crash_table << tr(td('crash info',align='left')+td(dictory[j]['crash_info']),align='left')
            crash_table << tr(td('data 0',align='left')+td(dictory[j]['crash_data0']),align='left')
            crash_table << tr(td('data 1',align='left')+td(dictory[j]['crash_data1']),align='left')
            crash_table << tr(td('data 2',align='left')+td(dictory[j]['crash_data2']),align='left')
        crash_page.printOut(crash_file_path)  
    
    if __name__ == "__main__":
        main()
    
    
    
  • 入口程序:

    import xml.etree.ElementTree as ET
    import os
    import sys
    sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir,os.pardir)))
    import argparse
    import random
    import subprocess as sp
    import pickle
    import datetime
    import re
    import time
    from gramary.runner import iter_event,get_different_list,parse_history,get_crash_path,get_crashfile_conetnt
    import generate_report
    
    
    class Configuration:
    
        def __init__(self):
            self.log_path = ''
            self.execute_main_campaign = ''
            self.device_id = ''
            self.work_dir = os.path.abspath(os.path.join(os.getcwd(),os.pardir))
    
        def load(self,filepath):
            temp_dict = {}
            try:
                execfile(filepath,{},temp_dict)
                for key in temp_dict.keys():
                    setattr(self,key,temp_dict[key])
            except TypeError,errormsg:
                print "Error : No file path given!"
                print "you can add -h option to view more info"
                sys.exit(-1)
            except IOError,msg1:
                print msg1
                print "Error : Please specify a path which contain a config file!"
                sys.exit(-1)
    
    
    def get_testcase_from_xml(config):
        xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
        parser = ET.parse(xml_path)
        root = parser.getroot()
        item_instance_list = root.iter('item')
        return item_instance_list
    
    def get_testsuite_from_xml(config):
        xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
        parser = ET.parse(xml_path)
        root = parser.getroot()
        suite_instance_list = root.iter('suite')
        return suite_instance_list
    
    def get_full_path(config,string):
        return os.path.join(config.work_dir,string) + '.py'
    
    
    def generate_file(config,dictory):
        temporary_path = os.path.join(config.work_dir,'temporary',config.device_id)
        if not os.path.isdir(temporary_path):
            try:
                os.makedirs(temporary_path)
            except OSError:
                pass        
        pickle_dump_path = os.path.join(temporary_path,'settings_temp.pkl')
        f1 = open(pickle_dump_path,'wb')
        pickle.dump(dictory,f1,True)
        f1.close()
        return pickle_dump_path
    
    
    def save_data_for_html(config,dictory):
        temporary_path = os.path.join(config.work_dir,'temporary',config.device_id)
        if not os.path.isdir(temporary_path):
            try:
                os.makedirs(temporary_path)
            except OSError:
                pass        
        pickle_dump_path = os.path.join(temporary_path,'html_raw_data.pkl')
        f1 = open(pickle_dump_path,'wb')
        pickle.dump(dictory,f1,True)
        f1.close()
        return pickle_dump_path
    
    
    def main(config):
        r1=r'Fail|Pass'
        r2=r'(\d+)'
        r_crash_data0 = r'DATA0=(\S+\s)[\s\S]*'
        r_crash_data1 = r'DATA1=(\S+\s)[\s\S]*'
        r_crash_data2 = r'DATA2=(\S+\s)[\s\S]*'
        seq_num = 1
        html_result_data = {}
        settings_list = dir(config)
        for _ in settings_list:
            if _ == '__doc__':
                settings_list.remove(_)
            if _ == '__module__':
                settings_list.remove(_)
            if _ == 'load':
                settings_list.remove(_)    
        settings_list.remove('__init__')
        settings_dictory = {}
        for _ in settings_list:
            settings_dictory[_] = getattr(config,_)
        rs = 'run_set_'+datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')    
        settings_dictory['log_path'] = os.path.join(settings_dictory['log_path'],config.device_id,rs)
        execue_case_sequence = {}
        for testsuite in get_testsuite_from_xml(config):
            execue_case_sequence['suite_name'] = testsuite.get('name')
            execue_case_sequence['suite_loop'] = testsuite.get('loop')
            execue_case_sequence['suite_mode'] = testsuite.get('mode')
            execue_case_sequence['suite_content'] = {}
        execue_case_sequence['device_id'] = settings_dictory['device_id']    
        for testcase in get_testcase_from_xml(config):
            execue_case_sequence['suite_content'][seq_num] = [testcase.text,testcase.get('loop')]   
            seq_num += 1
        suite_loop = int(execue_case_sequence['suite_loop'])
        suite_mode = execue_case_sequence['suite_mode']
        tc_symbol_list = execue_case_sequence['suite_content'].keys()
        suite_content = execue_case_sequence['suite_content']
        if suite_loop == 0 or tc_symbol_list == []:
            sys.exit(0)
            print "there is no loop specify for running testsuite or no cases need to be executed!"
        need_to_change_sequence = False    
        if suite_mode == 'regular':
            need_to_change_sequence = False
        elif suite_mode == 'random':
            need_to_change_sequence = True
        else:
            need_to_change_sequence = False
        html_result_data = execue_case_sequence
        original_history = parse_history(settings_dictory['device_id'])
        for sl in range(1,suite_loop+1):
            print "Start to run testsuite : {0} , loop {1}".format(execue_case_sequence['suite_name'],sl)
            if need_to_change_sequence:
                random.shuffle(tc_symbol_list)
            for i in range(len(tc_symbol_list)):
                item_loop = int(suite_content[tc_symbol_list[i]][1]) if len(suite_content[tc_symbol_list[i]]) == 2 else 1
                item_path = get_full_path(config,suite_content[tc_symbol_list[i]][0])
                case_name = os.path.basename(item_path).split('.')[0]
                report_path = os.path.join(config.work_dir,settings_dictory['log_path'],case_name)
                if item_loop == 0:
                    continue    
                for j in range(1,item_loop+1):
                    print "Start to run case : {0} , loop {1}".format(case_name,j)
                    start_time = time.time()
                    start_time_dateformat = datetime.datetime.now().strftime('%Y/%m/%d_%H:%M:%S')
                    settings_dictory['TC'] = item_path
                    dir_path = generate_file(config,settings_dictory)
                    execute_command = 'python ' + item_path + ' ' + dir_path
                    proc=sp.Popen(execute_command.split(' '),stdout=sp.PIPE)
                    out = proc.stdout.read().strip('\n')
                    proc.wait()
                    current_history = parse_history(settings_dictory['device_id'])
                    stop_time = time.time()
                    stop_time_dateformat = datetime.datetime.now().strftime('%Y/%m/%d_%H:%M:%S')
                    time_taken = stop_time - start_time
                    crash_list = iter_event(get_different_list(original_history,current_history))
                    print_crashfile = ''
                    crashes = {}
                    crash_dic = {}
                    c_count = 1
                    for crash in crash_list:
                        crashlog_path = get_crash_path(crash)
                        if crashlog_path is not None:
                            print_crashfile = get_crashfile_conetnt(settings_dictory['device_id'],crashlog_path)
                        match_data0 = re.findall(r_crash_data0,print_crashfile)
                        match_data1 = re.findall(r_crash_data1,print_crashfile)
                        match_data2 = re.findall(r_crash_data2,print_crashfile)
                        crash_dic['crash_info'] = crash.strip('\r')
                        crash_dic['crash_data0'] = match_data0[0] if match_data0 != [] else ''
                        crash_dic['crash_data1'] = match_data1[0] if match_data1 != [] else '' 
                        crash_dic['crash_data2'] = match_data2[0] if match_data2 != [] else ''
                        crashes[c_count] = crash_dic
                        c_count+=1
                    crash_number = len(crash_list)
                    original_history = current_history
                    html_result_data['suite_content'][i+1].append(re.findall(r1,out)[0])
                    html_result_data['suite_content'][i+1].append(start_time_dateformat)
                    html_result_data['suite_content'][i+1].append(stop_time_dateformat)
                    html_result_data['suite_content'][i+1].append(round(time_taken,3))
                    html_result_data['suite_content'][i+1].append(crash_number)
                    html_result_data['suite_content'][i+1].append(crashes)  
                address = save_data_for_html(config,html_result_data)
                generate_report.main(address,report_path)
    
    
    if __name__ == '__main__':
        parse=argparse.ArgumentParser(usage='%(prog)s [options]')
        parse.add_argument('configuration_file',type=str,nargs='?',help="A path of config file")
        args=parse.parse_args()
        config_file = args.configuration_file
        configuration = Configuration()
        configuration.load(config_file)
        main(configuration)
    
  • 某个库文件如下:

     # coding: utf-8 
    import os
    import time
    import sys
    import subprocess as sp
    sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
    from engine import controler
    max_retry = 30
    
    class Commons:
        def __init__(self,conf_dic,ins_logger=None):
            self.conf_dic = conf_dic
            name = os.path.basename(conf_dic['TC']).split('.')[0]
            workpath = conf_dic['work_dir']
            dev = conf_dic['device_id']
            logpath = conf_dic['log_path']    
            log_path = os.path.join(workpath,logpath,name)
            self.mylogger = ins_logger   
            self.op = controler.Operator(device_id=dev,log_class=self.mylogger,log_path=log_path)
    
        def warm_reboot(self):
            if not self.op.hardware().check_boot_status():
                self.mylogger.error("[Common] Device is not ready for test reboot case. ")
                return False
            self.mylogger.info("[Common] Device is ready. ")    
            self.mylogger.info("[Common] Start to reboot device {0}".format(self.op.device_id))
            self.get_boot_infomation()
            self.op.hardware().reboot_device()
            time.sleep(15)
            self.mylogger.info("[Common] Start to check if execute reboot command? ")
            if not self.op.hardware().check_reboot_progress():
                self.mylogger.error("[Common] unknow error, reboot fail. ")
                return False
            self.mylogger.info("[Common] Device start booting...")    
            while 1:  
                if self.op.hardware().check_boot_status():
                    self.mylogger.info("[Common] Device reboot successfully. ")
                    break       
            return True
    
        def get_boot_infomation(self):
            get_boot_infomation_command = 'adb -s ' + self.op.device_id + ' shell getprop | grep boot'
            out = sp.Popen(get_boot_infomation_command.split(' '),stdout=sp.PIPE).stdout.read().strip('\n\r')
            self.mylogger.info(out)
    
        def log_on_google_account(self):
            self.mylogger.info("[Common] launch Settings app ")
            self.op.hardware().launch_app_with_intent(app='Settings')
            try:
                self.mylogger.info("[Common] click Accounts item ")
                self.op.search(text="Accounts",retry_time="3").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find Accounts item . Got Exception {0}".format(e))
                return False 
            try:
                self.mylogger.info("[Common] click Add account item ")
                self.op.search(text="Add account").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find Add account item . Got Exception {0}".format(e))
                return False
            try:
                self.mylogger.info("[Common] click Google item ")
                self.op.search(text="Google").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find Google item . Got Exception {0}".format(e))
                return False
            retry_count = 0    
            while 1:
                try:
                    self.mylogger.info("[Common] click enter your email input edittext ")
                    self.op.search(description="Enter your email ").click()
                    retry_count = 0
                    break
                except AttributeError as e:
                    if retry_count <= max_retry:
                        self.mylogger.debug("[Common] Can't find Enter your email input edittext . try again")
                        time.sleep(2)
                        retry_count+=1
                    else:
                        self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                        return False
            self.mylogger.info("[Common] input google account : {0}".format(self.conf_dic['google_account']))
            self.op.hardware().input_text(self.conf_dic['google_account'])
            try:
                self.mylogger.info("[Common] click NEXT button ")
                self.op.search(description="NEXT").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find NEXT button . Got Exception {0}".format(e))
                return False
            time.sleep(5)    
            try:
                self.mylogger.info("[Common] click password edittext ")
                self.op.search(id="password").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find password edittext . Got Exception {0}".format(e))
                return False
            self.mylogger.info("[Common] input google password : {0}".format(self.conf_dic['google_password']))
            self.op.hardware().input_text(self.conf_dic['google_password'])            
            try:
                self.mylogger.info("[Common] click NEXT button ")
                self.op.search(description="NEXT").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find NEXT button . Got Exception {0}".format(e))
                return False
            while 1:
                try:
                    self.mylogger.info("[Common] click ACCEPT button ")
                    self.op.search(description="ACCEPT").click()
                    retry_count = 0
                    break
                except AttributeError as e:
                    if retry_count <= max_retry:
                        self.mylogger.debug("[Common] Can't find ACCEPT button . try again")
                        time.sleep(2)
                        retry_count+=1
                    else:
                        self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                        return False        
            while 1:
                try:
                    self.mylogger.info("[Common] check Google services shown??? ")
                    self.op.search(text="Google services").click()
                    retry_count = 0
                    break
                except AttributeError as e:
                    if retry_count <= max_retry:
                        self.mylogger.debug("[Common] Can't Google services keywords. try again")
                        time.sleep(5)
                        retry_count+=1
                    else:
                        self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                        return False
            try:
                self.mylogger.info("[Common] click Next button ")
                self.op.search(text="Next",enabled='true',retry_time='3').click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find NEXT button . Got Exception {0}".format(e))
                return False
            while 1:
                try:
                    self.mylogger.info("[Common] click No thanks button ")
                    self.op.search(text="No thanks").click()
                    retry_count = 0
                    break
                except AttributeError as e:
                    if retry_count <= max_retry:
                        self.mylogger.debug("[Common] Can't find No thanks button . try again")
                        time.sleep(2)
                        retry_count+=1
                    else:
                        self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                        return False
            try:
                self.mylogger.info("[Common] click Continue button ")
                self.op.search(text="Continue").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find Continue button . Got Exception {0}".format(e))
                return False
            time.sleep(2)
            return True
    
        def check_google_account_added(self):
            self.mylogger.info("[Common] launch Settings app ")
            self.op.hardware().launch_app_with_intent(app='Settings')
            try:
                self.mylogger.info("[Common] click Accounts item ")
                self.op.search(text="Accounts",retry_time="3").click()
            except AttributeError as e:
                self.mylogger.error("[Common] Can't find Accounts item . Got Exception {0}".format(e))
                return False
            if self.op.search(text='Google') is not None:
                self.mylogger.info("[Common] add google account successfully. ")
                return True
            else:
                self.mylogger.error("[Common] add account fail. ")
                return False        
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
  • ui_hooks 模块如下:

    import controler
    import time
    import inspect
    import subprocess as sp
    
    """
    It provides a way to get through kinds of instability pup up box 
    """
    
    
    class Hooks:
        def __init__(self,device_id):   
            self.device_id=device_id
    
        def get_generators(self,generator=None):
            global frame
            frame={}
            a,b,c,d,e,f,g=[],[],[],[],[],[],[]
            for element in generator:
                a.append(element.get('package'))
                b.append(element.get('index'))
                c.append(element.get('text'))
                d.append(element.get('content-desc'))
                e.append(element.get('resource-id'))
                f.append(element.get('bounds'))
                g.append(element.get('enabled'))
    
            for i in range(len(a)):
                frame[i+1]={'package':a[i],'index':b[i],'text':c[i],'description':d[i],'id':e[i],'bounds':f[i],'enabled':g[i]}
    
    
    
        def parse(self):
            status=False
            status=self.divider()
            return status
    
        def divider(self):
            flag=False
            main_key = frame[1]['package']
            seducer001 = inspect.getargspec(self.hook_maps)[3][0]
            seducer002 = inspect.getargspec(self.hook_contact)[3][0]
            seducer003 = inspect.getargspec(self.hook_gms)[3][0]
            seducer004 = inspect.getargspec(self.hook_settings)[3][0]
            seducer005 = inspect.getargspec(self.hook_stk)[3][0]
            seducer006 = inspect.getargspec(self.hook_android)[3][0]        
            if main_key == seducer001:
                flag = self.executer(seducer001,self.hook_maps())       
            if main_key ==  seducer002:
                flag = self.executer(seducer002,self.hook_contact())
            if main_key ==  seducer003:
                flag = self.executer(seducer003,self.hook_gms())
            if main_key ==  seducer004:
                flag = self.executer(seducer004,self.hook_settings())
            if main_key ==  seducer005:
                flag = self.executer(seducer005,self.hook_stk()) 
            if main_key ==  seducer006:
                flag = self.executer(seducer006,self.hook_android())                                            
            return flag        
    
    
        def executer(self,sed,func):
            e_flag=False
            print "this app is %s"% sed
            coodinate=func
            if coodinate is None:
                e_flag = False
            else:
                pointseq=self.get_point(coodinate)
                self.tap(pointseq)  
                e_flag =True
            return e_flag           
    
    
        def hook_maps(self,seducer='com.google.android.apps.maps'):
            coodinate=None
            zipers=[]
            print "hook_maps method is invoked"
            return coodinate
    
    
        def hook_stk(self,seducer='com.android.stk'):
            coodinate=None
            print "hook_stk method is invoked"          
            for j in range(1,len(frame.keys())+1):
                if 'button_ok' in frame[j]['id'].strip('\n'):
                    coodinate=frame[j]['bounds']        
            return coodinate
    
    
        def hook_android(self,seducer='android'):
            coodinate=None
            print "hook_android method is invoked"          
            for j in range(1,len(frame.keys())+1):
                if 'button1' in frame[j]['id'].strip('\n'):
                    coodinate=frame[j]['bounds']        
            return coodinate
    
    
        def hook_settings(self,seducer='com.android.settings'):
            coodinate=None
            zipers=["Update preferred SIM card?"]
            print "hook_settings method is invoked"
            zipers_match=False
            for zi in zipers:
                for i in range(1,len(frame.keys())+1):
                    if zi == frame[i]['text'].strip('\n'):
                        zipers_match=True
                        break
            if zipers_match:          
                for j in range(1,len(frame.keys())+1):
                    if 'NO' in frame[j]['text'].strip('\n'):
                        coodinate=frame[j]['bounds']        
            return coodinate
    
    
        def hook_gms(self,seducer='com.google.android.gms'):
            coodinate=None
            zipers=["Couldn't sign in"]
            print "hook_gms method is invoked"
            zipers_match=False
            for zi in zipers:
                for i in range(1,len(frame.keys())+1):
                    if zi == frame[i]['text'].strip('\n'):
                        zipers_match=True
                        break
            if zipers_match:          
                for j in range(1,len(frame.keys())+1):
                    if 'Next' in frame[j]['text'].strip('\n'):
                        coodinate=frame[j]['bounds']        
            return coodinate
    
        def hook_contact(self,seducer='com.google.android.packageinstaller'):
            coodinate=None  
            zipers = ["Allow Contacts to access photos, media, and files on your device?"]
            print "hook_contact method is invoked"
            zipers_match=False
            for zi in zipers:
                for i in range(1,len(frame.keys())+1):
                    if zi == frame[i]['text'].strip('\n'):
                        zipers_match=True
                        break
            if zipers_match:          
                for j in range(1,len(frame.keys())+1):
                    if 'Allow' in frame[j]['text'].strip('\n'):
                        coodinate=frame[j]['bounds']
                    if 'Next' in frame[j]['text'].strip('\n'):
                        coodinate=frame[j]['bounds']  
            return coodinate
    
        def get_point(self,coodinate):
            s = coodinate
            temp = s.replace('[','').replace(']',',').split(',')
            temp.remove('')
            point_c = []
            co_x1 = temp[0]
            co_x2 = temp[2]
            point_c.append(str((int(co_x1)+int(co_x2))/2))
            co_y1 = temp[1]
            co_y2 = temp[3]
            point_c.append(str((int(co_y1)+int(co_y2))/2))
            return point_c
    
        def tap(self,point_list):
            tap_coordinate = point_list
            tap_command = self.adb_shell() + 'input tap ' + tap_coordinate[0] + ' ' + tap_coordinate[1]
            self.execute_command(tap_command).wait()                      
    
    
        def adb_shell(self):
            adb_shell_command = 'adb -s ' + self.device_id + ' shell '
            return adb_shell_command
    
        def execute_command(self,cmd,ignore_print=True):    
            ccmd = cmd
            if ignore_print:
                print ccmd
            else:
                pass    
            proc = sp.Popen(ccmd.split(' '),stdout=sp.PIPE)    
            return proc
    
  • logger 代码如下:

    #!/usr/bin/python
    import logging
    import os
    
    class Logger():
    
        def __init__(self,name,log_path):
            self.log_path = log_path
            self.name = name
            self.log_file = os.path.join(log_path,'log.txt')
            if not os.path.isdir(log_path):
                try:
                    os.makedirs(log_path)
                except OSError:
                    pass             
    
        def modlogger(self):
            mylogger = logging.getLogger(self.name)
            mylogger.setLevel(logging.DEBUG)
            fh = logging.FileHandler(self.log_file)
            sh = logging.StreamHandler()
            formatter = logging.Formatter("%(asctime)s  %(levelname)s\t%(message)s")
            fh.setFormatter(formatter)
            sh.setFormatter(formatter)
            mylogger.addHandler(fh)
            mylogger.addHandler(sh)
            return mylogger
    
        def get_instance(self):
            logger = Logger(self.name,self.log_path)
            return logger
    
        def logger(self):
            methods = self.get_instance()
            return Methods(methods)  
    
    class Methods(object):
        def __init__(self,obj):
            self.ml = obj.modlogger()
    
        def info(self,string):
            self.ml.info(string)
    
        def debug(self,string):
            self.ml.debug(string)
    
        def warning(self,string):
            self.ml.warning(string)
    
        def error(self,string):
            self.ml.error(string)
    
    
    if __name__=="__main__":
        l = Logger('test0','/home/buildbot/bluesea/reporter')
        m = l.logger()
        m.info('------this is info message-------')
        m.error('tap ok button fail')
        m.debug('-----this is debug message-------')
        m.warning('you have not set timeout')
    
    
  • #1 楼 @264768502
    第一次发,不懂怎么发代码,现在改过来了。

  • controller 代码

    import xml.etree.ElementTree as ET
    import os
    import sys
    import subprocess as sp
    import time
    import logging
    import re
    import codecs
    import datetime
    import ui_hooks
    import socket
    import threading
    
    
    WINDOW_FACTOR = ['index', 'text', 'resource-id', 'class', 'package', 'visible', 'content-desc', 'checkable',
                  'checked', 'clickable', 'enabled', 'focusable', 'focused', 'scrollable', 'long-clickable',
                  'password', 'bottom', 'selected', 'bounds']
    
    RETRY_MAX_TIME = 3
    
    def get_data(address):
        result={}
        temp = ''
        f1 = open(address,'r')
        temp = f1.read()
        result = eval(temp)
        f1.close()
        return result 
    
    
    
    
    class Operator:
        def __init__(self,device_id='',log_class=None,log_path='',**kwargs):
            self.device_id = device_id
            self.log_class = log_class
            self.log_path = log_path
            self.current_path = os.path.dirname(__file__) 
            self.home_dir = os.path.expanduser('~')
            if not os.path.isdir(self.log_path): 
                try:
                    os.makedirs(log_path)
                except OSError:
                    pass         
            self.failure_pic_location = os.path.join(self.log_path,'screenshots')
            if not os.path.isdir(self.failure_pic_location):
                try:
                    os.makedirs(self.failure_pic_location)
                except OSError:
                    pass             
            self.temporary = os.path.join(self.log_path,'temp')
            if not os.path.isdir(self.temporary):
                try:
                    os.makedirs(self.temporary)
                except OSError:
                    pass             
            self.hooks = ui_hooks.Hooks(self.device_id)       
    
    
    
        def get_instance(self):
            op = Operator(self.device_id,self.log_class,self.log_path)
            return op
    
        def decode_utf(self,var):
            utype = codecs.lookup("utf-8")
            return utype.decode(var)[0]
    
        def update_dictory(self,dictory):
            keys_list = dictory.keys()
            values_list = dictory.values()
            temp={}
            for i in range(len(keys_list)):
                temp[keys_list[i]] = self.decode_utf(values_list[i])
            return temp    
    
        def search(self,**kwargs):
            global paras_dictory,retry_time
            temp = kwargs
            paras_dictory = self.update_dictory(temp)
            if paras_dictory.has_key("retry_time"):
                retry_time = paras_dictory["retry_time"]
            else:
                retry_time = None
            return self.judge(paras_dictory,retry_time)
    
        def judge(self,dic,retry_time):
            timer = 0      
            global xml_path    
            xml_path = self.get_xml()
            if xml_path is None:
                print "Error : can not generate window dump file."
                panel = None
                return panel
            else:
                self.native_unlock_screen() 
                if retry_time is not None:
                    while 1:
                        if timer < int(retry_time):
                            if self.parse_window_dump(paras_dictory) is None:
                                print "Warnning : there is no view or many views match your require, retry again!"
                                self.little_swipe()
                                time.sleep(1)
                                timer+=1
                                self.judge(paras_dictory,retry_time)
                            else:
                                op = self.get_instance()
                                panel = Panel(op)
                                return panel
                                break
                        else:
                            panel = None
                            return panel
                else:
                    if self.parse_window_dump(paras_dictory) is None:
                        print "Warnning : there is no view or many views match your require, start to search hooks library...!" 
                        self.hooks.get_generators(self.get_all_nodes())
                        status = self.hooks.parse()
                        # if status is True, it means find corresponding frame and click next button to skip this frame
                        if status:
                            time.sleep(1)
                            self.judge(paras_dictory,retry_time).click()
                        else:
                            panel = None
                            return panel
                    else:
                        op = self.get_instance()
                        panel = Panel(op)
                        return panel
    
    
        def get_intents_dictory(self):
            bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir))
            app_intents_file_path = os.path.join(bluesea_dir,'repository/intent_list.cfg')
            return get_data(app_intents_file_path)
    
    
        def hardware(self,**kwargs):    
            executor = self.get_instance()
            dut = Device(executor)
            return dut
    
    
        def get_all_nodes(self):
            parser = ET.parse(xml_path)
            root = parser.getroot()
            node_instance_list = root.iter('node')
            return node_instance_list
    
        def get_scrollable_view(self):
            for ins in self.get_all_nodes():
                if ins.attrib["scrollable"] == "true":
                    bound = ins.get("bounds")
                    break
                else:
                    bound = None
            return bound
    
        def native_is_screen_lock(self):
            is_screen_lock = False
            for ins in self.get_all_nodes():
                id_collector = ins.attrib['resource-id']
                if "id/lock_icon" in id_collector.strip('\n'):
                    is_screen_lock = True
                    break     
            return is_screen_lock
    
    
        def parse_window_dump(self,parametres=None):
            r1 = r'[A-Za-z]+:id/(\w+)'
            bound = None
            p_counter = []
            bound_list = [] 
            counter_id = 0
            counter_index = 0
            counter_desc = 0
            counter_pack = 0
            counter_text = 0
            counter_enabled = 0
            false_flag = 0
            if parametres.has_key('id'):
                view_id = parametres['id']
                p_counter.append(view_id)
            if parametres.has_key('text'):
                view_text = parametres['text']
                p_counter.append(view_text)
            if parametres.has_key('description'):
                view_description = parametres['description']
                p_counter.append(view_description)
            if parametres.has_key('index'):
                view_index = parametres['index']
                p_counter.append(view_index)
            if parametres.has_key('package'):
                view_package = parametres['package']
                p_counter.append(view_package)
            if parametres.has_key('enabled'):
                view_enabled = parametres['enabled']
                p_counter.append(view_enabled)
    
    
            if len(p_counter) == 0:
                print "Error : invalid input, there is no useful parameter given!!!"
                sys.exit(-1)
    
            for factor in self.get_all_nodes():
                id_collector = factor.attrib['resource-id']
                text_collector = factor.attrib['text']  
                index_collector = factor.attrib['index']
                desc_collector = factor.attrib['content-desc']
                pack_collector = factor.attrib['package']
                enabled_collector = factor.attrib['enabled']
                for your_input in p_counter:
                    id_strs = re.findall(r1,id_collector.strip('\n'))
                    id_string = id_strs
                    if not id_strs == []:
                        id_string=id_strs[0]
                    else:
                        id_string = id_collector.strip('\n\r')    
                    if your_input == id_string:
                        bound_id = factor.get("bounds")
                        counter_id +=1
                    if your_input == text_collector.strip('\n'):
                        bound_text = factor.get("bounds")
                        counter_text +=1
                    if your_input == index_collector.strip('\n'):
                        bound_index = factor.get("bounds")
                        counter_index +=1
                    if your_input == desc_collector.strip('\n'):
                        bound_desc = factor.get("bounds")
                        counter_desc +=1
                    if your_input == pack_collector.strip('\n'):
                        bound_pack = factor.get("bounds")
                        counter_pack +=1
                    if your_input == enabled_collector.strip('\n'):
                        bound_enabled = factor.get("bounds")
                        counter_enabled +=1                    
    
            if counter_id == 0:
                false_flag += 1 
            if counter_index == 0:
                false_flag += 1 
            if counter_pack == 0:
                false_flag += 1            
            if counter_desc == 0:
                false_flag += 1 
            if counter_text == 0:
                false_flag += 1
            if counter_enabled == 0:
                false_flag += 1
    
            if counter_id == 1:
                bound_list.append(bound_id)
            if counter_text == 1:
                bound_list.append(bound_text)
            if counter_desc == 1:
                bound_list.append(bound_desc)
            if counter_pack == 1:
                bound_list.append(bound_pack)
            if counter_index == 1:
                bound_list.append(bound_index)
            if counter_enabled == 1:
                bound_list.append(bound_enabled)
    
            if false_flag <= 6 - len(p_counter):  
                if len(bound_list) == 0:
                    bound = None
                elif len(bound_list) == 1:
                    bound = bound_list[0]
                elif len(bound_list) == 2:
                    temp_bound = bound_list[0]
                    if bound_list[1] == temp_bound:
                        bound = temp_bound
                    else:
                        bound = None        
                elif len(bound_list) > 2:
                    temp_bound = bound_list[0]
                    for bo in range(1,len(bound_list)):
                        if bound_list[bo] != temp_bound:
                            bound = None
                            break   
                        else:
                            bound = temp_bound
            else:
                bound = None                 
            return bound            
    
        def get_xml(self):
            retry_count = 0
            while 1:
                if  retry_count < RETRY_MAX_TIME:
                    self.adb_root()
                    shell_cmd = self.adb_shell()
                    dump_xml_command = shell_cmd + 'uiautomator dump /sdcard/window_dump.xml'
                    self.execute_command(dump_xml_command).wait()
                    xml_file_on_device_path = '/sdcard/window_dump.xml'
                    xml_file_on_server_path = self.temporary
                    xml_file_origin = os.path.join(xml_file_on_server_path,'window_dump.xml')
                    xml_new_name = self.device_id + '_' + 'window_dump.xml'
                    xml_file = os.path.join(xml_file_on_server_path,xml_new_name)
                    get_xml_command = 'adb -s ' + self.device_id + ' pull ' + xml_file_on_device_path + ' ' + xml_file_on_server_path
                    self.execute_command(get_xml_command).wait()
                    try:
                        os.rename(xml_file_origin,xml_file)
                    except OSError as oer:
                        print oer
                        self.get_xml()    
                    if os.path.isfile(xml_file):
                        return xml_file
                        break
                    else:
                        retry_count += 1    
                else:
                    print "Unkown issue of adb, uiautomator dump file failed!"  
                    break
            return          
    
        def adb_root(self):
            root_command = 'adb -s ' + self.device_id + ' root'
            self.execute_command(root_command).wait()   
    
        def generate_folder_locate_picture(self):
            address_picture = self.failure_pic_location
            if not os.path.isdir(address_picture):
                try:
                    os.makedirs(address_picture)
                except OSError:
                    pass             
            return address_picture
    
        def generate_screenshot_name_format(self):
            dt = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
            file_name = 'screenshot_' + dt + '.png'
            return file_name
    
        def adb_command(self):
            adb_command = 'adb -s ' + self.device_id + ' '
            return adb_command
    
    
        def adb_shell(self):
            adb_shell_command = 'adb -s ' + self.device_id + ' shell '
            return adb_shell_command
    
        def execute_command(self,cmd,ignore_print=True):    
            ccmd = cmd
            if ignore_print:
                self.log_class.info("Execute command : {0}".format(ccmd))
            else:
                pass    
            proc = sp.Popen(ccmd.split(' '),stdout=sp.PIPE)    
            return proc
    
        def check_adb_works(self,proc):
            pass    
    
        def get_dumpsys_display(self):
            dumpsys_display_command = self.adb_shell() + 'dumpsys display'
            out = self.execute_command(dumpsys_display_command)
            lines = out.stdout.read().split('\n')
            return lines
    
        def bound_to_list(self):
            s = self.parse_window_dump(paras_dictory)
            temp = s.replace('[','').replace(']',',').split(',')
            temp.remove('')
            return temp
    
        def get_centre_coordinate(self):
            c_list = self.bound_to_list()
            point_c = []
            co_x1 = c_list[0]
            co_x2 = c_list[2]
            point_c.append(str((int(co_x1)+int(co_x2))/2))
            co_y1 = c_list[1]
            co_y2 = c_list[3]
            point_c.append(str((int(co_y1)+int(co_y2))/2))
            return point_c
    
        def native_power_on(self):
            if not self.native_check_screen_on():
                power_on_command = self.adb_shell() + 'input keyevent 26'
                self.execute_command(power_on_command).wait()   
    
        def native_unlock_screen(self):
            self.native_power_on()
            if self.native_is_screen_lock():
                self.native_push_up()
    
    
        def native_check_screen_on(self):
            is_screen_on = False
            for line in self.get_dumpsys_display():
                if 'mScreenState' in line and 'ON' in line:
                    is_screen_on = True
            return is_screen_on 
    
        def native_push_up(self):
            device_resolution = self.get_max_resolution()
            max_w = device_resolution[0]
            max_h = device_resolution[1]
            if max_w =='unknow' or max_h =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                x1 = str(int(max_w)/2)
                x2 = str(int(max_w)/2)
                y1 = str((int(max_h)*4)/5)
                y2 = str(int(max_h)/5)
                push_up_command = self.adb_shell()  + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
                self.execute_command(push_up_command).wait()
    
        def get_max_resolution(self):
            max_resolution = []
            for line in self.get_dumpsys_display():
                if 'mDisplayWidth' in line:
                    max_resolution.append(line.strip('\r').split('=')[-1])
                if 'mDisplayHeight' in line:        
                    max_resolution.append(line.strip('\r').split('=')[-1])     
            return max_resolution
    
        def take_snapshot(self):
            global xml_path
            xml_path=self.get_xml()
            file_path = self.generate_folder_locate_picture()
            picture_name = self.generate_screenshot_name_format()
            uiautomator_dump_file_name = picture_name.split('.')[0] + '.uix'
            uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name)
            file_path_device = os.path.join('/sdcard/' + picture_name)
            take_snapshot_command = self.adb_shell() + '/system/bin/screencap -p ' + file_path_device
            pull_snapshot_command = 'adb -s ' + self.device_id + ' pull ' + file_path_device + ' ' + file_path
            get_uiautomator_dump_command = self.adb_shell() + 'uiautomator dump'
            pull_uiautomator_dump_command = 'adb -s ' + self.device_id + ' pull /sdcard/window_dump.xml' + ' ' + uiautomator_dump_file
            self.native_unlock_screen()
            time.sleep(0.5)
            self.execute_command(take_snapshot_command).wait()
            self.execute_command(pull_snapshot_command).wait()
            self.execute_command(get_uiautomator_dump_command).wait()
            self.execute_command(pull_uiautomator_dump_command).wait()        
    
        def little_swipe(self):
            device_resolution_2 = self.get_max_resolution()
            max_w_2 = device_resolution_2[0]
            max_h_2 = device_resolution_2[1]
            x1_2 = str(int(max_w_2)/2)
            x2_2 = str(int(max_w_2)/2)
            y2_2 = str((int(max_h_2)*2)/5)
            y1_2 = str((int(max_h_2)*4)/5)
            little_swipe_command = self.adb_shell() + 'input swipe ' + x1_2 + ' ' + y1_2 + ' ' + x2_2 + ' ' + y2_2
            self.execute_command(little_swipe_command).wait()
    
    
    class Device(object):
        def __init__(self,executor):
            self.executor = executor
    
        def return_home(self):
            go_home_command = self.executor.adb_shell() + 'input keyevent 3'
            self.executor.execute_command(go_home_command).wait()
    
    
        def press_enter(self):
            press_enter_command = self.executor.adb_shell() + 'input keyevent 66'
            self.executor.execute_command(press_enter_command).wait()
    
        def press_menu(self):
            press_menu_command = self.executor.adb_shell() + 'input keyevent 82'
            self.executor.execute_command(press_menu_command).wait()
    
        def shift_tab(self,counter=1):
            tab_key = '61 '
            shift_tab_command = self.executor.adb_shell() + 'input keyevent ' + (tab_key*int(counter)).strip()
            self.executor.execute_command(shift_tab_command).wait()
    
        def shift_down(self,counter=1):
            down_key = '20 '
            press_down_command = self.executor.adb_shell() + 'input keyevent ' + (down_key*int(counter)).strip()
            self.executor.execute_command(press_down_command).wait()    
    
        def shift_up(self,counter=1):
            up_key = '19 '
            press_up_command = self.executor.adb_shell() + 'input keyevent ' + (up_key*int(counter)).strip()
            self.executor.execute_command(press_up_command).wait()
    
        def shift_left(self,counter=1):
            press_left_command = self.executor.adb_shell() + 'input keyevent 21'
            for j in range(int(counter)):
                self.executor.execute_command(press_left_command).wait()
                time.sleep(1)
    
        def volume_up(self,counter=1):
            volume_up_command = self.executor.adb_shell() + 'input keyevent 24'
            for j in range(int(counter)):
                self.executor.execute_command(volume_up_command).wait()
                time.sleep(0.5)
    
        def volume_down(self,counter=1):
            volume_down_command = self.executor.adb_shell() + 'input keyevent 25'
            for j in range(int(counter)):
                self.executor.execute_command(volume_down_command).wait()
                time.sleep(0.5)
    
        def shift_right(self,counter=1):
            press_right_command = self.executor.adb_shell() + 'input keyevent 22'
            for k in range(int(counter)):
                self.executor.execute_command(press_right_command).wait()
                time.sleep(1)
    
        def push_up(self):
            device_resolution = self.executor.get_max_resolution()
            max_w = device_resolution[0]
            max_h = device_resolution[1]
            if max_w =='unknow' or max_h =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                x1 = str(int(max_w)/2)
                x2 = str(int(max_w)/2)
                y1 = str((int(max_h)*4)/5)
                y2 = str(int(max_h)/5)
                push_up_command = self.executor.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
                self.executor.execute_command(push_up_command).wait()
    
    
    
        def swipe_right(self):
            device_resolution = self.executor.get_max_resolution()
            max_w = device_resolution[0]
            max_h = device_resolution[1]
            if max_w =='unknow' or max_h =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                x1 = str(int(max_w)/5)
                x2 = str((int(max_w)*4)/5)
                y1 = str(int(max_h)/2)
                y2 = str(int(max_h)/2)
                swipe_right_command = self.executor.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
                self.executor.execute_command(swipe_right_command).wait()
    
        def check_boot_status(self):
            is_booted = False
            check_boot_status_command = self.executor.adb_shell() + 'getprop'
            grep_command = 'grep bootanim'
            p5 = sp.Popen(check_boot_status_command.split(' '),stdout=sp.PIPE)
            p6 = sp.Popen(grep_command.split(' '),stdin=p5.stdout,stdout=sp.PIPE)
            out = p6.stdout.read().strip('\n')
            if "stopped" and '1' in out:
                is_booted = True
            return is_booted
    
    
        def check_reboot_progress(self):
            reboot_execute_success = False
            if not self.check_boot_status():
                reboot_execute_success = True
            return reboot_execute_success    
    
    
        def reboot_device(self):
            reboot_device_command = self.executor.adb_command() + 'reboot'
            self.executor.execute_command(reboot_device_command).wait()
    
        def input_text(self,string='%s'):
            string = string.strip().replace(' ','%s')    
            input_text_command = self.executor.adb_shell() + 'input text ' + string
            self.executor.execute_command(input_text_command).wait()
    
    
    
        def swipe_left(self):
            device_resolution = self.executor.get_max_resolution()
            max_w = device_resolution[0]
            max_h = device_resolution[1]
            if max_w =='unknow' or max_h =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                x1 = str((int(max_w)*4)/5)
                x2 = str(int(max_w)/5)
                y1 = str(int(max_h)/2)
                y2 = str(int(max_h)/2)
                swipe_left_command = self.executor.adb_shell()  + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
                self.executor.execute_command(swipe_left_command).wait()
    
    
        def pull_down(self):
            device_resolution_1 = self.executor.get_max_resolution()
            max_w_1 = device_resolution_1[0]
            max_h_1 = device_resolution_1[1]
            if max_w_1 =='unknow' or max_h_1 =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                x1_1 = str(int(max_w_1)/2)
                x2_1 = str(int(max_w_1)/2)
                y1_1 = str(int(max_h_1)/5)
                y2_1 = str((int(max_h_1)*4)/5)
                push_up_command_1 = self.executor.adb_shell()   + 'input swipe ' + x1_1 + ' ' + y1_1 + ' ' + x2_1 + ' ' + y2_1
                self.executor.execute_command(push_up_command_1).wait()
    
        def swipe_pull_notification(self):
            device_resolution = self.executor.get_max_resolution()
            max_w = device_resolution[0]
            max_h = device_resolution[1]
            if max_w =='unknow' or max_h =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                x1 = str(int(max_w)/2)
                x2 = str(int(max_w)/2)
                y1 = "0"
                y2 = str(int(max_h)/5)
                swip_noticfi_command = self.executor.adb_shell()    + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
                self.executor.execute_command(swip_noticfi_command).wait()
    
        def get_through_oobe(self):
            device_resolution = self.executor.get_max_resolution()
            max_w = device_resolution[0]
            max_h = device_resolution[1]
            print "[debug] max_w and max_h is ",max_w,max_h
            if max_w =='unknow' or max_h =='unknow':
                print "Error : dumpsys display failed"
                sys.exit(-1)
            else:
                p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)]
                p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)]
                p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)]
                p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)]
                tap_command1 = self.executor.adb_shell() + 'input tap ' + p1[0] + ' ' + p1[1]
                tap_command2 = self.executor.adb_shell() + 'input tap ' + p2[0] + ' ' + p2[1]
                tap_command3 = self.executor.adb_shell() + 'input tap ' + p3[0] + ' ' + p3[1]
                tap_command4 = self.executor.adb_shell() + 'input tap ' + p4[0] + ' ' + p4[1]
                self.executor.execute_command(tap_command1).wait()
                self.executor.execute_command(tap_command2).wait()
                self.executor.execute_command(tap_command3).wait()
                self.executor.execute_command(tap_command4).wait()      
    
        def power_on(self):
            if not self.check_screen_on():
                power_on_command = self.executor.adb_shell() + 'input keyevent 26'
                self.executor.execute_command(power_on_command).wait()
    
    
        def press_back(self,counter=1):
            press_back_command = self.executor.adb_shell() + 'input keyevent 4'
            for i in range(int(counter)):
                self.executor.execute_command(press_back_command).wait()
                time.sleep(1)
    
    
    
        def check_screen_on(self):
            is_screen_on = False
            for line in self.executor.get_dumpsys_display():
                if 'mScreenState' in line and 'ON' in line:
                    is_screen_on = True
            return is_screen_on     
    
        def launch_app_with_intent(self,app="Native_HomePage",**kwargs):
            intent_dictory = self.executor.get_intents_dictory()
            if app in intent_dictory.keys():
                app_intent = intent_dictory[app]['intent']  
                launch_app_command = self.executor.adb_shell() + 'am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n ' + app_intent
                self.executor.execute_command(launch_app_command).wait()
            else:
                print "[Error] there is no intent match your given application, please double check or contact author to add!!!"
                sys.exit(-1) 
    
    
    class Panel(object):
        def __init__(self,panel):
            self.panel = panel
    
        def click(self):
            click_coordinate = self.panel.get_centre_coordinate()
            click_command = self.panel.adb_shell() + 'input tap ' + click_coordinate[0] + ' ' + click_coordinate[1]
            self.panel.execute_command(click_command).wait()
    
        def swipe_up(self,times=1):
            coordinate_list = self.panel.bound_to_list()
            x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
            x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
            y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5)
            y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5)
            push_up_command = self.panel.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            for loop in range(int(times)):
                self.panel.execute_command(push_up_command).wait()
    
        def long_click(self):
            click_coordinate = self.panel.get_centre_coordinate()
            px = str(int(click_coordinate[0])+1)
            py = str(int(click_coordinate[1])+1)
            during_time = '2000'
            long_click_command = self.panel.adb_shell() + 'input swipe ' + click_coordinate[0] + ' ' + click_coordinate[1] + ' ' + px + ' ' + py + ' ' + during_time
            self.panel.execute_command(long_click_command).wait()
    
        def drag_to(self,dx,dy):
            drag_to_command = []
            down_key_coordinate = self.panel.get_centre_coordinate()
            down_key_px = str(int(down_key_coordinate[0]))
            down_key_py = str(int(down_key_coordinate[1]))
            down_key_command = self.touch_down(down_key_px,down_key_py)
            move_command = self.move(str(dx),str(dy))
            up_key_command = self.touch_up(str(dx),str(dy))
            if self.get_pid() is None:
                self.generate_monkey_process()
            monkey_pid_num = self.get_pid()
            if monkey_pid_num is not None:
                drag_to_command.append(down_key_command)
                drag_to_command.append(move_command)
                drag_to_command.append(up_key_command)
                self.open_socket(drag_to_command)
                self.kill_monkey_process(monkey_pid_num)
            if self.get_pid() is not None:
                self.kill_monkey_process(monkey_pid_num)    
    
        def touch_down(self,dx,dy):
            return 'touch down {0} {1}\n'.format(dx,dy)
    
        def touch_up(self,dx,dy):
            return 'touch up {0} {1}\n'.format(dx,dy)
    
        def move(self,dx,dy):
            return 'touch move {0} {1}\n'.format(dx,dy)
    
        def get_pid(self):
            monkey_pid=None
            r2=r'\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey'
            grep_monkey_command = self.panel.adb_shell() + 'ps'
            o = sp.Popen(grep_monkey_command.split(' '),stdout=sp.PIPE).stdout.read().strip('\n')
            re_result = re.findall(r2,o)
            if re_result == []:
                monkey_pid=None
            else:
                monkey_pid=re_result[0]    
            return monkey_pid
    
        def kill_monkey_process(self,monkey_pid):
            if monkey_pid is not None:
                kill_monkey_process_command = self.panel.adb_shell() + 'kill ' + monkey_pid
                self.panel.execute_command(kill_monkey_process_command).wait()
    
        def generate_monkey_process(self):
            generate_monkey_process_command = self.panel.adb_shell() + 'LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555'
            p2 = sp.Popen(generate_monkey_process_command.split(' '),stdout=sp.PIPE)
            t1 = threading.Timer(5,lambda:self.kill(p2))
            t1.setDaemon(True)
            t1.start()
            time.sleep(5)
            if self.get_pid() is not None:
                forwad_command = 'adb -s ' + self.panel.device_id + ' forward tcp:15555 tcp:15555'
                self.panel.execute_command(forwad_command).wait()
    
        def open_socket(self,cmd_list):
            sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            sk.settimeout(10)
            sk.connect(('127.0.0.1',15555))
            for cmd in cmd_list:
                self.panel.log_class.info("Execute command : {0}".format(cmd)) 
                sk.sendall(cmd)
                time.sleep(1.5)
            sk.send('quit\n')
            sk.close()    
    
        def kill(self,process):
            process.terminate()
            process.kill()
    
    
    
    
    
    
    
    
  • Android 常用 adb 命令总结 at 2017年01月09日

    我们公司自己开发了一个 UI 自动化测试框架,就是基于 python+adb,非常强大。