最近新的项目统一采用网关,我们组处在后台主要提供 dubbo 接口,dubbo 接口的参数🈶️个是通过隐式传参的。做接口测试的时候该如何传这个隐式参数,找过 jmeter 和 python 相关资料都没有头绪
RpcContext.getContext().setAttachment("parmerName",value);
应该是这个吧
# -*- coding: utf-8 -*-
# @Time : 2020/7/1 10:11
# @Author : grassroadsZ
# @File : handle_dubbo.py
import random
import urllib
import json
import telnetlib
from urllib.parse import unquote
from kazoo.client import KazooClient
from functools import lru_cache
from loguru import logger
@lru_cache(maxsize=10)
class Dubbo(telnetlib.Telnet):
prompt = 'dubbo>'
coding = 'utf-8'
def __init__(self, host=None, port=0, timeout=10):
super().__init__(host, port, timeout)
self.write(b'\n')
def command(self, flag, str_=""):
data = self.read_until(flag.encode())
self.write(str_.encode() + b"\n")
return data
def invoke(self, service_name, method_name, arg):
arg_str = None
if isinstance(arg, (dict, list)):
arg_str = json.dumps(arg)
if isinstance(arg, tuple):
arg_str = str(arg).replace("(", "").replace(")", "")
command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg_str)
self.command(Dubbo.prompt, command_str)
data = self.command(Dubbo.prompt, "")
data = data.decode("utf-8", errors='ignore').split('\n')[1].strip()
return data
class DubboUtils(object):
def __init__(self, zk_service, interface):
self.zk_service = zk_service
self.interface = interface
@lru_cache(maxsize=10)
def _get_dubbo(self, server_name):
"""
获取单个dubbo服务的
:param server_name:服务名
:return:{"service": service, "paths": paths, "method": method}
"""
zk = KazooClient(hosts="{}".format(self.zk_service))
zk.start()
urls = []
service_list = zk.get_children("dubbo")
for i in service_list:
if server_name in i:
try:
# 获取服务发布方
providers = zk.get_children("/dubbo/{}/providers".format(i))
if providers:
for provider in providers:
url = urllib.parse.unquote(provider)
if url.startswith('dubbo:'):
urls.append(url.split('dubbo://')[1])
except Exception as e:
print(e)
paths = []
for i in urls:
try:
path, temp = i.split('/')
service = temp.split('?')[0]
method = temp.split('methods=')[1].split('&')[0].split(',')
paths.append(path)
except Exception as e:
print(e)
services = {"service": service, "paths": paths, "method": method}
return services
@logger.catch
def requests_dubbo(self, method, param):
"""
请求dubbo接口
:param method: dubbo接口的方法
:param param: 请求参数
:return:
"""
res = self._get_dubbo(self.interface)
methods = res.get("method")
if method not in methods:
raise NameError(f"{method} not in {methods}")
paths = res.get("paths")
if len(paths) > 1:
paths = paths[random.randint(0, len(paths) - 1)]
# paths = paths[-1]
else:
paths = paths[0]
ip, port = paths.split(":")
con = Dubbo(ip, port)
logger.info(
f"开始调用地址ip - {ip} - 端口: {port} - 的 {self.interface} 服务的 接口- {method}, 参数为:\n {json.dumps(param, ensure_ascii=False)} ")
result = con.invoke(service_name=self.interface, method_name=method, arg=param)
logger.info(f"接口调用结果: {result}")
return result
if __name__ == '__main__':
d = DubboUtils("注册中心地址", '服务名')
d.requests_dubbo(方法名, python格式的参数)
我们用的是 zk 注册中心,不知道能不能帮到你
有什么解决方案了吗