# txt中数据为1,2,3,4,5,6
def get_txt_data():
    with open("user.txt", "r") as f:
        data = f.read()
        return data


# 列表形式数据
s = [1, 2, 3, 4, 5, 6]
s1 = [{"user_name": "张三", "password": 123456},
      {"user_name": "李四", "password": 456789},
      {"user_name": "王五", "password": 123789}]

# 从上下文中取数据
from locust import Locust, TaskSet, task, between
from time import time
from uuid import uuid1


# 返回当前时间戳
def get_time_stamp():
    return str(int(time()))


class MyTaskSet(TaskSet):
    # 该函数不是task任务的,可以添加函数来进行获取token,方便任务来调用
    def get_login_token(self):
        return uuid1()

    # 使用类属性来进行参数传递
    token = None

    # TaskSet相当于下面所有task的大脑
    @task(1)  # 声明任务
    def my_task(self):
        print("执行task1" + get_txt_data())
        MyTaskSet.token = uuid1()

    @task(2)
    def my_task_2(self):
        print("执行task2" + str(s[0])+str(MyTaskSet.token))


class AppUser(Locust):
    weight = 1  # 赋值权重  weight = 10 默认为10
    task_set = MyTaskSet
    wait_time = between(1, 2)
    host = ""  # 域名host
# 两个UserLocust重复调用一个list数据
from locust import HttpLocust, TaskSet, task, Locust, between
import queue
# from multiprocessing import Queue


s=[1,2,3,4,5,6]


class MyTaskSet(TaskSet):
    @task()
    def task_test1(self):
        print(queue_data_test1[0])


class MyTaskSet2(TaskSet):
    @task()
    def task_test2(self):
        print(queue_data_test2[0])


class UserLocust2(Locust):
    task_set = MyTaskSet2
    wait_time = between(3, 10)
    queue_data_test2 =s


class UserLocust(Locust):
    task_set = MyTaskSet
    wait_time = between(3, 10)
    queue_data_test1 = s
# 两个UserLocust调用同一个队列,数据取值不重复;(存在多批次不重复,可定义多个Queue()对象)
from locust import HttpLocust, TaskSet, task, Locust, between
import queue
# from multiprocessing import Queue


def queue_data():
    queue_data = queue.Queue()  # 默认为先进先出  该队列为task_set共享
    # queue.LifoQueue(),后进先出
    for i in range(0, 13):
        queue_data.put_nowait(i)  # put_nowait 不阻塞
    return queue_data


class MyTaskSet(TaskSet):
    @task()
    def task_test1(self):
        print(self.locust.queue_data_test1.get())


class MyTaskSet2(TaskSet):
    @task()
    def task_test2(self):
        print(self.locust.queue_data_test2.get())


class UserLocust2(Locust):
    weight =1
    task_set = MyTaskSet2
    wait_time = between(3, 10)
    queue_data_test2 = queue_data()  # 默认为先进先出  该队列为task_set共享


class UserLocust(Locust):
    weight =3
    task_set = MyTaskSet
    wait_time = between(3, 10)
    queue_data_test1 = queue_data()


↙↙↙阅读原文可查看相关链接,并与作者交流