在人工智能大模型的测评领域,我们往往关注模型的准确率、响应速度、成本等指标,但有一个同样重要却容易被忽视的维度——稳定性(Stability)。一个模型可能在某一次测试中表现出色,但面对相同问题的多次询问,是否能保持一致的高质量输出?这正是稳定性测评需要回答的核心问题。
本文将深入探讨大模型稳定性测评的意义、挑战以及完整的技术实现方案,希望为从事大模型测评和应用的工程师提供参考。
大语言模型(LLM)基于概率统计的生成机制,同一个问题的多次询问,即使在相同参数下,也可能产生不同的答案。这种随机性来源于:
这种不确定性在某些场景下是优势(如创意写作),但在需要稳定输出的场景(如金融分析、医疗诊断、法律咨询)则是严重的风险。
在实际生产环境中,我们遇到过多个因模型稳定性问题导致的事故:
案例 1:客服机器人的"心情不稳定"
案例 2:金融风险评估的"摇摆判断"
案例 3:代码生成的"随机 bug"
这些案例表明,稳定性是模型可用性的前提。一个准确率 95% 但稳定性差的模型,在实际应用中的价值可能远低于准确率 90% 但高度稳定的模型。
传统的模型测评方法(如单次测试、随机抽样)存在明显的局限性:
维度 | 传统方法 | 问题 |
---|---|---|
测试次数 | 每题测试 1 次 | 无法发现不稳定性 |
数据覆盖 | 随机抽样 | 关键场景可能遗漏 |
结果分析 | 平均准确率 | 掩盖了个体差异 |
风险识别 | 事后发现 | 生产环境出问题才暴露 |
我们需要一个能够系统性地量化和分析模型稳定性的测评方法。
稳定性测评的核心思想是:对同一个问题进行 N 轮重复测试,分析模型答案的一致性和正确率分布。
具体来说:
我们设计了一套完整的稳定性指标体系:
1. 题目级指标
2. 整体分布指标
json
{
"0次对": 5题 (5%),
"1次对": 3题 (3%),
...
"10次全对": 70题 (70%)
}
3. 业务级指标
维度 | 传统评测 | 稳定性评测 |
---|---|---|
测试次数 | 每题 1 次 | 每题 N 次(5-100) |
结果维度 | 对/错 | 成功率分布 |
风险发现 | 平均准确率 | 不稳定题目识别 |
API 调用量 | M 题 × K 模型 | M 题 × 1 模型 × N 轮 |
适用场景 | 多模型横向对比 | 单模型深度分析 |
成本 | 较低 | 较高(N 倍) |
实现一个生产级的稳定性测评系统,面临以下技术挑战:
针对上述挑战,我们设计了一套完整的解决方案:
核心思想:多道题同时评测,但每道题的 N 轮按顺序执行
传统串行方案:
题1轮1 → 题1轮2 → ... → 题1轮N → 题2轮1 → ... → 题M轮N
耗时:M × N × T(T为单次调用耗时)
轮次级并发方案(方案二):
(题1轮1, 题1轮2, ..., 题1轮N, 题2轮1, ...) 全部并发
问题:破坏了轮次顺序,可能影响测评结果
题目级并发方案(方案一,我们的选择):
并发组1: (题1轮1→轮2→...→轮N)
并发组2: (题2轮1→轮2→...→轮N)
...
并发组C: (题C轮1→轮2→...→轮N)
耗时:约 (M / C) × N × T(C为并发数)
优势:
代码实现:
# 创建信号量控制并发
sem_question = asyncio.Semaphore(concurrency_limit)
async def evaluate_single_question(idx, question):
"""评测单个题目的所有轮次(串行)"""
async with sem_question: # 限制题目并发数
round_results = []
# 串行执行N轮
for round_num in range(1, repetition_count + 1):
# 调用模型获取答案
answer = await fetch_model_answer(question)
# 裁判模型评分
is_correct, score, reason = await judge_answer(question, answer)
round_results.append({
'round': round_num,
'answer': answer,
'correct': is_correct,
'score': score,
'reason': reason
})
return idx, round_results
# 使用 asyncio.gather 并发执行所有题目
tasks = [evaluate_single_question(idx, q) for idx, q in enumerate(questions)]
results = await asyncio.gather(*tasks)
核心思想:从配置文件动态读取每个模型的并发限制,而不是硬编码
# 从模型工厂获取并发限制(配置驱动)
concurrency_limit = model_factory.get_model_concurrent_limit(model_name)
# 配置示例(config.py)
MODEL_CONCURRENT_LIMITS = {
'HKGAI-V1': 3, # 港话通并发限制较低
'HKGAI-V2': 5, # 新版本提高并发
'Gemini': 10, # Google API并发较高
'GPT-5': 20, # OpenAI API并发很高
'Doubao': 8 # 豆包中等并发
}
优势:
核心思想:复用 HTTP 连接,减少连接建立和销毁的开销
# 创建优化的连接池
connector = aiohttp.TCPConnector(
limit=50, # 全局最大连接数
limit_per_host=20, # 单个主机最大连接数
ttl_dns_cache=300 # DNS缓存5分钟
)
# 设置合理的超时时间
timeout = aiohttp.ClientTimeout(
total=600, # 总超时10分钟
connect=30, # 连接超时30秒
sock_read=300 # 读取超时5分钟
)
# 在整个测评过程中复用session
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# 所有API调用共享这个session
results = await asyncio.gather(*tasks)
性能提升:
核心思想:使用线程锁保护共享变量,避免并发更新冲突
import threading
# 进度追踪
completed_steps = 0
progress_lock = threading.Lock()
async def update_progress():
"""线程安全的进度更新"""
nonlocal completed_steps
with progress_lock:
completed_steps += 1
if task_id in task_status:
task_status[task_id].progress = completed_steps
task_status[task_id].current_step = f"已完成 {completed_steps}/{total}"
核心思想:单个请求失败不应该导致整个测评中断
# 使用 return_exceptions=True 捕获异常
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for result in results:
if isinstance(result, Exception):
print(f"❌ 题目评测异常: {result}")
# 记录错误日志,但继续处理其他结果
continue
if isinstance(result, tuple) and len(result) == 2:
idx, result_data = result
stability_results[idx] = result_data
稳定性测评系统采用分层架构,各模块职责清晰:
文件位置:utils/evaluation_engine.py
核心函数:evaluate_stability
async def evaluate_stability(
data: List[Dict], # 题目列表
model_name: str, # 模型名称(单个)
judge_model: str, # 裁判模型
repetition_count: int, # 重复轮次 (1-100)
task_id: str, # 任务ID
filename: str = None, # 文件名
results_folder: str = None # 结果文件夹
) -> Tuple[str, Dict]:
"""
稳定性评测核心引擎
返回:(结果文件路径, 元数据字典)
"""
执行流程:
函数:judge_answer_by_score
核心逻辑:
(is_correct: bool, score: int, reason: str)
评分提示词设计:
prompt = f"""你是一个严格的评分裁判。请对被测模型的答案进行评分。
问题:{query}
被测模型答案:{answer}
评分标准(0-1分制):
- 1分:答案完全正确、准确、完整,符合问题要求
- 0分:答案错误、不准确、不完整、答非所问或无法理解
请严格按照以下JSON格式输出:
{{
"score": 1, // 必须是0或1
"reason": "评分理由"
}}
"""
为什么选择 0-1 分制?
函数:calculate_stability_distribution
核心逻辑:
def calculate_stability_distribution(stability_results: dict, repetition_count: int) -> dict:
"""计算稳定性分布"""
# 动态生成0到N的所有分类
distribution_counts = {i: 0 for i in range(repetition_count + 1)}
# 统计每个正确次数的题目数
for result in stability_results.values():
correct_count = result['correct_count']
if 0 <= correct_count <= repetition_count:
distribution_counts[correct_count] += 1
# 计算百分比
distribution_percent = {
count: round(num / total * 100, 2)
for count, num in distribution_counts.items()
}
return {
'distribution_counts': distribution_counts,
'distribution_percent': distribution_percent,
'total_questions': total,
'repetition_count': repetition_count
}
输出示例:
{
"distribution_counts": {
"0": 2, // 2道题全错
"1": 1, // 1道题只对1次
"2": 3, // 3道题对2次
...
"10": 70 // 70道题全对
},
"distribution_percent": {
"0": 2.0,
"1": 1.0,
"2": 3.0,
...
"10": 70.0
},
"total_questions": 100,
"repetition_count": 10
}
函数:generate_stability_csv
CSV 结构设计:
题目编号,问题,标准答案,类型,第1轮_答案,第1轮_评分,第1轮_理由,第2轮_答案,第2轮_评分,第2轮_理由,...,正确次数,成功率
1,1+1等于几?,2,客观题,2,1,答案正确,2,1,答案正确,...,10,100%
2,中国的首都是?,北京,客观题,北京,1,正确,上海,0,错误,...,8,80%
动态列生成:
repetition_count
动态生成列数新增字段:
ALTER TABLE evaluation_results
ADD COLUMN repetition_count INT DEFAULT 1
COMMENT '重复轮次:1=普通评测,N=稳定性评测N轮';
ADD INDEX idx_results_repetition (repetition_count);
新增字段:
ALTER TABLE running_tasks
ADD COLUMN repetition_count INT DEFAULT 1
COMMENT '重复轮次:1=普通评测,N=稳定性评测N轮';
使用 JSON 格式存储详细结果:
{
"stability_results": {
"0": {
"question": "问题内容",
"standard_answer": "标准答案",
"type": "题目类型",
"rounds": [
{"round": 1, "answer": "答案1", "correct": true, "score": 1, "reason": "正确"},
{"round": 2, "answer": "答案2", "correct": false, "score": 0, "reason": "错误"}
],
"correct_count": 8,
"success_rate": 0.8
}
},
"distribution": {
"distribution_counts": {...},
"distribution_percent": {...},
"total_questions": 100,
"repetition_count": 10
},
"model": "HKGAI-V2",
"judge_model": "gemini"
}
我们使用了两层信号量(Semaphore)进行并发控制:
sem_model = asyncio.Semaphore(10) # 模型调用并发(全局)
sem_question = asyncio.Semaphore(concurrency_limit) # 题目级并发(可配置)
为什么需要两层?
sem_model
:限制同时进行的模型 API 调用总数(防止资源耗尽)sem_question
:限制同时评测的题目数量(符合厂商限流策略)举例说明:
concurrency_limit=5
(题目级并发)挑战:异步任务中如何实时更新进度给前端?
解决方案:
task_status
字典存储任务状态/progress/<task_id>
接口progress
和current_step
# 后端更新
with progress_lock:
completed_steps += 1
task_status[task_id].progress = completed_steps
task_status[task_id].current_step = f"题目{idx+1} 第{round_num}轮"
# 前端轮询(每2秒)
setInterval(async () => {
const response = await fetch(`/progress/${task_id}`);
const data = await response.json();
updateProgressBar(data.progress, data.total);
updateStatusText(data.current_step);
}, 2000);
层级 1:单次 API 调用失败
try:
answer = await fetch_model_answer(query)
except Exception as e:
print(f"⚠️ API调用失败: {e}")
answer = "[调用失败]"
# 记录为错误答案,继续执行
层级 2:单轮评分失败
try:
is_correct, score, reason = await judge_answer(query, answer)
except Exception as e:
print(f"⚠️ 评分失败: {e}")
is_correct, score, reason = False, 0, f"评分失败: {str(e)}"
# 记录为0分,继续执行
层级 3:整个题目评测失败
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"❌ 题目评测异常: {result}")
# 跳过这道题,继续其他题目
continue
设计原则:局部失败不影响全局,最大程度完成评测任务。
挑战:如何直观展示稳定性分布?
解决方案:卡片式分布统计
前端页面(templates/results.html
)动态生成分布卡片:
// 检测稳定性评测结果
const scoreColumns = columns.filter(col => col.includes('轮_评分'));
if (scoreColumns.length > 0) {
// 统计每道题的正确轮数
const distribution = {};
filteredData.forEach(row => {
let correctCount = 0;
scoreColumns.forEach(col => {
if (parseFloat(row[col]) === 1) correctCount++;
});
distribution[correctCount] = (distribution[correctCount] || 0) + 1;
});
// 生成卡片(从高到低)
for (let i = totalRounds; i >= 0; i--) {
const count = distribution[i] || 0;
const percent = (count / totalQuestions * 100).toFixed(1);
// 根据稳定性等级设置颜色
let color = i === totalRounds ? '#4CAF50' : // 全对-绿色
i >= totalRounds * 0.8 ? '#8BC34A' : // 高稳定-浅绿
i >= totalRounds * 0.5 ? '#FFC107' : // 不稳定-黄色
'#FF5722'; // 低稳定-红色
statsHTML += `
<div class="distribution-card" style="border-left: 4px solid ${color};">
<div class="distribution-label">${i}次全对</div>
<div class="distribution-value">${count}题 (${percent}%)</div>
</div>
`;
}
}
效果:
我们进行了详细的性能对比测试:
测试环境:
测试结果:
方案 | 耗时 | 加速比 | 说明 |
---|---|---|---|
串行执行 | 33 分 20 秒 | 1.0x | 基准方案 |
轮次级并发(并发 10) | 3 分 34 秒 | 9.3x | 破坏轮次顺序 |
题目级并发(并发 5) | 6 分 40 秒 | 5.0x | 推荐方案 |
题目级并发(并发 10) | 3 分 35 秒 | 9.3x | 资源消耗高 |
结论:
优化前:
优化后:
配置优化:
connector = aiohttp.TCPConnector(
limit=50, # 全局最大连接数(根据并发需求调整)
limit_per_host=20, # 单个主机最大连接数(避免压垮服务器)
ttl_dns_cache=300, # DNS缓存5分钟(减少DNS查询)
enable_cleanup_closed=True # 自动清理关闭的连接
)
挑战:1000 次调用产生大量中间数据(答案、评分、理由),如何避免内存溢出?
解决方案:
# 流式写入CSV
with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(headers) # 写入表头
# 边评测边写入
for idx, result_data in stability_results.items():
row = build_csv_row(result_data)
writer.writerow(row)
# result_data用完后立即释放
del result_data
多层超时设计:
# 1. 全局超时(整个测评任务)
timeout = aiohttp.ClientTimeout(total=600) # 10分钟
# 2. 单次API调用超时
async with asyncio.timeout(120): # 2分钟
response = await fetch_model_answer(query)
# 3. 连接超时
connector = aiohttp.TCPConnector(connect_timeout=30) # 30秒
超时后的处理:
自稳定性测评功能上线以来,我们收集了以下数据:
使用情况(截至 2025 年 1 月):
发现的问题:
典型案例:
案例 1:数学推理的不稳定性
案例 2:开放性问题的极端不稳定
案例 3:知识类问题的高稳定性
我们对比了稳定性和传统准确率指标:
模型 | 传统准确率 | 平均稳定性 | 完美题目占比 | 评价 |
---|---|---|---|---|
模型 A | 95% | 88% | 65% | 高准确率 + 高稳定性 ⭐⭐⭐⭐⭐ |
模型 B | 90% | 92% | 70% | 中准确率 + 高稳定性 ⭐⭐⭐⭐ |
模型 C | 93% | 75% | 45% | 高准确率 + 低稳定性 ⭐⭐⭐ |
模型 D | 85% | 83% | 50% | 中准确率 + 中稳定性 ⭐⭐⭐ |
发现:
大模型稳定性测评是模型评估体系中不可或缺的一环。通过本文介绍的完整技术方案,我们可以: