背景

公司业务处于上升期,但是服务端却 low 的像个 demo,于是乎重构服务端开始了;

关于测试,测试这个行业在大多数互联网公司比较失败,做测试的应该都有这种感觉。但是我感觉我很幸运,因为 CTO 很重视测试,他的口头禅就是不可测试的程序都是不可靠的,所以我们公司的所有程序都会有配套的测试工具。这篇文章中的工具就是专门测试服务端而模拟的客户端 TSP。

业务需求

重构服务端,达到接入百万客户端级别

实现原理

  1. 服务端监听 3 个端口,这三个端口全部是设备发起
  2. tsp 模拟客户端连接服务端逻辑,从而达到设备、用户上线
  3. 采用进程池嵌套线程池方式,达到高并发的目的
  4. 代码中统计发包时间平均值、收包时间平均值、收包字节大小平均值、收包错误率;
  5. 服务端采集硬件配置、cpu 负载、内存负载、磁盘 io 等等

具体实现

1.构建主流程,完成基于 socket 请求的收发包

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
from socket import socket,AF_INET,SOCK_STREAM

import struct

from log import log
log=log()
class socket_main:

    def __init__(self,who,conn_data):
        self.conn_data = conn_data
        self.who = who
        self.__conn(conn_data)

    def __conn(self,addr):

        try:
            log.debug('{} _conn data:{}'.format(self.who,addr))
            self.tcpCliSock = socket(AF_INET, SOCK_STREAM)
            self.tcpCliSock.connect(addr)
            self.tcpCliSock.setblocking(1)
            self.tcpCliSock.settimeout(1)
            # self.tcpCliSock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,8192)
            log.debug('{} socket data:{}'.format(self.who,'success'))
        except BaseException as msg:
            log.error('{} socket conn_result:{}'.format(self.who,msg))

    def send(self,data):
        try:
            try:
                result = self.tcpCliSock.send(data)
            except BaseException as msg:
                # self.__conn(self.conn_data)
                # result = self.tcpCliSock.send(data)
                log.error(msg)
                raise msg
            log.debug('{} socket send_len:{}'.format(self.who, result))
            return result
        except BaseException as msg:
            raise log.error('{} socket send_error:{}'.format(self.who,msg))

    def recv(self,size,code):
        result =''
        try:
            try:
                result = self.tcpCliSock.recv(int(size))

                while len(result)<int(size):
                    result +=self.tcpCliSock.recv(int(size)-len(result))
            except:
                pass
            if not result:
               log.error('{} socket recv_result ERROR:result is null'.format(self.who))
            log.debug('{} socket recv_result:{}'.format(self.who, len(result)))
            data_struct = struct.unpack(code, result)
            log.debug('{} socket recv_result_unpack:{}'.format(self.who, data_struct))
            return data_struct
        except BaseException as msg:
            log.error('{} socket recv_error:{}'.format(self.who,msg))
            raise msg

    def only_recv(self,size):
        try:
            result = self.tcpCliSock.recv(int(size))
            while len(result) < int(size):
                result += self.tcpCliSock.recv(int(size) - len(result))
            log.debug('{} socket only_recv_result_len:{}'.format(self.who, len(result)))
            log.debug('only_recv:{}'.format(result))
            return result
        except BaseException as msg:
            log.error('{} socket recv_error:{}'.format(self.who,msg))
            raise msg

    def close(self):
        try:
            log.debug('{} socket has been closed'.format(self.who))
            self.tcpCliSock.close()
        except BaseException as msg:
            log.debug('{} socket close_error:{}'.format(self.who,msg))
  1. 建立进程池、线程池套餐
def main():
  print '业务代码'

class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)

def main_thread():
    global sn
    threads = []
    nloops = xrange(thread_num)# thread_num并发线程数
    for i in nloops:
        mac, mac_real, sn = getMacSn()
        t = MyThread(main, (mac,mac_real,sn))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()


if __name__=='__main__':
    result = ''
    pool = multiprocessing.Pool(processes=proc)# processes进程池数量
    log.info("main process(%d) running..." % os.getpid())
    for i in xrange(proc_num):# proc_num 并发进程数量
        result = pool.apply_async(main_thread)
    pool.close()
    pool.join()

    if not result.successful():
        log.error('主进程异常:{}'.format(result.successful()))
    else:
        log.info('goodbye:主进程({})执行完毕'.format(os.getpid()))
  1. 通过在业务代码中计算统计参数,然后放在内存中累计计算,输出到指定级别日志中,用到的主要方法是通过设置全局变量进行统一计算:
class globalMap:
    # 拼装成字典构造全局变量  借鉴map  包含变量的增删改查
    map = {}

    def set_map(self, key, value):
        if(isinstance(value,dict)):
            value = json.dumps(value)
        self.map[key] = value
        log.debug(key + ":" + str(value))


    def set(self, **keys):
        try:
            for key_, value_ in keys.items():
                self.map[key_] = str(value_)
                log.debug(key_+":"+str(value_))
        except BaseException as msg:
            log.error(msg)
            raise msg

    def del_map(self, key):
        try:
            del self.map[key]
            return self.map
        except KeyError:
            log.error("key:'" + str(key) + "'  不存在")


    def get(self,*args):
        try:
            dic = {}
            for key in args:
                if len(args)==1:
                    dic = self.map[key]
                    log.debug(key+":"+str(dic))
                elif len(args)==1 and args[0]=='all':
                    dic = self.map
                else:
                    dic[key]=self.map[key]
            return dic
        except KeyError:
            log.warning("key:'" + str(key) + "'  不存在")
            return 'Null_'

总结

多进程的方式和多线程进行对比,虽然 Python 全局锁的限制导致线程有点瑕疵,但是跟进程比起并发,还是能甩进程几条街的。通过这种方式进行高密度业务逻辑操作,可以很轻松找到服务端瓶颈。

声明:看到过的好文章,转载分享,若有侵权,请及时联系,速删。


↙↙↙阅读原文可查看相关链接,并与作者交流