UiAutomator 【自动安装 Apk】主要 jenkins 自动拉取最新包时安装需要输入密码或者需要点击继续安装等问题

Time · 2022年03月11日 · 4714 次阅读

准备工作

  1. pip install --pre uiautomator2 (安装 u2 的库)
  2. pip install -U weditor (桌面可生成一个可执行的 py)
  3. 根据网上的资料去了解 u2 的使用方法

安装 App 主文件(install_app.py)

import subprocess
import uiautomator2 as u2
import time
import argparse
from threading import Thread
import pymysql.cursors
import pymysql
import sys


def get_serial_and_apk():
    """
    获取设备名和包名
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--serial', type=str)  # 从jenkins获取需要安装的App的手机的uid
    parser.add_argument('--upload_filename', type=str)  # 获取需要安装的Apk的包名
    parser.add_argument('--replace_install', type=int)  # 这边是根据公司需要使用原有Apk构建,还是重新覆盖安装新的包
    args = parser.parse_args()
    serial = args.serial
    upload_file = args.upload_filename
    replace_install = args.replace_install
    print(replace_install)
    apk_path = "/Users/data/upload_apk/" + upload_file  # 安装包的路径,linux为例
    print(apk_path)
    return serial, apk_path, replace_install, upload_file  # 输出设备号,apk的安装路径,是否覆盖安装,包名


def select_install_pwd():
    """
        运行sql语句,主要是去数据库根据uid去查当前设备的安装密码
        :param
        sql_words: sql语句
        :return: 查询数据对应的字典列表
    """
    connect = pymysql.Connect(
        host="192.168.1.1",
        port=3306,
        user="root",
        passwd="123456",
        db="auto_build",
        charset="utf8"
    )
    # 获取游标
    cursor = connect.cursor(cursor=pymysql.cursors.DictCursor)
    try:
        sql = f"""SELECT install_passwd FROM device_info WHERE id =\'{serial}\'"""
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            print(row['install_passwd'])
            return row['install_passwd']
    except Exception as e:
        print(e)
        connect.rollback()
    cursor.close()
    connect.close()
    return None


def time_out(start_time, time_out=120.0):
    """
    定义一个超时的时间2min
    """
    end_time = time.time()
    difference = round((end_time - start_time), 2)
    if difference - time_out > 0:
        print(f"设定的时间{time_out}到了退出")
        return False
    else:
        return True


def listen_alert(driver):
    """
    监听弹框
    """
    time.sleep(1)
    if driver(textContains='请输入').exists():
        pwd = select_install_pwd()
        driver(textContains='请输入').set_text(pwd)
        print("准备点击确定")
        time.sleep(2)
        if driver(text='确定').exists:
            driver(text='确定').click()
            print("已点击确定")
    if driver(text="继续安装").exists():
        driver(text="继续安装").click()
        print("点击继续安装")
    if driver(text='无视风险安装').exists():
        driver(text='无视风险安装').click()
    if driver(text='安装').exists():
        driver(text='安装').click()


def cmd(cmd, remove_shell=False):
    """
    adb命令头部封装
    """
    head = adb_head
    if remove_shell:
        head = adb_head.replace('shell', '')
    print(head + ' ' + cmd)
    import os
    r = os.popen(head + ' ' + cmd)
    print(r.read())
    return r.read()


def create_dir_if_not_exists(remote_path):
    """
    判断下手机设备中是否有我们上传的一个路径
    为什么需要上传到手机,而不是直接adb install 安装呢,主要是需要获取安装成功的Success的提示,以手机内部安装更为合适
    """
    r = cmd('ls ' + remote_path)
    if 'No such file or directory ' in r:
        cmd(' mkdirs ' + remote_path)


def push_file(file_path, remote_path):
    """
    推Apk到手机中存储
    """
    # create_dir_if_not_exists(remote_path)
    cmd(' push ' + file_path + ' ' + remote_path, remove_shell=True)
    print(' push ' + file_path + ' ' + remote_path)


def install(apk_path, apk_name):
    """
    安装apk
    """
    push_file(apk_path, '/data/local/tmp')  # 上传apk到手机中
    remote_path = '/data/local/tmp/' + apk_name  # 手机中的安装包的路径
    cmd = adb_head + ' pm install  -r ' + remote_path  # 强制安装
    # cmd = "adb -s %s install %s" % (serial, apk_path)
    process = subprocess.Popen(cmd, shell=True,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE)
    process.wait(120)
    for line in process.stdout:
        line_str = line.decode("utf-8")
        print(line_str)
        if 'Success' in str(line_str):
            print("安装包已成功安装".center(70, "-"))
            return True
        else:
            if 'Failure' in str(line_str):
                print(line_str)
                print("请检查当前安装的apk版本是否过低".center(50, "*"))
                print("注意此时apk安装失败,断开测试连接,请检查完毕后重新构建".center(50, "*"))
                sys.exit(1)
            return False


if __name__ == '__main__':
    serial, apk_path, replace_install, upload_file = get_serial_and_apk()
    adb_head = 'adb -s ' + serial + ' shell '
    if replace_install == 3:
        p1 = Thread(target=install, args=(apk_path, upload_file))
        p1.start()
        try:
            driver = u2.connect_usb(serial)
        except RuntimeError as e:
            print("检测到是否是设备断开连接!!!".center(70, "*"))
            sys.exit(1)
        while p1.is_alive():
            listen_alert(driver)
        p1.join()
        print("Ending".center(70, "-"))
        try:
            driver.service("uiautomator").stop()
        except Exception as e:
            print("未能停止uiautomator2的程序")
            print(e)
    else:
        print("使用原有的apk进行构建...")

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册