# txt中数据为1,2,3,4,5,6
def get_txt_data():
with open("user.txt", "r") as f:
data = f.read()
return data
# 列表形式数据
s = [1, 2, 3, 4, 5, 6]
s1 = [{"user_name": "张三", "password": 123456},
{"user_name": "李四", "password": 456789},
{"user_name": "王五", "password": 123789}]
# 从上下文中取数据
from locust import Locust, TaskSet, task, between
from time import time
from uuid import uuid1
# 返回当前时间戳
def get_time_stamp():
return str(int(time()))
class MyTaskSet(TaskSet):
# 该函数不是task任务的,可以添加函数来进行获取token,方便任务来调用
def get_login_token(self):
return uuid1()
# 使用类属性来进行参数传递
token = None
# TaskSet相当于下面所有task的大脑
@task(1) # 声明任务
def my_task(self):
print("执行task1" + get_txt_data())
MyTaskSet.token = uuid1()
@task(2)
def my_task_2(self):
print("执行task2" + str(s[0])+str(MyTaskSet.token))
class AppUser(Locust):
weight = 1 # 赋值权重 weight = 10 默认为10
task_set = MyTaskSet
wait_time = between(1, 2)
host = "" # 域名host
# 两个UserLocust重复调用一个list数据
from locust import HttpLocust, TaskSet, task, Locust, between
import queue
# from multiprocessing import Queue
s=[1,2,3,4,5,6]
class MyTaskSet(TaskSet):
@task()
def task_test1(self):
print(queue_data_test1[0])
class MyTaskSet2(TaskSet):
@task()
def task_test2(self):
print(queue_data_test2[0])
class UserLocust2(Locust):
task_set = MyTaskSet2
wait_time = between(3, 10)
queue_data_test2 =s
class UserLocust(Locust):
task_set = MyTaskSet
wait_time = between(3, 10)
queue_data_test1 = s
# 两个UserLocust调用同一个队列,数据取值不重复;(存在多批次不重复,可定义多个Queue()对象)
from locust import HttpLocust, TaskSet, task, Locust, between
import queue
# from multiprocessing import Queue
def queue_data():
queue_data = queue.Queue() # 默认为先进先出 该队列为task_set共享
# queue.LifoQueue(),后进先出
for i in range(0, 13):
queue_data.put_nowait(i) # put_nowait 不阻塞
return queue_data
class MyTaskSet(TaskSet):
@task()
def task_test1(self):
print(self.locust.queue_data_test1.get())
class MyTaskSet2(TaskSet):
@task()
def task_test2(self):
print(self.locust.queue_data_test2.get())
class UserLocust2(Locust):
weight =1
task_set = MyTaskSet2
wait_time = between(3, 10)
queue_data_test2 = queue_data() # 默认为先进先出 该队列为task_set共享
class UserLocust(Locust):
weight =3
task_set = MyTaskSet
wait_time = between(3, 10)
queue_data_test1 = queue_data()