前端页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Message Generator</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<div class="container">
<h1>Message Generator</h1>
<form id="messageForm">
<div class="form-group">
<label for="controller">Controller Type</label>
<select id="controller" class="form-control">
<option value="LCE">LCE</option>
<option value="KCE">KCE</option>
<option value="GCE">GCE</option>
<option value="STEP">STEP</option>
<option value="ESC">ESC</option>
<option value="KSE">KSE</option>
<option value="DTU">DTU</option>
<option value="AE">AE</option>
<option value="SN">SN</option>
</select>
</div>
<div class="form-group">
<label for="msgType">Message Type</label>
<select id="msgType" class="form-control">
<option value="fault">Fault</option>
<option value="alarm">Alarm</option>
<option value="faultRecovered">Fault Recovered</option>
<option value="movementData">Movement Data</option>
<option value="serviceModeChange">Service Mode Change</option>
</select>
</div>
<button type="submit" class="btn btn-primary">Generate Message</button>
<button type="button" class="btn btn-secondary" id="clearButton">Clear Messages</button>
<button type="button" class="btn btn-danger" id="resetButton">Reset All</button>
</form>
<div id="response" class="response"></div>
<div id="messageBox" class="message-box"></div>
</div>
<!-- 复制成功提示框 -->
<div id="copyAlert" class="copy-alert" style="display: none;">Message copied to clipboard!</div>
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
</body>
</html>
CSS 样式
/* 通用样式 */
body {
font-family: 'Arial', sans-serif; /* 设置全局字体为 Arial,无衬线字体作为备用 */
background-color: #f5f5f5; /* 设置页面背景颜色为浅灰色 */
margin: 0; /* 移除默认外边距 */
padding: 0; /* 移除默认内边距 */
}
.container {
max-width: 800px; /* 设置容器最大宽度为 800px,适配大屏幕 */
margin: 50px auto; /* 水平居中容器,并在顶部和底部留出 50px 的间距 */
padding: 30px; /* 容器内边距为 30px */
background: #fff; /* 容器背景颜色为白色 */
border-radius: 10px; /* 圆角边框,半径为 10px */
box-shadow: 0 4px 10px rgba(0, 0, 0, 0.1); /* 添加阴影效果,提升层次感 */
}
h1 {
text-align: center; /* 标题居中对齐 */
color: #333; /* 标题文字颜色为深灰色 */
margin-bottom: 30px; /* 标题与下方内容的间距为 30px */
}
.form-group {
margin-bottom: 20px; /* 每个表单组之间的间距为 20px */
}
label {
display: block; /* 将标签设置为块级元素,独占一行 */
margin-bottom: 8px; /* 标签与表单控件的间距为 8px */
font-weight: 600; /* 加粗字体 */
color: #555; /* 标签文字颜色为中灰色 */
}
select.form-control {
width: 100%; /* 下拉菜单宽度占满父容器 */
padding: 12px; /* 内边距为 12px */
border: 1px solid #ddd; /* 边框为浅灰色虚线 */
border-radius: 5px; /* 圆角边框,半径为 5px */
background: #fff; /* 背景颜色为白色 */
box-sizing: border-box; /* 确保内边距和边框不超出容器宽度 */
font-size: 16px; /* 字体大小为 16px */
}
/* 按钮样式 */
button.btn {
background-color: #007bff; /* 主按钮颜色为蓝色 */
color: #fff; /* 按钮文字颜色为白色 */
border: none; /* 无边框 */
padding: 12px 24px; /* 内边距为 12px(垂直)和 24px(水平) */
border-radius: 5px; /* 圆角边框,半径为 5px */
cursor: pointer; /* 鼠标悬停时显示指针 */
font-size: 16px; /* 字体大小为 16px */
transition: background-color 0.3s ease; /* 添加平滑的背景色过渡效果 */
}
button.btn:hover {
background-color: #0056b3; /* 鼠标悬停时,按钮颜色变深 */
}
button.btn:active {
background-color: #003d80; /* 按钮被点击时,颜色更深,增强交互感 */
}
/* 清除按钮样式 */
button.btn-secondary {
background-color: #28a745; /* 清除按钮颜色为绿色 */
color: #fff; /* 按钮文字颜色为白色 */
}
button.btn-secondary:hover {
background-color: #218838; /* 鼠标悬停时,按钮颜色变深 */
}
button.btn-secondary:active {
background-color: #1d742d; /* 按钮被点击时,颜色更深 */
}
/* 重置按钮样式 */
button.btn-danger {
background-color: #17a2b8; /* 浅蓝色 */
color: #fff;
}
button.btn-danger:hover {
background-color: #138496; /* 鼠标悬停时的颜色 */
}
button.btn-danger:active {
background-color: #106b7d; /* 按钮被点击时的颜色 */
}
/* 消息响应区域 */
.response {
margin-top: 20px; /* 消息响应区域与上方内容的间距为 20px */
padding: 15px; /* 内边距为 15px */
background: #e9f5ff; /* 背景颜色为浅蓝色 */
border-radius: 5px; /* 圆角边框,半径为 5px */
font-size: 14px; /* 字体大小为 14px */
color: #333; /* 文字颜色为深灰色 */
}
/* 消息框样式 */
.message-box {
margin-top: 20px; /* 消息框与上方内容的间距为 20px */
padding: 15px; /* 内边距为 15px */
background: #fff; /* 背景颜色为白色 */
border-radius: 5px; /* 圆角边框,半径为 5px */
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1); /* 添加阴影效果 */
font-size: 14px; /* 字体大小为 14px */
color: #555; /* 文字颜色为中灰色 */
max-height: 400px; /* 设置最大高度为 400px,防止内容溢出 */
overflow-y: auto; /* 内容超出时显示垂直滚动条 */
}
/* 消息内容样式 */
.message-content pre {
background: #f8f9fa; /* 消息内容背景颜色为浅灰色 */
padding: 10px; /* 内边距为 10px */
border-radius: 5px; /* 圆角边框,半径为 5px */
font-family: 'Courier New', monospace; /* 字体为等宽字体,适合显示代码 */
}
/* 提示框样式 */
.alert {
position: fixed;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
background: #d4edda; /* 浅绿色背景 */
color: #155724; /* 深绿色文字 */
padding: 20px 40px;
text-align: center;
font-size: 18px;
font-weight: bold;
border-radius: 8px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2);
z-index: 1000;
opacity: 0;
transition: opacity 0.3s ease-in-out, transform 0.3s ease-in-out;
}
.alert.show {
opacity: 1;
transform: translate(-50%, -50%);
}
.alert.hide {
opacity: 0;
transform: translate(-50%, -50%);
}
.success {
background-color: #d4edda; /* 成功消息背景颜色为浅绿色 */
border: 1px solid #c3e6cb; /* 边框为绿色 */
color: #155724; /* 文字颜色为深绿色 */
}
.error {
background-color: #f8d7da; /* 错误消息背景颜色为浅红色 */
border: 1px solid #f5c6cb; /* 边框为红色 */
color: #721c24; /* 文字颜色为深红色 */
}
/* 复制按钮样式 */
.copy-btn {
background-color: #007bff; /* 主按钮颜色为蓝色 */
color: #fff; /* 按钮文字颜色为白色 */
border: none; /* 无边框 */
padding: 10px 20px; /* 内边距为 10px(垂直)和 20px(水平) */
border-radius: 5px; /* 圆角边框,半径为 5px */
cursor: pointer; /* 鼠标悬停时显示指针 */
font-size: 14px; /* 字体大小为 14px */
margin-top: 10px; /* 按钮与上方内容的间距为 10px */
box-shadow: 0 2px 5px rgba(0, 123, 255, 0.3); /* 添加阴影效果 */
transition: background-color 0.3s ease, box-shadow 0.3s ease; /* 添加平滑的过渡效果 */
}
.copy-btn:hover {
background-color: #0056b3; /* 鼠标悬停时,按钮颜色变深 */
box-shadow: 0 4px 10px rgba(0, 123, 255, 0.5); /* 鼠标悬停时,阴影效果更明显 */
}
.copy-btn:active {
background-color: #004085; /* 按钮被点击时,颜色更深 */
box-shadow: 0 2px 5px rgba(0, 123, 255, 0.3); /* 恢复阴影 */
transform: translateY(1px); /* 按钮被点击时,轻微下沉效果 */
}
/* 复制成功提示框样式 */
.copy-alert {
position: fixed;
bottom: 0; /* 如果在底部 */
/* top: 0; 如果在顶部 */
left: 0;
width: 100%;
background: #d4edda; /* 浅绿色背景 */
color: #155724; /* 深绿色文字 */
padding: 15px 0;
text-align: center;
font-size: 16px;
font-weight: bold;
box-shadow: 0 -2px 5px rgba(0, 0, 0, 0.1); /* 顶部阴影 */
z-index: 1000;
opacity: 0;
transition: opacity 0.3s ease-in-out, transform 0.3s ease-in-out;
}
.copy-alert.show {
opacity: 1;
transform: translateY(0);
}
.copy-alert.hide {
opacity: 0;
transform: translateY(100%);
}
/* 滚动条样式 */
.message-box::-webkit-scrollbar {
width: 8px; /* 设置滚动条宽度为 8px */
}
.message-box::-webkit-scrollbar-track {
background: #f8f9fa; /* 滚动条轨道颜色为浅灰色 */
border-radius: 4px; /* 滚动条轨道圆角,半径为 4px */
}
.message-box::-webkit-scrollbar-thumb {
background: #007bff; /* 滚动条滑块颜色为蓝色 */
border-radius: 4px; /* 滚动条滑块圆角,半径为 4px */
}
.message-box::-webkit-scrollbar-thumb:hover {
background: #0056b3; /* 滚动条滑块悬停时颜色变深 */
}
JS 脚本
document.getElementById('messageForm').addEventListener('submit', async (e) => {
e.preventDefault();
const controller = document.getElementById('controller').value;
const msgType = document.getElementById('msgType').value;
document.getElementById('messageBox').innerHTML = '';
document.getElementById('response').innerHTML = '';
if (!controller || !msgType) {
// 显示固定的错误提示框
const errorAlert = document.createElement('div');
errorAlert.className = 'alert error';
errorAlert.innerHTML = `
<strong>Error:</strong> Please select both Controller Type and Message Type.
`;
document.body.appendChild(errorAlert);
errorAlert.classList.add('show');
// 3秒后自动清除提示
setTimeout(() => {
errorAlert.classList.remove('show');
setTimeout(() => {
errorAlert.remove();
}, 300); // 等待动画完成
}, 3000);
return;
}
try {
const response = await fetch('/get_template', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ controller, msgType })
});
const data = await response.json();
if (data.status === 'success') {
// 渲染消息内容和复制按钮
document.getElementById('messageBox').innerHTML = `
<div class="message-content">
<pre>${JSON.stringify(data.message, null, 2)}</pre>
</div>
<button class="btn copy-btn">Copy Message</button>
`;
// 显示动态生成成功的提示信息
const responseAlert = document.createElement('div');
responseAlert.className = 'alert success';
responseAlert.innerHTML = `
<strong>Success:</strong> Message generated successfully!
`;
document.body.appendChild(responseAlert);
responseAlert.classList.add('show');
// 绑定复制按钮事件监听器
const copyBtn = document.querySelector('.copy-btn');
if (copyBtn) {
copyBtn.addEventListener('click', async () => {
const messageContent = document.querySelector('.message-content pre')?.textContent;
if (!messageContent) {
alert('No message content to copy. Please generate a message first.');
return;
}
try {
// 检查是否支持 Clipboard API
if (!navigator.clipboard) {
console.warn('Your browser does not support async clipboard API. Fallback to execCommand.');
// 使用传统复制方法
const tempTextArea = document.createElement('textarea');
tempTextArea.value = messageContent;
document.body.appendChild(tempTextArea);
tempTextArea.select();
document.execCommand('copy');
document.body.removeChild(tempTextArea);
} else {
await navigator.clipboard.writeText(messageContent);
}
// 显示固定的复制成功提示框
const copyAlert = document.getElementById('copyAlert');
if (copyAlert) {
copyAlert.style.display = 'block'; // 确保元素可见
copyAlert.classList.add('show'); // 添加显示类
// 3秒后自动移除提示框
setTimeout(() => {
copyAlert.classList.remove('show'); // 移除显示类
setTimeout(() => {
copyAlert.style.display = 'none'; // 隐藏元素
}, 300); // 等待动画完成
}, 3000);
} else {
console.error('copyAlert element not found in the DOM.');
}
} catch (err) {
console.error('Failed to copy message:', err);
alert('Failed to copy message. Please try again.');
}
});
}
// 3秒后自动清除生成成功提示
setTimeout(() => {
responseAlert.classList.remove('show');
setTimeout(() => {
responseAlert.remove();
}, 300); // 等待动画完成
}, 3000);
} else if (data.status === 'error') {
// 显示后端返回的错误信息
const errorAlert = document.createElement('div');
errorAlert.className = 'alert error';
errorAlert.innerHTML = `
<strong>Error:</strong> ${data.message || 'Unable to generate message. Please try again.'}
`;
document.body.appendChild(errorAlert);
errorAlert.classList.add('show');
// 3秒后自动清除提示
setTimeout(() => {
errorAlert.classList.remove('show');
setTimeout(() => {
errorAlert.remove();
}, 300); // 等待动画完成
}, 3000);
} else {
// 显示通用错误提示
const errorAlert = document.createElement('div');
errorAlert.className = 'alert error';
errorAlert.innerHTML = `
<strong>Error:</strong> Unable to generate message. Please try again.
`;
document.body.appendChild(errorAlert);
errorAlert.classList.add('show');
// 3秒后自动清除提示
setTimeout(() => {
errorAlert.classList.remove('show');
setTimeout(() => {
errorAlert.remove();
}, 300); // 等待动画完成
}, 3000);
}
} catch (error) {
console.error('Error:', error);
// 显示通用错误提示
const errorAlert = document.createElement('div');
errorAlert.className = 'alert error';
errorAlert.innerHTML = `
<strong>Error:</strong> Unable to generate message. Please try again.
`;
document.body.appendChild(errorAlert);
errorAlert.classList.add('show');
// 3秒后自动清除提示
setTimeout(() => {
errorAlert.classList.remove('show');
setTimeout(() => {
errorAlert.remove();
}, 300); // 等待动画完成
}, 3000);
}
// 页面滚动到生成消息区域
document.getElementById('messageBox').scrollIntoView({ behavior: 'smooth' });
});
// 清除按钮功能
document.getElementById('clearButton').addEventListener('click', () => {
// 仅清空消息框和响应区域
document.getElementById('messageBox').innerHTML = '';
document.getElementById('response').innerHTML = '';
// 显示清除成功的提示信息
const responseAlert = document.createElement('div');
responseAlert.className = 'alert success';
responseAlert.innerHTML = `
<strong>Success:</strong> Messages cleared successfully!
`;
document.body.appendChild(responseAlert);
responseAlert.classList.add('show');
// 3秒后自动清除提示
setTimeout(() => {
responseAlert.classList.remove('show');
setTimeout(() => {
responseAlert.remove();
}, 300); // 等待动画完成
}, 3000);
});
// 重置按钮功能
document.getElementById('resetButton').addEventListener('click', () => {
// 清空所有内容,包括下拉选项
document.getElementById('controller').value = '';
document.getElementById('msgType').value = '';
document.getElementById('messageBox').innerHTML = '';
document.getElementById('response').innerHTML = '';
// 显示重置成功的提示信息
const responseAlert = document.createElement('div');
responseAlert.className = 'alert success';
responseAlert.innerHTML = `
<strong>Success:</strong> All fields reset successfully!
`;
document.body.appendChild(responseAlert);
responseAlert.classList.add('show');
// 3秒后自动清除提示
setTimeout(() => {
responseAlert.classList.remove('show');
setTimeout(() => {
responseAlert.remove();
}, 300); // 等待动画完成
}, 3000);
});
获取消息模板数据
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import json
import uuid
import datetime
import random
import copy
import time, random, queue
from collections import OrderedDict
from threading import Thread
import ast
# 1.创建Kafka消息mock服务类
class KafkaMsgMock:
def __init__(self):
'''
类初始化
'''
self.template = self.loadTemplate()
self.equipmentNumber = self.loadEquipmentNumber()
self.faultCode = self.loadFaultCode()
print('Load resources ...')
def loadTemplate(self):
'''
读取模板文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
for i in os.listdir('./MsgTamplate/Kafka'):
if len(i) > 7: continue;
print(f'Load resource for {i}')
with open(os.path.abspath(os.path.dirname(i)) + '/MsgTamplate/Kafka/' + i, 'r', encoding='utf-8') as f:
file_content = f.read().replace("\\n", "").replace("\\t", "").replace(" ", "")
# print(f"Debug: File content for {i}: {file_content}") # Debugging line
dict_template.update({i: json.loads(file_content)})
return dict_template
def loadEquipmentNumber(self):
'''
读取ken文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/EQUIPMETN', 'r', encoding='utf-8') as f:
dict_template.update(
{'EQUIPMETN': json.loads(f.read().replace("\\n", "").replace("\\t", "").replace(" ", ""))})
return dict_template
def loadFaultCode(self):
'''
读取fault code文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTamplate/FAULTCODE', 'r', encoding='utf-8') as f:
dict_template.update(
{'FAULTCODE': json.loads(f.read().replace("\\n", "").replace("\\t", "").replace(" ", ""))})
return dict_template
def utcTime(self):
'''
make utc time for timestemp
'''
a = datetime.datetime.utcnow()
# 2020-11-06T09:10:36.000Z
# return (str(a.year) + '-' + str(a.month).zfill(2) + '-' + str(a.day).zfill(2) + 'T' + str(a.hour).zfill(
# 2) + ':' + str(a.minute).zfill(2) + ':' + str(a.second).zfill(2) + '.' + '%03dZ' % (int(a.microsecond / 1000)))
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
def uuid(self):
'''
make uuid for timestemp
'''
return str(uuid.uuid4())
def comon_msg(self, controller='LCE', msgType='random', count=1):
'''
generate lce/gce/step/esc message
LCE is default
'''
# 读取LCE模板
data = self.template[controller]
# 读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
# 读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
# 随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers[controller]))
# 随机fault code
faultCode = str(random.choice(faultCodes[controller]))
# 随机消息类型
if msgType == 'random':
# 随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content = data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'fault':
content = data['faultTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecovered':
content = data['faultRecoveredTamplate'].copy()
# service mode change消息
elif msgType == 'serviceModeChange':
content = data['serviceModeChangeTamplate'].copy()
# movment data消息
elif msgType == 'movementData':
content = data['movementDataTamplate'].copy()
# routine call
elif msgType == 'usagedata':
content = data['usagedataTamplate'].copy()
# button
elif msgType == 'button':
content = data['buttonTamplate'].copy()
# door
elif msgType == 'door':
content = data['doorTamplate'].copy()
# real time status
elif msgType == 'realtimestatus':
content = data['realtimestatusTamplate'].copy()
# real time status
elif msgType == 'serviceorderv2':
content = data['serviceorderv2Tamplate'].copy()
# edge
elif msgType == 'edge':
content = data['edgeTamplate'].copy()
# upperpitsensor
elif msgType == 'upperpitsensor':
content = data['upperpitsensorTamplate'].copy()
# upperpitnoise
elif msgType == 'upperpitnoise':
content = data['upperpitnoiseTamplate'].copy()
# lowerpitsensor
elif msgType == 'lowerpitsensor':
content = data['lowerpitsensorTamplate'].copy()
# lowerpitnoise
elif msgType == 'lowerpitnoise':
content = data['lowerpitnoiseTamplate'].copy()
# movementv2
elif msgType == 'movementv2':
content = data['movementv2Tamplate'].copy()
else:
print(f'未定义的消息类型 - {msgType}')
# 合成消息
if content.get('EquipmentNumber'):
content['EquipmentNumber'] = equipmentNumber
if content.get('Param', {}).get('UUID'):
content['Param']['UUID'] = self.uuid()
if content.get('Param', {}).get('FaultCode'):
content['Param']['FaultCode'] = faultCode
if content.get('Param', {}).get('Timestamp'):
content['Param']['Timestamp'] = self.utcTime()
if content.get('value', {}).get('Timestamp'):
content['value']['Timestamp'] = self.utcTime()
if content.get('value', {}).get('Param', {}).get('Timestamp'):
content['value']['Param']['Timestamp'] = self.utcTime()
# topic = content.get('topic')
# msages.append((topic, copy.deepcopy(content)))
# 保存最新的消息
msg = content
return msg
def kce_msg(self, msgType='random', count=1):
'''
generate kce message
'''
# 读取LCE模板
data = self.template['KCE']
# 读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
# 读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
# 随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers['KCE']))
# 随机fault code
faultCode = str(random.choice(faultCodes['KCE']))
# 随机消息类型
if msgType == 'random':
# 随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content = data['alarmTamplate'].copy()
# fault 消息
elif msgType == 'fault':
content = data['faultTamplate'].copy()
# fault recover消息
elif msgType == 'faultRecovered':
content = data['faultRecoveredTamplate'].copy()
# movment data消息
elif msgType == 'movementData':
content = data['movementDataTamplate'].copy()
# service mode change消息
elif msgType == 'serviceModeChange':
content = data['serviceModeChangeTamplate'].copy()
# routine call
elif msgType == 'usagedata':
content = data['usagedataTamplate'].copy()
# real time status
elif msgType == 'realtimestatus':
content = data['realtimestatusTamplate'].copy()
# edge
elif msgType == 'edge':
content = data['edgeTamplate'].copy()
else:
print(f'未定义的消息类型 - {msgType}')
# 合成消息
if content.get('EquipmentNumber'):
content['EquipmentNumber'] = equipmentNumber
if content.get('equipment'):
content['equipment'] = equipmentNumber
if content.get('Param', {}).get('UUID'):
content['Param']['UUID'] = self.uuid()
if content.get('Param', {}).get('FaultCode'):
content['Param']['FaultCode'] = faultCode
'''
if content.get('Param',{}).get('Timestamp'):
content['Param']['Timestamp'] = self.utcTime()
if content.get('startTime'):
content['startTime'] = self.utcTime()
if content.get('endTime'):
content['endTime'] = self.utcTime()
'''
topic = content.get('topic')
msages.append((topic, copy.deepcopy(content)))
return msages
def kcecpuc_msg(self, msgType='random', count=1):
'''
generate kcecpuc message
'''
# 读取LCE模板
data = self.template['KCECPUC']
# 读取设备编号列表
equipmentNumbers = self.equipmentNumber['EQUIPMETN']
# 读取fault code列表
faultCodes = self.faultCode['FAULTCODE']
# list容器
msages = list()
idx = 0
while idx < count:
idx += 1
# 随机设备号码
equipmentNumber = str(random.choice(equipmentNumbers['KCECPUC']))
# 随机fault code
faultCode = str(random.choice(faultCodes['KCECPUC']))
# 随机消息类型
if msgType == 'random':
# 随机消息模板
msgtype = random.choice(sorted(data.keys()))
content = data[msgtype].copy()
# alarm消息
elif msgType == 'alarm':
content = data['alarmTamplate'].copy()
# kcecpucMainfault
elif msgType == 'kcecpucMainfault':
content = data['kcecpucMainfaultTamplate'].copy()
# kcecpucMainfaultRecovered
elif msgType == 'kcecpucMainfaultRecovered':
content = data['kcecpucMainfaultRecoveredTamplate'].copy()
# fault 消息
elif msgType == 'faultGcd':
content = data['faultGcdTamplate'].copy()
# fault recover消息
elif msgType == 'faultGcdRecovered':
content = data['faultGcdRecoveredTamplate'].copy()
# fault 消息
elif msgType == 'faultGw':
content = data['faultGwTamplate'].copy()
# fault recover消息
elif msgType == 'faultGwRecovered':
content = data['faultGwRecoveredTamplate'].copy()
# movement data消息
elif msgType == 'movementData':
content = data['movementDataTamplate'].copy()
# routine call
elif msgType == 'usagedata':
content = data['usagedataTamplate'].copy()
# button
elif msgType == 'button':
content = data['buttonTamplate'].copy()
# door
elif msgType == 'door':
content = data['doorTamplate'].copy()
# real time status
elif msgType == 'realtimestatus':
content = data['realtimestatusTamplate'].copy()
# movement data消息
elif msgType == 'ckpicapabilities':
content = data['ckpicapabilitiesTamplate'].copy()
# routine call
elif msgType == 'ckpidailystatistics':
content = data['ckpidailystatisticsTamplate'].copy()
# button
elif msgType == 'noservicepredict':
content = data['noservicepredictTamplate'].copy()
# door
elif msgType == 'toolconnectivity':
content = data['toolconnectivityTamplate'].copy()
# mode maintenance change
elif msgType == 'modemaintenancechange':
content = data['modemaintenancechangeTamplate'].copy()
else:
print(f'未定义的消息类型 - {msgType}')
# 合成消息
if content.get('value', {}).get('equipment'):
content['value']['equipment'] = equipmentNumber
elif content.get('value', {}).get('eq'):
content['value']['eq'] = equipmentNumber
elif content.get('eq'):
content['eq'] = equipmentNumber
if content.get('Param', {}).get('UID'):
content['value']['UID'] = self.uuid().upper().replace('-', '')
if content.get('code'):
content['code'] = faultCode
'''
if content.get('Timestamp'):
content['Timestamp'] = self.utcTime()
if content.get('startTime'):
content['startTime'] = self.utcTime()
if content.get('endTime'):
content['endTime'] = self.utcTime()
'''
topic = content.get('topic')
msages.append((topic, copy.deepcopy(content)))
return msages
# if __name__ == '__main__':
# wss = KafkaMsgMock()
# start = time.time()
# msg = wss.comon_msg( controller = 'LCE', msgType = 'fault',count = 1 )
# print(msg)
# print(time.time() -start)
# while msg:
# #print(msg.pop())
# pass
flask 服务
from flask import Flask, request, jsonify, render_template
from DataMock import KafkaMsgMock
app = Flask(__name__)
kafka_mock = KafkaMsgMock()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/get_template', methods=['POST'])
def get_template():
data = request.json
controller = data.get('controller', 'LCE')
msgType = data.get('msgType', 'fault')
# try:
message = kafka_mock.comon_msg(controller=controller, msgType=msgType)
if message:
return jsonify({"status": "success", "message": message})
else:
return jsonify({"status": "error", "message": "Message template not found"}), 500
# except Exception as e:
# return jsonify({"status": "error", "message": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
最终效果图
转载文章时务必注明原作者及原始链接,并注明「发表于 TesterHome 」,并不得对作品进行修改。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!