逐步讲述回包

直接入正题,第三章已经提到了如何收到回包,当前这个主题网络编程都是讲 Tcp,协议请求方式本身会决定特性,我们这里先讲和发包收包顺序有关的。
Tcp 首先是一个双工的,意思就是客户端 C 端发消息给 S 端,S 端也可以发消息给客户端,传输协议数据类型里面传输在网卡那层都是二进制文件流。为啥是流而不是包可以回顾之前的,也可以选择死记硬背。
那么 C 端是不是发给 S 端消息,S 端一定是要回复的呢,这个是未必的。因为在协议请求/传输方式还会传输协议数据类型,这个类型和业务是直接有关的,大体可以分为需要应答和不需要应答。不需要应答的最常见的就是心跳包。这里为啥要叫包而不是叫心跳流呢,只能说是一种通用的叫法。
当客户端链接上服务后,每隔约一段时间,往服务器固定传输一段信息,就和人的心电图一样,证实当前客户端是活跃的。如果不活跃服务器会把客户端对象断开(Tcp 挥手),这里不活跃一般是指多少次没有收到,才会判断不活跃。
断开的好处,这里可以这样记忆,每个 socket 链接是一定有成本的,保持链接状态也一样,每个 socket 做一些事会产生内存变化(这里不考虑软件缓存,硬件的几级缓存),如果在把一些数据存储到数据库,就会把内存数据交换到数据库或者数据库缓存,交互过程也会产生内存碎片和数据库 IO 开销。只是是活跃的,就会根据这个 socket 做得事情产生一系列的非用户态的一些开销,所以有必要剔除不活跃的请求。
比如 30 分钟没有获得经验/多少分钟没有移动(移动也会产生移动数据包,这个数据包会逐条同步给他周围的活跃用户),然后就会进入挂机状态,这种其实也是类似把 fb 标记为了不活跃 (leave = no alive),不活跃就只需要定期同步一次。
如果是完全下线 (close),这里撇开业务,完全下线也会自动挂机获取经验。
以上主要慢慢展开客户端发了 3 次 100 个字节后等待几百 ms,但是服务器只回了一组包,这个是 Tcp 独有黏包的问题。
但是发现这样跨度有点大,例子比较基础,会先来通过一个 Tcp 心跳的例子(分四,五章节)来慢慢往后延伸,反正最终都会讲清楚的。

心跳学习和设计

写个例子来做一些直观性的推理和设计,例子也会结合之前的一些知识点。
心跳信息:json 字串 # 实战里面是一个 bytes 形式 这个 butes 会比较小,只是为了让服务器计数客户端是活跃的。
详解 服务器那边记录一个客户端连接句柄的管理字典,链接的客户端对象在服务器那边会是一个"fd",是否活跃会是一个字段"status":"alive",""leave","close" 分别代表 活跃,非活跃暂离,关闭。

服务器那边进行开发设计,支持状态为一个双向的顺序 ("alive"<-->"leave"<-->"close")

关于 status:"alive"状态条件 客户端来说是默认的,没在 socket.error 那层被拦截掉,所以链接上了,链接上分为地址正确,写法正确,防火墙允许 (这里测试用不上这个)

需要服务器开发:
1.socket 服务器程序
2.常规接收包
3.判断 4 个字节后的数据在反序列化 json.loads。服务器比客户端多一个 bind 后开启监听的功能,服务器支持多个客户端。
4.匹配功能。服务器收到客户端信息后会进行匹配,有一个游标(第 5 章完成该功能)。
游标比如设置为 3,阈值是 5,多少秒收到一次消息就-1,多少秒没有收到消息就 +1.阈值达到一次 5 计数一次,达到 3 次阈值,修改当前客户端链接管理字典为"leave"。达到阈值小于 3 次那么还是 alive。
5.会用一个 user 实例对象列表去管理添加模拟链接进来的客户端。每 10 秒加一个,客户端是模拟假的,端口号 +1。
需要客户端开发的:
1.socket client 程序
2.Json 发包 + 数据结构
3.间隔每 10 秒发一次。

关于 status:"leave"状态:记录保持 30 分钟(代码会写 30 秒)会到下个状态"close",如果出现心跳稳定多次,没有触发到阈值 5,会回退到"alive"。

关于到达 status:"close"服务器会把客户端踢掉,在 user 的列表管理里面,服务器会存储客户端的 ip:端口。

心跳客户端前置

先写这个例子 只有"alive"状态条件,没有切换条件的。根据过去例子,我们先需要定义一个网络层的数据结构,这次先不用 protobuff。
协议传输用 Tcp 使用通用性高的 struct,传输数据结构为 Json
Tcp 数据结构为包头 4 个字节,4 个字节为里面塞包体长度,那个整个包长度也就是 4 个字节(固定的)+ 包头 4 个字节的取值(动态的包体长度)。
Json C2S {"ip":传递本地 Ip,"status":上面讲的状态使用默认的,"pid":用于客户端管理自己的} pid 这里主要用于自己杀自己进程
这里需要补一个知识,第三章未讲完的,就是如果 4 个字节包头长度 + 客户端传递对象是如何写的

# ip_add给服务器的地址
ip_addr = socket.gethostbyname(socket.gethostname())
# 客户端本地自己管理自己用的pid
pid = os.getpid()
# 状态切换用的列表,列表有序的,通过idx作为状态机索引,默认是alive
status = ["alive", "leave", "close"]
idx = 0
data = {'ip': ip_addr, 'status': status[idx], 'pid': pid}
# data的长度,这里有str只是为了下面+使用,int->str不影响下面结果
pack_len = str(len(data))
print(pack_len) # 输出为3
# i是4个字节 +pack_len struct.calcsize是看fmt区域的
client_len = struct.calcsize("!i" + pack_len + 's')
print(client_len) # 输出为4+3

注意看注释,动态计算长度就这么出来了,但是注意了,struct.pack 压包动态化是面向包头里面有几个对象,如果只有一个对象传入 json 输出应该是这样的
本次代码关于压包实例

buffer = json.dumps(data)
print(len(buffer))
packet_head = struct.pack("!i",len(buffer))
buffer_client = packet_head+buffer.encode("utf-8")
print(buffer_client) # b'\x00\x00\x008{"ip": "192.168.1.105", "status": "alive", "pid": 26464}'

心跳客户端

只包含和服务器进行通信,发送 Tcp-stuct,Json,把 json 数据做为心跳包发送了,注意实际心跳的确会是固定的数据,但不会传入这个。
服务器只是解析字符串那样去判断心跳是哪个客户端发的。状态切换功能下个文章再写。

import datetime
import socket, sys, os
import time, json,struct

class HeartBeatClient:
    interval = 10  #心跳发送间隔

    def __init__(self, addr: tuple):
        try:
            self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client.connect(addr)
        except socket.error as e:
            print("Error creating socket: %s" % e)
            sys.exit()

    def encode_json(self, conn,data:dict):
        """发包"""
        buffer = json.dumps(data)
        print(f"当前客户端发出去包长度{len(buffer)}")
        packet_head = struct.pack("!i", len(buffer))
        conn.send(packet_head + buffer.encode("utf-8"))

    def loop_client(self, conn: socket.socket):
        """
        循环客户端
        :param conn:
        :return:
        """
        status = ["alive", "leave", "close"]  #状态列表有序
        idx = 0 #状态索引
        while True:
            ip_addr = socket.gethostbyname(socket.gethostname())
            pid = os.getpid()
            data = {"ip": ip_addr, "status": status[idx], "pid": pid}
            buffer = json.dumps(data)
            try:
                self.encode_json(conn,buffer)
            except socket.error:
                print("client send failed")
                break
            else:
                now =datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                print({"time":now,"fd":os.getpid(),"status":status[idx]})
                time.sleep(self.interval)
        self.client_close(conn)

    def client_close(self,conn):
        if conn:
            conn.close()


if __name__ == '__main__':
    client = HeartBeatClient(("127.0.0.1", 14000))
    client.loop_client(client.client)

加固 strcut 传递使用的例子,建议手敲代码。

心跳服务器

主要是对应这次客户端编写的心跳服务器功能,注意调式是先启动服务器,在启动客户端文件。
为了模拟 fd_port 自增 1,等于好多个客户端连接入,这里如果不熟悉 Threading 的,需要稍微自学下。

import socket
import json
from threading import Thread
import ast

class HeartBeatServer:

    def __init__(self, addr: tuple, max: int):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(addr)
        # 最大支持max个链接数
        self.sock.listen(max)
        self.user = []

    def encode_buffer(self, conn: socket.socket, data: str or bytes or dict):
        """
        发送数据
        :param conn: 客户端链接到服务器的句柄
        :param data: 数据
        :return:
        """
        if isinstance(data, str):
            data = data.encode("utf-8")
        elif isinstance(data, dict):
            data = str(data).encode("utf-8")
        conn.send(str(data).encode("utf-8"))

    def match_data(self,recv)->bool:
        """
        匹配反序列化后的数据
        :param recv:
        :return:
        """
        if isinstance(recv,dict):
            return recv.get("ip") and recv.get("status")

    def loop_thread(self, conn: socket.socket, fd_port: int):
        """
        服务器是只判断客户端的包头是否合法。
        :return:
        """
        head_len = 4
        self.user.append("192.168.1.105:88888")
        idx = 1
        while True:
            try:
                # 不停收包
                data = conn.recv(1024 * 4)
                if len(data) > head_len:
                    recv = json.loads(data[4:])
                    recv = ast.literal_eval(recv)
                    if not self.match_data(recv):
                        # pop()列表丢出去并且返回丢出的数据,默认是最后一个
                        print(f"client fd addr:{self.user.pop()} Connected over!")
                        break
                    # 为了模拟fd_port自增1,等于好多个客户端连接入
                    self.user.append(f"{recv.get('ip')}:{fd_port + idx}")
                    print(f"当前服务器管理用户-->{self.user}")
                else:
                    print(f"client fd addr:{self.user[-1]}数据不合法")
            except socket.error:
                print(f"client fd addr:{self.user.pop()} Connected over!")
                break
            else:
                idx += 1
        conn.close()

    def start(self):
        """
        服务启动
        :return:
        """
        while True:
            conn, addr = self.sock.accept()
            t = Thread(target=self.loop_thread, args=(conn, addr[1]))
            t.start()


if __name__ == '__main__':
    server = HeartBeatServer(("127.0.0.1", 14000), 100)
    server.start()

这里有一个重要概念
这里为啥不能用 self.sock 是因为 self.sock 是服务器本身的句柄
conn 是服务器收到客户端对象的句柄,所以这个 conn 才是被管理的,当客户端断开时,在服务器通过 conn 进行挥手
如果是 self.sock 服务就关闭了。

结尾

第五章也写了一部分草稿中,因为这次写得比较快,所以中途看官和使用者有啥问题欢迎留言啊。


↙↙↙阅读原文可查看相关链接,并与作者交流