通用技术 中间件学习—RabbitMQ:hello world

爱老的虎油 · June 17, 2025 · Last by 爱老的虎油 replied at June 17, 2025 · 621 hits

官方文档链接:https://www.rabbitmq.com/tutorials

下方以官方 Python 版本教程为例

第一条消息:Hello World

前置准备:已经安装了 rabbitMQ(默认本地 localhost 运行,且端口为 5672)

RabbitMQ 是一个消息代理,即 message broker,用途就是接收和分发消息。可以把它类比为邮局:当你把一封信放到邮箱,那么邮递员最终就会把信件送到指定的接收人。RabbitMQ 扮演的就是邮箱、邮局、邮递员的角色。

当然 RabbitMQ 处理的不是纸质信件,它接收、存储、分发的是二进制的数据。

学习 RabbitMQ 和消息传送总的来说需要了解一些术语:

  1. 生产者-Producer:消息生成就是发送消息,一个发送消息的程序就称为生产者 Producer
  2. 队列-Queue:队列就是 RabbitMQ 中扮演邮箱的角色。虽然消息流产生在生产者和消费者之间,但是消息的存储只能在队列中,队列可以看作一个大型消息缓冲区,只受主机内存和磁盘大小限制
  3. 消费者-Consumer:消费者接收消息,是一段等着接收消息的程序

环境准备

  1. 安装 Python
  2. 安装 pikapython -m pip install pika --upgrade

下面的例子中,将用两段代码分别实现生产者和消费者之间最简单的一种消息传送模式:

发送消息

创建send.py,用来编写生产者代码

建立连接

其中 host 如果不是本地 IP 根据自己的实际配置修改即可

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

创建队列

通过第一步已经跟代理服务器创建了连接,根据上述模型,我们像队列发送消息,所以先创建一个名为hello的队列

channel.queue_declare(queue='hello')

发送消息

之前的 RabbitMQ 管理后台的使用教程中,介绍到完整的消息传送模型是生产者->交换机->队列->消费者,这里我们只做最简单的消息收发测试,所以不用指定交换机,exchange使用空串,表示使用默认交换机

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

关闭连接

connection.close()

类比 Python 操作 Mysql

通过上述操作,可以看出一个标准生产者的流程非常类似 Python 操作数据库的流程

把二者的完整代码放在一起对比:

import pika
# 创建连接和频道
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='hello')
# 发送消息,exchange为空串,表示使用默认的exchange, routing_key为队列名称, body为发送的消息内容
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
import mysql.connector
# 创建连接
connection = mysql.connector.connect(
    host='localhost',
    user='your_username',
    password='your_password',
    database='your_database'
)

# 创建游标
cursor = connection.cursor()

# 定义插入数据的SQL语句
sql = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
values = ('value1', 'value2')

# 执行插入操作
cursor.execute(sql, values)

# 提交事务
connection.commit()

# 关闭游标和连接
cursor.close()
connection.close()

接收消息

建立连接

创建一个 receive.py 来实现简单的消费者代码,其中第一步建立连接和生产者的代码完全一样

创建队列

这里官方建议对于消费者最好也要声明队列,这里的声明是为了确保队列存在,因为我们不知道实际上是先运行了生产者还是消费者,当然此处创建队列是幂等的,不会发生重复创建的问题

channel.queue_declare(queue='hello')

创建回调函数

消费者从队列接收消息,要稍复杂一点。此处的实现是通过向队列订阅一个回调函数实现,每当接收一条消息,Pika 这个库就会调用回调函数,在官方 Demo 中,每次回调会打印一条消息:

def callback(ch, method, properties, body):
    print(f" [x] Received {body}")

声明监听队列

接下来告诉 RabbitMQ 从哪个队列中监听消息,并且指明当收到消息时回调哪个函数来处理消息

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

保持监听

创建一个无线循环的函数,保持监听队列消息,并且监听用户关闭程序的行为,允许用户通过键入命令主动关闭程序

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

消费者完整代码

import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

测试演练

step1:执行消费者代码,控制台会打印开始语句

step2:执行生产者代码,发送消息

顺利的话,就可以看到消费者已经成功收到了消息并且调用了callback()函数打印了消息体

以上就是一个最简单的使用 Python 通过 RabbitMQ 队列分发消息的案例。

最佳回复

所以这样的中间件怎么测试保障?

共收到 2 条回复 时间 点赞

所以这样的中间件怎么测试保障?

恒温 回复

我没有专门接触过中间件的测试,不过我能想到的,对中间件的测试大概思路就是:

  1. 功能测试:根据具体业务验证生产 - 消费链路(比如有的要求异步任务,有的要求定时任务等等)
  2. 性能/压力测试:我们是零售行业,以这个为例,大促期间需要大量发放权益,就需要验证大流量下的性能跟压力表现
  3. 之前好像在论坛看到过有大佬专门做混沌工程,应该可以注入一些故障,模拟一下服务挂掉、弱网等这种极端情况来看一下系统的容灾能力 实际上应该还是根据业务特性还有具体需求来指定测试计划。 佬可以指正补充一下🤓 🤓
需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up