想问问, 玩转 ai 系列啥时候可以公开学习
需要替换的数据全是有逻辑的,有什么好的建议吗?
我去尝试一下
已经试过了,不行 ,连鼠标的托拉拽貌似都识别不出来
我是要实现这种业务的自动化,不是要做这个的需求
# -*- coding: utf-8 -*-
# @Time    : 2020/7/1 10:11
# @Author  : grassroadsZ
# @File    : handle_dubbo.py
import random
import urllib
import json
import telnetlib
from urllib.parse import unquote
from kazoo.client import KazooClient
from functools import lru_cache
from loguru import logger
@lru_cache(maxsize=10)
class Dubbo(telnetlib.Telnet):
    prompt = 'dubbo>'
    coding = 'utf-8'
    def __init__(self, host=None, port=0, timeout=10):
        super().__init__(host, port, timeout)
        self.write(b'\n')
    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"\n")
        return data
    def invoke(self, service_name, method_name, arg):
        arg_str = None
        if isinstance(arg, (dict, list)):
            arg_str = json.dumps(arg)
        if isinstance(arg, tuple):
            arg_str = str(arg).replace("(", "").replace(")", "")
        command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg_str)
        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = data.decode("utf-8", errors='ignore').split('\n')[1].strip()
        return data
class DubboUtils(object):
    def __init__(self, zk_service, interface):
        self.zk_service = zk_service
        self.interface = interface
    @lru_cache(maxsize=10)
    def _get_dubbo(self, server_name):
        """
        获取单个dubbo服务的
        :param server_name:服务名
        :return:{"service": service, "paths": paths, "method": method}
        """
        zk = KazooClient(hosts="{}".format(self.zk_service))
        zk.start()
        urls = []
        service_list = zk.get_children("dubbo")
        for i in service_list:
            if server_name in i:
                try:
                    # 获取服务发布方
                    providers = zk.get_children("/dubbo/{}/providers".format(i))
                    if providers:
                        for provider in providers:
                            url = urllib.parse.unquote(provider)
                            if url.startswith('dubbo:'):
                                urls.append(url.split('dubbo://')[1])
                except Exception as e:
                    print(e)
        paths = []
        for i in urls:
            try:
                path, temp = i.split('/')
                service = temp.split('?')[0]
                method = temp.split('methods=')[1].split('&')[0].split(',')
                paths.append(path)
            except Exception as e:
                print(e)
        services = {"service": service, "paths": paths, "method": method}
        return services
    @logger.catch
    def requests_dubbo(self, method, param):
        """
        请求dubbo接口
        :param method: dubbo接口的方法
        :param param: 请求参数
        :return:
        """
        res = self._get_dubbo(self.interface)
        methods = res.get("method")
        if method not in methods:
            raise NameError(f"{method} not in {methods}")
        paths = res.get("paths")
        if len(paths) > 1:
            paths = paths[random.randint(0, len(paths) - 1)]
            # paths = paths[-1]
        else:
            paths = paths[0]
        ip, port = paths.split(":")
        con = Dubbo(ip, port)
        logger.info(
            f"开始调用地址ip - {ip} - 端口: {port} - 的 {self.interface} 服务的 接口- {method}, 参数为:\n {json.dumps(param, ensure_ascii=False)} ")
        result = con.invoke(service_name=self.interface, method_name=method, arg=param)
        logger.info(f"接口调用结果: {result}")
        return result
if __name__ == '__main__':
    d = DubboUtils("注册中心地址", '服务名')
    d.requests_dubbo(方法名, python格式的参数)
我们用的是 zk 注册中心,不知道能不能帮到你
 呜呜呜。。。。。。。。上车上车
伸手党的我请问什么时候开源~
不知道为什么通过服务器外网 ip+ 端口无法访问控制台,使用 vnc 同样无法连接阿里服务器上的容器镜像内部,安全组已开,防火墙以关,请问楼主有遇到过吗
这种问题我也在纠结,学完了但是想拿公司实际演练一下,发现公司的业务主要依赖定时器工程,真正的核心业务流程全部都是定时任务。然后就是接口关联的,上接口的部分响应数据做下接口的入参数据,如果全部使用 python 中的动态属性去做的话感觉总是差了那么点,但是如果使用 mock 只是为了做接口而做接口的话感觉没有必要,个人感觉核心的业务流程来做自动化回归更重要。不知道有没有大佬有好的答案,期待 ing、、、