某个城市对接,需要我们支持 MQTT 协议,为此,需要开发一个 MOCK 服务来满足当前需求,将 MOCK 服务作为中间挡板,来模拟政府端订阅消息进行功能验证。MOCK 脚本需要部署在 AWS LINUX 平台,方便开发和测试自测使用。
sudo yum install gcc gcc-c++ libstdc++-devel
sudo yum install openssl-devel -y
sudo yum install c-ares-devel -y
sudo yum install uuid-devel -y
sudo yum install libuuid-devel -y
// 通过rpm -qa XXX(依赖名) 检查是否已安装过
// 下载
wget https://mosquitto.org/files/source/mosquitto-2.0.8.tar.gz
// 解压
tar -zxvf mosquitto-2.0.8.tar.gz
// 编译、安装
cd mosquitto-2.0.8
make
sudo make install
// 拷贝配置
cd /etc/mosquitto
sudo cp mosquitto.conf.example mosquitto.conf
编译时,若提示 fatal error: cjson/cJSON.h: No such file or directory,需要安装 cJSON,然后重新安装 mosquitto。
yum install cmake
yum -y install git -- 若之前未安装git需执行此步骤,通过git --version验证是否安装
cd /usr/local/
git clone https://github.com/DaveGamble/cJSON.git
cd cJSON/
mkdir build
cd build/
cmake ..
make
make install
echo "/usr/local/lib64" >> /etc/ld.so.conf
/sbin/ldconfig
sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
sudo ldconfig
若不添加软连接,发布、订阅消息时会提示"error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory"。
// 创建用户、用户组,否则启动时,提示"Unable to drop privileges to 'mosquitto' ..."
sudo groupadd mosquitto
sudo useradd -g mosquitto mosquitto
sudo chown -R mosquitto:mosquitto /etc/mosquitto/
mosquitto -c /etc/mosquitto/mosquitto.conf -d
# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Author : SKS
@Contact : SKS@SKS.com
@File : mockHTTPServer.py
@Create Time: 2022-10-25 9:22
@Description: 模拟政府端订阅MQTT Server消息
"""
import paho.mqtt.client as mqtt
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
import threading
import json
import time
import os
import queue
import requests
from logger import Logger
import datetime
from hashlib import md5
log = Logger()
class MQTTMockClient():
def __init__(self, host, port, topic):
self.client = mqtt.Client()
self.client.on_message = self.handle_message
self.client.connect(host, port)
# 使用传递进来的topic参数订阅主题
self.client.subscribe(topic)
self.client.loop_start()
def handle_message(self, client, userdata, msg):
# client 和 userdata 参数在这里不使用,但它们由 MQTT 客户端库调用时必需
print(f"<客户端> 收到MQTT主题为{msg.topic}的数据: {msg.payload.decode()}")
self.savepayload(msg.payload)
def savepayload(self, payload):
# 创建队列
global msgQueue
msgQueue = queue.Queue()
# 最大暂存最新的200条消息
while msgQueue.qsize() > 200:
msgQueue.get()
msgQueue.put(payload)
print('<客户端> 暂存数据, 数据是 %s' % payload)
log.info('After save payload, New queue size is %s, saved payload is %s' % (msgQueue.qsize(), str(payload)))
if __name__ == '__main__':
mqtt_host = '127.0.0.1'
mqtt_port = 1883
# 创建实例时动态传入主题
mock_mqtt_client = MQTTMockClient(mqtt_host, mqtt_port, "topic")
try:
print("======server main begin======")
print(f"starting MQTT client to listen at {mqtt_host}:{mqtt_port}")
while True:
time.sleep(1) # 主线程保持运行以防止程序退出
except KeyboardInterrupt:
print("MQTT MOCK服务已手动停止")
except Exception as e:
print(f"发生未知错误: {str(e)}")
finally:
# 确保在任何情况下都执行清理操作
mock_mqtt_client.client.disconnect()
mock_mqtt_client.client.loop_stop()
print("MQTT MOCK服务已停止")
https://blog.csdn.net/sy201707/article/details/126450432
https://zhuanlan.zhihu.com/p/164930347
https://www.jianshu.com/p/1a9159e47a02