前言

从 appium 迁移到了 macaca,研究了一下多机并行跑 case,总结一下,和大家一起讨论一下~~~

1. 获取连接到电脑上的机器

def is_using(port):
    """
    判断端口号是否被占用
    :param port:
    :return:
    """
    cmd = "netstat -an | grep %s" % port

    if os.popen(cmd).readlines():
        return True
    else:
        return False

def get_port(count):
    """
    获得3456端口后一系列free port
    :param count:
    :return:
    """
    port = 3456
    port_list = []
    while True:
        if len(port_list) == count:
            break

        if not is_using(port) and (port not in port_list):
            port_list.append(port)
        else:
            port += 1

    return port_list

3. 开启 macaca 服务,为每一个 service 动态分布一个端口

class macacaServer():
    def __init__(self, devices):

        self.devices = devices
        self.count = len(devices)
        self.url = 'http://127.0.0.1:%s/wd/hub/status'

    def start_server(self):

        pool = Pool(processes=self.count)
        port_list = get_port(self.count)

        for i in range(self.count):
            pool.apply_async(self.run_server, args=(self.devices[i], port_list[i]))

        pool.close()
        pool.join()

    def run_server(self, device, port):

        r = RunServer(port)
        r.start()

        while not self.is_running(port):
            sleep(1)

        server_url = {
            'hostname': "ununtrium.local",
            'port': port,
        }
        driver = WebDriver(device, server_url)
        driver.init()

        DRIVER.set_driver(driver)
        DRIVER.set_OS(device.get("platformName"))

        self.run_test()

    def run_test(self):
        """运行测试
        """
        all_test = AllTests()
        all_test.run_case()

    def is_running(self, port):
        """Determine whether server is running
        :return:True or False
        """
        url = self.url % port
        response = None
        try:
            response = requests.get(url, timeout=0.01)

            if str(response.status_code).startswith('2'):

                # data = json.loads((response.content).decode("utf-8"))

                # if data.get("staus") == 0:
                return True

            return False
        except requests.exceptions.ConnectionError:
            return False
        except ReadTimeout:
            return False
        finally:
            if response:
                response.close()


class RunServer(threading.Thread):

    def __init__(self, port):
        threading.Thread.__init__(self)
        self.cmd = 'macaca server -p %s --verbose' % port

    def run(self):
        os.system(self.cmd)

最后

本来想放个视频看看效果的,但是发现不会搞~~~ 我还是早点睡觉吧。。。。


↙↙↙阅读原文可查看相关链接,并与作者交流