TCP 百万并发 数据连接测试 python+locust

过程笔记和总结

尝试一、locust 测试百万 Tcp 并发

另一种方式是使用 jmeter

基础环境

虚拟机:Centos7.2

jdk 1.8

虚拟机: Centos7.2

python : 3.7.3 Anaconda3

locust : 0.14.5


基础知识:


Q&A

客户端调优

  1. 客户端端口限制导致的 tcp 连接不超过 4000,通过修改打开文件限制来增加可使用的端口数量,注意修改后需要重启生效,具体配置参考
--   /etc/sysctl.conf 

# 系统级别最大打开文件
fs.file-max = 100000

# 单用户进程最大文件打开数
fs.nr_open = 100000

# 是否重用, 快速回收time-wait状态的tcp连接
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1

# 单个tcp连接最大缓存byte单位
net.core.optmem_max = 8192

# 可处理最多孤儿socket数量,超过则警告,每个孤儿socket占用64KB空间
net.ipv4.tcp_max_orphans = 10240

# 最多允许time-wait数量
net.ipv4.tcp_max_tw_buckets = 10240

# 从客户端发起的端口范围,默认是32768 61000,则只能发起2w多连接,改为一下值,可一个IP可发起差不多6.4w连接。
net.ipv4.ip_local_port_range = 1024 65535
--  /etc/security/limits.conf 

# 最大不能超过fs.nr_open值, 分别为单用户进程最大文件打开数,soft指软性限制,hard指硬性限制
* soft nofile 100000
* hard nofile 100000
root soft nofile 100000
root hard nofile 100000

服务端调优

  1. 服务端需要接受的链接数需要超过 100w,根据需要修改虚拟机配置;

查看 ulimit-n

soft 和 hard 为两种限制方式,其中 soft 表示警告的限制,hard 表示真正限制,nofile 表示打开的最大文件数。整体表示任何用户一个进程能够打开 1000000 个文件。注意语句签名有 号 表示任何用户

--  /etc/security/limits.conf
# 文末加两行
*hard nofile 1000000
soft nofile 1000000

shutdown -r now 重启并查看生效

sudo vi /etc/sysctl.conf

# 在文末增加
fs.file-max=1000000

让文件生效

sudo sysctl -p


Locust 分布式说明

因单台 node 最大 tcp 连接 6w,故测试百万连接需要对 Locust 做分布式处理;

Locust 分布式注意点:

每台机器上都要有 client 文件,且必须相同

slave 虚拟机环境、Locust 版本、python 版本必须相同,否则预期出现未知错误

slave 节点较多时需要并行管理工具 pdsh 的使用吗,同时启动多个 slave 节点,减少人工启停时间成本

遇到的问题

locust 分布式执行问题

单机启动可发送 60000,两台 120000

集群启动发送 60000 服务端 ip 无法超过 60000 限制

已解决:

locust master 节点不作为负载机进行发送数据,slave 一个节点最多可以增加 6w 个长连接(linux 共有端口 65535, 一般 1-1023 端口是系统保留的,1024-65535 是用户使用的,可用端口有 6w 多个)

运维管理工具 pdsh

作用:快速分发命令,启动 slave 节点

1、 配置 ssh 信任

修改主机名称 vim /etc/hostname

master 可设为 node0,slave 可设为 node1~16

2、 安装 pdsh

tar -jxvf **.tag.bz2
./configure --with-ssh --with-rsh --with-mrsh--with-mqshell --with-qshell --with-dshgroups--with-machines=/etc/pdsh/machines --without-pam
make  
make install

客户端代码

## locustfile.py
# coding:utf-8
import time
import random
# from socket import socket, AF_INET, SOCK_STREAM
import socket
from locust import Locust, TaskSet, events, task


class TcpSocketClient(socket.socket):
    # locust tcp client
    # author: ShenDu
    def __init__(self, af_inet, socket_type):
        super(TcpSocketClient, self).__init__(af_inet, socket_type)

    def connect(self, addr):
        start_time = time.time()
        try:
            super(TcpSocketClient, self).connect(addr)
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="tcpsocket", name="connect", response_time=total_time, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="tcpsocket", name="connect", response_time=total_time,
                                        response_length=0)

    def send(self, msg):
        start_time = time.time()
        try:
            super(TcpSocketClient, self).send(msg)
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="tcpsocket", name="send", response_time=total_time, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="tcpsocket", name="send", response_time=total_time,
                                        response_length=0)

    def recv(self, bufsize):
        recv_data = ''
        start_time = time.time()
        try:
            recv_data = super(TcpSocketClient, self).recv(bufsize)
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="tcpsocket", name="recv", response_time=total_time, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="tcpsocket", name="recv", response_time=total_time,
                                        response_length=0)
        return recv_data


class TcpSocketLocust(Locust):
    """
    This is the abstract Locust class which should be subclassed. It provides an TCP socket client
    that can be used to make TCP socket requests that will be tracked in Locust's statistics.
    """
    def __init__(self, *args, **kwargs):

        super(TcpSocketLocust, self).__init__(*args, **kwargs)
        self.client = TcpSocketClient(socket.AF_INET, socket.SOCK_STREAM)
        ADDR = (self.host, self.port)
        self.client.connect(ADDR)


class TcpTestUser(TcpSocketLocust):
    host = "192.168.5.58"
    port = 6667
    min_wait = 100
    max_wait = 1000

    class task_set(TaskSet):
        @task
        def send_data(self):
            data = "7e0200003f000004021895000b00000000000400030158ccaa06cb7" \
                   "9f50095000000001601051654150104000069740202000003020000" \
                   "2504000000002b040000000030010031010b3201467c7e"
            # self.client.send(data.encode("utf-8"))
            self.client.send(bytes.fromhex(data))
            data = self.client.recv(2048).decode("utf-8")
            print(data)


if __name__ == "__main__":
    import os
    os.system("locust -f sendTcp.py")

单节点启动

locust -f locustfile.py

分布式启动

# master执行
locust -f locustfile.py --master 
# slave执行 
locust -f locustfile.py --slave --master-host = 127.0.0.1 
# pdsh 同时启动slave节点, node 可使用ip代替
pdsh -w node[1-16] "locust -f /root/loadtest/locustfile.py --slave --master-host=node0"

python 编写的简易服务端

调试程序使用的服务端代码

#coding: utf-8
from __future__ import print_function
from gevent.server import StreamServer
import gevent

# sleeptime = 60
'''
python 编写的简单tcp socket server,作为服务端测试使用
'''
def handle(socket, address):
    # print(address)
    # data = socket.recv(1024)
    # print(data)
    while True:
        gevent.sleep(sleeptime)
        try:
            socket.send("ok")
        except Exception as e:
            print(e)

if __name__ == "__main__":
    import sys
    port = 80
    if len(sys.argv) > 2:
        port = int(sys.argv[1])
        sleeptime = int(sys.argv[2])
    else:
        print("需要两个参数!!")
        sys.exit(1)
    # default backlog is 256
    server = StreamServer(('0.0.0.0', port), handle, backlog=4096)
    server.serve_forever()

声明:看到过的好文章,转载分享,若有侵权,请及时联系,速删。


↙↙↙阅读原文可查看相关链接,并与作者交流