本次教程举例模型:
工作队列也叫任务队列,核心思路就是通过异步处理方式,把原本非常耗时的一些任务封装成消息发送到队列,后台的消费者们监听队列消息,从中取出任务并执行。如果同时运行多个工作进程,这些任务在这些进程之间是共享的,即队列中的任务不会固定分配给某个进程,而是由所有空闲进程竞争获取,从而实现任务的动态分配、负载均衡和高效处理。
理解最后一句话:对于一个餐厅点餐系统,顾客可以通过点餐队列下单,后厨多个厨师(工作进程)负责接单做菜。在这个场景中,顾客下单后,订单进入队列,那么对于队列中的这些订单,任何一个空闲的厨师都可以从中取走订单处理,而不是每一个订单必须分配给固定的某个厨师。
举个例子:
- 订单 1 番茄炒蛋被厨师 A 接单
- 订单 2 辣椒炒肉被厨师 B 接单
- 订单 3 炒青菜可能又被 A 接单(假设 A 做完了番茄炒蛋空闲了)
所以这里共享的本质是多个厨师共同竞相接单,也就是多个工作进程并行竞争队列中的任务,任务是根据进程的忙闲状态动态分配的。
在之前的例子中,发送的是一个简单的字符串hello world
消息。这次发送代表复杂任务的字符串消息,通过time.sleep()
函数,让这些任务 “看起来很忙 “,通过简单的字符串消息模拟实际的复杂场景。
其中用字符串中的.
的数量来代表任务复杂程度,每个点代表一秒工作量。
新建一个new_task.py
文件,在原来send.py
的基础上,对发送消息的部分进行修改:
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(f" [x] Sent {message}")
新建一个worker.py
文件,同样在原来的receive.py
基础上,对回调函数进行一些修改:为消息中的每一个点都加上模拟一秒的工作量,回调函数的作用是从队列中取出来消息并执行任务。(比如取出来的消息是 hello world..,那么执行任务的时候就要等待两秒)
import time
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
使用任务队列的好处之一就是可以轻松实现任务并行。如果积累了大量待处理的任务, 那么这时候提高进程数量就可以轻松实现扩展。
为了模拟两个不同的进程,我们打开两个终端,同时运行worker.py
,再打开一个终端运行我们的生产者new_task.py
在生产者窗口中,执行:
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
检查两个消费者的消息接收情况:
上述我们使用的是默认的分发模式,默认情况下,RabbitMQ 会按照顺序把消息发送给下一个消费者,所以平均下来每一个进程接收的消息数量应该是相同的,这种方式就是轮询
问题:当消息成功被消费者接收,那么该条消息就会被立即标记为待删除,但是此时我们无法知道这项任务是否已经被成功执行完毕。
风险:如果遇到复杂任务,执行一条任务可能需要耗时很久,在这个过程中,如果执行该任务的进程服务挂了,那么这条消息就被浪费掉了。
思路:如果一条任务被进程认领并执行,此时如果该进程未执行完任务就终止,那么这条任务可以继续被另一个进程认领并执行。
实现:RabbitMQ 支持消息确认策略。
auto_ack=true
是关闭了手动确认)继续改造call_back()
函数验证 RabbitMQ 的消息确认功能:
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.') )
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
:::info
坑:如果漏掉了basic_ack
,假如服务执行任务的时候挂了,消息仍然会在队列中,在被重新分配之前,会一直占用内存,这样就会有一个隐患,如果等待被消费的消息越来越多,会一直占用内存,造成很大的内存压力。
:::
windows 中查看积压消息的命令:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
验证:
同样打开两个终端分别运行新的消费者,接着发送一条耗时十秒钟的任务,当其中一个消费者接收任务之后,处理完成之前终止进程,此时 RabbitMQ 会把消息自动分配给另一个消费者:
Message durability
前边的消息确认是为了确保消费者如果挂了,任务不会丢失,但是如果 RabbitMQ 服务器挂了,任务其实还是会丢失,此时 RabbitMQ 会忘记队列和消息,所以需要保证队列和消息都能够被标记为持久化。
首先确保队列持久化,使用的参数是durable=True
,但是如果在原来的代码上这样改:
channel.queue_declare(queue='hello', durable=True)
还是不生效的,因为之前已经创建了一个非持久化的队列hello
,RabbitMQ 不允许对一个已经存在的队列重定义,会报错,所以我们重新声明一个新的队列task_queue
,并且将其持久化:
channel.queue_declare(queue='task_queue', durable=True)
注意生产者和消费者的代码都要改
消息持久化通过修改生产者的参数delivery_mode
实现:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = pika.DeliveryMode.Persistent
))
将消息标记为持久化并不能完全保证消息不会丢失。尽管它会告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接收到消息但尚未保存的这段时间内,仍存在一个短暂的窗口期。此外,RabbitMQ 不会对每条消息都执行 fsync(2) 操作——消息可能只是被保存到缓存中,而没有真正写入磁盘。这种持久化保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更强的保证,那么可以使用发布者确认。
首先不改变原来的代码,生产者发送一条消息后,被消费者接收,在消费者未处理完之前,重启 RabbitMQ 服务,此时会发现队列已经丢失:
当重启 RabbitMQ 服务时,消费者会报错,重启消费者会发现虽然重新创建了队列,但是没有接收到任何消息,说明消息已经丢失了:
现在修改代码,实现消息和队列持久化:
import sys
import pika
# 创建连接和频道
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
)
)
print(f" [x] Sent {message}")
connection.close()
import pika, sys, os, time
def main():
# 创建连接和频道
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列(重复创建队列是幂等的)
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='task_queue',
# auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
重复上边的操作:
使用上述代码重新测试之后,就会发现当重启了 RabbitMQ,队列仍然存在,同样存在一条未消费的消息:
此时重启消费者,会发现可以成功接收消息: