背景

为了实时监控企业SPC流程链路的异常情况(如 CPK 不达标或检测值超限),系统集成了企业微信 Webhook 推送功能。当 OQC 或 IQC 报告在导入或计算后出现“异常”状态时,会自动向指定的企业微信群发送 Markdown 格式的预警消息,并提供直接跳转至前端详情页的链接,实现“发现即通知,通知即处理”的闭环。


参考文档

https://developer.work.weixin.qq.com/document/path/99110

前置条件

申请企业微信Webhook的key

实现

1. 核心推送工具类 (wechat_push.py)

统一封装了异步和同步两个版本的推送函数,强制使用 Markdown 格式以支持丰富的样式和超链接。

# src/utils/wechat_push.py
import httpx
from src.conf import settings

async def send_wechat_notification(content: str, webhook_key: str):
    """异步推送:用于 Web 异步协程环境"""
    url = f"{settings.WEBHOOK_BASE_URL}{webhook_key}"
    data = {"msgtype": "markdown", "markdown": {"content": content}}
    async with httpx.AsyncClient() as client:
        await client.post(url, json=data, timeout=10.0)

def send_wechat_notification_sync(content: str, webhook_key: str):
    """同步推送:用于多进程 ProcessPoolExecutor 环境"""
    url = f"{settings.WEBHOOK_BASE_URL}{webhook_key}"
    data = {"msgtype": "markdown", "markdown": {"content": content}}
    with httpx.Client() as client:
        client.post(url, json=data, timeout=10.0)

2. 消息模板构造 (service.py)

根据业务需求(单条推送 vs 汇总推送)构造 Markdown 文本,并处理 URL 穿插。

# 核心逻辑示例:汇总推送 (IQC 模块)
def _check_and_push_iqc_summary_sync(db, report_id: int):
    report = db.get(IQCReport, report_id)
    # 仅查询状态异常的条目
    abnormal_entries = db.query(IQCReportEntry).filter(
        IQCReportEntry.report_id == report_id,
        IQCReportEntry.status == "ABNORMAL"
    ).all()
    
    if not abnormal_entries: return

    # 构造 URL 穿插展示的异常项
    base_url = f"{settings.FRONTEND_URL}/IQC/{report_id}"
    entry_links = [f"{e.name} [{base_url}/{e.id}]({base_url}/{e.id})" for e in abnormal_entries]
    entry_names_str = "\n- **异常项**: ".join(entry_links)

    msg = (
        f"### IQC 异常汇总\n"
        f"- **供应商**: {report.supplier}\n"
        f"- **品名**: {report.name}\n"
        f"- **异常项**: {entry_names_str}"
    )
    send_wechat_notification_sync(msg, settings.IQC_WEBHOOK_KEY)

3. 调用执行逻辑

系统采用 “即时单项预警” + “后台异步汇总预警” 双重机制。

流程图

graph TD
    A[用户操作: 导入Excel/手动更新数据] --> B{操作类型}
    
    B -- 手动更新单个条目 --> C[Router 实时计算状态]
    C --> D[异步推送 push_abnormal_notification]
    D --> E((企业微信群))

    B -- 批量导入Excel --> F[提交进程池异步计算]
    F --> G[子进程执行 _calculate_entries_sync]
    G --> H{判断来源 origin}
    H -- 供应商 SUPPLIER --> I[同步汇总推送 _check_and_push_summary_sync]
    H -- 天禄自己 SELF --> J[结束-不预警]
    I --> E

4. 关键策略点

  • 跨进程处理:在 ProcessPoolExecutor 中,由于无法直接共享父进程的数据库 Session 和事件循环,因此在子进程中使用 session_maker() 创建新的同步 Session,并调用 send_wechat_notification_sync

  • 环境配置:Webhook 地址前缀(WEBHOOK_BASE_URL)和前端跳转基准地址(FRONTEND_URL)均统一在 src/conf.py 中维护,方便多环境切换。

  • 异常处理:推送逻辑使用 try...except 包裹,确保推送服务的偶发异常不会中断主业务流程(如数据导入或计算)。

  • 预警过滤

    • OQC:全量预警。

    • IQC:通过查询 IQCReportImportLog 确认来源,仅对供应商(SUPPLIER)上传的数据进行预警,天禄自检数据产生的异常不发送通知,避免干扰。

  • 消息交互:URL 格式严格遵循 模块/报告ID/条目ID,确保用户点击后能精准定位到异常图表。


实现效果

源码与目录结构

点击展开查看目录结构
.
├── src/
│   ├── conf.py              # 全局配置(Webhook Key, Frontend URL)
│   ├── utils/
│   │   └── wechat_push.py   # 核心推送工具(同步/异步实现)
│   ├── iqc/
│   │   ├── router.py        # IQC 路由(触发单项推送)
│   │   ├── service.py       # IQC 服务(逻辑处理、汇总推送、进程池任务)
│   │   └── models.py        # IQC 模型(导入日志记录)
│   └── oqc/
│       ├── router.py        # OQC 路由(触发单项推送)
│       └── service.py       # OQC 服务(逻辑处理、汇总推送、进程池任务)
点击展开查看核心源码

1. 推送工具类 (src/utils/wechat_push.py)

import logging
import httpx
from src.conf import settings

logger = logging.getLogger(__name__)

def send_wechat_notification_sync(content: str, webhook_key: str | None = None) -> bool:
    """同步版本:通过企业微信 Webhook 发送消息"""
    key = webhook_key or settings.OQC_WEBHOOK_KEY
    if not key:
        logger.warning("未配置微信 Webhook Key,跳过消息推送")
        return False

    url = f"{settings.WEBHOOK_BASE_URL}{key}"
    data = {"msgtype": "markdown", "markdown": {"content": content}}

    try:
        with httpx.Client() as client:
            response = client.post(url, json=data, timeout=10.0)
            response.raise_for_status()
            return True
    except Exception as e:
        logger.error(f"同步微信消息推送异常: {str(e)}")
        return False

async def send_wechat_notification(content: str, webhook_key: str | None = None) -> bool:
    """异步版本:通过企业微信 Webhook 发送消息"""
    key = webhook_key or settings.OQC_WEBHOOK_KEY
    if not key:
        logger.warning("未配置微信 Webhook Key,跳过消息推送")
        return False

    url = f"{settings.WEBHOOK_BASE_URL}{key}"
    data = {"msgtype": "markdown", "markdown": {"content": content}}

    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(url, json=data, timeout=10.0)
            response.raise_for_status()
            return True
    except Exception as e:
        logger.error(f"微信消息推送异常: {str(e)}")
        return False

2. OQC 服务逻辑 (src/oqc/service.py)

async def push_oqc_abnormal_notification(db: AsyncSessionDep, entry: OQCReportEntry):
    """单项异常推送"""
    if entry.status == EntryStatusEnum.ABNORMAL.value:
        try:
            report = await db.get(OQCReport, entry.report_id)
            # ... 构造 msg ...
            await send_wechat_notification(msg, settings.OQC_WEBHOOK_KEY)
        except Exception as e:
            logger.error(f"推送 OQC 异常消息失败: {e}")

def _check_and_push_oqc_summary_sync(db, report_id: int):
    """汇总异常推送 (同步)"""
    # ... 查询异常项 ...
    # ... 构造汇总 msg ...
    send_wechat_notification_sync(msg, settings.OQC_WEBHOOK_KEY)

3. IQC 服务逻辑 (src/iqc/service.py)

def _calculate_entries_sync(report_ids: list[int]):
    """子进程计算逻辑"""
    # ... 计算过程 ...
    for report_id in report_ids:
        # 仅对供应商来源的数据进行汇总预警
        last_log = db.query(IQCReportImportLog).filter(...).first()
        if last_log and last_log.origin == OriginType.SUPPLIER.value:
            _check_and_push_iqc_summary_sync(db, report_id)