背景
公司业务处于上升期,但是服务端却 low 的像个 demo,于是乎重构服务端开始了;
关于测试,测试这个行业在大多数互联网公司比较失败,做测试的应该都有这种感觉。但是我感觉我很幸运,因为 CTO 很重视测试,他的口头禅就是不可测试的程序都是不可靠的,所以我们公司的所有程序都会有配套的测试工具。这篇文章中的工具就是专门测试服务端而模拟的客户端 TSP。
业务需求
重构服务端,达到接入百万客户端级别
实现原理
- 服务端监听 3 个端口,这三个端口全部是设备发起
- tsp 模拟客户端连接服务端逻辑,从而达到设备、用户上线
- 采用进程池嵌套线程池方式,达到高并发的目的
- 代码中统计发包时间平均值、收包时间平均值、收包字节大小平均值、收包错误率;
- 服务端采集硬件配置、cpu 负载、内存负载、磁盘 io 等等
具体实现
1.构建主流程,完成基于 socket 请求的收发包
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
from socket import socket,AF_INET,SOCK_STREAM
import struct
from log import log
log=log()
class socket_main:
def __init__(self,who,conn_data):
self.conn_data = conn_data
self.who = who
self.__conn(conn_data)
def __conn(self,addr):
try:
log.debug('{} _conn data:{}'.format(self.who,addr))
self.tcpCliSock = socket(AF_INET, SOCK_STREAM)
self.tcpCliSock.connect(addr)
self.tcpCliSock.setblocking(1)
self.tcpCliSock.settimeout(1)
# self.tcpCliSock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,8192)
log.debug('{} socket data:{}'.format(self.who,'success'))
except BaseException as msg:
log.error('{} socket conn_result:{}'.format(self.who,msg))
def send(self,data):
try:
try:
result = self.tcpCliSock.send(data)
except BaseException as msg:
# self.__conn(self.conn_data)
# result = self.tcpCliSock.send(data)
log.error(msg)
raise msg
log.debug('{} socket send_len:{}'.format(self.who, result))
return result
except BaseException as msg:
raise log.error('{} socket send_error:{}'.format(self.who,msg))
def recv(self,size,code):
result =''
try:
try:
result = self.tcpCliSock.recv(int(size))
while len(result)<int(size):
result +=self.tcpCliSock.recv(int(size)-len(result))
except:
pass
if not result:
log.error('{} socket recv_result ERROR:result is null'.format(self.who))
log.debug('{} socket recv_result:{}'.format(self.who, len(result)))
data_struct = struct.unpack(code, result)
log.debug('{} socket recv_result_unpack:{}'.format(self.who, data_struct))
return data_struct
except BaseException as msg:
log.error('{} socket recv_error:{}'.format(self.who,msg))
raise msg
def only_recv(self,size):
try:
result = self.tcpCliSock.recv(int(size))
while len(result) < int(size):
result += self.tcpCliSock.recv(int(size) - len(result))
log.debug('{} socket only_recv_result_len:{}'.format(self.who, len(result)))
log.debug('only_recv:{}'.format(result))
return result
except BaseException as msg:
log.error('{} socket recv_error:{}'.format(self.who,msg))
raise msg
def close(self):
try:
log.debug('{} socket has been closed'.format(self.who))
self.tcpCliSock.close()
except BaseException as msg:
log.debug('{} socket close_error:{}'.format(self.who,msg))
- 建立进程池、线程池套餐
def main():
print '业务代码'
class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args
def run(self):
apply(self.func, self.args)
def main_thread():
global sn
threads = []
nloops = xrange(thread_num)# thread_num并发线程数
for i in nloops:
mac, mac_real, sn = getMacSn()
t = MyThread(main, (mac,mac_real,sn))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
if __name__=='__main__':
result = ''
pool = multiprocessing.Pool(processes=proc)# processes进程池数量
log.info("main process(%d) running..." % os.getpid())
for i in xrange(proc_num):# proc_num 并发进程数量
result = pool.apply_async(main_thread)
pool.close()
pool.join()
if not result.successful():
log.error('主进程异常:{}'.format(result.successful()))
else:
log.info('goodbye:主进程({})执行完毕'.format(os.getpid()))
- 通过在业务代码中计算统计参数,然后放在内存中累计计算,输出到指定级别日志中,用到的主要方法是通过设置全局变量进行统一计算:
class globalMap:
# 拼装成字典构造全局变量 借鉴map 包含变量的增删改查
map = {}
def set_map(self, key, value):
if(isinstance(value,dict)):
value = json.dumps(value)
self.map[key] = value
log.debug(key + ":" + str(value))
def set(self, **keys):
try:
for key_, value_ in keys.items():
self.map[key_] = str(value_)
log.debug(key_+":"+str(value_))
except BaseException as msg:
log.error(msg)
raise msg
def del_map(self, key):
try:
del self.map[key]
return self.map
except KeyError:
log.error("key:'" + str(key) + "' 不存在")
def get(self,*args):
try:
dic = {}
for key in args:
if len(args)==1:
dic = self.map[key]
log.debug(key+":"+str(dic))
elif len(args)==1 and args[0]=='all':
dic = self.map
else:
dic[key]=self.map[key]
return dic
except KeyError:
log.warning("key:'" + str(key) + "' 不存在")
return 'Null_'
总结
多进程的方式和多线程进行对比,虽然 Python 全局锁的限制导致线程有点瑕疵,但是跟进程比起并发,还是能甩进程几条街的。通过这种方式进行高密度业务逻辑操作,可以很轻松找到服务端瓶颈。
声明:看到过的好文章,转载分享,若有侵权,请及时联系,速删。
转载文章时务必注明原作者及原始链接,并注明「发表于 TesterHome 」,并不得对作品进行修改。