• # -*- coding: utf-8 -*-
    # @Time    : 2020/7/1 10:11
    # @Author  : grassroadsZ
    # @File    : handle_dubbo.py
    import random
    import urllib
    import json
    import telnetlib
    from urllib.parse import unquote
    
    from kazoo.client import KazooClient
    
    from functools import lru_cache
    from loguru import logger
    
    
    @lru_cache(maxsize=10)
    class Dubbo(telnetlib.Telnet):
        prompt = 'dubbo>'
        coding = 'utf-8'
    
        def __init__(self, host=None, port=0, timeout=10):
            super().__init__(host, port, timeout)
            self.write(b'\n')
    
        def command(self, flag, str_=""):
            data = self.read_until(flag.encode())
            self.write(str_.encode() + b"\n")
            return data
    
        def invoke(self, service_name, method_name, arg):
            arg_str = None
    
            if isinstance(arg, (dict, list)):
                arg_str = json.dumps(arg)
            if isinstance(arg, tuple):
                arg_str = str(arg).replace("(", "").replace(")", "")
    
            command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg_str)
    
            self.command(Dubbo.prompt, command_str)
            data = self.command(Dubbo.prompt, "")
            data = data.decode("utf-8", errors='ignore').split('\n')[1].strip()
    
            return data
    
    
    class DubboUtils(object):
        def __init__(self, zk_service, interface):
            self.zk_service = zk_service
            self.interface = interface
    
        @lru_cache(maxsize=10)
        def _get_dubbo(self, server_name):
            """
            获取单个dubbo服务的
            :param server_name:服务名
            :return:{"service": service, "paths": paths, "method": method}
            """
            zk = KazooClient(hosts="{}".format(self.zk_service))
            zk.start()
            urls = []
            service_list = zk.get_children("dubbo")
            for i in service_list:
                if server_name in i:
                    try:
                        # 获取服务发布方
                        providers = zk.get_children("/dubbo/{}/providers".format(i))
                        if providers:
                            for provider in providers:
                                url = urllib.parse.unquote(provider)
                                if url.startswith('dubbo:'):
                                    urls.append(url.split('dubbo://')[1])
                    except Exception as e:
                        print(e)
            paths = []
            for i in urls:
                try:
                    path, temp = i.split('/')
                    service = temp.split('?')[0]
                    method = temp.split('methods=')[1].split('&')[0].split(',')
                    paths.append(path)
                except Exception as e:
                    print(e)
            services = {"service": service, "paths": paths, "method": method}
            return services
    
        @logger.catch
        def requests_dubbo(self, method, param):
            """
            请求dubbo接口
            :param method: dubbo接口的方法
            :param param: 请求参数
            :return:
            """
            res = self._get_dubbo(self.interface)
            methods = res.get("method")
            if method not in methods:
                raise NameError(f"{method} not in {methods}")
    
            paths = res.get("paths")
    
            if len(paths) > 1:
                paths = paths[random.randint(0, len(paths) - 1)]
                # paths = paths[-1]
            else:
                paths = paths[0]
    
            ip, port = paths.split(":")
    
            con = Dubbo(ip, port)
            logger.info(
                f"开始调用地址ip - {ip} - 端口: {port} - 的 {self.interface} 服务的 接口- {method}, 参数为:\n {json.dumps(param, ensure_ascii=False)} ")
            result = con.invoke(service_name=self.interface, method_name=method, arg=param)
            logger.info(f"接口调用结果: {result}")
            return result
    
    
    if __name__ == '__main__':
        d = DubboUtils("注册中心地址", '服务名')
        d.requests_dubbo(方法名, python格式的参数
    
    

    我们用的是 zk 注册中心,不知道能不能帮到你

  • 🚃 呜呜呜。。。。。。。。上车上车

  • 伸手党的我请问什么时候开源~

  • 不知道为什么通过服务器外网 ip+ 端口无法访问控制台,使用 vnc 同样无法连接阿里服务器上的容器镜像内部,安全组已开,防火墙以关,请问楼主有遇到过吗

  • 接口测试用例设计 at 2019年10月29日

    这种问题我也在纠结,学完了但是想拿公司实际演练一下,发现公司的业务主要依赖定时器工程,真正的核心业务流程全部都是定时任务。然后就是接口关联的,上接口的部分响应数据做下接口的入参数据,如果全部使用 python 中的动态属性去做的话感觉总是差了那么点,但是如果使用 mock 只是为了做接口而做接口的话感觉没有必要,个人感觉核心的业务流程来做自动化回归更重要。不知道有没有大佬有好的答案,期待 ing、、、

  • docker 搭建 mysql at 2019年10月23日

    宿主机端口是外网访问的端口,昨天为了用费 3306 端口外网连接数据库,结果 containter 删了启,启了删,最后发现是两边端口反了,留下了没有技术的眼泪😔

  • 昨天看到一个新的连接数据库的
    db = records.Database('mysql+pymysql://user:pwd@ip:port/db_name')
    # 查询
    rows = db.query('SELECT * FROM xxx;')
    print(rows.all(as_dict=True))

    requests 作者封装的,查询同样一条 sql,未封装的大概是 0.1s,这个封装过后的 records 大概是 0.4s 左右

  • appium 启动做的事情感觉说的不是特别清楚~

  • 在 init 连接的时候加个这个参数更方便操作 cursorclass = pymysql.cursors.DictCursor
    即可转换为字典

  • 僅樓主可見
  • 测试开发之路----概要 at 2019年10月15日
    僅樓主可見
  • 我想实现的也是这种,之前浅显的觉得要测试平台来做,结果后面发现测试平台并不是想的那么简单,不过我也同样需要页面上传参进行调用,jmeter 的脚本,python 的脚本都有。

  • 测试开发之路----概要 at 2019年10月04日
    僅樓主可見
  • Python 接口自动化测试 at 2019年09月30日
    僅樓主可見
  • Python 接口自动化测试 at 2019年09月30日
    僅樓主可見
  • Python 接口自动化测试 at 2019年09月29日
    僅樓主可見
  • Python 接口自动化测试 at 2019年09月29日

    感谢大佬,最近在开始看 django 的教程,想尝试着试一下怎么做,发现了整个帖子,非常感谢