import time
from scapy.all import *
from scapy.layers.inet import TCP, IP, getmacbyip
from scapy.config import conf
# 显式导入 getmacbyip 和 Ether
from scapy.layers.l2 import Ether
# 全局变量,用于存储捕获到的 WebSocket 消息
captured_messages = []
# 配置变量,指定要监听的 WebSocket 服务器的 URL
TARGET_WS_URL = "ws://10.2.2.147:8765"
# 配置变量,指定网络接口名称,若为 None 则自动选择合适的网络接口
NETWORK_INTERFACE = None
# conf.sniff_promisc = 1 # 在文件开头添加
# 在会话对象中维护状态
session_pool = {} # 初始化会话池字典
class TCPSession:
def __init__(self):
self.client_next_seq = 1 # 客户端下一个要发送的SEQ
self.client_last_ack = 0 # 客户端最后发送的ACK
self.server_next_seq = 1 # 服务端下一个要发送的SEQ
self.server_last_ack = 0 # 服务端最后发送的ACK
def get_or_create_session(src_ip, src_port, dst_ip, dst_port):
"""
获取或创建TCP会话
:return: TCPSession对象
"""
key = (src_ip, src_port, dst_ip, dst_port)
if key not in session_pool:
session_pool[key] = TCPSession()
return session_pool[key]
def parse_websocket_frame(data):
"""
解析 WebSocket 帧数据,处理文本消息和二进制消息。
:param data: 待解析的 WebSocket 帧字节数据
:return: 若解析成功返回解码后的字符串或字节数据,否则返回 None
"""
try:
if len(data) < 2:
return None
fin = (data[0] & 0x80) == 0x80
opcode = data[0] & 0x0F
masked = (data[1] & 0x80) == 0x80
payload_len = data[1] & 0x7F
offset = 2
if payload_len == 126:
payload_len = (data[2] << 8) + data[3]
offset = 4
elif payload_len == 127:
payload_len = 0
for i in range(8):
payload_len = (payload_len << 8) + data[2 + i]
offset = 10
if masked:
mask_key = data[offset:offset + 4]
offset += 4
else:
mask_key = None
if len(data) < offset + payload_len:
return None
payload = data[offset:offset + payload_len]
if masked:
decoded = bytearray()
for i in range(len(payload)):
decoded.append(payload[i] ^ mask_key[i % 4])
payload = decoded
if opcode == 0x01: # 文本帧
try:
return payload.decode('utf-8')
except UnicodeDecodeError:
return payload
elif opcode == 0x02: # 二进制帧
return payload
except Exception as e:
print(f"Error parsing WebSocket frame: {e}")
return None
def packet_handler(pkt):
global session_pool
if pkt.haslayer(IP) and pkt.haslayer(TCP):
ip_layer = pkt[IP]
tcp_layer = pkt[TCP]
# 获取或创建会话
key = (ip_layer.src, tcp_layer.sport, ip_layer.dst, tcp_layer.dport)
session = get_or_create_session(ip_layer.src, tcp_layer.sport, ip_layer.dst, tcp_layer.dport)
# 更新服务端状态(假设服务端发送了数据)
if pkt.haslayer(Raw):
payload_len = len(pkt[Raw].load)
session.server_next_seq = tcp_layer.seq + payload_len
session.server_last_ack = tcp_layer.ack
print(
f"src_ip: {ip_layer.src}, src_port: {tcp_layer.sport}, dst_ip: {ip_layer.dst}, dst_port: {tcp_layer.dport}")
if pkt.haslayer(Raw):
raw_data = bytes(pkt[Raw].load)
print("原始数据: ", raw_data)
ws_message = parse_websocket_frame(raw_data)
if ws_message:
if isinstance(ws_message, str):
print("解析后的明文: ", ws_message)
if ws_message == "23":
time.sleep(1)
print(222222)
client_ip = ip_layer.src
server_ip = ip_layer.dst
client_port = tcp_layer.sport
server_port = tcp_layer.dport
msg = "33"
ws_frame = build_websocket_frame(msg, opcode=0x01, masked=True)
print("ws_frame: ", ws_frame)
# 构造IP和TCP层
ip = IP(src=client_ip, dst=server_ip)
tcp = TCP(
sport=client_port,
dport=server_port,
flags="PA",
seq=session.client_next_seq,
ack=session.server_last_ack
)
# MAC地址处理
try:
server_mac = getmacbyip(server_ip)
local_mac = get_if_hwaddr(conf.iface if not NETWORK_INTERFACE else NETWORK_INTERFACE)
print("local_mac: ", local_mac)
print("server_mac: ", server_mac)
eth = Ether(src=local_mac, dst=server_mac)
except Exception as e:
print(f"MAC地址获取错误: {e}")
return
packet = eth / ip / tcp / Raw(load=ws_frame)
print("构造的数据包: ", packet)
print("构造的数据包: ", packet.show())
print("[DEBUG] 数据包详情:")
print(f" - 以太网层: src={eth.src}, dst={eth.dst}")
print(f" - IP层: {ip.src} -> {ip.dst}")
print(f" - TCP层: {tcp.sport} -> {tcp.dport}, seq={tcp.seq}, ack={tcp.ack}")
print(f" - WebSocket帧: {ws_frame[:20].hex()}... (len={len(ws_frame)})")
try:
send_result = sendp(packet, iface=NETWORK_INTERFACE, verbose=0, return_packets=True)
if send_result:
sent_packet = send_result[0]
print(f"目标: {sent_packet[IP].dst}:{sent_packet[TCP].dport}")
print(f"数据长度: {len(sent_packet[Raw].load)}字节")
print(f"sent_packet: {sent_packet}")
from scapy.layers.l2 import arping
print(f"ARP验证结果: {arping(server_ip, verbose=0, timeout=2)[0]}")
# 关键修复:更新客户端序列号
session.client_next_seq += len(ws_frame)
# 添加响应验证(可选)
response = sniff(
filter=f"tcp and host {server_ip}",
count=1,
timeout=2,
iface=NETWORK_INTERFACE
)
if response:
print(f"[+] 收到服务端响应: {response[0].summary()}")
else:
print("发送失败:数据包未实际发出")
except PermissionError:
print("发送失败:权限不足,请尝试使用sudo或以管理员身份运行")
except OSError as e:
if "No such device" in str(e):
print(f"发送失败:网络接口 {NETWORK_INTERFACE} 不存在")
elif "Network is down" in str(e):
print("发送失败:网络连接已断开")
else:
print(f"发送失败:系统错误 - {str(e)}")
except Scapy_Exception as e:
print(f"发送失败:Scapy内部错误 - {str(e)}")
except Exception as e:
print(f"发送失败:未知错误 - {str(e)}")
else:
print("解析后的二进制数据十六进制表示: ", ws_message.hex())
def start_sniffer(interface=None):
"""
启动抓包程序,开始监听指定 WebSocket URL 的网络流量。
:param interface: 网络接口名称,若为 None 则自动选择
"""
filter_rule = "tcp port 8765"
print(f"[*] Starting WebSocket sniffer for URL: {TARGET_WS_URL}")
print("[*] Press Ctrl+C to stop...")
try:
sniff(iface=interface, filter=filter_rule, prn=packet_handler, store=0)
except KeyboardInterrupt:
print("\n[*] Stopping sniffer...")
except Exception as e:
print(f"[!] Error: {e}")
finally:
print("\n=== Captured WebSocket Messages Summary ===")
for i, msg in enumerate(captured_messages, 1):
print(f"{i}. {msg['direction']} - {msg['source']} -> {msg['destination']}")
print(f" Time: {msg['timestamp']}")
if isinstance(msg['message'], str):
if len(msg['message']) > 50:
print(f" Message: {msg['message'][:50]}...")
else:
print(f" Message: {msg['message']}")
else:
print(f" Message (Binary): {msg['message'].hex()}")
def build_websocket_frame(data, opcode=0x01, masked=False):
"""
将数据重新封装成 WebSocket 帧。
:param data: 待封装的数据,可以是字符串或字节类型
:param opcode: 操作码,0x01 表示文本帧,0x02 表示二进制帧,默认为 0x01
:param masked: 是否对数据进行掩码处理,默认为 False
:return: 封装后的 WebSocket 帧字节数据
"""
if isinstance(data, str):
payload = data.encode('utf-8')
else:
payload = data
payload_len = len(payload)
# 构建帧头部
fin = 0x80 # 表示这是最后一帧
# 构建帧头部
header = bytearray([fin | opcode])
# 处理负载长度
if payload_len <= 125:
header.append(payload_len if not masked else (0x80 | payload_len))
elif payload_len <= 65535:
header.append(126 if not masked else (0x80 | 126))
header.extend(payload_len.to_bytes(2, 'big'))
else:
header.append(127 if not masked else (0x80 | 127))
header.extend(payload_len.to_bytes(8, 'big'))
# 如果是客户端发送,通常需要掩码
if masked:
mask_key = os.urandom(4)
header.extend(mask_key)
masked_payload = bytearray()
for i in range(len(payload)):
masked_payload.append(payload[i] ^ mask_key[i % 4])
payload = masked_payload
return bytes(header + payload)
if __name__ == "__main__":
if not (TARGET_WS_URL.startswith("ws://") or TARGET_WS_URL.startswith("wss://")):
print("[!] Error: Target URL must start with ws:// or wss://")
exit(1)
start_sniffer(interface=NETWORK_INTERFACE)
```py