自动化工具 mitmproxy + python 实现游戏协议测试

自然生长 · July 01, 2021 · Last by 自然生长 replied at September 28, 2022 · 16434 hits
本帖已被设为精华帖!

mitmproxy + python 实现游戏协议测试

本文侧重介绍如何使用 python 和 mitmproxy 实现拦截数据包、重发数据包,以及解析 protobuf 数据内容,对于相关依赖的安装不做介绍。

一、游戏协议安全测试内容

参考https://testerhome.com/topics/29053,这篇文章讲的很清楚。


二、实现原理

想直接使用的同学可以跳到第三部分。

mitmproxy 作为代理,可以获取客户端与服务端通信的数据,并且可以拦截、修改和自主发送数据。当配合其证书使用时,还可以解密 wss 连接中的 websocket 数据。

  • Websotcket 数据处理源码分析

在 http 代理的过程中若发现 upgrade websocket 请求,则创建 WebSocketLayer 实例,并调用其_call_方法。

# mitmproxy/proxy/protocol/http.py
"""以下为Httplayer的_process_flow方法的部分代码"""
if f.response.status_code == 101:
    # Handle a successful HTTP 101 Switching Protocols Response,
    # received after e.g. a WebSocket upgrade request.
    # Check for WebSocket handshake
    is_websocket = (
        websockets.check_handshake(f.request.headers) and
        websockets.check_handshake(f.response.headers)
    )
    if is_websocket and not self.config.options.websocket:
        self.log(
          "Client requested WebSocket connection, but the protocol is disabled.",
          "info"
        )

    if is_websocket and self.config.options.websocket:
        layer = WebSocketLayer(self, f)
    else:
        layer = self.ctx.next_layer(self)
    layer()

WebSocketLayer 初始化时会创建用于此次 websocket 通信的编解码器。

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的init方法,省略部分代码"""
def __init__(self, ctx, handshake_flow):
    super().__init__(ctx)
    self.handshake_flow = handshake_flow

    self.connections: dict[object, WSConnection] = {}

    client_extensions = []
    server_extensions = []
    # 判断交互数据是否使用deflate压缩
    if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
        if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
            client_extensions = [PerMessageDeflate()]
            server_extensions = [PerMessageDeflate()]
    # self.client_conn和self.server_conn继承自ctx,即原http的client和server,原理为父类的__getattr__(self, name)方法返回的是getattr(self.ctx, name)。WSConnection是一个websocket协议编解码器,实际不会发送任何网络IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html
    # 负责和解码server收到信息和编码server发送的信息
    self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER)
    # 负责和解码client收到信息和编码client发送的信息
    self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT)

    # 构造发送给Server的websocket的握手请求
    request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path)
    # send()方法只会构造一个适用于对应conn的数据,并不会真正发送数据,recv_data()会将信息解码,需要通过next(conn.events())获取解码后数据
    # 按上所说,下面两行代码的操作是将握手请求按client编码后发送给server编码器,然后让server编码器解码
    data = self.connections[self.server_conn].send(request)
    self.connections[self.client_conn].receive_data(data)

    event = next(self.connections[self.client_conn].events())
    assert isinstance(event, events.Request)
    # 返回给客户端接受连接响应
    data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions))
    self.connections[self.server_conn].receive_data(data)
    assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)

WebSocketLayer 实例的_call_方法负责处理后续 websocket 通信

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的call方法,省略部分代码"""
def __call__(self):
    self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow)
    self.flow.metadata['websocket_handshake'] = self.handshake_flow.id
    self.handshake_flow.metadata['websocket_flow'] = self.flow.id
    # 调用addons中的websocket_start(self, flow)对flow进行处理
    self.channel.ask("websocket_start", self.flow)

    conns = [c.connection for c in self.connections.keys()]
    close_received = False

    try:
        while not self.channel.should_exit.is_set():
            # 往client或server插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server是队列,后续实现在连接中主动发消息就是通过往队列中插入数据实现
            self._inject_messages(self.client_conn, self.flow._inject_messages_client)
            self._inject_messages(self.server_conn, self.flow._inject_messages_server)
        # select监视原http的client和server连接的可读事件
            r = tcp.ssl_read_select(conns, 0.1)
            for conn in r:
                source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
                other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
                is_server = (source_conn == self.server_conn)

                frame = websockets.Frame.from_file(source_conn.rfile)
                # 将从conn中获取的数据放入编解码器,此方法并没有返回值,所以data是None
                data = self.connections[source_conn].receive_data(bytes(frame))
                # data是None,不解此举有何意义
                source_conn.send(data)

                if close_received:
                    return
        # 处理编解码器中解码后的数据,event由pop取出,后续不会再用到。
                for event in self.connections[source_conn].events():
                    if not self._handle_event(event, source_conn, other_conn, is_server):
                        if not close_received:
                            close_received = True
    except (socket.error, exceptions.TcpException, SSL.Error) as e:
        s = 'server' if is_server else 'client'
        self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e)))
        # 调用addons中的websocket_start(self, flow)对flow进行处理
        self.channel.tell("websocket_start", self.flow)
    finally:
        self.flow.ended = True
        # 调用addons中的websocket_end(self, flow)对flow进行处理
        self.channel.tell("websocket_end", self.flow)

WebSocketLayer 实例中处理 Message Event 的方法

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的_handle_message方法,_handle_event中,若isinstance(event, events.Message),则会调用此函数"""
def _handle_message(self, event, source_conn, other_conn, is_server):
    fb = self.server_frame_buffer if is_server else self.client_frame_buffer
    fb.append(event.data)

    if event.message_finished:
        original_chunk_sizes = [len(f) for f in fb]

        if isinstance(event, events.TextMessage):
            message_type = wsproto.frame_protocol.Opcode.TEXT
            payload = ''.join(fb)
        else:
            message_type = wsproto.frame_protocol.Opcode.BINARY
            payload = b''.join(fb)

        fb.clear()

        websocket_message = WebSocketMessage(message_type, not is_server, payload)
        length = len(websocket_message.content)
        self.flow.messages.append(websocket_message)
        # 调用addons中的websocket_message(self, flow)对flow进行处理
        self.channel.ask("websocket_message", self.flow)

        # WebsocketMessage的属性killed用于判断该信息是否需要被转发,可在websocket_message函数中调用message的kill()方法置为True
        if not self.flow.stream and not websocket_message.killed:
            def get_chunk(payload):
                if len(payload) == length:
                    # message has the same length, we can reuse the same sizes
                    pos = 0
                    for s in original_chunk_sizes:
                        yield (payload[pos:pos + s], True if pos + s == length else False)
                        pos += s
                else:
                    # just re-chunk everything into 4kB frames
                    # header len = 4 bytes without masking key and 8 bytes with masking key
                    chunk_size = 4092 if is_server else 4088
                    chunks = range(0, len(payload), chunk_size)
                    for i in chunks:
                        yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)

            # 将收到的信息重新编码后向对端发送
            for chunk, final in get_chunk(websocket_message.content):
                data = self.connections[other_conn].send(Message(data=chunk, message_finished=final))
                other_conn.send(data)

    if self.flow.stream:
        data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished))
        other_conn.send(data)
    return True
  • Tcp 数据处理源码分析

TCP 数据处理触发条件

# mitmproxy/proxy/root_context.py
"""RootContext类_next_layer方法,省略部分代码"""
"""
4. Check for --tcp
判断Option中tcp_hosts, 类型是一个列表,包含需要转换成tcp流信息的server address正则表达式,例如['192\.168\.\d+\.\d+']
"""
if self.config.check_tcp(top_layer.server_conn.address):
    return protocol.RawTCPLayer(top_layer)

"""
6. Check for raw tcp mode
判断Option中rawtcp,类型是bool,若为true,则将不能处理的流转换成tcp流处理,建议开启,默认是false
"""
is_ascii = (
    len(d) == 3 and
    # expect A-Za-z
    all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
)
if self.config.options.rawtcp and not is_ascii:
    return protocol.RawTCPLayer(top_layer)

TCP 信息处理 RawTCPLayer 类源码

class RawTCPLayer(base.Layer):
    chunk_size = 4096

    def __init__(self, ctx, ignore=False):
        self.ignore = ignore
        super().__init__(ctx)

    def __call__(self):
        self.connect()

        if not self.ignore:
            f = tcp.TCPFlow(self.client_conn, self.server_conn, self)
            # 调用addons中的tcp_start(self, flow)对flow进行处理
            self.channel.ask("tcp_start", f)

        # 创建一个长度为4096的空bytearray
        buf = memoryview(bytearray(self.chunk_size))

        client = self.client_conn.connection
        server = self.server_conn.connection
        conns = [client, server]

        # https://github.com/openssl/openssl/issues/6234
        for conn in conns:
            if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"):
                SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY)

        try:
            while not self.channel.should_exit.is_set():
                r = mitmproxy.net.tcp.ssl_read_select(conns, 10)
                for conn in r:
                    dst = server if conn == client else client
                    try:
                        # 将从conn中recv的数据存入buf,返回size
                        size = conn.recv_into(buf, self.chunk_size)
                    except (SSL.WantReadError, SSL.WantWriteError):
                        continue
                    if not size:
                        conns.remove(conn)
                        # Shutdown connection to the other peer
                        if isinstance(conn, SSL.Connection):
                            # We can't half-close a connection, so we just close everything here.
                            # Sockets will be cleaned up on a higher level.
                            return
                        else:
                            dst.shutdown(socket.SHUT_WR)

                        if len(conns) == 0:
                            return
                        continue
            # 将recv的数据转成TCPMessage
                    tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes())
                    if not self.ignore:
                        f.messages.append(tcp_message)
                        # 调用addons中的tcp_message(self, flow)对flow进行处理
                        self.channel.ask("tcp_message", f)
                    # 发送tcp_message中的content
                    dst.sendall(tcp_message.content)

        except (socket.error, exceptions.TcpException, SSL.Error) as e:
            if not self.ignore:
                f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e)))
                # 调用addons中的tcp_error(self, flow)对flow进行处理
                self.channel.tell("tcp_error", f)
        finally:
            if not self.ignore:
                # 调用addons中的tcp_end(self, flow)对flow进行处理
                self.channel.tell("tcp_end", f)

三、开启 mitmproxy 并加载 addon

首先需要安装两个库:mitmproxy 和 mitmdump

1、编写 websocket 的 addon
"""
简略版用于websocket的Addon
后续改进可以增加判断host,避免拦截到不需要处理的连接,或者将Queue改成redis
"""
import asyncio
from multiprocessing import Queue

import mitmproxy.websocket

class WebsocketAddon:
    def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
        self._input_q = input_q
        self._output_q = output_q

    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
        while not flow.ended and not flow.error:
            # 增加间隔,否则会阻塞event
            await asyncio.sleep(0.5)
            while not self._input_q.empty():
                # WebSocketFlow的内置方法,用于主动插入信息,这里我只主动插入client->server的信息
                flow.inject_message(flow.server_conn, self._input_q.get())

    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
        # 加入发送websocket消息的task,参考了官方的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message
        asyncio.get_event_loop().create_task(self.inject(flow))


    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
        message = flow.messages[-1]
        self._output_q.put({
          'from_client': message.from_client, 
          'data': message.content
        })
        # message.kill()可以让Layer不转发该条信息,我这里的目的是拦截掉所有客户端发送的数据,由自己编辑后再发送
        if message.from_client:
            message.kill()
2、编写 socket 的 addon
"""
简略版用于socket的Addon
和websocket版差别不大,插入数据和拦截数据有区别
"""
import asyncio
from multiprocessing import Queue

import mitmproxy.tcp

class SocketAddon:
    def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
        self._input_q = input_q
        self._output_q = output_q

    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
        while flow.live and not flow.error:
            await asyncio.sleep(0.5)
            while not self._input_q.empty():
                    # 直接向对端发送socket信息完成插入
                    flow.server_conn.connection.sendall(payload)

    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
        asyncio.get_event_loop().create_task(self.inject(flow))


    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
        message = flow.messages[-1]
        self._output_q.put({
          'from_client': message.from_client, 
          'data': message.content
        })
        if message.from_client:
           # socket发送0字节,conn.sendall(b'')将不会发送任何数据
           message.content = b''
3、开启 mitmproxy 并完成处理函数
import multiprocessing

from mitmdump import Options, DumpMaster

def start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
        addons = [
        # 自主选择是使用Websocket还是Socket
            WebsocketAddon(input_q, output_q)
        # SocketAddon(input_q, output_q)
        ]
        opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5',
                     rawtcp=True,
                   # 需要转换tcp数据成的ip正则
                   tcp_hosts=['.*'],
                   flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, )
        m = DumpMaster(opts)
        m.addons.add(*addons)
        m.run()

def deal_client_message_func(client_message: [bytes, str]):
        if type(client_message) is bytes:
        return client_message.decode('utf-8').encode('gbk')
    elif type(client_message) is str:
        return f"test {client_message}"

def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
    while True:
            if not output_q.empty()
            message = output_q.get()
                print(f"{'客户端' if message['from_client'] else '服务端'} 包内容:{message['data']}")
            if message['from_client']:
                input_q.push(deal_client_message_func(message['data']))

def main():
    input_queue = multiprocessing.Queue()
    output_queue = multiprocessing.Queue()
    # 使用子进程启动proxy
    multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start()
    simple_handel_message_func(input_queue, output_queue)

四、总结

​ 对于想实现开头文中所提到的功能还需要实现客户端,以及对于 protobuf 协议的编解码,这里限于篇幅不再讨论,后续有机会再更新。

​ 另外,之所以 mitmproxy 选择 socks5 模式,是因为 socks 协议支持代理除了 http、https 以外更多种类的协议,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。

共收到 7 条回复 时间 点赞
陈子昂 将本帖设为了精华贴 02 Jul 11:24


https://docs.mitmproxy.org/stable/concepts-protocols/
看了下文档,发现 V7 文档写着还不能修改重播?有点奇怪。另外看 issue 问题似乎也挺多的。

陈随想 回复

应该是命令行工具没法修改重播吧,官方 addon example 里还有修改 websocket 的例子呢,但原始 tcp 官方没有例子,但通过看源代码也能实现。

Author only

想了解一下是 websocket+ protobuf 这种私有协议怎么去进行抓包解析,望楼主回复或出帖子分享,万分感谢!

Cuinn 回复

需要自己对数据进行分包,然后用 pb 文件反序列化成结构化数据。

自然生长 回复

Mitmproxy: 8.1.1 已经没有 WebSocketFlow

Mitmproxy used to have its own WebSocketFlow type until mitmproxy 6, but now WebSocket connections now are represented
as HTTP flows as well. They can be distinguished from regular HTTP requests by having the
mitmproxy.http.HTTPFlow.websocket attribute set.

This module only defines the classes for individual WebSocketMessages and the WebSocketData container.

Cuinn 回复

目前项目中 mitmproxy 版本固定,不会更新,另外这个新特性只是把 websocketFlow 归类到 HTTPFlow 里,对数据做相应处理就行。

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up