//引入amqplib这个模块
const amqp = require("amqplib/callback_api");
生产者()
function 生产者(){
var rabbitmq = {
hostname: "",
port: "5672",
username: "",
password: "",
authMechanism: "AMQPLAIN",
pathname: "/",
ssl: {
enabled: false
}
};
var conn = amqp.connect(rabbitmq, function(error1, connection) {
connection.on("error", err => {
生产者();
});
// 创建一个信道
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = "Test";
var msg =
'{"a": "123456"}'
//创建or连上队列
channel.assertQueue(queue, {
durable: true //队列持久化
});
channel.prefetch(1);
//将消息塞入队列
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true //消息持久化
});
console.log("生成者开始发送消息:", queue, msg);
});
//关闭连接,并且退出
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
}
const amqp = require("amqplib/callback_api");
function 消费者() {
// 1. 创建链接对象
var rabbitmq = {
hostname: "",
port: "5672",
username: "admin",
password: "",
authMechanism: "AMQPLAIN",
pathname: "/",
ssl: {
enabled: false
}
};
amqp.connect(rabbitmq, function(error, connection) {
/**
*根据实际业务出发,消费者是要持续监听的,所以这里不直接断开
*一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
* error是个特别的事件,务必要处理的
*报错就直接去重连
*/
connection.on("error", err => {
console.log("断了 " + err.message, err);
test_appType01_registno();
});
console.log("连上啦,开始监听......");
// 2. 获取信道
const channel = connection.createChannel();
// 3. 声明队列名称test
const queueName = "test";
// 4. 创建并连接到队列
channel.assertQueue(queueName, {
durable: true
});
// 5. 从队列里取出消息
channel.consume(queueName, function(msg) {
console.log("消费者接收到的消息:", msg);
channel.ack(msg);
});
// });
//关闭连接,并且退出
// setTimeout(function() {
// connection.close();
// process.exit(0);
// }, 500);
});
}
try {
test_appType01_registno();
} catch (e) {
console.log("发现异常", e);
console.log(e.code);
消费者();
}
module.exports = {
消费者
};