对于嵌入式设备而言,每一个厂商或者公司来说,协议文档都是不一样的,所以具体的协议解析和组包在这里就不便说明,只记录一个基础框架:命令构造器、协议解析器、数据提取器(目前只写了这三个,后期再更新)
相对于我司而言,具体的通信协议帧格式如下:
帧头:2byte
帧长度:2byte
端口:1byte
指令类型:2byte
帧载荷:nbyte
校验和:1byte
帧尾:2byte
这些通讯协议数据的解析以及封装均为二进制,中间还采用了低字节序(LITTLE-ENDIAN)也就是小端模式,这里除了固定的帧头和帧尾外,其余部分均采用了小端模式
校验和:是指进行的 CRC 校验
还对数据帧中进行了特征字的定义,当出现特征字时就需要进行转义
从串口接收到的数据,我们需要进行一些处理,因为避免无效的数据带来的影响,FrameExtractor 是为了提取到一个符合规则正确的数据帧
当从串口接收数据时,我们需要去建立一个缓冲区来存放接收到的数据
因为可能会发生以下几种情况:
1.串口当前接收到的数据不全,不是一个完整的数据帧
2.接收到了多个数据帧,当最后一个数据帧不是完整的
class FrameExtractor:
def __init__(self):
self.buffer = b''
self.frames = []
所以建立缓冲区来对数据进行处理:
1.当缓冲区数据不足一个完整的数据帧时,等待更多的数据到达
2.若有完整的数据帧,则提取出来
对数据帧的提取,也就是解析,以下是解析的流程:
查找帧头--------->查找帧尾--------->进行校验和判断------->校验通过就行转义处理(还原原始数据)----->提取正确的数据帧
def extract_frames(self, data):
frames = []
frame_start_found = False # 新增标志变量,指示是否找到了帧头
# 将当前数据缓冲区中的数据与新接收到的数据合并
self.buffer +=data
start = None # 记录帧头的起始位置
# 处理数据缓冲区中的数据,提取完整的数据帧
while True:
# 如果缓冲区中的数据不足一个完整的数据帧,则退出循环等待更多数据到达
if len(self.buffer) < 6:
break
if not frame_start_found:
# 查找帧头
try:
start = self.buffer.index(b'\x5a\x55')
if start != None:
self.buffer = self.buffer[start:]
frame_start_found = True # 找到帧头,设置标志变量为True
continue
else:
frame_start_found = True # 找到帧头,设置标志变量为True
except ValueError as e:
self.buffer = []
print(f'error:{e}')
break
else:
# 查找帧尾
try:
end = self.buffer.index(b'\x6a\x69', start+2)
except ValueError:
break # 未找到帧尾,退出循环等待更多数据
# 提取完整的数据帧
try:
# 进行校验和计算判断
if self.caclc_check(self.buffer[start:end - 1]) != self.buffer[end - 1]:
# CRC校验不符,抛弃数据,重新寻找数据帧
self.buffer = self.buffer[start + 2:]
continue
#还原转义
data_frame = self.unescape(self.buffer[start+2:end-1])
#计算数据帧长度
length = struct.unpack('<H', data_frame[:2])[0]
#判断是否为完整的数据帧
if end - start == length + 2:
#添加完整的数据帧到列表中
frame = self.buffer[start:start+2] + data_frame +self.buffer[end-1:end+2]
frames.append(frame)
#重置标志变量
frame_start_found = False
#从缓冲区移出已提取到数据帧
self.buffer = self.buffer[end+2:]
break
else:
#不是完整的数据帧,需要重新查找帧头
frame_start_found = False
self.buffer = self.buffer[start+2:]
except struct.error:
# 未找到帧尾,等待下一个数据
self.buffer = self.buffer[start+2:]
frame_start_found = False
pass # 解析数据帧出错,继续查找下一个数据帧
return frames
获取到正确的数据帧之后,需要去判断这个数据帧对应的协议回复内容是什么,那么就需要去通过数据帧中的指令去判断
以下代码中我举出一个实例:
中止操作命令下发之后的回复内容,指令为 0x0011 时,载负荷内容为 1 时是中止操作成功,为 0 时是中止操作失败
class CommandAnalyzer:
def __init__(self):
self.frame = None
self.port = None
self.command_type = None
self.payload = None
self.checksum = None
def analyze_frame(self,data):
self.frame=data
# 解析帧长度
frame_length = int.from_bytes(self.frame[2:4], byteorder='little')
# 解析端口号
self.port = self.frame[4]
# 解析指令类型
self.command_type = int.from_bytes(self.frame[5:7], byteorder='little')
# 解析帧载荷,根据帧长度判断
if frame_length > 6:
self.payload = self.frame[7:frame_length+1]
# 解析校验和
self.checksum = self.frame[frame_length +1]
# 根据不同的指令类型进行不同的操作
if self.command_type == 0x0000:
self.handle_command_0000()
elif self.command_type == 0x0001:
self.handle_command_0001()
elif self.command_type == 0x0002:
self.handle_command_0002()
elif self.command_type == 0x0003:
self.handle_command_0003()
elif self.command_type == 0x0011:
self.handle_command_0011()
# 可以继续添加其他指令类型的处理函数
def handle_command_0011(self):
# 中止操作
# 处理指令类型为0x0011的指令
if int.from_bytes(self.payload,byteorder='little'):
print(f'中止操作成功:{self.payload}')
else:
print(f'中止操作失败')
def handle_command_0000(self):
#Command_Begin指令
# 处理指令类型为0x0000的指令
pass
def handle_command_0001(self):
# 处理指令类型为0x00的指令
# Command_End指令
pass
def handle_command_0002(self):
pass
def handle_command_0004(self):
pass
def handle_command_0003(self):
pass
我们要给设备发送指令时,需要根据相关的协议填写具体的载负荷内容(载负荷也可以为空),然后通过组包完成之后,再发送给设备
class CommandBuilder:
def __init__(self):
self.header = b'\x5a\x55'
self.footer = b'\x6a\x69'
self.port = b'\x0d'
def build_oem_read_command(self,register_address,register_value):
"""
:param register_address: 寄存器地址2byte
:param register_value: 寄存器值4byte
:return: OEM寄存器读取指令
"""
def build_oem_write_command(self, register_address, register_value):
"""
Host发送获取OEM寄存器写入指令
HOST_PACKET_TYPE_OEM_WRITE_COMMAND
:param register_address: 寄存器地址2byte
:param register_value:寄存器值4byte
:return:OEM寄存器写入指令
"""
oem_write_command = b'\x50\x54'
# construct payload
payload = register_address.to_bytes(2, byteorder='little')
payload += register_value.to_bytes(4, byteorder='little')
oem_write_command_len = b'\x00\x0c'
# construct full command
command = self.header
command += oem_write_command_len[::-1]
command += self.port
command += oem_write_command[::-1] # command type little
command += payload
# calculate checksum
checksum = self._calculate_checksum(command)
command += checksum.to_bytes(1, byteorder='little')
command += self.footer
return command
def build_abort_command(self):
"""
操作中止:HOST_PACKET_TYPE_ABORT_COMMAND
:return:操作中止指令
"""
# construct payload
payload = b''
abort_len = b'\x00\x06'
abort_command = b'\x00\x03'
# construct full command
command = self.header
command += abort_len[::-1]
command += self.port
command += abort_command[::-1] # command type little
command += payload
# calculate checksum
checksum = self._calculate_checksum(command)
command += checksum.to_bytes(1, byteorder='little')
command += self.footer
return command
def _calculate_checksum(self, payload):
checksum = 0
for byte in payload:
checksum += byte
return checksum % 0xff
当然这只是粗略的框架,如果需要实际运用就需要根据实际的协议去填充这里面的信息或者更改,对于去查看设备的寄存器的值,我认为是相当有效的,因为在使用过程中,客户可能对于某个寄存器的错误设置,导致设备的性能下降,在排查时,可能因此会误导排查的方向,在这基础上,可以通过该框架去把一个初始设备的寄存器的值进行读取保存,后期有问题时,可以通过自动化测试进行寄存器排查问题因素,提升效率