自动化工具 基于 python+adb 的部分自动化测试代码

yaboandroid · 2017年01月13日 · 最后由 HSB2 回复于 2019年12月09日 · 29 次阅读

源代码格式错误,已在楼下重发!!!

共收到 19 条回复 时间 点赞

小小的建议
没缩进不能看,不如贴 gist
单纯的 adb 的封装有很多人写了
比如我 (# 厚脸皮) https://github.com/264768502/adb_wrapper
比如这贴: https://testerhome.com/topics/6938

如果要处理 UI 的话,其实有现成的,比如 pyuiautomator 或者 Appium

controller 代码

import xml.etree.ElementTree as ET
import os
import sys
import subprocess as sp
import time
import logging
import re
import codecs
import datetime
import ui_hooks
import socket
import threading


WINDOW_FACTOR = ['index', 'text', 'resource-id', 'class', 'package', 'visible', 'content-desc', 'checkable',
              'checked', 'clickable', 'enabled', 'focusable', 'focused', 'scrollable', 'long-clickable',
              'password', 'bottom', 'selected', 'bounds']

RETRY_MAX_TIME = 3

def get_data(address):
    result={}
    temp = ''
    f1 = open(address,'r')
    temp = f1.read()
    result = eval(temp)
    f1.close()
    return result 




class Operator:
    def __init__(self,device_id='',log_class=None,log_path='',**kwargs):
        self.device_id = device_id
        self.log_class = log_class
        self.log_path = log_path
        self.current_path = os.path.dirname(__file__) 
        self.home_dir = os.path.expanduser('~')
        if not os.path.isdir(self.log_path): 
            try:
                os.makedirs(log_path)
            except OSError:
                pass         
        self.failure_pic_location = os.path.join(self.log_path,'screenshots')
        if not os.path.isdir(self.failure_pic_location):
            try:
                os.makedirs(self.failure_pic_location)
            except OSError:
                pass             
        self.temporary = os.path.join(self.log_path,'temp')
        if not os.path.isdir(self.temporary):
            try:
                os.makedirs(self.temporary)
            except OSError:
                pass             
        self.hooks = ui_hooks.Hooks(self.device_id)       



    def get_instance(self):
        op = Operator(self.device_id,self.log_class,self.log_path)
        return op

    def decode_utf(self,var):
        utype = codecs.lookup("utf-8")
        return utype.decode(var)[0]

    def update_dictory(self,dictory):
        keys_list = dictory.keys()
        values_list = dictory.values()
        temp={}
        for i in range(len(keys_list)):
            temp[keys_list[i]] = self.decode_utf(values_list[i])
        return temp    

    def search(self,**kwargs):
        global paras_dictory,retry_time
        temp = kwargs
        paras_dictory = self.update_dictory(temp)
        if paras_dictory.has_key("retry_time"):
            retry_time = paras_dictory["retry_time"]
        else:
            retry_time = None
        return self.judge(paras_dictory,retry_time)

    def judge(self,dic,retry_time):
        timer = 0      
        global xml_path    
        xml_path = self.get_xml()
        if xml_path is None:
            print "Error : can not generate window dump file."
            panel = None
            return panel
        else:
            self.native_unlock_screen() 
            if retry_time is not None:
                while 1:
                    if timer < int(retry_time):
                        if self.parse_window_dump(paras_dictory) is None:
                            print "Warnning : there is no view or many views match your require, retry again!"
                            self.little_swipe()
                            time.sleep(1)
                            timer+=1
                            self.judge(paras_dictory,retry_time)
                        else:
                            op = self.get_instance()
                            panel = Panel(op)
                            return panel
                            break
                    else:
                        panel = None
                        return panel
            else:
                if self.parse_window_dump(paras_dictory) is None:
                    print "Warnning : there is no view or many views match your require, start to search hooks library...!" 
                    self.hooks.get_generators(self.get_all_nodes())
                    status = self.hooks.parse()
                    # if status is True, it means find corresponding frame and click next button to skip this frame
                    if status:
                        time.sleep(1)
                        self.judge(paras_dictory,retry_time).click()
                    else:
                        panel = None
                        return panel
                else:
                    op = self.get_instance()
                    panel = Panel(op)
                    return panel


    def get_intents_dictory(self):
        bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir))
        app_intents_file_path = os.path.join(bluesea_dir,'repository/intent_list.cfg')
        return get_data(app_intents_file_path)


    def hardware(self,**kwargs):    
        executor = self.get_instance()
        dut = Device(executor)
        return dut


    def get_all_nodes(self):
        parser = ET.parse(xml_path)
        root = parser.getroot()
        node_instance_list = root.iter('node')
        return node_instance_list

    def get_scrollable_view(self):
        for ins in self.get_all_nodes():
            if ins.attrib["scrollable"] == "true":
                bound = ins.get("bounds")
                break
            else:
                bound = None
        return bound

    def native_is_screen_lock(self):
        is_screen_lock = False
        for ins in self.get_all_nodes():
            id_collector = ins.attrib['resource-id']
            if "id/lock_icon" in id_collector.strip('\n'):
                is_screen_lock = True
                break     
        return is_screen_lock


    def parse_window_dump(self,parametres=None):
        r1 = r'[A-Za-z]+:id/(\w+)'
        bound = None
        p_counter = []
        bound_list = [] 
        counter_id = 0
        counter_index = 0
        counter_desc = 0
        counter_pack = 0
        counter_text = 0
        counter_enabled = 0
        false_flag = 0
        if parametres.has_key('id'):
            view_id = parametres['id']
            p_counter.append(view_id)
        if parametres.has_key('text'):
            view_text = parametres['text']
            p_counter.append(view_text)
        if parametres.has_key('description'):
            view_description = parametres['description']
            p_counter.append(view_description)
        if parametres.has_key('index'):
            view_index = parametres['index']
            p_counter.append(view_index)
        if parametres.has_key('package'):
            view_package = parametres['package']
            p_counter.append(view_package)
        if parametres.has_key('enabled'):
            view_enabled = parametres['enabled']
            p_counter.append(view_enabled)


        if len(p_counter) == 0:
            print "Error : invalid input, there is no useful parameter given!!!"
            sys.exit(-1)

        for factor in self.get_all_nodes():
            id_collector = factor.attrib['resource-id']
            text_collector = factor.attrib['text']  
            index_collector = factor.attrib['index']
            desc_collector = factor.attrib['content-desc']
            pack_collector = factor.attrib['package']
            enabled_collector = factor.attrib['enabled']
            for your_input in p_counter:
                id_strs = re.findall(r1,id_collector.strip('\n'))
                id_string = id_strs
                if not id_strs == []:
                    id_string=id_strs[0]
                else:
                    id_string = id_collector.strip('\n\r')    
                if your_input == id_string:
                    bound_id = factor.get("bounds")
                    counter_id +=1
                if your_input == text_collector.strip('\n'):
                    bound_text = factor.get("bounds")
                    counter_text +=1
                if your_input == index_collector.strip('\n'):
                    bound_index = factor.get("bounds")
                    counter_index +=1
                if your_input == desc_collector.strip('\n'):
                    bound_desc = factor.get("bounds")
                    counter_desc +=1
                if your_input == pack_collector.strip('\n'):
                    bound_pack = factor.get("bounds")
                    counter_pack +=1
                if your_input == enabled_collector.strip('\n'):
                    bound_enabled = factor.get("bounds")
                    counter_enabled +=1                    

        if counter_id == 0:
            false_flag += 1 
        if counter_index == 0:
            false_flag += 1 
        if counter_pack == 0:
            false_flag += 1            
        if counter_desc == 0:
            false_flag += 1 
        if counter_text == 0:
            false_flag += 1
        if counter_enabled == 0:
            false_flag += 1

        if counter_id == 1:
            bound_list.append(bound_id)
        if counter_text == 1:
            bound_list.append(bound_text)
        if counter_desc == 1:
            bound_list.append(bound_desc)
        if counter_pack == 1:
            bound_list.append(bound_pack)
        if counter_index == 1:
            bound_list.append(bound_index)
        if counter_enabled == 1:
            bound_list.append(bound_enabled)

        if false_flag <= 6 - len(p_counter):  
            if len(bound_list) == 0:
                bound = None
            elif len(bound_list) == 1:
                bound = bound_list[0]
            elif len(bound_list) == 2:
                temp_bound = bound_list[0]
                if bound_list[1] == temp_bound:
                    bound = temp_bound
                else:
                    bound = None        
            elif len(bound_list) > 2:
                temp_bound = bound_list[0]
                for bo in range(1,len(bound_list)):
                    if bound_list[bo] != temp_bound:
                        bound = None
                        break   
                    else:
                        bound = temp_bound
        else:
            bound = None                 
        return bound            

    def get_xml(self):
        retry_count = 0
        while 1:
            if  retry_count < RETRY_MAX_TIME:
                self.adb_root()
                shell_cmd = self.adb_shell()
                dump_xml_command = shell_cmd + 'uiautomator dump /sdcard/window_dump.xml'
                self.execute_command(dump_xml_command).wait()
                xml_file_on_device_path = '/sdcard/window_dump.xml'
                xml_file_on_server_path = self.temporary
                xml_file_origin = os.path.join(xml_file_on_server_path,'window_dump.xml')
                xml_new_name = self.device_id + '_' + 'window_dump.xml'
                xml_file = os.path.join(xml_file_on_server_path,xml_new_name)
                get_xml_command = 'adb -s ' + self.device_id + ' pull ' + xml_file_on_device_path + ' ' + xml_file_on_server_path
                self.execute_command(get_xml_command).wait()
                try:
                    os.rename(xml_file_origin,xml_file)
                except OSError as oer:
                    print oer
                    self.get_xml()    
                if os.path.isfile(xml_file):
                    return xml_file
                    break
                else:
                    retry_count += 1    
            else:
                print "Unkown issue of adb, uiautomator dump file failed!"  
                break
        return          

    def adb_root(self):
        root_command = 'adb -s ' + self.device_id + ' root'
        self.execute_command(root_command).wait()   

    def generate_folder_locate_picture(self):
        address_picture = self.failure_pic_location
        if not os.path.isdir(address_picture):
            try:
                os.makedirs(address_picture)
            except OSError:
                pass             
        return address_picture

    def generate_screenshot_name_format(self):
        dt = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
        file_name = 'screenshot_' + dt + '.png'
        return file_name

    def adb_command(self):
        adb_command = 'adb -s ' + self.device_id + ' '
        return adb_command


    def adb_shell(self):
        adb_shell_command = 'adb -s ' + self.device_id + ' shell '
        return adb_shell_command

    def execute_command(self,cmd,ignore_print=True):    
        ccmd = cmd
        if ignore_print:
            self.log_class.info("Execute command : {0}".format(ccmd))
        else:
            pass    
        proc = sp.Popen(ccmd.split(' '),stdout=sp.PIPE)    
        return proc

    def check_adb_works(self,proc):
        pass    

    def get_dumpsys_display(self):
        dumpsys_display_command = self.adb_shell() + 'dumpsys display'
        out = self.execute_command(dumpsys_display_command)
        lines = out.stdout.read().split('\n')
        return lines

    def bound_to_list(self):
        s = self.parse_window_dump(paras_dictory)
        temp = s.replace('[','').replace(']',',').split(',')
        temp.remove('')
        return temp

    def get_centre_coordinate(self):
        c_list = self.bound_to_list()
        point_c = []
        co_x1 = c_list[0]
        co_x2 = c_list[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = c_list[1]
        co_y2 = c_list[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def native_power_on(self):
        if not self.native_check_screen_on():
            power_on_command = self.adb_shell() + 'input keyevent 26'
            self.execute_command(power_on_command).wait()   

    def native_unlock_screen(self):
        self.native_power_on()
        if self.native_is_screen_lock():
            self.native_push_up()


    def native_check_screen_on(self):
        is_screen_on = False
        for line in self.get_dumpsys_display():
            if 'mScreenState' in line and 'ON' in line:
                is_screen_on = True
        return is_screen_on 

    def native_push_up(self):
        device_resolution = self.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.adb_shell()  + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.execute_command(push_up_command).wait()

    def get_max_resolution(self):
        max_resolution = []
        for line in self.get_dumpsys_display():
            if 'mDisplayWidth' in line:
                max_resolution.append(line.strip('\r').split('=')[-1])
            if 'mDisplayHeight' in line:        
                max_resolution.append(line.strip('\r').split('=')[-1])     
        return max_resolution

    def take_snapshot(self):
        global xml_path
        xml_path=self.get_xml()
        file_path = self.generate_folder_locate_picture()
        picture_name = self.generate_screenshot_name_format()
        uiautomator_dump_file_name = picture_name.split('.')[0] + '.uix'
        uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name)
        file_path_device = os.path.join('/sdcard/' + picture_name)
        take_snapshot_command = self.adb_shell() + '/system/bin/screencap -p ' + file_path_device
        pull_snapshot_command = 'adb -s ' + self.device_id + ' pull ' + file_path_device + ' ' + file_path
        get_uiautomator_dump_command = self.adb_shell() + 'uiautomator dump'
        pull_uiautomator_dump_command = 'adb -s ' + self.device_id + ' pull /sdcard/window_dump.xml' + ' ' + uiautomator_dump_file
        self.native_unlock_screen()
        time.sleep(0.5)
        self.execute_command(take_snapshot_command).wait()
        self.execute_command(pull_snapshot_command).wait()
        self.execute_command(get_uiautomator_dump_command).wait()
        self.execute_command(pull_uiautomator_dump_command).wait()        

    def little_swipe(self):
        device_resolution_2 = self.get_max_resolution()
        max_w_2 = device_resolution_2[0]
        max_h_2 = device_resolution_2[1]
        x1_2 = str(int(max_w_2)/2)
        x2_2 = str(int(max_w_2)/2)
        y2_2 = str((int(max_h_2)*2)/5)
        y1_2 = str((int(max_h_2)*4)/5)
        little_swipe_command = self.adb_shell() + 'input swipe ' + x1_2 + ' ' + y1_2 + ' ' + x2_2 + ' ' + y2_2
        self.execute_command(little_swipe_command).wait()


class Device(object):
    def __init__(self,executor):
        self.executor = executor

    def return_home(self):
        go_home_command = self.executor.adb_shell() + 'input keyevent 3'
        self.executor.execute_command(go_home_command).wait()


    def press_enter(self):
        press_enter_command = self.executor.adb_shell() + 'input keyevent 66'
        self.executor.execute_command(press_enter_command).wait()

    def press_menu(self):
        press_menu_command = self.executor.adb_shell() + 'input keyevent 82'
        self.executor.execute_command(press_menu_command).wait()

    def shift_tab(self,counter=1):
        tab_key = '61 '
        shift_tab_command = self.executor.adb_shell() + 'input keyevent ' + (tab_key*int(counter)).strip()
        self.executor.execute_command(shift_tab_command).wait()

    def shift_down(self,counter=1):
        down_key = '20 '
        press_down_command = self.executor.adb_shell() + 'input keyevent ' + (down_key*int(counter)).strip()
        self.executor.execute_command(press_down_command).wait()    

    def shift_up(self,counter=1):
        up_key = '19 '
        press_up_command = self.executor.adb_shell() + 'input keyevent ' + (up_key*int(counter)).strip()
        self.executor.execute_command(press_up_command).wait()

    def shift_left(self,counter=1):
        press_left_command = self.executor.adb_shell() + 'input keyevent 21'
        for j in range(int(counter)):
            self.executor.execute_command(press_left_command).wait()
            time.sleep(1)

    def volume_up(self,counter=1):
        volume_up_command = self.executor.adb_shell() + 'input keyevent 24'
        for j in range(int(counter)):
            self.executor.execute_command(volume_up_command).wait()
            time.sleep(0.5)

    def volume_down(self,counter=1):
        volume_down_command = self.executor.adb_shell() + 'input keyevent 25'
        for j in range(int(counter)):
            self.executor.execute_command(volume_down_command).wait()
            time.sleep(0.5)

    def shift_right(self,counter=1):
        press_right_command = self.executor.adb_shell() + 'input keyevent 22'
        for k in range(int(counter)):
            self.executor.execute_command(press_right_command).wait()
            time.sleep(1)

    def push_up(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.executor.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(push_up_command).wait()



    def swipe_right(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/5)
            x2 = str((int(max_w)*4)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_right_command = self.executor.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(swipe_right_command).wait()

    def check_boot_status(self):
        is_booted = False
        check_boot_status_command = self.executor.adb_shell() + 'getprop'
        grep_command = 'grep bootanim'
        p5 = sp.Popen(check_boot_status_command.split(' '),stdout=sp.PIPE)
        p6 = sp.Popen(grep_command.split(' '),stdin=p5.stdout,stdout=sp.PIPE)
        out = p6.stdout.read().strip('\n')
        if "stopped" and '1' in out:
            is_booted = True
        return is_booted


    def check_reboot_progress(self):
        reboot_execute_success = False
        if not self.check_boot_status():
            reboot_execute_success = True
        return reboot_execute_success    


    def reboot_device(self):
        reboot_device_command = self.executor.adb_command() + 'reboot'
        self.executor.execute_command(reboot_device_command).wait()

    def input_text(self,string='%s'):
        string = string.strip().replace(' ','%s')    
        input_text_command = self.executor.adb_shell() + 'input text ' + string
        self.executor.execute_command(input_text_command).wait()



    def swipe_left(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str((int(max_w)*4)/5)
            x2 = str(int(max_w)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_left_command = self.executor.adb_shell()  + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(swipe_left_command).wait()


    def pull_down(self):
        device_resolution_1 = self.executor.get_max_resolution()
        max_w_1 = device_resolution_1[0]
        max_h_1 = device_resolution_1[1]
        if max_w_1 =='unknow' or max_h_1 =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1_1 = str(int(max_w_1)/2)
            x2_1 = str(int(max_w_1)/2)
            y1_1 = str(int(max_h_1)/5)
            y2_1 = str((int(max_h_1)*4)/5)
            push_up_command_1 = self.executor.adb_shell()   + 'input swipe ' + x1_1 + ' ' + y1_1 + ' ' + x2_1 + ' ' + y2_1
            self.executor.execute_command(push_up_command_1).wait()

    def swipe_pull_notification(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = "0"
            y2 = str(int(max_h)/5)
            swip_noticfi_command = self.executor.adb_shell()    + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(swip_noticfi_command).wait()

    def get_through_oobe(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        print "[debug] max_w and max_h is ",max_w,max_h
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)]
            p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)]
            p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)]
            p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)]
            tap_command1 = self.executor.adb_shell() + 'input tap ' + p1[0] + ' ' + p1[1]
            tap_command2 = self.executor.adb_shell() + 'input tap ' + p2[0] + ' ' + p2[1]
            tap_command3 = self.executor.adb_shell() + 'input tap ' + p3[0] + ' ' + p3[1]
            tap_command4 = self.executor.adb_shell() + 'input tap ' + p4[0] + ' ' + p4[1]
            self.executor.execute_command(tap_command1).wait()
            self.executor.execute_command(tap_command2).wait()
            self.executor.execute_command(tap_command3).wait()
            self.executor.execute_command(tap_command4).wait()      

    def power_on(self):
        if not self.check_screen_on():
            power_on_command = self.executor.adb_shell() + 'input keyevent 26'
            self.executor.execute_command(power_on_command).wait()


    def press_back(self,counter=1):
        press_back_command = self.executor.adb_shell() + 'input keyevent 4'
        for i in range(int(counter)):
            self.executor.execute_command(press_back_command).wait()
            time.sleep(1)



    def check_screen_on(self):
        is_screen_on = False
        for line in self.executor.get_dumpsys_display():
            if 'mScreenState' in line and 'ON' in line:
                is_screen_on = True
        return is_screen_on     

    def launch_app_with_intent(self,app="Native_HomePage",**kwargs):
        intent_dictory = self.executor.get_intents_dictory()
        if app in intent_dictory.keys():
            app_intent = intent_dictory[app]['intent']  
            launch_app_command = self.executor.adb_shell() + 'am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n ' + app_intent
            self.executor.execute_command(launch_app_command).wait()
        else:
            print "[Error] there is no intent match your given application, please double check or contact author to add!!!"
            sys.exit(-1) 


class Panel(object):
    def __init__(self,panel):
        self.panel = panel

    def click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        click_command = self.panel.adb_shell() + 'input tap ' + click_coordinate[0] + ' ' + click_coordinate[1]
        self.panel.execute_command(click_command).wait()

    def swipe_up(self,times=1):
        coordinate_list = self.panel.bound_to_list()
        x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5)
        y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5)
        push_up_command = self.panel.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
        for loop in range(int(times)):
            self.panel.execute_command(push_up_command).wait()

    def long_click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        px = str(int(click_coordinate[0])+1)
        py = str(int(click_coordinate[1])+1)
        during_time = '2000'
        long_click_command = self.panel.adb_shell() + 'input swipe ' + click_coordinate[0] + ' ' + click_coordinate[1] + ' ' + px + ' ' + py + ' ' + during_time
        self.panel.execute_command(long_click_command).wait()

    def drag_to(self,dx,dy):
        drag_to_command = []
        down_key_coordinate = self.panel.get_centre_coordinate()
        down_key_px = str(int(down_key_coordinate[0]))
        down_key_py = str(int(down_key_coordinate[1]))
        down_key_command = self.touch_down(down_key_px,down_key_py)
        move_command = self.move(str(dx),str(dy))
        up_key_command = self.touch_up(str(dx),str(dy))
        if self.get_pid() is None:
            self.generate_monkey_process()
        monkey_pid_num = self.get_pid()
        if monkey_pid_num is not None:
            drag_to_command.append(down_key_command)
            drag_to_command.append(move_command)
            drag_to_command.append(up_key_command)
            self.open_socket(drag_to_command)
            self.kill_monkey_process(monkey_pid_num)
        if self.get_pid() is not None:
            self.kill_monkey_process(monkey_pid_num)    

    def touch_down(self,dx,dy):
        return 'touch down {0} {1}\n'.format(dx,dy)

    def touch_up(self,dx,dy):
        return 'touch up {0} {1}\n'.format(dx,dy)

    def move(self,dx,dy):
        return 'touch move {0} {1}\n'.format(dx,dy)

    def get_pid(self):
        monkey_pid=None
        r2=r'\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey'
        grep_monkey_command = self.panel.adb_shell() + 'ps'
        o = sp.Popen(grep_monkey_command.split(' '),stdout=sp.PIPE).stdout.read().strip('\n')
        re_result = re.findall(r2,o)
        if re_result == []:
            monkey_pid=None
        else:
            monkey_pid=re_result[0]    
        return monkey_pid

    def kill_monkey_process(self,monkey_pid):
        if monkey_pid is not None:
            kill_monkey_process_command = self.panel.adb_shell() + 'kill ' + monkey_pid
            self.panel.execute_command(kill_monkey_process_command).wait()

    def generate_monkey_process(self):
        generate_monkey_process_command = self.panel.adb_shell() + 'LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555'
        p2 = sp.Popen(generate_monkey_process_command.split(' '),stdout=sp.PIPE)
        t1 = threading.Timer(5,lambda:self.kill(p2))
        t1.setDaemon(True)
        t1.start()
        time.sleep(5)
        if self.get_pid() is not None:
            forwad_command = 'adb -s ' + self.panel.device_id + ' forward tcp:15555 tcp:15555'
            self.panel.execute_command(forwad_command).wait()

    def open_socket(self,cmd_list):
        sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sk.settimeout(10)
        sk.connect(('127.0.0.1',15555))
        for cmd in cmd_list:
            self.panel.log_class.info("Execute command : {0}".format(cmd)) 
            sk.sendall(cmd)
            time.sleep(1.5)
        sk.send('quit\n')
        sk.close()    

    def kill(self,process):
        process.terminate()
        process.kill()







#1 楼 @264768502
第一次发,不懂怎么发代码,现在改过来了。

logger 代码如下:

#!/usr/bin/python
import logging
import os

class Logger():

    def __init__(self,name,log_path):
        self.log_path = log_path
        self.name = name
        self.log_file = os.path.join(log_path,'log.txt')
        if not os.path.isdir(log_path):
            try:
                os.makedirs(log_path)
            except OSError:
                pass             

    def modlogger(self):
        mylogger = logging.getLogger(self.name)
        mylogger.setLevel(logging.DEBUG)
        fh = logging.FileHandler(self.log_file)
        sh = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s  %(levelname)s\t%(message)s")
        fh.setFormatter(formatter)
        sh.setFormatter(formatter)
        mylogger.addHandler(fh)
        mylogger.addHandler(sh)
        return mylogger

    def get_instance(self):
        logger = Logger(self.name,self.log_path)
        return logger

    def logger(self):
        methods = self.get_instance()
        return Methods(methods)  

class Methods(object):
    def __init__(self,obj):
        self.ml = obj.modlogger()

    def info(self,string):
        self.ml.info(string)

    def debug(self,string):
        self.ml.debug(string)

    def warning(self,string):
        self.ml.warning(string)

    def error(self,string):
        self.ml.error(string)


if __name__=="__main__":
    l = Logger('test0','/home/buildbot/bluesea/reporter')
    m = l.logger()
    m.info('------this is info message-------')
    m.error('tap ok button fail')
    m.debug('-----this is debug message-------')
    m.warning('you have not set timeout')

ui_hooks 模块如下:

import controler
import time
import inspect
import subprocess as sp

"""
It provides a way to get through kinds of instability pup up box 
"""


class Hooks:
    def __init__(self,device_id):   
        self.device_id=device_id

    def get_generators(self,generator=None):
        global frame
        frame={}
        a,b,c,d,e,f,g=[],[],[],[],[],[],[]
        for element in generator:
            a.append(element.get('package'))
            b.append(element.get('index'))
            c.append(element.get('text'))
            d.append(element.get('content-desc'))
            e.append(element.get('resource-id'))
            f.append(element.get('bounds'))
            g.append(element.get('enabled'))

        for i in range(len(a)):
            frame[i+1]={'package':a[i],'index':b[i],'text':c[i],'description':d[i],'id':e[i],'bounds':f[i],'enabled':g[i]}



    def parse(self):
        status=False
        status=self.divider()
        return status

    def divider(self):
        flag=False
        main_key = frame[1]['package']
        seducer001 = inspect.getargspec(self.hook_maps)[3][0]
        seducer002 = inspect.getargspec(self.hook_contact)[3][0]
        seducer003 = inspect.getargspec(self.hook_gms)[3][0]
        seducer004 = inspect.getargspec(self.hook_settings)[3][0]
        seducer005 = inspect.getargspec(self.hook_stk)[3][0]
        seducer006 = inspect.getargspec(self.hook_android)[3][0]        
        if main_key == seducer001:
            flag = self.executer(seducer001,self.hook_maps())       
        if main_key ==  seducer002:
            flag = self.executer(seducer002,self.hook_contact())
        if main_key ==  seducer003:
            flag = self.executer(seducer003,self.hook_gms())
        if main_key ==  seducer004:
            flag = self.executer(seducer004,self.hook_settings())
        if main_key ==  seducer005:
            flag = self.executer(seducer005,self.hook_stk()) 
        if main_key ==  seducer006:
            flag = self.executer(seducer006,self.hook_android())                                            
        return flag        


    def executer(self,sed,func):
        e_flag=False
        print "this app is %s"% sed
        coodinate=func
        if coodinate is None:
            e_flag = False
        else:
            pointseq=self.get_point(coodinate)
            self.tap(pointseq)  
            e_flag =True
        return e_flag           


    def hook_maps(self,seducer='com.google.android.apps.maps'):
        coodinate=None
        zipers=[]
        print "hook_maps method is invoked"
        return coodinate


    def hook_stk(self,seducer='com.android.stk'):
        coodinate=None
        print "hook_stk method is invoked"          
        for j in range(1,len(frame.keys())+1):
            if 'button_ok' in frame[j]['id'].strip('\n'):
                coodinate=frame[j]['bounds']        
        return coodinate


    def hook_android(self,seducer='android'):
        coodinate=None
        print "hook_android method is invoked"          
        for j in range(1,len(frame.keys())+1):
            if 'button1' in frame[j]['id'].strip('\n'):
                coodinate=frame[j]['bounds']        
        return coodinate


    def hook_settings(self,seducer='com.android.settings'):
        coodinate=None
        zipers=["Update preferred SIM card?"]
        print "hook_settings method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i]['text'].strip('\n'):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if 'NO' in frame[j]['text'].strip('\n'):
                    coodinate=frame[j]['bounds']        
        return coodinate


    def hook_gms(self,seducer='com.google.android.gms'):
        coodinate=None
        zipers=["Couldn't sign in"]
        print "hook_gms method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i]['text'].strip('\n'):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if 'Next' in frame[j]['text'].strip('\n'):
                    coodinate=frame[j]['bounds']        
        return coodinate

    def hook_contact(self,seducer='com.google.android.packageinstaller'):
        coodinate=None  
        zipers = ["Allow Contacts to access photos, media, and files on your device?"]
        print "hook_contact method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i]['text'].strip('\n'):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if 'Allow' in frame[j]['text'].strip('\n'):
                    coodinate=frame[j]['bounds']
                if 'Next' in frame[j]['text'].strip('\n'):
                    coodinate=frame[j]['bounds']  
        return coodinate

    def get_point(self,coodinate):
        s = coodinate
        temp = s.replace('[','').replace(']',',').split(',')
        temp.remove('')
        point_c = []
        co_x1 = temp[0]
        co_x2 = temp[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = temp[1]
        co_y2 = temp[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def tap(self,point_list):
        tap_coordinate = point_list
        tap_command = self.adb_shell() + 'input tap ' + tap_coordinate[0] + ' ' + tap_coordinate[1]
        self.execute_command(tap_command).wait()                      


    def adb_shell(self):
        adb_shell_command = 'adb -s ' + self.device_id + ' shell '
        return adb_shell_command

    def execute_command(self,cmd,ignore_print=True):    
        ccmd = cmd
        if ignore_print:
            print ccmd
        else:
            pass    
        proc = sp.Popen(ccmd.split(' '),stdout=sp.PIPE)    
        return proc

某个库文件如下:

 # coding: utf-8 
import os
import time
import sys
import subprocess as sp
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from engine import controler
max_retry = 30

class Commons:
    def __init__(self,conf_dic,ins_logger=None):
        self.conf_dic = conf_dic
        name = os.path.basename(conf_dic['TC']).split('.')[0]
        workpath = conf_dic['work_dir']
        dev = conf_dic['device_id']
        logpath = conf_dic['log_path']    
        log_path = os.path.join(workpath,logpath,name)
        self.mylogger = ins_logger   
        self.op = controler.Operator(device_id=dev,log_class=self.mylogger,log_path=log_path)

    def warm_reboot(self):
        if not self.op.hardware().check_boot_status():
            self.mylogger.error("[Common] Device is not ready for test reboot case. ")
            return False
        self.mylogger.info("[Common] Device is ready. ")    
        self.mylogger.info("[Common] Start to reboot device {0}".format(self.op.device_id))
        self.get_boot_infomation()
        self.op.hardware().reboot_device()
        time.sleep(15)
        self.mylogger.info("[Common] Start to check if execute reboot command? ")
        if not self.op.hardware().check_reboot_progress():
            self.mylogger.error("[Common] unknow error, reboot fail. ")
            return False
        self.mylogger.info("[Common] Device start booting...")    
        while 1:  
            if self.op.hardware().check_boot_status():
                self.mylogger.info("[Common] Device reboot successfully. ")
                break       
        return True

    def get_boot_infomation(self):
        get_boot_infomation_command = 'adb -s ' + self.op.device_id + ' shell getprop | grep boot'
        out = sp.Popen(get_boot_infomation_command.split(' '),stdout=sp.PIPE).stdout.read().strip('\n\r')
        self.mylogger.info(out)

    def log_on_google_account(self):
        self.mylogger.info("[Common] launch Settings app ")
        self.op.hardware().launch_app_with_intent(app='Settings')
        try:
            self.mylogger.info("[Common] click Accounts item ")
            self.op.search(text="Accounts",retry_time="3").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find Accounts item . Got Exception {0}".format(e))
            return False 
        try:
            self.mylogger.info("[Common] click Add account item ")
            self.op.search(text="Add account").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find Add account item . Got Exception {0}".format(e))
            return False
        try:
            self.mylogger.info("[Common] click Google item ")
            self.op.search(text="Google").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find Google item . Got Exception {0}".format(e))
            return False
        retry_count = 0    
        while 1:
            try:
                self.mylogger.info("[Common] click enter your email input edittext ")
                self.op.search(description="Enter your email ").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can't find Enter your email input edittext . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                    return False
        self.mylogger.info("[Common] input google account : {0}".format(self.conf_dic['google_account']))
        self.op.hardware().input_text(self.conf_dic['google_account'])
        try:
            self.mylogger.info("[Common] click NEXT button ")
            self.op.search(description="NEXT").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find NEXT button . Got Exception {0}".format(e))
            return False
        time.sleep(5)    
        try:
            self.mylogger.info("[Common] click password edittext ")
            self.op.search(id="password").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find password edittext . Got Exception {0}".format(e))
            return False
        self.mylogger.info("[Common] input google password : {0}".format(self.conf_dic['google_password']))
        self.op.hardware().input_text(self.conf_dic['google_password'])            
        try:
            self.mylogger.info("[Common] click NEXT button ")
            self.op.search(description="NEXT").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find NEXT button . Got Exception {0}".format(e))
            return False
        while 1:
            try:
                self.mylogger.info("[Common] click ACCEPT button ")
                self.op.search(description="ACCEPT").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can't find ACCEPT button . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                    return False        
        while 1:
            try:
                self.mylogger.info("[Common] check Google services shown??? ")
                self.op.search(text="Google services").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can't Google services keywords. try again")
                    time.sleep(5)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                    return False
        try:
            self.mylogger.info("[Common] click Next button ")
            self.op.search(text="Next",enabled='true',retry_time='3').click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find NEXT button . Got Exception {0}".format(e))
            return False
        while 1:
            try:
                self.mylogger.info("[Common] click No thanks button ")
                self.op.search(text="No thanks").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can't find No thanks button . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can't register google account . Got Exception {0}".format(e))
                    return False
        try:
            self.mylogger.info("[Common] click Continue button ")
            self.op.search(text="Continue").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find Continue button . Got Exception {0}".format(e))
            return False
        time.sleep(2)
        return True

    def check_google_account_added(self):
        self.mylogger.info("[Common] launch Settings app ")
        self.op.hardware().launch_app_with_intent(app='Settings')
        try:
            self.mylogger.info("[Common] click Accounts item ")
            self.op.search(text="Accounts",retry_time="3").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can't find Accounts item . Got Exception {0}".format(e))
            return False
        if self.op.search(text='Google') is not None:
            self.mylogger.info("[Common] add google account successfully. ")
            return True
        else:
            self.mylogger.error("[Common] add account fail. ")
            return False        

















入口程序:

import xml.etree.ElementTree as ET
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir,os.pardir)))
import argparse
import random
import subprocess as sp
import pickle
import datetime
import re
import time
from gramary.runner import iter_event,get_different_list,parse_history,get_crash_path,get_crashfile_conetnt
import generate_report


class Configuration:

    def __init__(self):
        self.log_path = ''
        self.execute_main_campaign = ''
        self.device_id = ''
        self.work_dir = os.path.abspath(os.path.join(os.getcwd(),os.pardir))

    def load(self,filepath):
        temp_dict = {}
        try:
            execfile(filepath,{},temp_dict)
            for key in temp_dict.keys():
                setattr(self,key,temp_dict[key])
        except TypeError,errormsg:
            print "Error : No file path given!"
            print "you can add -h option to view more info"
            sys.exit(-1)
        except IOError,msg1:
            print msg1
            print "Error : Please specify a path which contain a config file!"
            sys.exit(-1)


def get_testcase_from_xml(config):
    xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
    parser = ET.parse(xml_path)
    root = parser.getroot()
    item_instance_list = root.iter('item')
    return item_instance_list

def get_testsuite_from_xml(config):
    xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
    parser = ET.parse(xml_path)
    root = parser.getroot()
    suite_instance_list = root.iter('suite')
    return suite_instance_list

def get_full_path(config,string):
    return os.path.join(config.work_dir,string) + '.py'


def generate_file(config,dictory):
    temporary_path = os.path.join(config.work_dir,'temporary',config.device_id)
    if not os.path.isdir(temporary_path):
        try:
            os.makedirs(temporary_path)
        except OSError:
            pass        
    pickle_dump_path = os.path.join(temporary_path,'settings_temp.pkl')
    f1 = open(pickle_dump_path,'wb')
    pickle.dump(dictory,f1,True)
    f1.close()
    return pickle_dump_path


def save_data_for_html(config,dictory):
    temporary_path = os.path.join(config.work_dir,'temporary',config.device_id)
    if not os.path.isdir(temporary_path):
        try:
            os.makedirs(temporary_path)
        except OSError:
            pass        
    pickle_dump_path = os.path.join(temporary_path,'html_raw_data.pkl')
    f1 = open(pickle_dump_path,'wb')
    pickle.dump(dictory,f1,True)
    f1.close()
    return pickle_dump_path


def main(config):
    r1=r'Fail|Pass'
    r2=r'(\d+)'
    r_crash_data0 = r'DATA0=(\S+\s)[\s\S]*'
    r_crash_data1 = r'DATA1=(\S+\s)[\s\S]*'
    r_crash_data2 = r'DATA2=(\S+\s)[\s\S]*'
    seq_num = 1
    html_result_data = {}
    settings_list = dir(config)
    for _ in settings_list:
        if _ == '__doc__':
            settings_list.remove(_)
        if _ == '__module__':
            settings_list.remove(_)
        if _ == 'load':
            settings_list.remove(_)    
    settings_list.remove('__init__')
    settings_dictory = {}
    for _ in settings_list:
        settings_dictory[_] = getattr(config,_)
    rs = 'run_set_'+datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')    
    settings_dictory['log_path'] = os.path.join(settings_dictory['log_path'],config.device_id,rs)
    execue_case_sequence = {}
    for testsuite in get_testsuite_from_xml(config):
        execue_case_sequence['suite_name'] = testsuite.get('name')
        execue_case_sequence['suite_loop'] = testsuite.get('loop')
        execue_case_sequence['suite_mode'] = testsuite.get('mode')
        execue_case_sequence['suite_content'] = {}
    execue_case_sequence['device_id'] = settings_dictory['device_id']    
    for testcase in get_testcase_from_xml(config):
        execue_case_sequence['suite_content'][seq_num] = [testcase.text,testcase.get('loop')]   
        seq_num += 1
    suite_loop = int(execue_case_sequence['suite_loop'])
    suite_mode = execue_case_sequence['suite_mode']
    tc_symbol_list = execue_case_sequence['suite_content'].keys()
    suite_content = execue_case_sequence['suite_content']
    if suite_loop == 0 or tc_symbol_list == []:
        sys.exit(0)
        print "there is no loop specify for running testsuite or no cases need to be executed!"
    need_to_change_sequence = False    
    if suite_mode == 'regular':
        need_to_change_sequence = False
    elif suite_mode == 'random':
        need_to_change_sequence = True
    else:
        need_to_change_sequence = False
    html_result_data = execue_case_sequence
    original_history = parse_history(settings_dictory['device_id'])
    for sl in range(1,suite_loop+1):
        print "Start to run testsuite : {0} , loop {1}".format(execue_case_sequence['suite_name'],sl)
        if need_to_change_sequence:
            random.shuffle(tc_symbol_list)
        for i in range(len(tc_symbol_list)):
            item_loop = int(suite_content[tc_symbol_list[i]][1]) if len(suite_content[tc_symbol_list[i]]) == 2 else 1
            item_path = get_full_path(config,suite_content[tc_symbol_list[i]][0])
            case_name = os.path.basename(item_path).split('.')[0]
            report_path = os.path.join(config.work_dir,settings_dictory['log_path'],case_name)
            if item_loop == 0:
                continue    
            for j in range(1,item_loop+1):
                print "Start to run case : {0} , loop {1}".format(case_name,j)
                start_time = time.time()
                start_time_dateformat = datetime.datetime.now().strftime('%Y/%m/%d_%H:%M:%S')
                settings_dictory['TC'] = item_path
                dir_path = generate_file(config,settings_dictory)
                execute_command = 'python ' + item_path + ' ' + dir_path
                proc=sp.Popen(execute_command.split(' '),stdout=sp.PIPE)
                out = proc.stdout.read().strip('\n')
                proc.wait()
                current_history = parse_history(settings_dictory['device_id'])
                stop_time = time.time()
                stop_time_dateformat = datetime.datetime.now().strftime('%Y/%m/%d_%H:%M:%S')
                time_taken = stop_time - start_time
                crash_list = iter_event(get_different_list(original_history,current_history))
                print_crashfile = ''
                crashes = {}
                crash_dic = {}
                c_count = 1
                for crash in crash_list:
                    crashlog_path = get_crash_path(crash)
                    if crashlog_path is not None:
                        print_crashfile = get_crashfile_conetnt(settings_dictory['device_id'],crashlog_path)
                    match_data0 = re.findall(r_crash_data0,print_crashfile)
                    match_data1 = re.findall(r_crash_data1,print_crashfile)
                    match_data2 = re.findall(r_crash_data2,print_crashfile)
                    crash_dic['crash_info'] = crash.strip('\r')
                    crash_dic['crash_data0'] = match_data0[0] if match_data0 != [] else ''
                    crash_dic['crash_data1'] = match_data1[0] if match_data1 != [] else '' 
                    crash_dic['crash_data2'] = match_data2[0] if match_data2 != [] else ''
                    crashes[c_count] = crash_dic
                    c_count+=1
                crash_number = len(crash_list)
                original_history = current_history
                html_result_data['suite_content'][i+1].append(re.findall(r1,out)[0])
                html_result_data['suite_content'][i+1].append(start_time_dateformat)
                html_result_data['suite_content'][i+1].append(stop_time_dateformat)
                html_result_data['suite_content'][i+1].append(round(time_taken,3))
                html_result_data['suite_content'][i+1].append(crash_number)
                html_result_data['suite_content'][i+1].append(crashes)  
            address = save_data_for_html(config,html_result_data)
            generate_report.main(address,report_path)


if __name__ == '__main__':
    parse=argparse.ArgumentParser(usage='%(prog)s [options]')
    parse.add_argument('configuration_file',type=str,nargs='?',help="A path of config file")
    args=parse.parse_args()
    config_file = args.configuration_file
    configuration = Configuration()
    configuration.load(config_file)
    main(configuration)

自定义生成测试报告:

import os
import sys
from pyh import *
import pickle

def main(address,report_path):
    f = open(address,'rb')
    database1 = pickle.load(f)
    f.close()
    case_name_pre = report_path.split('/')[-1]
    report_path_base = os.path.abspath(os.path.join(report_path,os.pardir))
    report_file_path = os.path.join(report_path_base,'report.html')
    content_dic = database1['suite_content']    
    page = PyH('test report')
    page << h1('this is report of yath project')
    order_list = page << ul(type='square')
    order_list << li('device id : %s'%database1['device_id'])
    order_list << li('suite mode : %s'%database1['suite_mode'])
    order_list << li('suite loop : %s'%database1['suite_loop'])
    order_list << li('suite name : %s'%database1['suite_name'])
    page << hr()
    p1_content = 'Loop_'+database1['suite_loop']+':'
    page << p(i(p1_content))
    t = page << table(border='1',bgcolor='#bbbb99')
    t << tr(th('test case',align='left')+th('test loop',align='center')+th('test result',align='center')+th('start time',align='center')+th('stop time',align='center')+th('duration(s)',align='center')+th('crashes',align='center'))
    for k in content_dic:
        item = content_dic[k]
        if len(item) == 2:
            continue
        test_case = item[0]
        test_loop = item[1]    
        if item[1] == '0':
            test_result,start_time,stop_time,duration,crashes = 'NA','NA','NA','NA','0'
        else:
            test_result = item[2]
            start_time = item[3]
            stop_time = item[4]
            duration = item[5]
            crashes = item[6]
        if test_result == 'Pass':
            attr_co_test_result = 'green'
        elif test_result == 'Fail':
            attr_co_test_result = 'red'
        elif test_result == 'NA':
            attr_co_test_result = 'grey'
        if int(crashes) > 0:
            crashes_dic = item[7]
            generate_crashes_page(crashes_dic,test_case,report_path)
            link_crash = 'file://'+os.path.join(report_path,'crash.html')
            t << tr(td(test_case,align='left')+td(test_loop,align='center')+td(test_result,align='center',bgcolor=attr_co_test_result)+td(start_time,align='center')+td(stop_time,align='center')+td(duration,align='center')+td(a(crashes,href=link_crash),align='center'))
        else:              
            t << tr(td(test_case,align='left')+td(test_loop,align='center')+td(test_result,align='center',bgcolor=attr_co_test_result)+td(start_time,align='center')+td(stop_time,align='center')+td(duration,align='center')+td(crashes,align='center'))    

    page.printOut(report_file_path)


def generate_crashes_page(dictory,case_name,crash_path):
    crash_file_path = os.path.join(crash_path,'crash.html')
    crash_page = PyH('Crash Page')
    crash_page << h1('this is a page of crash.')
    crash_page << p('detail info : ')
    for j in dictory:
        div1 = crash_page << div()
        h2_content = 'Crash %s occurs when execute %s : '%(str(j),case_name)
        div1 << h2(h2_content)
        crash_table = div1 << table(frame='box')
        crash_table << tr(th('key',align='left')+th('value'),align='left')
        crash_table << tr(td('crash info',align='left')+td(dictory[j]['crash_info']),align='left')
        crash_table << tr(td('data 0',align='left')+td(dictory[j]['crash_data0']),align='left')
        crash_table << tr(td('data 1',align='left')+td(dictory[j]['crash_data1']),align='left')
        crash_table << tr(td('data 2',align='left')+td(dictory[j]['crash_data2']),align='left')
    crash_page.printOut(crash_file_path)  

if __name__ == "__main__":
    main()


测试用例:

import os
import unittest
import sys
import pickle
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from container.base import settings
from engine import log


dir_path = sys.argv[1]
f = open(dir_path,'rb')
config_dictory = pickle.load(f)
f.close()
name = os.path.basename(config_dictory['TC']).split('.')[0]
workpath = config_dictory['work_dir']
logpath = config_dictory['log_path']    
log_path = os.path.join(workpath,logpath,name)
l = log.Logger(name,log_path)
mylogger = l.logger()

class test_setup_wifi_class(unittest.TestCase):
    def setUp(self):
        self.test_class = settings.Settings(config_dictory,mylogger)
    def tearDown(self):
        self.test_class.op.hardware().press_back(3)
        self.test_class.op.hardware().return_home()
    def test_skip_wizard(self):
        self.assertEqual(self.test_class.setup_wifi(),True,'test setup_wifi case fail')    




if __name__ == "__main__":
    my_suite = unittest.TestLoader().loadTestsFromTestCase(test_setup_wifi_class)
    result = unittest.TextTestRunner(verbosity=0).run(my_suite)
    if len(result.failures) != 0 or len(result.errors) != 0:
        flag = 'Fail'
    else:
        flag = 'Pass'
    print flag,(len(result.failures)+len(result.errors))

自己顶一下😄

你这个代码干嘛用的,陈述一下

???干嘛用的???封装了哪里?有什么优势?

针对 android 操作系统 UI 进行自动化测试,可以进行绝大多数的操作,多个 app 之间的交互。还可以在多台设备之间进行交互,譬如互相打电话,接电话等。

这是我写的一个小测试框架,case 和 lib 可以自己定义,提供了 log 和 controler 接口。log 用来打印日志,controler 用来控制设备。

层主啊,你的 github 的说明文档,能写成中文的不??没必要整个英文的,我估计外国人也看不到。。。

楼主,你写的挺多的,看来也是费了不少时间吧?既然想让大家一起用起来,何不整理下,写详情的使用说明文档呢?

这是自己写的关键词库吗??

建议友好的注释说明

那些问干嘛用的,是没看源码么😀 简单点说,就是和 python uiautomator 的功能差不多,只不过:1,他自己用 socket 实现了 pc 和手机端的通信,不需要安装 atx 之类的。所以,稳定性无法保证;2,功能可能没 uiautomator 齐全;3,实例化和调用方式一步到位,直接调用模块,一般人难以理解,不熟悉方法的话,写起用例比较麻烦。

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册