性能测试工具 goreplay python plugin 修改原始流量的参数增加测试标签

margaret2022 · 2023年10月26日 · 2698 次阅读
!/usr/bin/python3
# -*- coding: utf-8 -*-
from typing import Dict
import sys
sys.path.append('/home/services')
import fileinput
import binascii
import json
import XXXX as rcp
import XXXX as pb
from google.protobuf import text_format
# Used to find end of the Headers section
EMPTY_LINE = b'\r\n\r\n'

def log(msg):
    """
    Logging to STDERR as STDOUT and STDIN used for data transfer
    @type msg: str or byte string
    @param msg: Message to log to STDERR
    """
    try:
        msg = str(msg) + '\n'
    except:
        pass
    sys.stderr.write(msg)
    sys.stderr.flush()

def http_header( payload: bytes, name: str) -> Dict[str, str]:
        current_line = 0
        idx = 0
        header = {
            'start': -1,
            'end': -1,
            'value_start': -1,
        }
        name = name.encode()
        while idx < len(payload):
            c = payload[idx]
            if c == ord('\n'):
                current_line += 1
                idx += 1
                header['end'] = idx

                if current_line > 0 and header['start'] > 0 and header['value_start'] > 0:
                    if payload[header['start']:header['value_start']-1].lower() == name.lower():
                        header['value'] = payload[header['value_start']:header['end']].strip().decode()
                        header['name'] = name.lower().decode()
                        return header

                header['start'] = -1
                header['value_start'] = -1
                continue
            elif c == ord('\r'):
                idx += 1
                continue
            elif c == ord(':'):
                if header['value_start'] == -1:
                    idx += 1
                    header['value_start'] = idx
                    continue
            if header['start'] == -1:
                header['start'] = idx
            idx += 1
        return None

def http_headers( payload: bytes) -> Dict[str, str]:
        """
        Parse the payload and return http headers in a map
        :param payload: the http payload to inspect
        :return: a map mapping from key to value of each http header item
        """
        start_index = payload.index(b"\r\n")
        end_index = payload.index(b"\r\n\r\n")
        if end_index == -1:
            end_index = len(payload)
        headers = {}
        for item in payload[start_index+2:end_index].split(b"\r\n"):
            sep_index = item.index(b":")
            key = item[:sep_index].decode()
            value = item[sep_index+1:].lstrip().decode()
            headers[key] = value
        return headers

def set_http_header( payload: bytes, name: str, value: str) -> bytes:
        header = http_header(payload, name)
        if header is None:
            header_start = payload.index(b'\n') + 1
            return payload[:header_start] + name.encode() + b': ' + value.encode() + b'\r\n' + payload[header_start:]
        else:
            return payload[:header['value_start']] + b' ' + value.encode() + b'\r\n' + payload[header['end']:]

def find_end_of_headers(byte_data):
    """
    Finds where the header portion ends and the content portion begins.
    @type byte_data: str or byte string
    @param byte_data: Hex decoded req or resp string
    """
    return byte_data.index(EMPTY_LINE) + 4


def process_stdin(my_dict):
    """
    Process STDIN and output to STDOUT
    """
    for raw_line in fileinput.input():
        line = raw_line.rstrip()

        # Decode base64 encoded line
        decoded = bytes.fromhex(line)

        # Split into metadata and payload, the payload is headers + body
        (raw_metadata, payload) = decoded.split(b'\n', 1)

        # Split into headers and payload
        headers_pos = find_end_of_headers(payload)
        raw_headers = payload[:headers_pos]
        raw_content = payload[headers_pos:]
        //pb实例化
        message = pb.XXXXX()
        message.ParseFromString(raw_content)
        //标注流量来自qa
        setattr(message.base_params, 'from', 'qa')        

        raw_content=message.SerializeToString()
        new_content_length=len(raw_content)
        new_raw_headers=set_http_header(raw_headers,'Content-Length',str(new_content_length))
        request_type_id = int(raw_metadata.split(b' ')[0])

        encoded = binascii.hexlify(raw_metadata + b'\n' + new_raw_headers + raw_content).decode('ascii')

        sys.stdout.write(encoded + '\n')
        sys.stdout.flush()



if __name__ == '__main__':
    process_stdin()
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册