官方文档链接:https://www.rabbitmq.com/tutorials
下方以官方 Python 版本教程为例
前置准备:已经安装了 rabbitMQ(默认本地 localhost 运行,且端口为 5672)
RabbitMQ 是一个消息代理,即 message broker,用途就是接收和分发消息。可以把它类比为邮局:当你把一封信放到邮箱,那么邮递员最终就会把信件送到指定的接收人。RabbitMQ 扮演的就是邮箱、邮局、邮递员的角色。
当然 RabbitMQ 处理的不是纸质信件,它接收、存储、分发的是二进制的数据。
学习 RabbitMQ 和消息传送总的来说需要了解一些术语:
python -m pip install pika --upgrade
下面的例子中,将用两段代码分别实现生产者和消费者之间最简单的一种消息传送模式:
创建send.py
,用来编写生产者代码
其中 host 如果不是本地 IP 根据自己的实际配置修改即可
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
通过第一步已经跟代理服务器创建了连接,根据上述模型,我们像队列发送消息,所以先创建一个名为hello
的队列
channel.queue_declare(queue='hello')
之前的 RabbitMQ 管理后台的使用教程中,介绍到完整的消息传送模型是生产者->交换机->队列->消费者,这里我们只做最简单的消息收发测试,所以不用指定交换机,exchange
使用空串,表示使用默认交换机
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
通过上述操作,可以看出一个标准生产者的流程非常类似 Python 操作数据库的流程
把二者的完整代码放在一起对比:
import pika
# 创建连接和频道
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='hello')
# 发送消息,exchange为空串,表示使用默认的exchange, routing_key为队列名称, body为发送的消息内容
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
import mysql.connector
# 创建连接
connection = mysql.connector.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database'
)
# 创建游标
cursor = connection.cursor()
# 定义插入数据的SQL语句
sql = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
values = ('value1', 'value2')
# 执行插入操作
cursor.execute(sql, values)
# 提交事务
connection.commit()
# 关闭游标和连接
cursor.close()
connection.close()
创建一个 receive.py 来实现简单的消费者代码,其中第一步建立连接和生产者的代码完全一样
这里官方建议对于消费者最好也要声明队列,这里的声明是为了确保队列存在,因为我们不知道实际上是先运行了生产者还是消费者,当然此处创建队列是幂等的,不会发生重复创建的问题
channel.queue_declare(queue='hello')
消费者从队列接收消息,要稍复杂一点。此处的实现是通过向队列订阅一个回调函数实现,每当接收一条消息,Pika 这个库就会调用回调函数,在官方 Demo 中,每次回调会打印一条消息:
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
接下来告诉 RabbitMQ 从哪个队列中监听消息,并且指明当收到消息时回调哪个函数来处理消息
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
创建一个无线循环的函数,保持监听队列消息,并且监听用户关闭程序的行为,允许用户通过键入命令主动关闭程序
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
step1:执行消费者代码,控制台会打印开始语句
step2:执行生产者代码,发送消息
顺利的话,就可以看到消费者已经成功收到了消息并且调用了callback()
函数打印了消息体
以上就是一个最简单的使用 Python 通过 RabbitMQ 队列分发消息的案例。