问题描述

在测试报告导入功能中,当上一批数据导入完成后触发后台计算任务,此时再导入大量 Excel 文件会导致:

  1. 请求超时:导入请求无法及时响应

  2. 文件上传解析慢python_multipart.multipart 日志显示解析很慢

  3. 负载均衡无效:虽然使用了 --workers=3,但似乎没有作用

问题分析

1. 架构问题

1.1 多进程 + 线程池的配置

# main.py
uvicorn.run("src.app:app", host="127.0.0.1", port=8000, workers=3)

# src/test_report/service.py
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=4, thread_name_prefix="calculate_worker")

问题:

  • 使用 --workers=3 启动了 3 个独立的进程

  • 每个进程都有自己的 ThreadPoolExecutor(max_workers=4)

  • 实际上有 3 × 4 = 12 个线程在执行后台任务

  • 每个进程内的线程池都在执行 CPU 密集型任务

1.2 资源竞争

进程 1 (Worker 1)
├── 请求处理线程 (处理导入请求)
├── 后台任务线程 1 (计算 SPC)
├── 后台任务线程 2 (计算 SPC)
├── 后台任务线程 3 (计算 SPC)
└── 后台任务线程 4 (计算 SPC)

进程 2 (Worker 2)
├── 请求处理线程 (处理导入请求)
├── 后台任务线程 1 (计算 SPC)
├── 后台任务线程 2 (计算 SPC)
├── 后台任务线程 3 (计算 SPC)
└── 后台任务线程 4 (计算 SPC)

进程 3 (Worker 3)
├── 请求处理线程 (处理导入请求)
├── 后台任务线程 1 (计算 SPC)
├── 后台任务线程 2 (计算 SPC)
├── 后台任务线程 3 (计算 SPC)
└── 后台任务线程 4 (计算 SPC)

问题:

  • 当后台任务在执行时,会占用同一进程内的 CPU 资源

  • 即使请求被路由到不同的进程,如果该进程的线程池正在执行任务,也会受影响

  • **GIL(全局解释器锁)**导致 CPU 密集型任务会影响同一进程内的其他操作

2. CPU 密集型任务

2.1 后台任务内容

async def calculate_entry(db: AsyncSessionDep, entry_id: int):
    # 计算x_ucl、x_lcl、cpk、fixed_x_cl、fixed_sigma
    # ...
    agent_x_bar_r = getAgent(df, entry, spc_data_x_bar_r)  # CPU 密集型
    agent_cpk = getAgent(df, entry, spc_data_cpk)  # CPU 密集型
    fixed_x_cl, x_ucl, x_lcl = agent_x_bar_r.get_x_control_limits()  # CPU 密集型
    fixed_sigma = agent_x_bar_r.get_sigma()  # CPU 密集型
    cpk = agent_cpk.get_cpk()  # CPU 密集型

问题:

  • XRSControlChartCapabilityAnalysis 的计算是 CPU 密集型任务

  • 涉及大量数学计算(pandas、numpy 操作)

  • 每个 entry 的计算可能需要几百毫秒到几秒

2.2 任务执行流程

# 导入完成后立即触发后台任务
if successful_report_ids:
    unique_report_ids = list(dict.fromkeys(successful_report_ids))
    loop = asyncio.get_running_loop()
    calculate_reports_entries_in_thread_pool(unique_report_ids, loop)  # 立即提交到线程池

# 线程池执行
def _calculate_reports_entries_sync(report_ids: list[int]):
    asyncio.run(calculate_reports_entries(report_ids))  # 创建新的事件循环
    # 遍历所有 entries,每个都执行 CPU 密集型计算

问题:

  • 导入完成后立即触发后台任务

  • 如果有多个 report,每个 report 有多个 entries,会创建大量任务

  • 线程池的 4 个 worker 会立即被占用

3. 文件上传解析慢

3.1 Multipart 解析机制

FastAPI 的文件上传处理:

FastAPI 使用 python-multipart 库来解析 multipart/form-data 格式的文件上传请求。当客户端通过 HTTP POST 请求上传文件时:

@router.post("/import")
async def import_test_report(db: AsyncSessionDep, files: list[UploadFile]):
    # FastAPI 自动调用 python-multipart 解析上传的文件
    # files: list[UploadFile] 中的每个 UploadFile 对象已经包含了解析后的文件流

解析过程:

  1. 接收 HTTP 请求体(I/O 操作)

    • 从网络套接字读取原始字节流

    • 数据以流式方式接收,不是一次性加载到内存

  2. 解析 Multipart 边界(CPU 密集型)

    python_multipart.multipart - DEBUG - Calling on_part_data with data[0:262144]
    
    • python-multipart 需要解析 HTTP 请求体中的 multipart 边界标记

    • 识别每个文件部分的开始和结束位置

    • 提取文件元数据(文件名、Content-Type 等)

    • 这是同步操作,需要 CPU 资源来处理字节流

  3. 提取文件内容(CPU + I/O)

    • 从 multipart 数据流中提取文件的实际内容

    • 将文件内容存储到临时缓冲区或文件对象

    • 对于大文件,需要多次读取和解析(每次 262144 字节,即 256KB)

问题分析:

  • Multipart 解析是同步操作

    • python-multipart 的解析过程是同步的,会阻塞事件循环

    • 虽然 FastAPI 使用异步框架,但底层的 multipart 解析是同步的

    • 大文件上传时,解析过程会占用 CPU 资源

  • CPU 资源竞争

    • 解析 multipart 数据需要 CPU 来处理字节流、查找边界、提取内容

    • 当同一进程内的线程池正在执行 CPU 密集型后台任务时,会竞争 CPU 资源

    • **GIL(全局解释器锁)**导致 CPU 密集型任务会影响其他操作

    • 即使后台任务在独立进程中执行,文件解析仍需要 CPU 资源

  • 大文件的影响

    • 文件越大,解析时间越长

    • 多个文件同时上传时,解析过程会累积

    • 解析过程中,事件循环可能被阻塞,影响其他请求的处理

示例场景:

场景:上传 10 个 Excel 文件,每个 5MB

时间线:
T1: 开始接收第 1 个文件(multipart 解析开始,占用 CPU)
T2: 解析第 1 个文件完成(耗时 200ms,期间事件循环可能被阻塞)
T3: 开始接收第 2 个文件(继续解析,占用 CPU)
...
T10: 所有文件解析完成(总耗时约 2 秒)

问题:
- 解析过程中,CPU 被占用
- 如果同时有其他请求到达,响应会变慢
- 事件循环可能被阻塞,导致其他接口卡慢

3.2 请求处理流程

1. 接收 HTTP 请求(I/O)
2. 解析 multipart 数据(需要 CPU)
3. 提取 Excel 内容(CPU + I/O)
4. 导入数据到数据库(I/O)
5. 提交事务(I/O)
6. 触发后台任务(CPU 密集型)

问题:

  • 步骤 2-3 需要 CPU 资源

  • 如果同一进程内的线程池正在执行后台任务,会竞争 CPU 资源

  • 导致文件解析变慢,请求超时

4. 负载均衡无效的原因

4.1 进程级别的负载均衡

  • Uvicorn 的 --workers=3 使用进程级别的负载均衡

  • 请求会被路由到不同的进程

  • 但每个进程都有自己的线程池

4.2 问题场景

时间线:
T1: 进程 1 处理导入请求 A,完成后触发后台任务
T2: 进程 1 的线程池开始执行后台任务(占用 CPU)
T3: 新请求 B 被路由到进程 1
T4: 进程 1 的 CPU 被后台任务占用,请求 B 处理变慢

问题:

  • 即使有空闲进程(进程 2、3),如果请求被路由到正在执行后台任务的进程,也会受影响

  • 负载均衡是进程级别的,无法感知进程内的线程状态

5. 根本原因总结

  1. 线程池配置过大max_workers=4 导致每个进程有 4 个线程执行 CPU 密集型任务

  2. 资源竞争:CPU 密集型任务和请求处理在同一进程内竞争 CPU 资源

  3. GIL 限制:Python 的 GIL 导致 CPU 密集型任务会影响同一进程内的其他操作

  4. 立即执行:导入完成后立即触发后台任务,没有延迟或限流机制

  5. 无优先级:后台任务和请求处理没有优先级区分

解决方案对比

问题约束

  • 不能延迟执行:后台任务必须立即执行

  • 不能影响请求处理:导入请求不能超时

  • 现有架构:多进程(workers=3)+ 线程池(max_workers=4)

方案 1:减少线程池 Worker 数量

实现

# src/test_report/service.py
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="calculate_worker")

优点

  • 实现简单:只需修改一行代码

  • 立即生效:无需其他改动

  • 减少资源竞争:每个进程只有 1 个线程执行后台任务

  • 风险低:不会引入新的问题

缺点

  • 后台任务执行变慢:从并行 4 个任务变为串行 1 个任务

  • 吞吐量降低:如果有多个 report 需要计算,会排队执行

  • 不能充分利用多核:CPU 密集型任务无法并行执行

  • 治标不治本:仍然在同一进程内竞争资源

性能影响

  • 请求处理:✅ 改善(减少 CPU 竞争)

  • 后台任务速度:❌ 变慢(串行执行)

  • 总体吞吐量:⚠️ 可能降低


方案 2:使用进程池(ProcessPoolExecutor)绕过 GIL ⭐⭐⭐⭐

实现

# src/test_report/service.py
import multiprocessing
import sys

# 使用进程池而不是线程池
_mp_context = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork")
_executor = concurrent.futures.ProcessPoolExecutor(
    max_workers=2,  # 每个进程 2 个 worker
    mp_context=_mp_context
)

def _calculate_reports_entries_sync(report_ids: list[int]):
    """同步包装函数:在独立进程中运行异步的 calculate_reports_entries"""
    try:
        import asyncio
        # 在 spawn 模式下,子进程会重新导入整个模块,所以原来的函数可以直接使用
        asyncio.run(calculate_reports_entries(report_ids))
    except Exception as e:
        import logging
        logger = logging.getLogger(__name__)
        logger.error(f"后台计算任务失败 (report_ids={report_ids}): {e}", exc_info=True)

def calculate_reports_entries_in_process_pool(report_ids: list[int], loop: asyncio.AbstractEventLoop):
    """将计算任务提交到进程池执行,不等待结果"""
    executor = _get_process_pool_executor()
    loop.run_in_executor(executor, _calculate_reports_entries_sync, report_ids)

优点

  • 绕过 GIL:进程间不共享 GIL,真正的并行执行

  • 隔离性好:后台任务在独立进程中,不影响请求处理

  • 充分利用多核:可以并行执行多个 CPU 密集型任务

  • 资源隔离:进程间内存隔离,更安全

  • 代码复用:在 spawn 模式下,子进程会重新导入整个模块,可以直接使用原来的函数

缺点

  • 进程开销大:创建进程比创建线程开销大

  • 数据序列化:进程间通信需要 pickle 序列化

  • 数据库连接:每个进程需要独立的数据库连接

  • Windows 兼容性:需要使用 spawn 模式,启动较慢

  • 代码复杂度:需要处理进程间通信和错误处理

注意事项

  • 数据库连接不能跨进程共享,需要在每个进程中重新创建

  • 函数参数需要可序列化(pickle)

  • Windows 上进程启动较慢

  • 在 spawn 模式下,子进程会重新导入整个模块,所以原来的函数可以直接使用,无需重新定义

性能影响

  • 请求处理:✅ 显著改善(完全隔离)

  • 后台任务速度:✅ 可能更快(真正的并行)

  • 总体吞吐量:✅ 可能提升


方案 3:使用 Semaphore 限制并发任务数

实现

# src/test_report/service.py
import asyncio

# 全局信号量,限制并发任务数
_calculate_semaphore = asyncio.Semaphore(1)  # 最多 1 个并发任务

async def calculate_reports_entries_with_limit(report_ids: list[int]):
    """带并发限制的计算任务"""
    async with _calculate_semaphore:
        await calculate_reports_entries(report_ids)

def _calculate_reports_entries_sync(report_ids: list[int]):
    """同步包装函数"""
    try:
        asyncio.run(calculate_reports_entries_with_limit(report_ids))
    except Exception as e:
        logger.error(f"后台计算任务失败 (report_ids={report_ids}): {e}", exc_info=True)

def calculate_reports_entries_in_thread_pool(report_ids: list[int], loop: asyncio.AbstractEventLoop):
    """将计算任务提交到线程池执行"""
    loop.run_in_executor(_executor, _calculate_reports_entries_sync, report_ids)

优点

  • 精确控制并发:可以限制同时执行的任务数

  • 灵活调整:可以根据实际情况调整并发数

  • 代码改动小:只需添加信号量控制

  • 保持线程池:仍然使用线程池,兼容性好

缺点

  • 仍然受 GIL 影响:线程池仍然受 GIL 限制

  • 不能完全隔离:任务仍在同一进程内

  • 复杂度增加:需要管理信号量

性能影响

  • 请求处理:✅ 改善(限制并发任务)

  • 后台任务速度:⚠️ 可能变慢(限制并发)

  • 总体吞吐量:⚠️ 取决于并发数设置


方案 4:任务队列 + 限流机制

实现

# src/test_report/service.py
import asyncio
from collections import deque
from threading import Lock
from typing import Optional

# 任务队列和限流
_task_queue: deque[list[int]] = deque()
_queue_lock = Lock()
_max_concurrent_tasks = 1  # 限制并发任务数
_current_running_tasks = 0
_task_event = asyncio.Event()

async def _task_processor():
    """后台任务处理器"""
    global _current_running_tasks
    
    while True:
        # 等待有任务或事件触发
        await _task_event.wait()
        _task_event.clear()
        
        while True:
            # 检查是否可以执行新任务
            with _queue_lock:
                if _current_running_tasks >= _max_concurrent_tasks:
                    break
                
                if not _task_queue:
                    break
                
                report_ids = _task_queue.popleft()
                _current_running_tasks += 1
            
            # 执行任务
            loop = asyncio.get_running_loop()
            try:
                await loop.run_in_executor(_executor, _calculate_reports_entries_sync, report_ids)
            finally:
                with _queue_lock:
                    _current_running_tasks -= 1
                    # 通知可以处理下一个任务
                    _task_event.set()

# 在应用启动时启动任务处理器
_task_processor_task: Optional[asyncio.Task] = None

def start_task_processor():
    """启动任务处理器"""
    global _task_processor_task
    if _task_processor_task is None:
        loop = asyncio.get_event_loop()
        _task_processor_task = loop.create_task(_task_processor())

def submit_calculate_task(report_ids: list[int]):
    """提交计算任务到队列"""
    with _queue_lock:
        _task_queue.append(report_ids)
        _task_event.set()

优点

  • 精确控制:可以精确控制并发任务数

  • 队列缓冲:任务可以排队,不会丢失

  • 灵活调整:可以动态调整并发数和队列大小

  • 可监控:可以监控队列长度和执行状态

缺点

  • 实现复杂:需要管理队列和事件

  • 仍然受 GIL 影响:使用线程池仍然受 GIL 限制

  • 需要启动任务:需要在应用启动时启动处理器

性能影响

  • 请求处理:✅ 显著改善(精确控制)

  • 后台任务速度:⚠️ 取决于并发数设置

  • 总体吞吐量:✅ 可控


方案 5:优化计算任务本身(分批处理)

实现

# src/test_report/service.py
async def calculate_entries(entries: list[ReportEntry]):
    """分批处理 entries,每批之间 yield 控制权"""
    BATCH_SIZE = 10  # 每批处理 10 个 entries
    
    for i in range(0, len(entries), BATCH_SIZE):
        batch = entries[i:i + BATCH_SIZE]
        
        # 处理当前批次
        for entry in batch:
            async with async_session_maker() as db:
                await calculate_entry(db, entry.id)
                await db.commit()
        
        # 每批处理后 yield,让其他协程有机会执行
        await asyncio.sleep(0)  # 让出控制权

优点

  • 减少阻塞:分批处理,每批之间让出控制权

  • 改善响应性:其他协程有机会执行

  • 代码改动小:只需修改计算逻辑

缺点

  • 不能完全解决问题:仍然在同一进程内

  • 执行时间可能变长:分批处理可能增加总时间

  • 需要仔细调优:批次大小需要根据实际情况调整

性能影响

  • 请求处理:✅ 改善(减少长时间阻塞)

  • 后台任务速度:⚠️ 可能变慢(分批处理)

  • 总体吞吐量:⚠️ 取决于批次大小


方案 6:使用独立的 Worker 进程(长期方案)

实现

使用消息队列(如 Redis)或任务队列(如 Celery):

# 使用 Redis 作为消息队列
import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def submit_calculate_task(report_ids: list[int]):
    """提交任务到 Redis 队列"""
    redis_client.lpush('calculate_queue', json.dumps(report_ids))

# 独立的 worker 进程(单独运行)
def worker_process():
    """独立的 worker 进程"""
    while True:
        task = redis_client.brpop('calculate_queue', timeout=1)
        if task:
            report_ids = json.loads(task[1])
            _calculate_reports_entries_sync(report_ids)

优点

  • 完全隔离:后台任务在独立进程中,不影响请求处理

  • 可扩展:可以启动多个 worker 进程

  • 高可用:worker 进程崩溃不影响主服务

  • 监控友好:可以独立监控 worker 进程

缺点

  • 架构复杂:需要引入消息队列

  • 额外依赖:需要 Redis 或其他消息队列

  • 部署复杂:需要管理多个进程

  • 延迟增加:任务需要序列化和网络传输

性能影响

  • 请求处理:✅ 完全隔离,不影响

  • 后台任务速度:✅ 可以并行执行

  • 总体吞吐量:✅ 可扩展


方案对比总结

方案

实现难度

资源隔离

性能影响

推荐度

方案 1:减少 Worker

⭐ 简单

⚠️ 部分隔离

⚠️ 后台任务变慢

⭐⭐⭐

方案 2:进程池

⭐⭐⭐ 中等

✅ 完全隔离

✅ 可能更快

⭐⭐⭐⭐

方案 3:Semaphore

⭐⭐ 中等

⚠️ 部分隔离

⚠️ 取决于设置

⭐⭐⭐

方案 4:任务队列

⭐⭐⭐ 复杂

✅ 良好控制

✅ 可控

⭐⭐⭐⭐⭐

方案 5:分批处理

⭐⭐ 中等

❌ 无隔离

⚠️ 可能变慢

⭐⭐

方案 6:独立 Worker

⭐⭐⭐⭐ 复杂

✅ 完全隔离

✅ 可扩展

⭐⭐⭐⭐

推荐方案

已实施方案:方案 2(进程池)⭐⭐⭐⭐

当前实现:

  • ✅ 使用 ProcessPoolExecutor 替代 ThreadPoolExecutor

  • ✅ 绕过 GIL,实现真正的并行执行

  • ✅ 完全隔离后台任务和请求处理

  • ✅ 在 spawn 模式下,子进程会重新导入整个模块,可以直接使用原来的函数

优势:

  • 请求处理性能显著改善(完全隔离)

  • 后台任务可以真正并行执行

  • 代码简洁,复用原有函数逻辑

注意事项:

  • Windows 上进程启动较慢(spawn 模式)

  • 需要确保数据库连接在子进程中正确初始化

  • 应用关闭时需要正确清理进程池

短期方案(快速修复)

方案 1:减少线程池 worker 数量到 1

# src/test_report/service.py
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="calculate_worker")

效果:

  • 快速减少资源竞争

  • 但后台任务会串行执行,速度变慢

中期方案(进一步优化)

方案 4:实现任务队列机制

  • 精确控制并发任务数

  • 任务可以排队,不会丢失

  • 可以监控队列长度和执行状态

长期方案(架构优化)

方案 6:独立的 Worker 进程

  • 使用消息队列(Redis/Celery)

  • 完全解耦后台任务和请求处理

  • 可以独立扩展和监控

实施建议

第一步:已实施(方案 2)

✅ 已使用进程池替代线程池,实现完全隔离

第二步:监控和测试

  • 监控请求响应时间

  • 监控后台任务执行时间

  • 测试并发导入场景

  • 验证 Windows 上的性能表现

第三步:根据实际情况优化

  • 如果进程池方案效果良好,保持现状

  • 如果仍有性能问题,考虑实现任务队列机制(方案 4)

  • 如果后台任务量大,考虑使用独立的 worker 进程(方案 6)

相关文件

  • src/test_report/router.py - 导入接口

  • src/test_report/service.py - 后台任务实现(已使用进程池)

  • src/app.py - 应用生命周期管理(进程池清理)

  • main.py - 启动配置

技术细节

进程池实现要点

  1. 进程上下文

    _mp_context = multiprocessing.get_context("spawn" if sys.platform == "win32" else "fork")
    
  2. 延迟初始化

    def _get_process_pool_executor() -> concurrent.futures.ProcessPoolExecutor:
        global _executor
        if _executor is None:
            _executor = concurrent.futures.ProcessPoolExecutor(
                max_workers=2,
                mp_context=_mp_context,
            )
        return _executor
    
  3. 函数复用

    • 在 spawn 模式下,子进程会重新导入整个模块

    • 原来的函数(calculate_reports_entriescalculate_entriescalculate_entrygetAgent)可以直接使用

    • 无需重新定义或重新导入

  4. 优雅关闭

    def shutdown_process_pool():
        """关闭进程池执行器(在应用关闭时调用)"""
        global _executor
        if _executor is not None:
            _executor.shutdown(wait=True)
            _executor = None
    

SQLite 写入锁优化

问题分析

在使用进程池(max_workers=2)执行后台任务时,如果多个进程同时执行写入操作,会导致 SQLite 数据库写入锁竞争。

SQLite 锁机制:

SQLite 使用文件锁来控制并发访问,锁的级别包括:

  • SHARED:允许读取,不允许写入

  • RESERVED:允许读取,准备写入

  • PENDING:等待其他连接释放 SHARED 锁

  • EXCLUSIVE:独占访问,不允许其他任何操作

问题场景:

# 原代码:CPU 密集型计算在事务内
async def calculate_entries(entries: list[ReportEntry]):
    for entry in entries:
        async with async_session_maker() as db:  # 事务开始
            await calculate_entry(db, entry.id)  # 包含 CPU 密集型计算(几秒)
            await db.commit()  # 事务结束(几秒后)

问题:

  1. 事务持续时间长:CPU 密集型计算(几秒)在事务内执行

  2. 锁持有时间长:写入操作需要 EXCLUSIVE 锁,多个进程同时写入会导致锁等待

  3. 锁竞争max_workers=2 意味着最多 2 个进程同时执行,如果它们同时写入,会出现锁竞争

  4. 可能出现错误database is locked 错误

时间线示例(原代码):

async def calculate_entry(db: AsyncSessionDep, entry_id: int):
    # T1: 开始事务(DEFERRED 模式,不获取锁)
    entry = await db.get(...)  # T2: 第一次查询,获取 SHARED 锁
    rows = await db.execute(...)  # T3: 继续查询,仍持有 SHARED 锁
    
    # T4-T8: CPU 密集型计算(几秒,仍持有 SHARED 锁)
    agent_x_bar_r = getAgent(...)  # 耗时操作
    cpk = agent_cpk.get_cpk()  # 耗时操作
    
    # T9: 修改对象属性(SQLAlchemy 标记为 dirty,但锁仍为 SHARED)
    entry.cpk = cpk  # 只是标记 dirty,还没有执行 SQL UPDATE
    
    # T10: commit() 时,SQLAlchemy flush pending changes
    # 这时才执行 SQL UPDATE,锁从 SHARED 升级到 RESERVED → EXCLUSIVE
    await db.commit()  # T11: 提交,释放所有锁

关键点:

  1. cpk = calculate_cpk(...) 这一步本身不会持有 RESERVED 锁

    • CPU 计算是纯 Python 代码,不涉及数据库操作

    • 此时事务仍持有 SHARED 锁(只读锁)

  2. 锁升级的时机

    • 当执行 entry.cpk = cpk 时,SQLAlchemy 只是标记对象为"dirty"

    • 实际的 SQL UPDATE 语句还没有执行

    • 锁仍然是 SHARED 锁(因为还没有实际的写入操作)

  3. RESERVED 锁的获取时机

    • db.commit() 时,SQLAlchemy 会先 flush 所有 pending 的更改

    • 这时才执行实际的 SQL UPDATE 语句

    • 此时锁才从 SHARED 升级到 RESERVED,然后到 EXCLUSIVE

  4. 问题所在

    • 虽然 CPU 计算时只持有 SHARED 锁(多个连接可以同时持有)

    • 事务持续时间长(几秒),锁一直不释放

    • 当多个进程同时执行时,虽然可以同时持有 SHARED 锁,但在 commit 时会出现锁竞争

更准确的问题描述:

进程 1:
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁)
T3-T7: CPU 计算(几秒,仍持有 SHARED 锁)
T8: entry.cpk = cpk(标记 dirty,锁仍为 SHARED)
T9: commit() → flush → UPDATE(锁升级:SHARED → RESERVED → EXCLUSIVE)
T10: 提交完成,释放锁

进程 2(同时执行):
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁,可以与进程 1 并行)
T3-T7: CPU 计算(几秒,仍持有 SHARED 锁,可以与进程 1 并行)
T8: entry.cpk = cpk(标记 dirty,锁仍为 SHARED)
T9: commit() → flush → UPDATE(尝试获取 RESERVED 锁)
    - 如果进程 1 还在 T9,需要等待进程 1 完成
    - 如果进程 1 已完成,可以立即获取锁
T10: 提交完成,释放锁

真正的问题:

  • 虽然 CPU 计算时只持有 SHARED 锁(可以并发)

  • 事务持续时间长,锁一直不释放

  • 在 commit 时,多个进程会竞争 RESERVED/EXCLUSIVE 锁

  • 如果事务持续时间短,锁竞争的时间窗口就小,问题不明显

  • 如果事务持续时间长(包含 CPU 计算),锁竞争的时间窗口就大,更容易出现问题

优化方案

核心思想:将 CPU 密集型计算移到事务外,只在事务内执行数据库操作。

优化后的流程:

async def calculate_entries(entries: list[ReportEntry]):
    for entry in entries:
        # 1. 查询数据(只读事务,快速完成)
        async with async_session_maker() as db:
            entry_obj = await db.get(ReportEntry, entry.id)
            values_result = await db.execute(...)
            rows = values_result.scalars().all()
            spc_data_x_bar_r = ...
            spc_data_cpk = ...
        
        # 2. CPU 密集型计算(在事务外,不持有数据库锁)
        target_variable = spc_data_x_bar_r.params["target_variable"]
        df = pd.DataFrame([...])
        agent_x_bar_r = getAgent(df, entry_obj, spc_data_x_bar_r)
        agent_cpk = getAgent(df, entry_obj, spc_data_cpk)
        fixed_x_cl, x_ucl, x_lcl = agent_x_bar_r.get_x_control_limits()
        fixed_sigma = agent_x_bar_r.get_sigma()
        cpk = agent_cpk.get_cpk()
        
        # 3. 更新数据库(独立写入事务,快速提交)
        async with async_session_maker() as db:
            entry_to_update = await db.get(ReportEntry, entry.id)
            entry_to_update.x_ucl = list(dict.fromkeys(x_ucl))
            entry_to_update.x_lcl = list(dict.fromkeys(x_lcl))
            entry_to_update.cpk = float(cpk) if cpk is not None else None
            entry_to_update.fixed_x_cl = float(fixed_x_cl) if fixed_x_cl is not None else None
            entry_to_update.fixed_sigma = float(fixed_sigma) if fixed_sigma is not None else None
            await db.commit()  # 快速提交,释放锁

优化效果:

  1. 事务持续时间缩短:从几秒缩短到毫秒级

  2. 锁持有时间缩短:只在查询和更新时短暂持有锁

  3. 减少锁竞争:多个进程可以并行执行 CPU 计算,只在写入时短暂持有锁

  4. 提高并发性能:减少 database is locked 错误

优化后的时间线:

进程 1:
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁,几毫秒)
T3: commit()(释放 SHARED 锁,事务结束)
T4-T8: CPU 计算(几秒,不持有任何锁!)
T9: BEGIN(不获取锁)
T10: SELECT(获取 SHARED 锁,几毫秒)
T11: entry.cpk = cpk(标记 dirty)
T12: commit() → UPDATE(锁升级:SHARED → RESERVED → EXCLUSIVE,几毫秒)
T13: 提交完成,释放锁

进程 2(同时执行):
T1: BEGIN(不获取锁)
T2: SELECT(获取 SHARED 锁,可以与进程 1 的 T2 并行)
T3: commit()(释放 SHARED 锁)
T4-T8: CPU 计算(几秒,不持有任何锁,可以与进程 1 的 T4-T8 完全并行!)
T9: BEGIN(不获取锁)
T10: SELECT(获取 SHARED 锁)
T11: entry.cpk = cpk(标记 dirty)
T12: commit() → UPDATE(如果进程 1 在 T12,需要等待;否则立即获取锁)
T13: 提交完成,释放锁

优化效果对比:

阶段

原代码

优化后

查询阶段

持有 SHARED 锁(几秒)

持有 SHARED 锁(几毫秒)

CPU 计算阶段

持有 SHARED 锁(几秒)

不持有任何锁(几秒)

写入阶段

在长事务中,锁竞争窗口大

在短事务中,锁竞争窗口小

总锁持有时间

几秒(查询 + 计算 + 写入)

几十毫秒(查询 + 写入)

关键改进:

  1. CPU 计算阶段不持有锁

    • 原代码:CPU 计算时仍持有 SHARED 锁(虽然可以并发,但锁不释放)

    • 优化后:CPU 计算时完全不持有任何锁,多个进程可以完全并行计算

  2. 锁持有时间大幅缩短

    • 原代码:锁持有时间 = 查询时间 + CPU 计算时间 + 写入时间 ≈ 几秒

    • 优化后:锁持有时间 = 查询时间 + 写入时间 ≈ 几十毫秒

  3. 减少锁竞争

    • 原代码:多个进程在 commit 时竞争,竞争窗口 = 事务持续时间(几秒)

    • 优化后:多个进程在 commit 时竞争,竞争窗口 = 写入事务时间(几毫秒)

  4. 提高并发性能

    • 原代码:虽然可以并发读取,但事务持续时间长,锁竞争更频繁

    • 优化后:CPU 计算完全并行,只在写入时短暂串行,锁竞争大幅减少

注意事项

  1. 数据一致性:在 CPU 计算期间,如果其他进程修改了数据,可能导致计算结果不准确。但在当前场景中,后台任务是计算统计值,数据在导入后不会立即修改,所以风险较低。

  2. 进程池配置:如果仍有锁竞争,可以考虑:

    • max_workers 调整为 1(串行执行,完全避免锁竞争)

    • 或进一步优化,使用批量更新减少写入次数

  3. 错误处理:在写入时如果出现锁竞争,SQLAlchemy 会自动重试,但建议添加适当的错误处理和重试机制。

总结

通过使用进程池(ProcessPoolExecutor)替代线程池(ThreadPoolExecutor),我们成功解决了后台任务与请求处理之间的资源竞争问题。进程池方案的优势在于:

  1. 完全隔离:后台任务在独立进程中执行,不影响请求处理

  2. 真正并行:绕过 GIL,实现真正的并行执行

  3. 代码简洁:在 spawn 模式下可以直接复用原有函数,无需重新定义

  4. 性能提升:请求处理性能显著改善,后台任务可以真正并行执行

通过优化事务范围,将 CPU 密集型计算移到事务外,我们进一步解决了 SQLite 写入锁竞争问题:

  1. 事务持续时间缩短:从几秒缩短到毫秒级

  2. 锁持有时间缩短:只在查询和更新时短暂持有锁

  3. 减少锁竞争:多个进程可以并行执行 CPU 计算,只在写入时短暂持有锁

  4. 提高并发性能:减少 database is locked 错误

这是一个平衡了实现复杂度、性能提升和维护成本的优秀方案。