「原创声明:保留所有权利,禁止转载」
背景介绍
领导要求打通一条 “平台生成 SN 消息 → Kafka → 应用消费 → 自动创建并执行工单” 的端到端链路,实现全流程闭环验证。当前已实现 SN 的端到端的数据链路打通流程。
WEB 端
- 用户在界面一键 “Generate Messages”;
- 点击 “Send to Kafka” 后,平台立即把消息写入指定的 Kafka Topic;
- 发送结果即时回显:展示 “发送成功” 或 “发送失败”。
应用端
- CKFM 端实时订阅指定的 Kafka Topic,当收到 “发送成功” 的 SN 消息后,自动解析并转换为标准工单信息;
- CKFM 端将生成的工单指派给维保工的 APP 端,维保工通过 APP 端接到维保工单,进行工单处置流程操作。
平台整体代码结构
读取预制模板数据
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
'消息生成类'
__author__ = 'data mock'
__date__ = '2022-10-09'
import os
import json
import uuid
import datetime
import random
import copy
import time, random, queue
from collections import OrderedDict
# 1.创建Kafka消息mock服务类
class KafkaMsgMock:
def __init__(self, data_source='Kafka'):
'''
类初始化
'''
self.data_source = data_source
self.template = self.loadTemplate()
self.equipmentNumber = self.loadEquipmentNumber()
self.faultCode = self.loadFaultCode()
print(f'Load resources from {data_source} folder...')
def loadTemplate(self):
'''
读取模板文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
template_dir = f'./MsgTemplate/{self.data_source}/'
for i in os.listdir(template_dir):
if len(i) > 7: continue;
print(f'Load resource for {i} in {self.data_source} folder')
file_path = os.path.abspath(os.path.join(template_dir, i))
if not os.path.isfile(file_path):
continue
with open(file_path, 'r', encoding='utf-8') as f:
file_content = f.read().replace("\\n", "").replace("\\t", "").replace(" ", "")
dict_template[i] = json.loads(file_content)
return dict_template
def loadEquipmentNumber(self):
'''
读取ken文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTemplate/EQUIPMENT', 'r', encoding='utf-8') as f:
dict_template.update(
{'EQUIPMENT': json.loads(f.read().replace("\\n", "").replace("\\t", "").replace(" ", ""))})
return dict_template
def loadFaultCode(self):
'''
读取fault code文件,为快速处理,所有模板均加载到内存
'''
dict_template = dict()
with open(os.path.abspath(os.path.dirname('./')) + '/MsgTemplate/FAULTCODE', 'r', encoding='utf-8') as f:
dict_template.update(
{'FAULTCODE': json.loads(f.read().replace("\\n", "").replace("\\t", "").replace(" ", ""))})
return dict_template
def utcTime(self):
'''
make utc time for timestemp
'''
a = datetime.datetime.utcnow()
return datetime.datetime.utcnow().isoformat()[0:23] + 'Z'
def uuid(self):
'''
make uuid for timestemp
'''
return str(uuid.uuid4())
def generate_ordered_message(self, template_content):
"""
生成保持原始顺序的消息(适用于所有消息类型)
参数:
template_content: 原始模板字典
返回:
有序JSON字符串
"""
# 创建有序字典
ordered_content = OrderedDict()
# 按原始模板顺序填充字段
for key in template_content.keys():
ordered_content[key] = template_content[key]
# 序列化为JSON字符串(保持中文可读性)
return json.dumps(ordered_content, indent=2, ensure_ascii=False)
def comon_msg(self, controller='LCE', msgType='random', count=1, equipmentNumber=None, SNCode=None):
'''
生成消息(支持手动输入设备号和ServiceNeedCode)
'''
# 1. 获取模板数据
data = self.template.get(controller, {})
if not data:
print(f"错误:未找到控制器 {controller} 的模板")
return None
# 2. 设备号和故障码处理
equipment_numbers = self.equipmentNumber.get('EQUIPMENT', {}).get(controller, [30359546])
# 确保设备号是字符串类型
equipment_numbers = [str(eq) for eq in equipment_numbers]
fault_codes = self.faultCode.get('FAULTCODE', {}).get(controller, ['21'])
# 3. 根据消息类型选择模板
template_map = {
'fault': 'faultTamplate',
'faultRecovered': 'faultRecoveredTamplate',
'alarm': 'alarmTamplate',
'serviceModeChange': 'serviceModeChangeTamplate',
'movementData': 'movementDataTamplate',
'serviceNeed': 'serviceNeedTamplate', # 关键模板
'usagedata': 'usagedataTamplate',
'button': 'buttonTamplate',
'door': 'doorTamplate',
'realtimestatus': 'realtimestatusTamplate',
'serviceorderv2': 'serviceorderv2Tamplate',
'edge': 'edgeTamplate',
'upperpitsensor': 'upperpitsensorTamplate',
'upperpitnoise': 'upperpitnoiseTamplate',
'lowerpitsensor': 'lowerpitsensorTamplate',
'lowerpitnoise': 'lowerpitnoiseTamplate',
'movementv2': 'movementv2Tamplate',
'kcecpucMainfault': 'kcecpucMainfaultTamplate',
'kcecpucMainfaultRecovered': 'kcecpucMainfaultRecoveredTamplate',
'faultGcd': 'faultGcdTamplate',
'faultGcdRecovered': 'faultGcdRecoveredTamplate',
'faultGw': 'faultGwTamplate',
'faultGwRecovered': 'faultGwRecoveredTamplate',
'modemaintenancechange': 'modemaintenancechangeTamplate',
'ckpicapabilities': 'ckpicapabilitiesTamplate',
'ckpidailystatistics': 'ckpidailystatisticsTamplate',
'noservicepredict': 'noservicepredictTamplate',
'toolconnectivity': 'toolconnectivityTamplate'
}
# 4. 获取具体模板
template_key = template_map.get(msgType)
if not template_key or template_key not in data:
print(f"错误:未找到消息类型 {msgType} 的模板")
return None
messages = []
for _ in range(count):
# 使用深拷贝避免修改原始模板
content = copy.deepcopy(data[template_key])
# 5. 确定最终设备号
if equipmentNumber:
final_eq = str(equipmentNumber) # 确保是字符串
else:
final_eq = str(random.choice(equipment_numbers))
# 6. 替换所有设备号字段
if 'EquipmentNumber' in content:
content['EquipmentNumber'] = final_eq
print(f"设置顶层设备号为: {final_eq}")
# 替换IotRaw中的eq字段
if 'IotRaw' in content:
iot_raw = content['IotRaw']
print(f"处理IotRaw中的eq字段...")
# 替换Car中的eq
if 'Car' in iot_raw and isinstance(iot_raw['Car'], list):
for car in iot_raw['Car']:
if isinstance(car, dict) and 'eq' in car:
print(f"替换Car设备号: {car['eq']} -> {final_eq}")
car['eq'] = final_eq
# 替换Fault中的eq
if 'Fault' in iot_raw and isinstance(iot_raw['Fault'], list):
for fault in iot_raw['Fault']:
if isinstance(fault, dict) and 'eq' in fault:
print(f"替换Fault设备号: {fault['eq']} -> {final_eq}")
fault['eq'] = final_eq
# 7. 替换ServiceNeedCode(仅针对serviceNeed类型)
if msgType == 'serviceNeed' and SNCode:
if 'ServiceNeedCode' in content:
content['ServiceNeedCode'] = SNCode
print(f"设置ServiceNeedCode为: {SNCode}")
else:
print("警告:serviceNeed模板中缺少ServiceNeedCode字段")
# 8. 替换其他动态字段
if 'Param' in content:
if 'UUID' in content['Param']:
content['Param']['UUID'] = self.uuid()
if 'Timestamp' in content['Param']:
content['Param']['Timestamp'] = self.utcTime()
if 'FaultCode' in content['Param']:
content['Param']['FaultCode'] = random.choice(fault_codes)
else:
# 如果没有param,则直接替换uuid
if 'UUID' in content:
content['UUID'] = self.uuid()
# 5.6 将消息添加到列表
messages.append(content)
# 8. 返回结果
if count == 1:
return messages
else:
return [msg for msg in messages]
平台服务主入口
import sys
import os
import tempfile
import subprocess
import json
import time
import traceback
from flask import Flask, request, jsonify, render_template
from DataMock import KafkaMsgMock
app = Flask(__name__)
# 启用详细日志记录
app.logger.setLevel('DEBUG')
@app.route('/')
def index():
return render_template('index.html')
@app.route('/get_template', methods=['POST'])
def get_template():
data = request.json
# 获取请求参数并设置默认值
controller = data.get('controller', 'LCE')
msgType = data.get('msgType', 'ServiceNeed')
dataSource = data.get('dataSource', 'Kafka')
equipmentNumber = data.get('equipmentNumber')
serviceNeedCode = data.get('serviceNeedCode')
message_count = int(data.get('messageCount', 1)) # 默认值为1
# 添加详细日志
app.logger.debug(f"请求参数: controller={controller}, msgType={msgType}, dataSource={dataSource}, "
f"equipmentNumber={equipmentNumber}, serviceNeedCode={serviceNeedCode}, messageCount={message_count}")
try:
# 创建对应数据源的mock生成器
mock_generator = KafkaMsgMock(data_source=dataSource)
# 生成消息时直接传递所有参数
result = mock_generator.comon_msg(
controller=controller,
msgType=msgType,
equipmentNumber=equipmentNumber,
SNCode=serviceNeedCode,
count=message_count # 添加count参数
)
if not result:
return jsonify({"status": "error", "message": "Message template not found"}), 500
# 返回结果(根据生成数量返回单个对象或数组)
app.logger.debug(f"生成消息成功: {json.dumps(result, indent=2)[:500]}...") # 只记录前500个字符
return jsonify({
"status": "success",
"message": result
})
except Exception as e:
error_msg = f"Error generating message: {str(e)}"
app.logger.error(error_msg)
app.logger.error(traceback.format_exc())
return jsonify({"status": "error", "message": error_msg}), 500
@app.route('/send_to_kafka', methods=['POST'])
def send_to_kafka():
app.logger.info("收到 /send_to_kafka 请求")
try:
data = request.json
kafka_messages = data.get('messages', [])
if not kafka_messages:
app.logger.warning("没有提供消息")
return jsonify({"status": "error", "message": "No messages provided"}), 400
# 添加详细日志 - 记录接收到的消息
app.logger.debug(f"收到 {len(kafka_messages)} 条消息")
for i, msg in enumerate(kafka_messages, 1):
topic = msg.get('topic', 'enriched_service_needs')
delay = msg.get('delay', 0)
message_str = json.dumps(msg.get('message', {}))
app.logger.debug(f"消息 #{i}: 主题={topic}, 延迟={delay}, 消息长度={len(message_str)}")
if len(message_str) < 1000: # 避免记录太长的消息
app.logger.debug(f"消息内容: {message_str}")
# 创建临时文件
temp_file_path = None
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as temp_file:
# 按照格式写入每一行
for msg in kafka_messages:
topic = msg.get('topic', 'enriched_service_needs')
delay = msg.get('delay', 0)
message = msg.get('message', {})
# 关键修复1:确保消息是有效的JSON字符串
if isinstance(message, dict):
message_str = json.dumps(message)
elif isinstance(message, str):
# 尝试解析字符串是否为JSON
try:
json.loads(message)
message_str = message
except json.JSONDecodeError:
# 如果不是有效的JSON,则作为普通字符串处理
message_str = message
else:
message_str = str(message)
# 关键修复2:去除外层多余的双引号
if message_str.startswith('"') and message_str.endswith('"'):
message_str = message_str[1:-1]
# 写入格式: topic|delay|message
line = f"{topic}|{delay}|{message_str}\n"
temp_file.write(line)
temp_file_path = temp_file.name
app.logger.info(f"创建临时文件: {temp_file_path}")
# 记录临时文件内容
if app.config.get('DEBUG', False):
with open(temp_file_path, 'r') as f:
temp_content = f.read()
app.logger.debug(f"临时文件内容:\n{temp_content}")
except Exception as e:
app.logger.error(f"创建临时文件失败: {str(e)}")
app.logger.error(traceback.format_exc())
return jsonify({
"status": "error",
"message": f"创建临时文件失败: {str(e)}"
}), 500
# 获取脚本路径
script_dir = os.path.dirname(os.path.abspath(__file__))
producer_script = os.path.join(script_dir, 'Producer.py')
app.logger.info(f"Producer脚本路径: {producer_script}")
# 检查脚本是否存在
if not os.path.exists(producer_script):
app.logger.error(f"Producer.py 不存在于 {producer_script}")
return jsonify({
"status": "error",
"message": f"Producer script not found at {producer_script}"
}), 500
# 获取当前Python解释器的绝对路径
python_executable = sys.executable
app.logger.info(f"Python解释器路径: {python_executable}")
# 记录要执行的命令
cmd = [python_executable, producer_script, temp_file_path]
app.logger.info(f"执行命令: {' '.join(cmd)}")
# 调用Producer.py脚本
process_output = ""
process_error = ""
return_code = None
try:
start_time = time.time()
# 使用subprocess.Popen并进行超时处理
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
bufsize=1 # 行缓冲
) as process:
try:
# 尝试在60秒内完成
stdout, stderr = process.communicate(timeout=60)
return_code = process.returncode
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
return_code = -1
app.logger.warning("Producer脚本执行超时")
elapsed_time = time.time() - start_time
app.logger.info(f"Producer脚本执行完成,用时 {elapsed_time:.2f} 秒")
# 整理输出
process_output = stdout.strip() or ""
process_error = stderr.strip() or ""
# 记录完整的脚本输出
app.logger.debug(f"Producer标准输出:\n{process_output}")
app.logger.debug(f"Producer标准错误:\n{process_error}")
# 从输出中提取转换后的消息
processed_messages = []
if process_output:
# 从脚本输出中查找时间戳替换后的消息
for line in process_output.split('\n'):
if "Processed message:" in line:
message_content = line.split("Processed message: ", 1)[-1]
processed_messages.append(message_content)
app.logger.debug(f"找到转换后消息: {message_content[:100]}...")
else:
app.logger.warning("Producer脚本没有输出")
except Exception as e:
app.logger.error(f"执行Producer脚本失败: {str(e)}")
app.logger.error(traceback.format_exc())
return_code = -2
process_error = str(e)
finally:
# 确保删除临时文件
try:
if temp_file_path and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
app.logger.info(f"已删除临时文件 {temp_file_path}")
except Exception as e:
app.logger.error(f"删除临时文件失败: {str(e)}")
# 返回结果
if return_code == 0:
result = {
"status": "success",
"message": f"{len(kafka_messages)} 条消息已发送",
"output": process_output,
"processed_messages": processed_messages
}
# 如果有处理后的消息,在前端显示
if processed_messages:
result["message"] += f",包含 {len(processed_messages)} 条转换后消息"
app.logger.info(f"成功捕获 {len(processed_messages)} 条转换后消息")
return jsonify(result)
else:
error_msg = f"Producer脚本执行失败"
if return_code is not None:
error_msg += f",返回码 {return_code}"
if process_error:
error_msg += f" - {process_error}"
app.logger.error(error_msg)
return jsonify({
"status": "error",
"message": error_msg,
"error": process_error,
"output": process_output,
"return_code": return_code
}), 500
except Exception as e:
app.logger.error(f"/send_to_kafka 请求处理失败: {str(e)}")
app.logger.error(traceback.format_exc())
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}"
}), 500
if __name__ == '__main__':
# 确保工作目录正确
working_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(working_dir)
app.logger.info(f"工作目录: {working_dir}")
# 运行应用
app.run(host='0.0.0.0', port=9090, debug=True)
将生成的消息发到 IOT-Kafka 中
import sys
import datetime
import pytz
import re
import time
import traceback
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError, NoBrokersAvailable
# 配置立即刷新输出
sys.stdout = open(sys.stdout.fileno(), 'w', encoding='utf-8', buffering=1)
sys.stderr = open(sys.stderr.fileno(), 'w', encoding='utf-8', buffering=1)
def log_info(message):
"""记录信息级别日志"""
print(f"[INFO] {message}", flush=True)
def log_debug(message):
"""记录调试级别日志"""
print(f"[DEBUG] {message}", flush=True)
def log_error(message):
"""记录错误级别日志"""
print(f"[ERROR] {message}", file=sys.stderr, flush=True)
def log_warning(message):
"""记录警告级别日志"""
print(f"[WARNING] {message}", file=sys.stderr, flush=True)
def is_valid_json(message):
"""检查是否为有效的JSON"""
try:
json.loads(message)
return True
except json.JSONDecodeError:
return False
def main():
log_info("=" * 60)
log_info("启动 Producer 脚本")
# 检查参数
if len(sys.argv) != 2:
log_error("使用方法: python Producer.py <文件路径>")
log_error(f"参数数量错误: {len(sys.argv)}")
return 1
file_path = sys.argv[1]
log_info(f"处理文件: {file_path}")
# Kafka配置
bootstrap_servers = 'XXXXXX'
security_protocol = 'SASL_SSL'
sasl_mechanism = 'SCRAM-SHA-512'
sasl_plain_username = 'XXXXXX'
sasl_plain_password = 'XXXXXX'
# 生产者配置
producer_config = {
'bootstrap_servers': bootstrap_servers,
'security_protocol': security_protocol,
'sasl_mechanism': sasl_mechanism,
'sasl_plain_username': sasl_plain_username,
'sasl_plain_password': sasl_plain_password
}
# 创建生产者
try:
log_info("尝试连接 Kafka 集群...")
producer = KafkaProducer(**producer_config)
log_info("Kafka 生产者创建成功")
except Exception as e:
log_error(f"创建 Kafka 生产者失败: {type(e).__name__}: {str(e)}")
return 1
# 处理文件内容
processed_count = 0
try:
with open(file_path, "r", encoding='utf-8') as file:
log_info(f"打开文件: {file_path}")
for line_num, line in enumerate(file, 1):
# 清理行
raw_line = line.rstrip('\n')
if not raw_line:
continue
log_info(f"处理第 {line_num} 行: {raw_line[:100]}...")
try:
parts = raw_line.split('|', 2)
if len(parts) < 3:
log_error(f"无效格式 - 需要至少3个部分,但找到 {len(parts)}")
log_debug(f"行内容: {raw_line}")
continue
topic, delay_str, original_message = parts
log_debug(f"主题: {topic}, 延迟: {delay_str}, 消息长度: {len(original_message)}")
# 关键修复1:检查消息格式
if not is_valid_json(original_message):
log_warning("原始消息不是有效的JSON格式")
# 尝试修复:去除外层多余的双引号
if original_message.startswith('"') and original_message.endswith('"'):
original_message = original_message[1:-1]
log_info("已移除外层多余的双引号")
if is_valid_json(original_message):
log_info("修复后消息格式有效")
else:
log_error("修复后消息格式仍然无效")
continue
else:
log_error("无法修复消息格式,跳过")
continue
# 解析延迟
try:
delay = int(delay_str) if delay_str.isdigit() else 0
log_debug(f"延迟设置: {delay}秒")
except ValueError:
log_error(f"无效的延迟值: '{delay_str}', 使用默认值 0")
delay = 0
log_debug(f"原始消息: {original_message}...")
# 处理延迟
if delay > 0:
log_info(f"延迟 {delay} 秒...")
time.sleep(delay)
# 生成时间戳
try:
timestamp = datetime.datetime.now().astimezone(pytz.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
log_info(f"生成的时间戳: {timestamp}")
except Exception as e:
log_error(f"生成时间戳失败: {type(e).__name__}: {str(e)}")
return 1
# 定义时间戳模式
pattern1 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}'
pattern2 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\+\d{2}:\d{2}'
pattern3 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\+\d{4}'
# 执行替换
processed_message = re.sub(pattern1, timestamp, original_message)
processed_message = re.sub(pattern2, timestamp + 'Z', processed_message)
processed_message = re.sub(pattern3, timestamp + 'Z', processed_message)
log_info(f"转换后消息: {processed_message}...")
# 关键修复2:再次验证消息格式
if not is_valid_json(processed_message):
log_error("转换后消息不是有效的JSON格式")
log_debug(f"完整消息内容: {processed_message}")
continue
# 发送消息
try:
log_info(f"发送消息到主题 {topic}...")
producer.send(topic, processed_message.encode('utf-8'))
producer.flush()
log_info("消息发送成功")
processed_count += 1
except Exception as e:
log_error(f"发送消息失败: {type(e).__name__}: {str(e)}")
except Exception as e:
log_error(f"处理第 {line_num} 行失败: {type(e).__name__}: {str(e)}")
log_error(traceback.format_exc())
except Exception as e:
log_error(f"处理文件失败: {type(e).__name__}: {str(e)}")
log_error(traceback.format_exc())
return 1
# 清理资源
log_info(f"处理完成,成功发送 {processed_count} 条消息")
try:
producer.close()
log_info("Kafka 生产者已关闭")
except:
pass
return 0
if __name__ == "__main__":
try:
exit_code = main()
log_info(f"脚本退出码: {exit_code}")
sys.exit(exit_code)
except Exception as e:
log_error(f"未处理的异常: {type(e).__name__}: {str(e)}")
log_error(traceback.format_exc())
sys.exit(2)
WEB 端网页代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Elevator Data Simulation Platform</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<div class="container">
<h1>Elevator Data Simulation Platform</h1>
<form id="messageForm">
<div class="form-row">
<div class="form-col">
<div class="form-group">
<label for="controller">Controller Type</label>
<select id="controller" class="form-control">
<option value="LCE" selected>LCE</option>
<option value="KCE">KCE</option>
<option value="KCECPUC">KCECPUC</option>
<option value="GCE">GCE</option>
<option value="STEP">STEP</option>
<option value="ESC">ESC</option>
<option value="KSE">KSE</option>
<option value="DTU">DTU</option>
<option value="AE">AE</option>
</select>
</div>
</div>
<div class="form-col">
<div class="form-group">
<label for="msgType">Message Type</label>
<select id="msgType" class="form-control">
<option value="serviceNeed" selected>ServiceNeed</option>
<option value="fault">Fault</option>
<option value="alarm">Alarm</option>
<option value="faultRecovered">Fault Recovered</option>
<option value="movementData">Movement Data</option>
<option value="serviceModeChange">Service Mode Change</option>
<option value="usagedata">Routine Call</option>
<option value="button">Button Event</option>
<option value="door">Door Operation</option>
<option value="realtimestatus">Online Status</option>
<option value="serviceorderv2">Service Order v2</option>
<option value="edge">Edge</option>
<option value="upperpitsensor">AnyEscalator Sensor Stats</option>
<option value="upperpitnoise">AnyEscalator Sound Stats</option>
<option value="lowerpitsensor">AnyEscalator Sensor Stats2</option>
<option value="lowerpitnoise">AnyEscalator Sound Stats2</option>
<option value="movementv2">Movement v2</option>
<option value="modemaintenancechange">Main Fault</option>
<option value="ckpicapabilities">Main FaultRecovered</option>
<option value="ckpidailystatistics">Fault Gcd</option>
<option value="noservicepredict">Fault GcdRecovered</option>
<option value="toolconnectivity">Fault Gw</option>
<option value="toolconnectivity">Fault GwRecovered</option>
<option value="modemaintenancechange">Mode Maintenance Change</option>
<option value="ckpicapabilities">CKPI Capabilities</option>
<option value="ckpidailystatistics">CKPI Daily Statistics</option>
<option value="noservicepredict">Noser Vice Predict</option>
<option value="toolconnectivity">Tool Connectivity</option>
</select>
</div>
</div>
<div class="form-col">
<div class="form-group">
<label for="dataSource">Data Source</label>
<select id="dataSource" class="form-control">
<option value="Kafka" selected>Kafka</option>
<option value="DTU">DTU</option>
</select>
</div>
</div>
</div>
<div class="form-row">
<div class="form-col">
<div class="form-group">
<label for="equipmentNumber">Equipment Number</label>
<input type="text" id="equipmentNumber" class="form-control" placeholder="e.g., 32009970">
<div class="input-hint">Optional - leave blank for random</div>
</div>
</div>
<div class="form-col">
<div class="form-group">
<label for="serviceNeedCode">ServiceNeed Code</label>
<input type="text" id="serviceNeedCode" class="form-control" placeholder="e.g., GBRM12">
<div class="input-hint">Required for ServiceNeed type</div>
</div>
</div>
<div class="form-col">
<div class="form-group">
<label for="messageCount">Number of Messages</label>
<input type="number" id="messageCount" class="form-control" placeholder="e.g., 1" value="1" min="1" max="100">
<div class="input-hint">How many messages to generate</div>
</div>
</div>
</div>
<div class="button-container">
<button type="submit" class="btn btn-primary">Generate Messages</button>
<button type="button" class="btn btn-secondary" id="clearButton">Clear Messages</button>
<button type="button" class="btn btn-danger" id="resetButton">Reset All</button>
<button type="button" class="btn btn-success" id="kafkaButton">Send to Kafka</button>
</div>
</form>
<div id="response" class="response"></div>
<div id="messageBox" class="message-box"></div>
</div>
<!-- 复制提示框 -->
<div id="copyAlert" class="alert copy-alert">
<strong>Message copied to clipboard successfully!</strong>
</div>
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
</body>
</html>
JavaScript 脚本代码
// 在脚本最开头添加全局变量声明
let currentMessageContent = null; // 存储当前生成的消息
let alertTimeout = null; // 用于提示框的定时器
let isKafkaSending = false; // 标记Kafka是否正在发送中
let activeAlert = null; // 跟踪当前活动的提示框
// 表单提交事件
document.getElementById('messageForm').addEventListener('submit', async (e) => {
e.preventDefault();
const controller = document.getElementById('controller').value;
const msgType = document.getElementById('msgType').value;
const dataSource = document.getElementById('dataSource').value;
const equipmentNumber = document.getElementById('equipmentNumber').value;
const serviceNeedCode = document.getElementById('serviceNeedCode').value;
const messageCount = parseInt(document.getElementById('messageCount').value) || 1;
document.getElementById('messageBox').innerHTML = '';
document.getElementById('response').innerHTML = '';
if (!controller || !msgType || !dataSource) {
showAlert('Please select Controller Type, Message Type, and Data Source!', 'error');
return;
}
try {
const response = await fetch('/get_template', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
controller,
msgType,
dataSource,
equipmentNumber,
serviceNeedCode,
messageCount
})
});
const data = await response.json();
if (data.status === 'success') {
let messageContent = data.message;
// 存储当前消息
currentMessageContent = messageContent;
// 处理DTU数据源的特殊要求:移除topic字段
if (dataSource === 'DTU') {
if (Array.isArray(messageContent)) {
messageContent = messageContent.map(msg => {
const { topic, ...rest } = msg;
return rest;
});
} else {
const { topic, ...rest } = messageContent;
messageContent = rest;
}
}
// 渲染消息区域
document.getElementById('messageBox').innerHTML = `
<div class="message-content">
<pre>${JSON.stringify(messageContent, null, 2)}</pre>
</div>
<button class="btn copy-btn">Copy Message</button>
`;
// 根据生成的消息数量显示不同的提示信息
const successMessage = messageCount > 1
? `${messageCount} messages generated Successfully.`
: `Message generated successfully!`;
showAlert(successMessage, 'success');
// 绑定复制按钮事件
const copyBtn = document.querySelector('.copy-btn');
if (copyBtn) {
copyBtn.addEventListener('click', async () => {
try {
const messageText = document.querySelector('.message-content pre').textContent;
if (!messageText) {
throw new Error('No message content to copy.');
}
if (navigator.clipboard) {
await navigator.clipboard.writeText(messageText);
} else {
// 回退方案
const tempTextArea = document.createElement('textarea');
tempTextArea.value = messageText;
document.body.appendChild(tempTextArea);
tempTextArea.select();
document.execCommand('copy');
document.body.removeChild(tempTextArea);
}
showCopyAlert();
} catch (err) {
console.error('Failed to copy message:', err);
showAlert('Failed to copy message.', 'error');
}
});
}
} else {
showAlert(data.message || 'Failed to generate message.', 'error');
}
} catch (error) {
console.error('Error:', error);
showAlert('Failed to communicate with the server', 'error');
}
// 滚动到消息区域
document.getElementById('messageBox').scrollIntoView({ behavior: 'smooth' });
});
// Kafka按钮事件绑定
document.getElementById('kafkaButton').addEventListener('click', async () => {
// 防止多次点击导致重复请求
if (isKafkaSending) return;
if (!currentMessageContent) {
showAlert('Please generate a message first!', 'error');
return;
}
try {
// 标记为正在发送中
isKafkaSending = true;
// 更新按钮状态
const kafkaBtn = document.getElementById('kafkaButton');
kafkaBtn.disabled = true;
kafkaBtn.textContent = 'Sending...';
// 显示发送中的提示
showKafkaAlert('Sending message to Kafka...', 'info');
// 然后发送到Kafka
await sendToKafka(currentMessageContent);
} catch (error) {
console.error('Kafka send error:', error);
// 优化错误信息显示
let errorMessage;
// 处理 AbortError 特殊类型
if (error.name === 'AbortError') {
errorMessage = 'Request timed out. Please check server status.';
}
// 处理网络错误
else if (error.name === 'TypeError' && error.message.includes('Failed to fetch')) {
errorMessage = 'Network error: Failed to connect to server';
}
// 处理连接重置错误
else if (error.message.includes('ERR_CONNECTION_RESET')) {
errorMessage = 'Connection reset by server. Please try again later.';
}
// 其他错误
else {
errorMessage = error.message;
}
// 限制错误信息长度
if (errorMessage.length > 200) {
errorMessage = errorMessage.substring(0, 200) + '...';
}
showKafkaAlert(`Failed to send to Kafka: ${errorMessage}`, 'error');
} finally {
// 无论成功与否,都恢复按钮状态
const kafkaBtn = document.getElementById('kafkaButton');
kafkaBtn.disabled = false;
kafkaBtn.textContent = 'Send to Kafka';
isKafkaSending = false;
}
});
// 清除按钮功能
document.getElementById('clearButton').addEventListener('click', () => {
document.getElementById('messageBox').innerHTML = '';
document.getElementById('response').innerHTML = '';
currentMessageContent = null;
showAlert('Messages cleared successfully!', 'success');
});
// 重置按钮功能
document.getElementById('resetButton').addEventListener('click', () => {
// 重置表单字段
document.getElementById('controller').value = '';
document.getElementById('msgType').value = '';
document.getElementById('dataSource').value = '';
document.getElementById('equipmentNumber').value = '';
document.getElementById('serviceNeedCode').value = '';
document.getElementById('messageCount').value = '';
// 清除消息显示
document.getElementById('messageBox').innerHTML = '';
document.getElementById('response').innerHTML = '';
currentMessageContent = null;
showAlert('All fields reset successfully!', 'success');
});
/**
* 发送消息到Kafka
* @param {Object} messageContent 消息内容
*/
async function sendToKafka(messageContent) {
// 创建AbortController用于超时处理
const controller = new AbortController();
let timeoutId;
try {
// 处理消息:如果是数组则处理每个消息,否则包装成数组
const messages = Array.isArray(messageContent) ? messageContent : [messageContent];
// 创建要发送的数据
const payload = {
messages: messages.map(msg => {
// 确保消息是完整JSON字符串
try {
return {
topic: 'enriched_service_needs',
delay: 0,
message: JSON.stringify(msg)
};
} catch (e) {
console.error("Failed to stringify message:", msg);
throw new Error("Invalid message content");
}
})
};
// 设置超时
timeoutId = setTimeout(() => {
// 添加明确的超时原因
controller.abort(new Error('Request timed out after 30 seconds'));
}, 30000); // 30秒超时
// 发送到后端
const response = await fetch('/send_to_kafka', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(payload),
signal: controller.signal
});
// 清除超时定时器
clearTimeout(timeoutId);
// 检查响应状态
if (!response.ok) {
const text = await response.text();
throw new Error(`Server returned ${response.status}: ${text}`);
}
// 解析JSON响应
const result = await response.json();
// 根据后端响应显示结果
if (result.status === 'success') {
const successMessage = result.message || 'Message sent to Kafka successfully!';
showKafkaAlert(successMessage, 'success');
} else {
throw new Error(result.message || 'Failed to send message to Kafka');
}
} catch (error) {
// 清除超时定时器
if (timeoutId) clearTimeout(timeoutId);
console.error('Kafka send error:', error);
// 重新抛出错误以便上层处理
throw error;
}
}
/**
* 清除现有提示框
*/
function clearExistingAlert() {
// 清除定时器
if (alertTimeout) {
clearTimeout(alertTimeout);
alertTimeout = null;
}
// 移除现有提示框
if (activeAlert) {
// 添加淡出动画
activeAlert.classList.remove('show');
// 等待动画完成后再移除元素
setTimeout(() => {
// 安全移除:检查父节点是否存在
if (activeAlert && activeAlert.parentNode) {
activeAlert.remove();
}
activeAlert = null;
}, 300);
}
}
/**
* 显示提示框
* @param {string} message 消息内容
* @param {string} type 消息类型(success, error)
*/
function showAlert(message, type = 'default') {
// 清除现有提示框
clearExistingAlert();
// 创建提示框元素
const alertDiv = document.createElement('div');
alertDiv.className = `alert ${type}`;
alertDiv.innerHTML = `<strong>${message}</strong>`;
// 添加到页面
document.body.appendChild(alertDiv);
// 存储当前活动提示框
activeAlert = alertDiv;
// 强制触发重绘以便动画生效
void alertDiv.offsetWidth;
// 添加show类触发动画
alertDiv.classList.add('show');
// 设置自动消失时间
const delay = type === 'success' ? 5000 : type === 'error' ? 10000 : 3000;
alertTimeout = setTimeout(() => {
// 添加淡出动画
alertDiv.classList.remove('show');
// 等待动画完成后再移除元素
setTimeout(() => {
if (alertDiv.parentNode) {
alertDiv.remove();
}
activeAlert = null;
}, 300);
}, delay);
}
/**
* 显示Kafka状态提示
* @param {string} message 消息文本
* @param {string} type 消息类型(info, success, error)
*/
function showKafkaAlert(message, type = 'info') {
// 清除现有提示框
clearExistingAlert();
// 创建提示框元素
const kafkaAlert = document.createElement('div');
kafkaAlert.className = `alert kafka-alert ${type}`;
kafkaAlert.innerHTML = `<strong>Kafka Status:</strong> ${message}`;
// 添加到页面
document.body.appendChild(kafkaAlert);
// 存储当前活动提示框
activeAlert = kafkaAlert;
// 强制触发重绘以便动画生效
void kafkaAlert.offsetWidth;
// 添加show类触发动画
kafkaAlert.classList.add('show');
// 设置自动消失时间
const delay = type === 'success' ? 5000 : type === 'info' ? 3000 : 10000;
alertTimeout = setTimeout(() => {
// 添加淡出动画
kafkaAlert.classList.remove('show');
// 等待动画完成后再移除元素
setTimeout(() => {
if (kafkaAlert.parentNode) {
kafkaAlert.remove();
}
activeAlert = null;
}, 300);
}, delay);
}
/**
* 显示复制提示
*/
function showCopyAlert() {
// 清除现有提示框
clearExistingAlert();
// 创建提示框元素
const copyAlert = document.createElement('div');
copyAlert.className = 'alert copy-alert';
copyAlert.innerHTML = '<strong>Message copied to clipboard successfully!</strong>';
// 添加到页面
document.body.appendChild(copyAlert);
// 存储当前活动提示框
activeAlert = copyAlert;
// 强制触发重绘以便动画生效
void copyAlert.offsetWidth;
// 添加show类触发动画
copyAlert.classList.add('show');
// 3秒后自动清除
alertTimeout = setTimeout(() => {
// 添加淡出动画
copyAlert.classList.remove('show');
// 等待动画完成后再移除元素
setTimeout(() => {
if (copyAlert.parentNode) {
copyAlert.remove();
}
activeAlert = null;
}, 300);
}, 3000);
}
CSS 样式代码
:root {
--primary-color: #3498db;
--secondary-color: #2c3e50;
--light-bg: #f8f9fa;
--dark-text: #2c3e50;
--border-color: #e1e5e9;
--success-color: #27ae60;
--error-color: #e74c3c;
--info-color: #3498db;
--kafka-color: #9b59b6;
}
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
line-height: 1.6;
color: var(--dark-text);
background-color: #f5f7fa;
padding: 20px;
}
.container {
max-width: 1000px;
margin: 0 auto;
background: white;
border-radius: 10px;
padding: 30px;
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.1);
}
h1 {
text-align: center;
color: var(--secondary-color);
margin-bottom: 20px;
padding-bottom: 15px;
border-bottom: 1px solid var(--border-color);
}
.form-row {
display: flex;
flex-wrap: wrap;
margin: 0 -10px;
margin-bottom: 20px;
}
.form-col {
padding: 0 10px;
flex: 1;
min-width: 200px;
margin-bottom: 15px;
}
.form-group {
display: flex;
flex-direction: column;
height: 100%;
}
label {
margin-bottom: 8px;
font-weight: 600;
color: var(--secondary-color);
}
.form-control {
padding: 12px;
font-size: 1rem;
border: 1px solid var(--border-color);
border-radius: 6px;
width: 100%;
transition: all 0.3s;
}
.form-control:focus {
border-color: var(--primary-color);
outline: none;
box-shadow: 0 0 0 3px rgba(52, 152, 219, 0.15);
}
.input-hint {
font-size: 0.85rem;
color: #7f8c8d;
margin-top: 6px;
}
.button-container {
display: flex;
justify-content: center;
flex-wrap: wrap;
gap: 15px;
margin-top: 20px;
}
.btn {
padding: 12px 25px;
font-size: 1rem;
font-weight: 600;
border: none;
border-radius: 8px;
cursor: pointer;
transition: all 0.3s ease;
}
.btn-primary {
background-color: var(--primary-color);
color: white;
}
.btn-primary:hover {
background-color: #2980b9;
transform: translateY(-2px);
box-shadow: 0 4px 8px rgba(52, 152, 219, 0.25);
}
.btn-secondary {
background-color: #95a5a6;
color: white;
}
.btn-secondary:hover {
background-color: #7f8c8d;
transform: translateY(-2px);
}
.btn-danger {
background-color: #e74c3c;
color: white;
}
.btn-danger:hover {
background-color: #c0392b;
transform: translateY(-2px);
}
.btn-success {
background-color: var(--success-color);
color: white;
}
.btn-success:hover {
background-color: #219653;
transform: translateY(-2px);
box-shadow: 0 4px 8px rgba(39, 174, 96, 0.25);
}
.message-box {
margin-top: 30px;
padding: 20px;
border: 1px solid var(--border-color);
border-radius: 8px;
background-color: var(--light-bg);
}
.message-content pre {
white-space: pre-wrap;
word-break: break-word;
padding: 15px;
border-radius: 6px;
background-color: white;
max-height: 500px;
overflow: auto;
font-size: 0.95rem;
line-height: 1.5;
}
.copy-btn {
width: 100%;
padding: 12px;
color: white;
border: none;
border-radius: 6px;
margin-top: 15px;
font-weight: 600;
cursor: pointer;
transition: background-color 0.3s;
background-color: var(--secondary-color);
}
.copy-btn:hover {
background-color: #1a252f;
}
/* =================== 提示框统一样式 =================== */
.alert {
position: fixed;
top: 20px;
left: 50%;
transform: translateX(-50%);
padding: 15px 25px;
border-radius: 8px;
font-weight: 500;
color: white; /* 确保文字为白色 */
display: flex;
align-items: center;
justify-content: center;
z-index: 1000;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15);
opacity: 0;
transition: opacity 0.3s, transform 0.3s;
transform: translateX(-50%) translateY(-20px);
max-width: 90%;
text-align: center;
}
.alert.show {
opacity: 1;
transform: translateX(-50%) translateY(0);
}
/* 成功提示框 */
.alert.success {
background-color: var(--success-color);
}
/* 错误提示框 */
.alert.error {
background-color: var(--error-color);
}
/* 信息提示框 */
.alert.info {
background-color: var(--info-color);
}
/* Kafka提示框 */
.alert.kafka-alert {
background-color: var(--kafka-color); /* 紫色背景 */
}
/* 复制提示框 */
.alert.copy-alert {
background-color: var(--success-color);
}
/* =================== 响应式设计 =================== */
@media (max-width: 768px) {
.form-col {
flex: 1 1 100%;
}
.button-container {
flex-direction: column;
align-items: center;
}
.btn {
width: 100%;
max-width: 300px;
margin-bottom: 10px;
}
.alert {
width: 90%;
padding: 12px 20px;
font-size: 0.9rem;
}
}
@media (max-width: 480px) {
.container {
padding: 15px;
}
.form-control {
padding: 10px;
}
.btn {
padding: 10px 15px;
font-size: 0.9rem;
}
.message-content pre {
padding: 10px;
font-size: 0.85rem;
}
.alert {
padding: 12px 15px;
font-size: 0.85rem;
}
}
实际效果
TesterHome 为用户提供「保留所有权利,禁止转载」的选项。
除非获得原作者的单独授权,任何第三方不得转载标注了「原创声明:保留所有权利,禁止转载」的内容,否则均视为侵权。
具体请参见TesterHome 知识产权保护协议。
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暫無回覆。