问题描述
在测试报告导入功能中,当上一批数据导入完成后触发后台计算任务,此时再导入大量 Excel 文件会导致:
请求超时:导入请求无法及时响应
文件上传解析慢:
python_multipart.multipart日志显示解析很慢负载均衡无效:虽然使用了
--workers=3,但似乎没有作用
问题分析
1. 架构问题
1.1 多进程 + 线程池的配置
# main.py
uvicorn.run("src.app:app", host="127.0.0.1", port=8000, workers=3)
# src/test_report/service.py
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=4, thread_name_prefix="calculate_worker")
问题:
使用
--workers=3启动了 3 个独立的进程每个进程都有自己的
ThreadPoolExecutor(max_workers=4)实际上有 3 × 4 = 12 个线程在执行后台任务
每个进程内的线程池都在执行 CPU 密集型任务
1.2 资源竞争
进程 1 (Worker 1)
├── 请求处理线程 (处理导入请求)
├── 后台任务线程 1 (计算 SPC)
├── 后台任务线程 2 (计算 SPC)
├── 后台任务线程 3 (计算 SPC)
└── 后台任务线程 4 (计算 SPC)
进程 2 (Worker 2)
├── 请求处理线程 (处理导入请求)
├── 后台任务线程 1 (计算 SPC)
├── 后台任务线程 2 (计算 SPC)
├── 后台任务线程 3 (计算 SPC)
└── 后台任务线程 4 (计算 SPC)
进程 3 (Worker 3)
├── 请求处理线程 (处理导入请求)
├── 后台任务线程 1 (计算 SPC)
├── 后台任务线程 2 (计算 SPC)
├── 后台任务线程 3 (计算 SPC)
└── 后台任务线程 4 (计算 SPC)
问题:
当后台任务在执行时,会占用同一进程内的 CPU 资源
即使请求被路由到不同的进程,如果该进程的线程池正在执行任务,也会受影响
**GIL(全局解释器锁)**导致 CPU 密集型任务会影响同一进程内的其他操作
2. CPU 密集型任务
2.1 后台任务内容
async def calculate_entry(db: AsyncSessionDep, entry_id: int):
# 计算x_ucl、x_lcl、cpk、fixed_x_cl、fixed_sigma
# ...
agent_x_bar_r = getAgent(df, entry, spc_data_x_bar_r) # CPU 密集型
agent_cpk = getAgent(df, entry, spc_data_cpk) # CPU 密集型
fixed_x_cl, x_ucl, x_lcl = agent_x_bar_r.get_x_control_limits() # CPU 密集型
fixed_sigma = agent_x_bar_r.get_sigma() # CPU 密集型
cpk = agent_cpk.get_cpk() # CPU 密集型
问题:
XRSControlChart和CapabilityAnalysis的计算是 CPU 密集型任务涉及大量数学计算(pandas、numpy 操作)
每个 entry 的计算可能需要几百毫秒到几秒
2.2 任务执行流程
# 导入完成后立即触发后台任务
if successful_report_ids:
unique_report_ids = list(dict.fromkeys(successful_report_ids))
loop = asyncio.get_running_loop()
calculate_reports_entries_in_thread_pool(unique_report_ids, loop) # 立即提交到线程池
# 线程池执行
def _calculate_reports_entries_sync(report_ids: list[int]):
asyncio.run(calculate_reports_entries(report_ids)) # 创建新的事件循环
# 遍历所有 entries,每个都执行 CPU 密集型计算
问题:
导入完成后立即触发后台任务
如果有多个 report,每个 report 有多个 entries,会创建大量任务
线程池的 4 个 worker 会立即被占用
3. 文件上传解析慢
3.1 Multipart 解析机制
FastAPI 的文件上传处理:
FastAPI 使用 python-multipart 库来解析 multipart/form-data 格式的文件上传请求。当客户端通过 HTTP POST 请求上传文件时:
@router.post("/import")
async def import_test_report(db: AsyncSessionDep, files: list[UploadFile]):
# FastAPI 自动调用 python-multipart 解析上传的文件
# files: list[UploadFile] 中的每个 UploadFile 对象已经包含了解析后的文件流
解析过程:
接收 HTTP 请求体(I/O 操作)
从网络套接字读取原始字节流
数据以流式方式接收,不是一次性加载到内存
解析 Multipart 边界(CPU 密集型)
python_multipart.multipart - DEBUG - Calling on_part_data with data[0:262144]python-multipart需要解析 HTTP 请求体中的 multipart 边界标记识别每个文件部分的开始和结束位置
提取文件元数据(文件名、Content-Type 等)
这是同步操作,需要 CPU 资源来处理字节流
提取文件内容(CPU + I/O)
从 multipart 数据流中提取文件的实际内容
将文件内容存储到临时缓冲区或文件对象
对于大文件,需要多次读取和解析(每次 262144 字节,即 256KB)
问题分析:
Multipart 解析是同步操作:
python-multipart的解析过程是同步的,会阻塞事件循环虽然 FastAPI 使用异步框架,但底层的 multipart 解析是同步的
大文件上传时,解析过程会占用 CPU 资源
CPU 资源竞争:
解析 multipart 数据需要 CPU 来处理字节流、查找边界、提取内容
当同一进程内的线程池正在执行 CPU 密集型后台任务时,会竞争 CPU 资源
**GIL(全局解释器锁)**导致 CPU 密集型任务会影响其他操作
即使后台任务在独立进程中执行,文件解析仍需要 CPU 资源
大文件的影响:
文件越大,解析时间越长
多个文件同时上传时,解析过程会累积
解析过程中,事件循环可能被阻塞,影响其他请求的处理
示例场景:
场景:上传 10 个 Excel 文件,每个 5MB
时间线:
T1: 开始接收第 1 个文件(multipart 解析开始,占用 CPU)
T2: 解析第 1 个文件完成(耗时 200ms,期间事件循环可能被阻塞)
T3: 开始接收第 2 个文件(继续解析,占用 CPU)
...
T10: 所有文件解析完成(总耗时约 2 秒)
问题:
- 解析过程中,CPU 被占用
- 如果同时有其他请求到达,响应会变慢
- 事件循环可能被阻塞,导致其他接口卡慢
3.2 请求处理流程
1. 接收 HTTP 请求(I/O)
2. 解析 multipart 数据(需要 CPU)
3. 提取 Excel 内容(CPU + I/O)
4. 导入数据到数据库(I/O)
5. 提交事务(I/O)
6. 触发后台任务(CPU 密集型)
问题:
步骤 2-3 需要 CPU 资源
如果同一进程内的线程池正在执行后台任务,会竞争 CPU 资源
导致文件解析变慢,请求超时
4. 负载均衡无效的原因
4.1 进程级别的负载均衡
Uvicorn 的
--workers=3使用进程级别的负载均衡请求会被路由到不同的进程
但每个进程都有自己的线程池
4.2 问题场景
时间线:
T1: 进程 1 处理导入请求 A,完成后触发后台任务
T2: 进程 1 的线程池开始执行后台任务(占用 CPU)
T3: 新请求 B 被路由到进程 1
T4: 进程 1 的 CPU 被后台任务占用,请求 B 处理变慢
问题:
即使有空闲进程(进程 2、3),如果请求被路由到正在执行后台任务的进程,也会受影响
负载均衡是进程级别的,无法感知进程内的线程状态
5. 根本原因总结
线程池配置过大:
max_workers=4导致每个进程有 4 个线程执行 CPU 密集型任务资源竞争:CPU 密集型任务和请求处理在同一进程内竞争 CPU 资源
GIL 限制:Python 的 GIL 导致 CPU 密集型任务会影响同一进程内的其他操作
立即执行:导入完成后立即触发后台任务,没有延迟或限流机制
无优先级:后台任务和请求处理没有优先级区分
解决方案对比
问题约束
不能延迟执行:后台任务必须立即执行
不能影响请求处理:导入请求不能超时
现有架构:多进程(workers=3)+ 线程池(max_workers=4)
方案 1:减少线程池 Worker 数量
实现
# src/test_report/service.py
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="calculate_worker")
优点
✅ 实现简单:只需修改一行代码
✅ 立即生效:无需其他改动
✅ 减少资源竞争:每个进程只有 1 个线程执行后台任务
✅ 风险低:不会引入新的问题
缺点
❌ 后台任务执行变慢:从并行 4 个任务变为串行 1 个任务
❌ 吞吐量降低:如果有多个 report 需要计算,会排队执行
❌ 不能充分利用多核:CPU 密集型任务无法并行执行
❌ 治标不治本:仍然在同一进程内竞争资源
性能影响
请求处理:✅ 改善(减少 CPU 竞争)
后台任务速度:❌ 变慢(串行执行)
总体吞吐量:⚠️ 可能降低
方案 2:使用进程池(ProcessPoolExecutor)绕过 GIL ⭐⭐⭐⭐
实现
# src/test_report/service.py
import multiprocessing
import sys
# 使用进程池而不是线程池
_mp_context = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork")
_executor = concurrent.futures.ProcessPoolExecutor(
max_workers=2, # 每个进程 2 个 worker
mp_context=_mp_context
)
def _calculate_reports_entries_sync(report_ids: list[int]):
"""同步包装函数:在独立进程中运行异步的 calculate_reports_entries"""
try:
import asyncio
# 在 spawn 模式下,子进程会重新导入整个模块,所以原来的函数可以直接使用
asyncio.run(calculate_reports_entries(report_ids))
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"后台计算任务失败 (report_ids={report_ids}): {e}", exc_info=True)
def calculate_reports_entries_in_process_pool(report_ids: list[int], loop: asyncio.AbstractEventLoop):
"""将计算任务提交到进程池执行,不等待结果"""
executor = _get_process_pool_executor()
loop.run_in_executor(executor, _calculate_reports_entries_sync, report_ids)
优点
✅ 绕过 GIL:进程间不共享 GIL,真正的并行执行
✅ 隔离性好:后台任务在独立进程中,不影响请求处理
✅ 充分利用多核:可以并行执行多个 CPU 密集型任务
✅ 资源隔离:进程间内存隔离,更安全
✅ 代码复用:在 spawn 模式下,子进程会重新导入整个模块,可以直接使用原来的函数
缺点
❌ 进程开销大:创建进程比创建线程开销大
❌ 数据序列化:进程间通信需要 pickle 序列化
❌ 数据库连接:每个进程需要独立的数据库连接
❌ Windows 兼容性:需要使用
spawn模式,启动较慢❌ 代码复杂度:需要处理进程间通信和错误处理
注意事项
数据库连接不能跨进程共享,需要在每个进程中重新创建
函数参数需要可序列化(pickle)
Windows 上进程启动较慢
在 spawn 模式下,子进程会重新导入整个模块,所以原来的函数可以直接使用,无需重新定义
性能影响
请求处理:✅ 显著改善(完全隔离)
后台任务速度:✅ 可能更快(真正的并行)
总体吞吐量:✅ 可能提升
方案 3:使用 Semaphore 限制并发任务数
实现
# src/test_report/service.py
import asyncio
# 全局信号量,限制并发任务数
_calculate_semaphore = asyncio.Semaphore(1) # 最多 1 个并发任务
async def calculate_reports_entries_with_limit(report_ids: list[int]):
"""带并发限制的计算任务"""
async with _calculate_semaphore:
await calculate_reports_entries(report_ids)
def _calculate_reports_entries_sync(report_ids: list[int]):
"""同步包装函数"""
try:
asyncio.run(calculate_reports_entries_with_limit(report_ids))
except Exception as e:
logger.error(f"后台计算任务失败 (report_ids={report_ids}): {e}", exc_info=True)
def calculate_reports_entries_in_thread_pool(report_ids: list[int], loop: asyncio.AbstractEventLoop):
"""将计算任务提交到线程池执行"""
loop.run_in_executor(_executor, _calculate_reports_entries_sync, report_ids)
优点
✅ 精确控制并发:可以限制同时执行的任务数
✅ 灵活调整:可以根据实际情况调整并发数
✅ 代码改动小:只需添加信号量控制
✅ 保持线程池:仍然使用线程池,兼容性好
缺点
❌ 仍然受 GIL 影响:线程池仍然受 GIL 限制
❌ 不能完全隔离:任务仍在同一进程内
❌ 复杂度增加:需要管理信号量
性能影响
请求处理:✅ 改善(限制并发任务)
后台任务速度:⚠️ 可能变慢(限制并发)
总体吞吐量:⚠️ 取决于并发数设置
方案 4:任务队列 + 限流机制
实现
# src/test_report/service.py
import asyncio
from collections import deque
from threading import Lock
from typing import Optional
# 任务队列和限流
_task_queue: deque[list[int]] = deque()
_queue_lock = Lock()
_max_concurrent_tasks = 1 # 限制并发任务数
_current_running_tasks = 0
_task_event = asyncio.Event()
async def _task_processor():
"""后台任务处理器"""
global _current_running_tasks
while True:
# 等待有任务或事件触发
await _task_event.wait()
_task_event.clear()
while True:
# 检查是否可以执行新任务
with _queue_lock:
if _current_running_tasks >= _max_concurrent_tasks:
break
if not _task_queue:
break
report_ids = _task_queue.popleft()
_current_running_tasks += 1
# 执行任务
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(_executor, _calculate_reports_entries_sync, report_ids)
finally:
with _queue_lock:
_current_running_tasks -= 1
# 通知可以处理下一个任务
_task_event.set()
# 在应用启动时启动任务处理器
_task_processor_task: Optional[asyncio.Task] = None
def start_task_processor():
"""启动任务处理器"""
global _task_processor_task
if _task_processor_task is None:
loop = asyncio.get_event_loop()
_task_processor_task = loop.create_task(_task_processor())
def submit_calculate_task(report_ids: list[int]):
"""提交计算任务到队列"""
with _queue_lock:
_task_queue.append(report_ids)
_task_event.set()
优点
✅ 精确控制:可以精确控制并发任务数
✅ 队列缓冲:任务可以排队,不会丢失
✅ 灵活调整:可以动态调整并发数和队列大小
✅ 可监控:可以监控队列长度和执行状态
缺点
❌ 实现复杂:需要管理队列和事件
❌ 仍然受 GIL 影响:使用线程池仍然受 GIL 限制
❌ 需要启动任务:需要在应用启动时启动处理器
性能影响
请求处理:✅ 显著改善(精确控制)
后台任务速度:⚠️ 取决于并发数设置
总体吞吐量:✅ 可控
方案 5:优化计算任务本身(分批处理)
实现
# src/test_report/service.py
async def calculate_entries(entries: list[ReportEntry]):
"""分批处理 entries,每批之间 yield 控制权"""
BATCH_SIZE = 10 # 每批处理 10 个 entries
for i in range(0, len(entries), BATCH_SIZE):
batch = entries[i:i + BATCH_SIZE]
# 处理当前批次
for entry in batch:
async with async_session_maker() as db:
await calculate_entry(db, entry.id)
await db.commit()
# 每批处理后 yield,让其他协程有机会执行
await asyncio.sleep(0) # 让出控制权
优点
✅ 减少阻塞:分批处理,每批之间让出控制权
✅ 改善响应性:其他协程有机会执行
✅ 代码改动小:只需修改计算逻辑
缺点
❌ 不能完全解决问题:仍然在同一进程内
❌ 执行时间可能变长:分批处理可能增加总时间
❌ 需要仔细调优:批次大小需要根据实际情况调整
性能影响
请求处理:✅ 改善(减少长时间阻塞)
后台任务速度:⚠️ 可能变慢(分批处理)
总体吞吐量:⚠️ 取决于批次大小
方案 6:使用独立的 Worker 进程(长期方案)
实现
使用消息队列(如 Redis)或任务队列(如 Celery):
# 使用 Redis 作为消息队列
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def submit_calculate_task(report_ids: list[int]):
"""提交任务到 Redis 队列"""
redis_client.lpush('calculate_queue', json.dumps(report_ids))
# 独立的 worker 进程(单独运行)
def worker_process():
"""独立的 worker 进程"""
while True:
task = redis_client.brpop('calculate_queue', timeout=1)
if task:
report_ids = json.loads(task[1])
_calculate_reports_entries_sync(report_ids)
优点
✅ 完全隔离:后台任务在独立进程中,不影响请求处理
✅ 可扩展:可以启动多个 worker 进程
✅ 高可用:worker 进程崩溃不影响主服务
✅ 监控友好:可以独立监控 worker 进程
缺点
❌ 架构复杂:需要引入消息队列
❌ 额外依赖:需要 Redis 或其他消息队列
❌ 部署复杂:需要管理多个进程
❌ 延迟增加:任务需要序列化和网络传输
性能影响
请求处理:✅ 完全隔离,不影响
后台任务速度:✅ 可以并行执行
总体吞吐量:✅ 可扩展
方案对比总结
推荐方案
已实施方案:方案 2(进程池)⭐⭐⭐⭐
当前实现:
✅ 使用
ProcessPoolExecutor替代ThreadPoolExecutor✅ 绕过 GIL,实现真正的并行执行
✅ 完全隔离后台任务和请求处理
✅ 在 spawn 模式下,子进程会重新导入整个模块,可以直接使用原来的函数
优势:
请求处理性能显著改善(完全隔离)
后台任务可以真正并行执行
代码简洁,复用原有函数逻辑
注意事项:
Windows 上进程启动较慢(spawn 模式)
需要确保数据库连接在子进程中正确初始化
应用关闭时需要正确清理进程池
短期方案(快速修复)
方案 1:减少线程池 worker 数量到 1
# src/test_report/service.py
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="calculate_worker")
效果:
快速减少资源竞争
但后台任务会串行执行,速度变慢
中期方案(进一步优化)
方案 4:实现任务队列机制
精确控制并发任务数
任务可以排队,不会丢失
可以监控队列长度和执行状态
长期方案(架构优化)
方案 6:独立的 Worker 进程
使用消息队列(Redis/Celery)
完全解耦后台任务和请求处理
可以独立扩展和监控
实施建议
第一步:已实施(方案 2)
✅ 已使用进程池替代线程池,实现完全隔离
第二步:监控和测试
监控请求响应时间
监控后台任务执行时间
测试并发导入场景
验证 Windows 上的性能表现
第三步:根据实际情况优化
如果进程池方案效果良好,保持现状
如果仍有性能问题,考虑实现任务队列机制(方案 4)
如果后台任务量大,考虑使用独立的 worker 进程(方案 6)
相关文件
src/test_report/router.py- 导入接口src/test_report/service.py- 后台任务实现(已使用进程池)src/app.py- 应用生命周期管理(进程池清理)main.py- 启动配置
技术细节
进程池实现要点
进程上下文:
_mp_context = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork")延迟初始化:
def _get_process_pool_executor() -> concurrent.futures.ProcessPoolExecutor: global _executor if _executor is None: _executor = concurrent.futures.ProcessPoolExecutor( max_workers=2, mp_context=_mp_context, ) return _executor函数复用:
在 spawn 模式下,子进程会重新导入整个模块
原来的函数(
calculate_reports_entries、calculate_entries、calculate_entry、getAgent)可以直接使用无需重新定义或重新导入
优雅关闭:
def shutdown_process_pool(): """关闭进程池执行器(在应用关闭时调用)""" global _executor if _executor is not None: _executor.shutdown(wait=True) _executor = None
SQLite 写入锁优化
问题分析
在使用进程池(max_workers=2)执行后台任务时,如果多个进程同时执行写入操作,会导致 SQLite 数据库写入锁竞争。
SQLite 锁机制:
SQLite 使用文件锁来控制并发访问,锁的级别包括:
SHARED:允许读取,不允许写入
RESERVED:允许读取,准备写入
PENDING:等待其他连接释放 SHARED 锁
EXCLUSIVE:独占访问,不允许其他任何操作
问题场景:
# 原代码:CPU 密集型计算在事务内
async def calculate_entries(entries: list[ReportEntry]):
for entry in entries:
async with async_session_maker() as db: # 事务开始
await calculate_entry(db, entry.id) # 包含 CPU 密集型计算(几秒)
await db.commit() # 事务结束(几秒后)
问题:
事务持续时间长:CPU 密集型计算(几秒)在事务内执行
锁持有时间长:写入操作需要 EXCLUSIVE 锁,多个进程同时写入会导致锁等待
锁竞争:
max_workers=2意味着最多 2 个进程同时执行,如果它们同时写入,会出现锁竞争可能出现错误:
database is locked错误
时间线示例(原代码):
async def calculate_entry(db: AsyncSessionDep, entry_id: int):
# T1: 开始事务(DEFERRED 模式,不获取锁)
entry = await db.get(...) # T2: 第一次查询,获取 SHARED 锁
rows = await db.execute(...) # T3: 继续查询,仍持有 SHARED 锁
# T4-T8: CPU 密集型计算(几秒,仍持有 SHARED 锁)
agent_x_bar_r = getAgent(...) # 耗时操作
cpk = agent_cpk.get_cpk() # 耗时操作
# T9: 修改对象属性(SQLAlchemy 标记为 dirty,但锁仍为 SHARED)
entry.cpk = cpk # 只是标记 dirty,还没有执行 SQL UPDATE
# T10: commit() 时,SQLAlchemy flush pending changes
# 这时才执行 SQL UPDATE,锁从 SHARED 升级到 RESERVED → EXCLUSIVE
await db.commit() # T11: 提交,释放所有锁
关键点:
cpk = calculate_cpk(...)这一步本身不会持有 RESERVED 锁CPU 计算是纯 Python 代码,不涉及数据库操作
此时事务仍持有 SHARED 锁(只读锁)
锁升级的时机:
当执行
entry.cpk = cpk时,SQLAlchemy 只是标记对象为"dirty"实际的 SQL UPDATE 语句还没有执行
锁仍然是 SHARED 锁(因为还没有实际的写入操作)
RESERVED 锁的获取时机:
在
db.commit()时,SQLAlchemy 会先 flush 所有 pending 的更改这时才执行实际的 SQL UPDATE 语句
此时锁才从 SHARED 升级到 RESERVED,然后到 EXCLUSIVE
问题所在:
虽然 CPU 计算时只持有 SHARED 锁(多个连接可以同时持有)
但事务持续时间长(几秒),锁一直不释放
当多个进程同时执行时,虽然可以同时持有 SHARED 锁,但在 commit 时会出现锁竞争
更准确的问题描述:
进程 1:
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁)
T3-T7: CPU 计算(几秒,仍持有 SHARED 锁)
T8: entry.cpk = cpk(标记 dirty,锁仍为 SHARED)
T9: commit() → flush → UPDATE(锁升级:SHARED → RESERVED → EXCLUSIVE)
T10: 提交完成,释放锁
进程 2(同时执行):
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁,可以与进程 1 并行)
T3-T7: CPU 计算(几秒,仍持有 SHARED 锁,可以与进程 1 并行)
T8: entry.cpk = cpk(标记 dirty,锁仍为 SHARED)
T9: commit() → flush → UPDATE(尝试获取 RESERVED 锁)
- 如果进程 1 还在 T9,需要等待进程 1 完成
- 如果进程 1 已完成,可以立即获取锁
T10: 提交完成,释放锁
真正的问题:
虽然 CPU 计算时只持有 SHARED 锁(可以并发)
但事务持续时间长,锁一直不释放
在 commit 时,多个进程会竞争 RESERVED/EXCLUSIVE 锁
如果事务持续时间短,锁竞争的时间窗口就小,问题不明显
如果事务持续时间长(包含 CPU 计算),锁竞争的时间窗口就大,更容易出现问题
优化方案
核心思想:将 CPU 密集型计算移到事务外,只在事务内执行数据库操作。
优化后的流程:
async def calculate_entries(entries: list[ReportEntry]):
for entry in entries:
# 1. 查询数据(只读事务,快速完成)
async with async_session_maker() as db:
entry_obj = await db.get(ReportEntry, entry.id)
values_result = await db.execute(...)
rows = values_result.scalars().all()
spc_data_x_bar_r = ...
spc_data_cpk = ...
# 2. CPU 密集型计算(在事务外,不持有数据库锁)
target_variable = spc_data_x_bar_r.params["target_variable"]
df = pd.DataFrame([...])
agent_x_bar_r = getAgent(df, entry_obj, spc_data_x_bar_r)
agent_cpk = getAgent(df, entry_obj, spc_data_cpk)
fixed_x_cl, x_ucl, x_lcl = agent_x_bar_r.get_x_control_limits()
fixed_sigma = agent_x_bar_r.get_sigma()
cpk = agent_cpk.get_cpk()
# 3. 更新数据库(独立写入事务,快速提交)
async with async_session_maker() as db:
entry_to_update = await db.get(ReportEntry, entry.id)
entry_to_update.x_ucl = list(dict.fromkeys(x_ucl))
entry_to_update.x_lcl = list(dict.fromkeys(x_lcl))
entry_to_update.cpk = float(cpk) if cpk is not None else None
entry_to_update.fixed_x_cl = float(fixed_x_cl) if fixed_x_cl is not None else None
entry_to_update.fixed_sigma = float(fixed_sigma) if fixed_sigma is not None else None
await db.commit() # 快速提交,释放锁
优化效果:
事务持续时间缩短:从几秒缩短到毫秒级
锁持有时间缩短:只在查询和更新时短暂持有锁
减少锁竞争:多个进程可以并行执行 CPU 计算,只在写入时短暂持有锁
提高并发性能:减少
database is locked错误
优化后的时间线:
进程 1:
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁,几毫秒)
T3: commit()(释放 SHARED 锁,事务结束)
T4-T8: CPU 计算(几秒,不持有任何锁!)
T9: BEGIN(不获取锁)
T10: SELECT(获取 SHARED 锁,几毫秒)
T11: entry.cpk = cpk(标记 dirty)
T12: commit() → UPDATE(锁升级:SHARED → RESERVED → EXCLUSIVE,几毫秒)
T13: 提交完成,释放锁
进程 2(同时执行):
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁,可以与进程 1 的 T2 并行)
T3: commit()(释放 SHARED 锁)
T4-T8: CPU 计算(几秒,不持有任何锁,可以与进程 1 的 T4-T8 完全并行!)
T9: BEGIN(不获取锁)
T10: SELECT(获取 SHARED 锁)
T11: entry.cpk = cpk(标记 dirty)
T12: commit() → UPDATE(如果进程 1 在 T12,需要等待;否则立即获取锁)
T13: 提交完成,释放锁
优化效果对比:
关键改进:
CPU 计算阶段不持有锁:
原代码:CPU 计算时仍持有 SHARED 锁(虽然可以并发,但锁不释放)
优化后:CPU 计算时完全不持有任何锁,多个进程可以完全并行计算
锁持有时间大幅缩短:
原代码:锁持有时间 = 查询时间 + CPU 计算时间 + 写入时间 ≈ 几秒
优化后:锁持有时间 = 查询时间 + 写入时间 ≈ 几十毫秒
减少锁竞争:
原代码:多个进程在 commit 时竞争,竞争窗口 = 事务持续时间(几秒)
优化后:多个进程在 commit 时竞争,竞争窗口 = 写入事务时间(几毫秒)
提高并发性能:
原代码:虽然可以并发读取,但事务持续时间长,锁竞争更频繁
优化后:CPU 计算完全并行,只在写入时短暂串行,锁竞争大幅减少
注意事项
数据一致性:在 CPU 计算期间,如果其他进程修改了数据,可能导致计算结果不准确。但在当前场景中,后台任务是计算统计值,数据在导入后不会立即修改,所以风险较低。
进程池配置:如果仍有锁竞争,可以考虑:
将
max_workers调整为 1(串行执行,完全避免锁竞争)或进一步优化,使用批量更新减少写入次数
错误处理:在写入时如果出现锁竞争,SQLAlchemy 会自动重试,但建议添加适当的错误处理和重试机制。
总结
通过使用进程池(ProcessPoolExecutor)替代线程池(ThreadPoolExecutor),我们成功解决了后台任务与请求处理之间的资源竞争问题。进程池方案的优势在于:
完全隔离:后台任务在独立进程中执行,不影响请求处理
真正并行:绕过 GIL,实现真正的并行执行
代码简洁:在 spawn 模式下可以直接复用原有函数,无需重新定义
性能提升:请求处理性能显著改善,后台任务可以真正并行执行
通过优化事务范围,将 CPU 密集型计算移到事务外,我们进一步解决了 SQLite 写入锁竞争问题:
事务持续时间缩短:从几秒缩短到毫秒级
锁持有时间缩短:只在查询和更新时短暂持有锁
减少锁竞争:多个进程可以并行执行 CPU 计算,只在写入时短暂持有锁
提高并发性能:减少
database is locked错误
这是一个平衡了实现复杂度、性能提升和维护成本的优秀方案。