压测绕不开机器人的实现,游戏里面的压测相对软测来说需要保持长连接。大部分游戏都是采用 protobuf,google 提供了反射接口,对于处理消息是比较方便的,简单实现了一个 python 版本的机器人,你需要掌握以下的技能:
1、基础的 python 语法
2、tcp/ip
3、并发相关知识 (process, coroutine)
4、protobuf 相关知识
1、处理 protobuf
实现一个 pb 的管理器,提供注册、反射的方法,google 已经提供了现成的反射接口,只需要封装到管理器即可,
定义一个 Manager 类
init
msg_table: 用 dict 实现,存放了消息 - 函数指针的键值对
messageFactor(https://googleapis.dev/python/protobuf/latest/google/protobuf/message_factory.html)
desriptor_pool(https://googleapis.dev/python/protobuf/latest/google/protobuf/descriptor_pool.html)
上面两个对象用到了反射接口,文档 比较清晰的讲了用法,本文主要讲在机器人上的应用,protobuf 的反射原理的话可以参考这篇文章: https://blog.csdn.net/iopoint/article/details/118218789
reg
将 descriptor: func 存到 msg_table 中,
例:
定义一个 guide.ShowMap 协议,实现一个 TestGuide 类、reg_msg 方法
前向声明 Robot,robot 中定义了 manager,TestGuide 初始化的时候注册了 guide.ShowMap 与对应的回调方法,将 showMap: case_ShowMap 写入 msg_table,这里用到了 eval 方法 (https://docs.python.org/zh-cn/3/library/functions.html#eval)
get_des
通过协议名字返回 des 对象
get_func
通过 des 获取执行函数指针
exec
执行 msg_table 中的函数
reflect(名字起的有点随便)
传入 robot 的指针,conn, 协议名字, 收到的 buffer
通过协议名字拿到 des,通过 des 在 msg_table 中拿到回调函数指针, 进入下一层处理
manager(名字起的随便)
通过反射实例化 des,传入 buffer,既可反序化出来 protobuf 里面的消息,通过 protobuf 中 MessageToJson (https://googleapis.dev/python/protobuf/latest/google/protobuf/json_format.html,可以把 protobuf 里面的内容打印成 json 的格式输出,拿到消息后,通过 partial(https://docs.python.org/zh-cn/3/library/functools.html 绑定参数,传入 exec)
manager 代码:
import google.protobuf.message
import google.protobuf.descriptor
import google.protobuf.descriptor_pool
import google.protobuf.message_factory
import google.protobuf.json_format
import typing
from functools import partial
from robotSocket.conn import Conn
class Manager:
def __init__(self) -> None:
self.__msg_table = {} # key: google.protobuf.descriptor.Descript, value: func
self.__messageFactory = google.protobuf.message_factory.MessageFactory()
self.__descriptor_pool = google.protobuf.descriptor_pool.DescriptorPool()
def reg(self, descriptor: google.protobuf.descriptor.Descriptor, func: typing.Callable):
"""
:param: descriptor: descritpor
:param: func: function pointer
:return: None
"""
if descriptor not in self.__msg_table.keys():
self.__msg_table[id(descriptor)] = func
self.__descriptor_pool.AddDescriptor(descriptor)
def exec(self, func: partial, msg: google.protobuf.message.Message) -> bool:
"""
:param: func: function pointer
:param: msg: Message
:return: bool
"""
return func(msg)
def get_func(self, descriptor: google.protobuf.descriptor.Descriptor) -> typing.Callable | None:
"""
:param: descriptor: descritoir
:return: function pointer | None
"""
func = None
# map 比较需要__hash__, descriptor没必要重载
if id(descriptor) in self.__msg_table.keys():
func = self.__msg_table[id(descriptor)]
return func
def get_des(self, msg_name: str) -> google.protobuf.descriptor.Descriptor:
"""
:return: descriptor
"""
return self.__descriptor_pool.FindMessageTypeByName(msg_name)
def reflect(self, user: object, c: Conn, msg_name: str, buffer: bytearray, flag: bool) -> bool:
"""
:param: user: object
:param: c: Conn
:param: msg_name: str
:param: buffer: bytearray
:param: bool
:return: bool
"""
des = self.get_des(msg_name)
if not des:
return False
call_func = self.get_func(des)
print(f"call func: {call_func}")
if not call_func:
return False
return self.manager(user, c, call_func, des, buffer, flag)
def manager(self, user: typing.Callable, c: Conn, func: typing.Callable, descriptor: google.protobuf.descriptor.Descriptor, buffer: bytearray, flag: bool) -> bool:
"""
:param: user: class pointer
:param: c: Conn
:param: func: function pointer
:param: descriptor: descriptor
:param: buffer: bytearray
:param: flag: bool
:return: bool
"""
msg_type = self.__messageFactory.GetPrototype(descriptor)
if not msg_type:
return False
msg = msg_type()
if not msg:
return False
if not msg.ParseFromString(buffer):
return False
# if flag:
# print(google.protobuf.json_format.MessageToJson(msg))
# return False
print(google.protobuf.json_format.MessageToJson(msg))
ret = self.exec(partial(func, user, c), msg)
return ret
TestGuide 代码:
import google.protobuf.message
from robotSocket.conn import Conn
from robot.robot import Robot
from pb import guide_pb2
def reg_msg(pkg, msg):
Robot.manager.reg(eval(f"{pkg}.{msg}.DESCRIPTOR"), eval(f"TestGuide.case_{msg}"))
class TestGuide:
def __init__(self) -> None:
reg_msg("guide_pb2", "ShowMap")
pass
@staticmethod
def case_ShowMap(robot, c: Conn, msg: google.protobuf.message.Message) -> bool:
# 收到消息写逻辑
if robot.get_terimate():
c.send(b"123")
print("call function case_ShowMap succ!!!")
return True
内容有点多,暂时先介绍一下管理器实现,后续更新~~~~