游戏测试机器人搭建 - Recv Actor

Recv Actor

Recv Actor 初始化

class RecvActor(pykka.ThreadingActor):
    def __init__(self, player=None, sock=None):
        super(RecvActor, self).__init__()
        self.player = player
        self.socket = sock

启动 Recv Actor

def on_start(self):
    self.on_loop()

持续接收服务端数据

def on_receive(self, msg):
    self.on_loop()
    # sleep一下不然耗性能
    time.sleep(0.01)

接收服务端数据进行反序列化

def on_loop(self):
    data = self.socket.recv()
    data = list(ijson.items(data,''))[0]
    proto_id,proto_bin = data['cmd'],data
    proto_module = protoFile
    proto_cls = getattr(proto_module, str(proto_id).capitalize())
    proto_cls_ins = proto_cls()
    if hasattr(proto_cls_ins, 'response'):
        getattr(proto_cls_ins,'response')(proto_bin)
        self.player.send_msg(MSG_PROTO, (proto_id, proto_cls_ins))
    else:
        self.player.send_msg(MSG_PROTO, (proto_id, proto_cls_ins))
    self.actor_ref.tell({'msg': 'loop'})

停止 RecvActor

def on_stop(self):
    print('RecvActor stop')

双是 log 收集

@GetLog(level='error')
def on_failure(self, exception_type, exception_value, traceback):
    logging.error(f'RecvActor fail -> {exception_type, exception_value, tb.print_tb(traceback)}')

到这里 3 个核心的 Actor 都搭建完毕了,接下来演示下python 编写游戏测试机器人客户端 (四) 玩家登录

最后的最后,各位的关注、点赞、收藏、碎银子打赏是对我最大的支持,谢谢大家!
需要源码的小伙伴关注微信公众号 ID:gameTesterGz
或扫描二维码回复机器人脚本即可


↙↙↙阅读原文可查看相关链接,并与作者交流