需要背景

某个城市对接,需要我们支持 MQTT 协议,为此,需要开发一个 MOCK 服务来满足当前需求,将 MOCK 服务作为中间挡板,来模拟政府端订阅消息进行功能验证。MOCK 脚本需要部署在 AWS LINUX 平台,方便开发和测试自测使用。

一、安装依赖

sudo yum install gcc gcc-c++ libstdc++-devel
sudo yum install openssl-devel -y
sudo yum install c-ares-devel -y
sudo yum install uuid-devel -y
sudo yum install libuuid-devel -y

// 通过rpm -qa XXX(依赖名) 检查是否已安装过

二、下载、解压、编译、安装

// 下载
wget https://mosquitto.org/files/source/mosquitto-2.0.8.tar.gz

// 解压
tar -zxvf mosquitto-2.0.8.tar.gz

// 编译安装
cd mosquitto-2.0.8
make
sudo make install

// 拷贝配置
cd /etc/mosquitto
sudo cp mosquitto.conf.example mosquitto.conf

编译时,若提示 fatal error: cjson/cJSON.h: No such file or directory,需要安装 cJSON,然后重新安装 mosquitto。

yum install cmake
yum -y install git    -- 若之前未安装git需执行此步骤通过git --version验证是否安装
cd /usr/local/
git clone https://github.com/DaveGamble/cJSON.git
cd cJSON/
mkdir build
cd build/
cmake ..
make
make install
echo "/usr/local/lib64" >> /etc/ld.so.conf
/sbin/ldconfig

三、创建软连接

sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
sudo ldconfig

若不添加软连接,发布、订阅消息时会提示"error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory"。

四、启动服务

// 创建用户用户组否则启动时提示"Unable to drop privileges to 'mosquitto' ..."
sudo groupadd mosquitto
sudo useradd -g mosquitto mosquitto
sudo chown -R mosquitto:mosquitto /etc/mosquitto/

mosquitto -c /etc/mosquitto/mosquitto.conf -d

五、订阅、发布测试

服务端发布消息

客户端订阅消息

六、MQTT Client 代码

# !/usr/bin/python
# -*- coding: utf-8 -*-

"""
@Author  : SKS
@Contact : SKS@SKS.com 
@File    : mockHTTPServer.py
@Create Time: 2022-10-25 9:22
@Description: 模拟政府端订阅MQTT Server消息
"""
import paho.mqtt.client as mqtt
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
import threading
import json
import time
import os
import queue

import requests

from logger import Logger
import datetime
from hashlib import md5



log = Logger()


class MQTTMockClient():
    def __init__(self, host, port, topic):
        self.client = mqtt.Client()
        self.client.on_message = self.handle_message
        self.client.connect(host, port)
        # 使用传递进来的topic参数订阅主题
        self.client.subscribe(topic)
        self.client.loop_start()

    def handle_message(self, client, userdata, msg):
        # client 和 userdata 参数在这里不使用,但它们由 MQTT 客户端库调用时必需
        print(f"<客户端> 收到MQTT主题为{msg.topic}的数据: {msg.payload.decode()}")
        self.savepayload(msg.payload)


    def savepayload(self, payload):

        # 创建队列
        global msgQueue
        msgQueue = queue.Queue()

        # 最大暂存最新的200条消息
        while msgQueue.qsize() > 200:
            msgQueue.get()

        msgQueue.put(payload)
        print('<客户端> 暂存数据, 数据是 %s' % payload)
        log.info('After save payload, New queue size is %s, saved payload is %s' % (msgQueue.qsize(), str(payload)))



if __name__ == '__main__':

    mqtt_host = '127.0.0.1'
    mqtt_port = 1883
    # 创建实例时动态传入主题
    mock_mqtt_client = MQTTMockClient(mqtt_host, mqtt_port, "topic")

    try:
        print("======server main begin======")
        print(f"starting MQTT client to listen at {mqtt_host}:{mqtt_port}")

        while True:
            time.sleep(1)  # 主线程保持运行以防止程序退出

    except KeyboardInterrupt:
        print("MQTT MOCK服务已手动停止")
    except Exception as e:
        print(f"发生未知错误: {str(e)}")
    finally:
        # 确保在任何情况下都执行清理操作
        mock_mqtt_client.client.disconnect()
        mock_mqtt_client.client.loop_stop()
        print("MQTT MOCK服务已停止")

七、参考资料

https://blog.csdn.net/sy201707/article/details/126450432
https://zhuanlan.zhihu.com/p/164930347
https://www.jianshu.com/p/1a9159e47a02


↙↙↙阅读原文可查看相关链接,并与作者交流