接口测试 如何测试有 dubbo 隐式传参的接口

半闲 · 2020年12月07日 · 最后由 云深不知处 回复于 2023年02月02日 · 2572 次阅读

最近新的项目统一采用网关,我们组处在后台主要提供 dubbo 接口,dubbo 接口的参数🈶️个是通过隐式传参的。做接口测试的时候该如何传这个隐式参数,找过 jmeter 和 python 相关资料都没有头绪

共收到 5 条回复 时间 点赞

RpcContext.getContext().setAttachment("parmerName",value);
应该是这个吧

李海默 回复


不确定是不是,看方案上是用 dubbofilter。这种接口是不是只有通过 dubbo 框架写 Java 代码测试

# -*- coding: utf-8 -*-
# @Time    : 2020/7/1 10:11
# @Author  : grassroadsZ
# @File    : handle_dubbo.py
import random
import urllib
import json
import telnetlib
from urllib.parse import unquote

from kazoo.client import KazooClient

from functools import lru_cache
from loguru import logger


@lru_cache(maxsize=10)
class Dubbo(telnetlib.Telnet):
    prompt = 'dubbo>'
    coding = 'utf-8'

    def __init__(self, host=None, port=0, timeout=10):
        super().__init__(host, port, timeout)
        self.write(b'\n')

    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"\n")
        return data

    def invoke(self, service_name, method_name, arg):
        arg_str = None

        if isinstance(arg, (dict, list)):
            arg_str = json.dumps(arg)
        if isinstance(arg, tuple):
            arg_str = str(arg).replace("(", "").replace(")", "")

        command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg_str)

        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = data.decode("utf-8", errors='ignore').split('\n')[1].strip()

        return data


class DubboUtils(object):
    def __init__(self, zk_service, interface):
        self.zk_service = zk_service
        self.interface = interface

    @lru_cache(maxsize=10)
    def _get_dubbo(self, server_name):
        """
        获取单个dubbo服务的
        :param server_name:服务名
        :return:{"service": service, "paths": paths, "method": method}
        """
        zk = KazooClient(hosts="{}".format(self.zk_service))
        zk.start()
        urls = []
        service_list = zk.get_children("dubbo")
        for i in service_list:
            if server_name in i:
                try:
                    # 获取服务发布方
                    providers = zk.get_children("/dubbo/{}/providers".format(i))
                    if providers:
                        for provider in providers:
                            url = urllib.parse.unquote(provider)
                            if url.startswith('dubbo:'):
                                urls.append(url.split('dubbo://')[1])
                except Exception as e:
                    print(e)
        paths = []
        for i in urls:
            try:
                path, temp = i.split('/')
                service = temp.split('?')[0]
                method = temp.split('methods=')[1].split('&')[0].split(',')
                paths.append(path)
            except Exception as e:
                print(e)
        services = {"service": service, "paths": paths, "method": method}
        return services

    @logger.catch
    def requests_dubbo(self, method, param):
        """
        请求dubbo接口
        :param method: dubbo接口的方法
        :param param: 请求参数
        :return:
        """
        res = self._get_dubbo(self.interface)
        methods = res.get("method")
        if method not in methods:
            raise NameError(f"{method} not in {methods}")

        paths = res.get("paths")

        if len(paths) > 1:
            paths = paths[random.randint(0, len(paths) - 1)]
            # paths = paths[-1]
        else:
            paths = paths[0]

        ip, port = paths.split(":")

        con = Dubbo(ip, port)
        logger.info(
            f"开始调用地址ip - {ip} - 端口: {port} - 的 {self.interface} 服务的 接口- {method}, 参数为:\n {json.dumps(param, ensure_ascii=False)} ")
        result = con.invoke(service_name=self.interface, method_name=method, arg=param)
        logger.info(f"接口调用结果: {result}")
        return result


if __name__ == '__main__':
    d = DubboUtils("注册中心地址", '服务名')
    d.requests_dubbo(方法名, python格式的参数

我们用的是 zk 注册中心,不知道能不能帮到你

仅楼主可见

有什么解决方案了吗

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册