简介

最近复习 python 相关知识和学习 httprunner 的源代码,从中学习了很多知识.想着把这些知识穿起来,做一个小的压测工具.

poetry 构建工具

了解 poetry 构建工具,还是从 httprunner 最新版本了解到,使用 poetry 作为构建工具.poetry 工具要比 distutils、setuptools 等工具使用简洁、功能强大.

https://github.com/python-poetry/poetry

安装

curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python3
pip install --user poetry

命令

初始化项目

poetry init

创建新项目

poetry new project_name (项目名字)

查看依赖

poetry show --tree

安装依赖

poetry install

激活虚拟环境

poetry shell

增加安装包

poetry add redis

添加--dev 参数为开发依赖:

poetry add pytest --dev

pyproject.toml

pyproject.toml 文件中维护所有构建项目需要的数据,
例如:版本号、依赖库、命令入口

image

统计代码行数工具

安装

brew install cloc

运行

在项目根目录下

cloc ./ 

统计结果只有不到 100 行

image

程序相关知识

最近也重新复习了一下进程、线程、协程相关知识,这次使用网络库也是基于协程的.

进程

进程是表示资源分配的的基本概念,又是调度运行的基本单位,是系统中的并发执行的单位.

import os
import time
import requests
from random import randint
from multiprocessing import Process
from multiprocessing import Pool


def coding():
    while True:
        print('开始撸代码,PID是%s' % os.getpid())
        time.sleep(randint(1, 3))
        print('写累了,不撸了,PID是%s' % os.getpid())


def play_weixin():
    while True:
        print('玩一会微信,PID是%s' % os.getpid())
        time.sleep(randint(1,2))
        print('不玩微信了,开始撸代码,PID是%s' % os.getpid())

if __name__ == '__main__':
    p1 = Process(target=coding)
    p2 = Process(target=coding)
    # 启动进程
    p1.start()
    # 阻塞进程p1
    p1.join()

    # 启动进程
    p2.start()
    p2.join()

    # 主进程
    while True:
        time.sleep(3)
        print('我是主进程,PID是%s' % os.getpid())       

线程

一个线程只能属于一个进程,但是一个进程可以拥有多个线程.多线程处理就是允许一个进程中在同一时刻执行多个任务.

import time
import threading

class DataCopy(threading.Thread):

    def __init__(self, dbname):
        super(DataCopy, self).__init__()
        self.dbName = dbname

    def run(self):

        print('Thread %s is running' % threading.current_thread().name)
        print('开始备份数据库:%s' % self.dbName)

        time.sleep(5)

        print('数据库%s备份结束' % self.dbName)
        print('Thread %s is ended' % threading.current_thread().name)



thread1 = DataCopy('database1')
thread2 = DataCopy('database2')

thread1.daemon = True
# 当定义子线程为守护线程的话,当主线程结束了,不管子线程是否执行完,都会被直接给暂停掉。默认daemon为False
thread1.start()
# start() 方法是启动一个子线程,线程名就是我们定义的name,或者默认的线程名Thread-1, Thread-2......

thread2.run()
# run() 方法并不启动一个新线程,就是在主线程中调用了一个普通函数而已。
# 线程阻塞
# 在你的子线程没有中止或者运行完之前,你的主线程都不会结束
#thread1.join()

# 线程执行结束的输出提示
print('备份结束')

协程

协程切换任务资源很小,效率高.因为协程适中都在一个线程中,也不涉及切换任务的消耗.

import time

def consumer():
    r = '1xx'
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] 吃鸡翅 %s...' % n)
        time.sleep(1)
        r = '吃完啦,饱饱的了'


def produce(customer):
    # 启动迭代器
    customer.__next__()
    # 设置变量参数为0
    n = 0
    while n < 3:
        n = n + 1
        print('[PRODUCER] 做鸡翅 %s...' % n)
        # 想customer中传递变量n,直接跳到consumer中执行
        r = customer.send(n)
        print('[PRODUCER] 吃鸡翅状态 return: %s' % r)
    # 关闭消费者
    customer.close()

if __name__ == '__main__':
    print('开始协程')
    customer = consumer()
    print(customer)
    produce(customer)
    print('结束协程')    

协程相关拓展

asyncio

asyncio 是是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持.

通过@asyncio.coroutine注解把方法标记协程方法

import asyncio


@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")


# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
print("#####################")
loop.run_until_complete(hello())
loop.close()

async/await

async/await 是 Python 3.5 开始引入了新的语法 async 和 await.

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
print("#####################")
loop.run_until_complete(hello())
loop.close()    

网络库

requests

requests 是一个同步请求网络库,不支持异步.用的比较多,代码就不贴了.

httpx

支持异步请求网络库,也可以使用同步请求

https://pypi.org/project/httpx/

pip3 install httpx

异步请求脚本:

import asyncio
import httpx

async def post_httpx():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://www.example.org/')
        print(response)


loop = asyncio.get_event_loop()
loop.run_until_complete(post_httpx())

aiohttp

也是一个异步网络请求库,同时支持服务端编程和 WebSocket.

安装

https://pypi.org/project/aiohttp/

pip3 install aiohttp

异步请求脚本:

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

脚本开发

解析命令行

可以传入三个参数,并发数、持续时间、url 地址

image

创建 task 任务

根据入参并发数创建 task 任务,传给 looop 执行.

image

发送网络请求

使用 aiohttp 发送网络请求同时把请求挂起.

image

数据统计

使用 asyncio.gather(*tasks) 拿到所有返回数据,使用 numpy 计算 95 分位、99 分位响应耗时.

image

image

测试命令

poetry_run -m poetry_project --thread-count 100 --load-time 1  --load-url http://www.igetcool.com

同时 100 个并发请求,大致使用 3s 左右

image

测试服务

准备自己搭建一套测试环境,这个架构和我们公司的服务架构基本相似.

架构: java + eureka + gateway + app
框架: springclound

eureka 注册中心

把所以微服务注册到 eureka 中,方便后续的服务调度.

把 gateway 和子服务都注册到 eureka 注册中心上.

image

gateway 是网关

gateway 是所有的服务的入口,通过路由转发到具体微服务上.

app 是具体的应用

app 是具体的一个微服务应用,具体业务中有会有 N 个微服务.

测试接口

http://127.0.0.1:8763/hi

代码地址

https://github.com/xinxi1990/backendSpring.git

服务处理请求

想验证服务端是否接收到了这么多请求,可以使用 skywalking 这种服务监控系统,查看服务每秒的 qps

实际在压测的时候,一般会用压测工具的 qps 和服务端 api 的 qps 对比,看压力是否打到的被测应用.

image

项目地址

https://github.com/xinxi1990/poetry_project

小结

通过一个小的迷你项目,学会了如下几点:

  1. poetry 构建工具
  2. 同步、异步、阻塞、非阻塞
  3. 多进程、多线程、协程
  4. 学习多个网络库
  5. 搭建被测应用
  6. 提交代码并且 release 版本
  7. 学习优秀开源框架

最后说一下 locust 工具,locust 是一个用于可扩展的,分布式的,性能测试的,开源的,用 Python 编写框架/工具,它非常容易使用,也非常好学.

在 locust 中使用 gevent,gevent 是一种基于协程的 Python 网络库,它用到 Greenlet 提供的,封装了 libevent 事件循环的高层同步 API.


↙↙↙阅读原文可查看相关链接,并与作者交流