一直困扰于 Airtest 没有观察者函数,所以自己实现了一个,原理简单,python 子线程实现
利用子线程的方式实现:
def loop_watcher(find_element, timeout):
"""
每隔一秒,循环查找元素是否存在.
如果元素存在,click操作
:param find_element: 要查找元素,需要是poco对象
:param timeout: 超时时间,单位:秒
:return:
"""
log('thread %s is running...' % threading.current_thread().name)
start_time = time.time()
while True:
if find_element.exists():
find_element.click()
log("观察者线程发现")
break
elif (time.time() - start_time) < timeout:
log("观察者线程等待1秒")
time.sleep(1)
else:
log("观察者线程超时未发现")
break
log('thread %s ended.' % threading.current_thread().name)
def watcher(text=None, textMatches=None, timeout=5, poco=None):
"""
观察者函数: 根据text或textMatches定位元素,用守护线程的方式,增加子线程循环查找元素,直到超时
:param text: 元素的text
:param textMatches: 元素的textMatches,正则表达式
:param timeout: 超时时间
:param poco: poco实例
:return:
"""
# 目标元素
find_element = None
if poco is None:
raise Exception("poco is None")
if text or textMatches:
if text:
find_element = poco(text=text)
elif textMatches:
find_element = poco(textMatches=textMatches)
# 定义子线程: 循环查找目标元素
t = threading.Thread(target=loop_watcher, name='LoopWatcherThread', args=(find_element, timeout))
# 增加守护线程,主线程结束,子线程也结束
t.setDaemon(True)
t.start()