压测绕不开机器人的实现,游戏里面的压测相对软测来说需要保持长连接。大部分游戏都是采用 protobuf,google 提供了反射接口,对于处理消息是比较方便的,简单实现了一个 python 版本的机器人,你需要掌握以下的技能:
1、基础的 python 语法
2、tcp/ip
3、并发相关知识 (process, coroutine)
4、protobuf 相关知识

1、处理 protobuf
实现一个 pb 的管理器,提供注册、反射的方法,google 已经提供了现成的反射接口,只需要封装到管理器即可,
定义一个 Manager 类
init

msg_table: 用 dict 实现,存放了消息 - 函数指针的键值对
messageFactor(https://googleapis.dev/python/protobuf/latest/google/protobuf/message_factory.html)
desriptor_pool(https://googleapis.dev/python/protobuf/latest/google/protobuf/descriptor_pool.html)
上面两个对象用到了反射接口,文档 比较清晰的讲了用法,本文主要讲在机器人上的应用,protobuf 的反射原理的话可以参考这篇文章: https://blog.csdn.net/iopoint/article/details/118218789

reg

将 descriptor: func 存到 msg_table 中,
例:

定义一个 guide.ShowMap 协议,实现一个 TestGuide 类、reg_msg 方法

前向声明 Robot,robot 中定义了 manager,TestGuide 初始化的时候注册了 guide.ShowMap 与对应的回调方法,将 showMap: case_ShowMap 写入 msg_table,这里用到了 eval 方法 (https://docs.python.org/zh-cn/3/library/functions.html#eval)

get_des

通过协议名字返回 des 对象

get_func

通过 des 获取执行函数指针

exec

执行 msg_table 中的函数

reflect(名字起的有点随便)

传入 robot 的指针,conn, 协议名字, 收到的 buffer
通过协议名字拿到 des,通过 des 在 msg_table 中拿到回调函数指针, 进入下一层处理

manager(名字起的随便)

通过反射实例化 des,传入 buffer,既可反序化出来 protobuf 里面的消息,通过 protobuf 中 MessageToJson (https://googleapis.dev/python/protobuf/latest/google/protobuf/json_format.html,可以把 protobuf 里面的内容打印成 json 的格式输出,拿到消息后,通过 partial(https://docs.python.org/zh-cn/3/library/functools.html 绑定参数,传入 exec)

manager 代码:

import google.protobuf.message
import google.protobuf.descriptor
import google.protobuf.descriptor_pool
import google.protobuf.message_factory
import google.protobuf.json_format
import typing
from functools import partial
from robotSocket.conn import Conn

class Manager:

    def __init__(self) -> None:

        self.__msg_table = {} # key: google.protobuf.descriptor.Descript, value: func
        self.__messageFactory = google.protobuf.message_factory.MessageFactory()
        self.__descriptor_pool = google.protobuf.descriptor_pool.DescriptorPool()


    def reg(self, descriptor: google.protobuf.descriptor.Descriptor, func: typing.Callable):

        """
        :param: descriptor: descritpor
        :param: func: function pointer
        :return: None
        """
        if descriptor not in self.__msg_table.keys():
            self.__msg_table[id(descriptor)] = func
            self.__descriptor_pool.AddDescriptor(descriptor)


    def exec(self, func: partial, msg: google.protobuf.message.Message) -> bool:

        """

        :param: func: function pointer
        :param: msg: Message
        :return: bool 
        """
        return func(msg)

    def get_func(self, descriptor: google.protobuf.descriptor.Descriptor) -> typing.Callable | None:

        """

        :param: descriptor: descritoir
        :return: function pointer | None
        """
        func = None

        # map 比较需要__hash__, descriptor没必要重载
        if id(descriptor) in self.__msg_table.keys():
            func = self.__msg_table[id(descriptor)]

        return func

    def get_des(self, msg_name: str) -> google.protobuf.descriptor.Descriptor:

        """

        :return: descriptor
        """
        return self.__descriptor_pool.FindMessageTypeByName(msg_name)

    def reflect(self, user: object, c: Conn, msg_name: str, buffer: bytearray, flag: bool) -> bool:

        """
        :param: user: object
        :param: c: Conn
        :param: msg_name: str
        :param: buffer: bytearray
        :param: bool
        :return: bool
        """
        des = self.get_des(msg_name)

        if not des:

            return False

        call_func = self.get_func(des)
        print(f"call func: {call_func}")
        if not call_func:
            return False

        return self.manager(user, c, call_func, des, buffer, flag)

    def manager(self, user: typing.Callable, c: Conn, func: typing.Callable, descriptor: google.protobuf.descriptor.Descriptor, buffer: bytearray, flag: bool) -> bool:
        """

        :param: user: class pointer
        :param: c: Conn
        :param: func: function pointer
        :param: descriptor: descriptor
        :param: buffer: bytearray
        :param: flag: bool
        :return: bool
        """
        msg_type = self.__messageFactory.GetPrototype(descriptor)

        if not msg_type:
            return False

        msg = msg_type()

        if not msg:
            return False

        if not msg.ParseFromString(buffer):
            return False

        # if flag:
        #     print(google.protobuf.json_format.MessageToJson(msg))
        #     return False

        print(google.protobuf.json_format.MessageToJson(msg))
        ret = self.exec(partial(func, user, c), msg)

        return ret




TestGuide 代码:

import google.protobuf.message
from robotSocket.conn import Conn
from robot.robot import Robot
from pb import guide_pb2 



def reg_msg(pkg, msg):
    Robot.manager.reg(eval(f"{pkg}.{msg}.DESCRIPTOR"), eval(f"TestGuide.case_{msg}"))

class TestGuide:

    def __init__(self) -> None:
        reg_msg("guide_pb2", "ShowMap")
        pass

    @staticmethod
    def case_ShowMap(robot, c: Conn, msg: google.protobuf.message.Message) -> bool: 
        # 收到消息写逻辑
        if robot.get_terimate():

            c.send(b"123")
            print("call function case_ShowMap succ!!!")

        return True


内容有点多,暂时先介绍一下管理器实现,后续更新~~~~


↙↙↙阅读原文可查看相关链接,并与作者交流