import subprocess
import uiautomator2 as u2
import time
import argparse
from threading import Thread
import pymysql.cursors
import pymysql
import sys
def get_serial_and_apk():
"""
获取设备名和包名
"""
parser = argparse.ArgumentParser()
parser.add_argument('--serial', type=str) # 从jenkins获取需要安装的App的手机的uid
parser.add_argument('--upload_filename', type=str) # 获取需要安装的Apk的包名
parser.add_argument('--replace_install', type=int) # 这边是根据公司需要使用原有Apk构建,还是重新覆盖安装新的包
args = parser.parse_args()
serial = args.serial
upload_file = args.upload_filename
replace_install = args.replace_install
print(replace_install)
apk_path = "/Users/data/upload_apk/" + upload_file # 安装包的路径,linux为例
print(apk_path)
return serial, apk_path, replace_install, upload_file # 输出设备号,apk的安装路径,是否覆盖安装,包名
def select_install_pwd():
"""
运行sql语句,主要是去数据库根据uid去查当前设备的安装密码
:param
sql_words: sql语句
:return: 查询数据对应的字典列表
"""
connect = pymysql.Connect(
host="192.168.1.1",
port=3306,
user="root",
passwd="123456",
db="auto_build",
charset="utf8"
)
# 获取游标
cursor = connect.cursor(cursor=pymysql.cursors.DictCursor)
try:
sql = f"""SELECT install_passwd FROM device_info WHERE id =\'{serial}\'"""
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
print(row['install_passwd'])
return row['install_passwd']
except Exception as e:
print(e)
connect.rollback()
cursor.close()
connect.close()
return None
def time_out(start_time, time_out=120.0):
"""
定义一个超时的时间2min
"""
end_time = time.time()
difference = round((end_time - start_time), 2)
if difference - time_out > 0:
print(f"设定的时间{time_out}到了退出")
return False
else:
return True
def listen_alert(driver):
"""
监听弹框
"""
time.sleep(1)
if driver(textContains='请输入').exists():
pwd = select_install_pwd()
driver(textContains='请输入').set_text(pwd)
print("准备点击确定")
time.sleep(2)
if driver(text='确定').exists:
driver(text='确定').click()
print("已点击确定")
if driver(text="继续安装").exists():
driver(text="继续安装").click()
print("点击继续安装")
if driver(text='无视风险安装').exists():
driver(text='无视风险安装').click()
if driver(text='安装').exists():
driver(text='安装').click()
def cmd(cmd, remove_shell=False):
"""
adb命令头部封装
"""
head = adb_head
if remove_shell:
head = adb_head.replace('shell', '')
print(head + ' ' + cmd)
import os
r = os.popen(head + ' ' + cmd)
print(r.read())
return r.read()
def create_dir_if_not_exists(remote_path):
"""
判断下手机设备中是否有我们上传的一个路径
为什么需要上传到手机,而不是直接adb install 安装呢,主要是需要获取安装成功的Success的提示,以手机内部安装更为合适
"""
r = cmd('ls ' + remote_path)
if 'No such file or directory ' in r:
cmd(' mkdirs ' + remote_path)
def push_file(file_path, remote_path):
"""
推Apk到手机中存储
"""
# create_dir_if_not_exists(remote_path)
cmd(' push ' + file_path + ' ' + remote_path, remove_shell=True)
print(' push ' + file_path + ' ' + remote_path)
def install(apk_path, apk_name):
"""
安装apk
"""
push_file(apk_path, '/data/local/tmp') # 上传apk到手机中
remote_path = '/data/local/tmp/' + apk_name # 手机中的安装包的路径
cmd = adb_head + ' pm install -r ' + remote_path # 强制安装
# cmd = "adb -s %s install %s" % (serial, apk_path)
process = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.wait(120)
for line in process.stdout:
line_str = line.decode("utf-8")
print(line_str)
if 'Success' in str(line_str):
print("安装包已成功安装".center(70, "-"))
return True
else:
if 'Failure' in str(line_str):
print(line_str)
print("请检查当前安装的apk版本是否过低".center(50, "*"))
print("注意此时apk安装失败,断开测试连接,请检查完毕后重新构建".center(50, "*"))
sys.exit(1)
return False
if __name__ == '__main__':
serial, apk_path, replace_install, upload_file = get_serial_and_apk()
adb_head = 'adb -s ' + serial + ' shell '
if replace_install == 3:
p1 = Thread(target=install, args=(apk_path, upload_file))
p1.start()
try:
driver = u2.connect_usb(serial)
except RuntimeError as e:
print("检测到是否是设备断开连接!!!".center(70, "*"))
sys.exit(1)
while p1.is_alive():
listen_alert(driver)
p1.join()
print("Ending".center(70, "-"))
try:
driver.service("uiautomator").stop()
except Exception as e:
print("未能停止uiautomator2的程序")
print(e)
else:
print("使用原有的apk进行构建...")