一直困扰于 Airtest 没有观察者函数,所以自己实现了一个,原理简单,python 子线程实现

利用子线程的方式实现:

def loop_watcher(find_element, timeout):
    """
    每隔一秒,循环查找元素是否存在.
    如果元素存在,click操作
    :param find_element: 要查找元素,需要是poco对象
    :param timeout: 超时时间,单位:秒
    :return:
    """
    log('thread %s is running...' % threading.current_thread().name)
    start_time = time.time()
    while True:
        if find_element.exists():
            find_element.click()
            log("观察者线程发现")
            break
        elif (time.time() - start_time) < timeout:
            log("观察者线程等待1秒")
            time.sleep(1)
        else:
            log("观察者线程超时未发现")
            break
    log('thread %s ended.' % threading.current_thread().name)


def watcher(text=None, textMatches=None, timeout=5, poco=None):
    """
    观察者函数: 根据text或textMatches定位元素,用守护线程的方式,增加子线程循环查找元素,直到超时
    :param text: 元素的text
    :param textMatches: 元素的textMatches,正则表达式
    :param timeout: 超时时间
    :param poco: poco实例
    :return:
    """
    # 目标元素
    find_element = None
    if poco is None:
        raise Exception("poco is None")
    if text or textMatches:
        if text:
            find_element = poco(text=text)
        elif textMatches:
            find_element = poco(textMatches=textMatches)

    # 定义子线程: 循环查找目标元素
    t = threading.Thread(target=loop_watcher, name='LoopWatcherThread', args=(find_element, timeout))
    # 增加守护线程,主线程结束,子线程也结束
    t.setDaemon(True)
    t.start()


↙↙↙阅读原文可查看相关链接,并与作者交流