背景
为了实时监控企业SPC流程链路的异常情况(如 CPK 不达标或检测值超限),系统集成了企业微信 Webhook 推送功能。当 OQC 或 IQC 报告在导入或计算后出现“异常”状态时,会自动向指定的企业微信群发送 Markdown 格式的预警消息,并提供直接跳转至前端详情页的链接,实现“发现即通知,通知即处理”的闭环。
参考文档
https://developer.work.weixin.qq.com/document/path/99110
前置条件
申请企业微信Webhook的key

实现
1. 核心推送工具类 (wechat_push.py)
统一封装了异步和同步两个版本的推送函数,强制使用 Markdown 格式以支持丰富的样式和超链接。
# src/utils/wechat_push.py
import httpx
from src.conf import settings
async def send_wechat_notification(content: str, webhook_key: str):
"""异步推送:用于 Web 异步协程环境"""
url = f"{settings.WEBHOOK_BASE_URL}{webhook_key}"
data = {"msgtype": "markdown", "markdown": {"content": content}}
async with httpx.AsyncClient() as client:
await client.post(url, json=data, timeout=10.0)
def send_wechat_notification_sync(content: str, webhook_key: str):
"""同步推送:用于多进程 ProcessPoolExecutor 环境"""
url = f"{settings.WEBHOOK_BASE_URL}{webhook_key}"
data = {"msgtype": "markdown", "markdown": {"content": content}}
with httpx.Client() as client:
client.post(url, json=data, timeout=10.0)
2. 消息模板构造 (service.py)
根据业务需求(单条推送 vs 汇总推送)构造 Markdown 文本,并处理 URL 穿插。
# 核心逻辑示例:汇总推送 (IQC 模块)
def _check_and_push_iqc_summary_sync(db, report_id: int):
report = db.get(IQCReport, report_id)
# 仅查询状态异常的条目
abnormal_entries = db.query(IQCReportEntry).filter(
IQCReportEntry.report_id == report_id,
IQCReportEntry.status == "ABNORMAL"
).all()
if not abnormal_entries: return
# 构造 URL 穿插展示的异常项
base_url = f"{settings.FRONTEND_URL}/IQC/{report_id}"
entry_links = [f"{e.name} [{base_url}/{e.id}]({base_url}/{e.id})" for e in abnormal_entries]
entry_names_str = "\n- **异常项**: ".join(entry_links)
msg = (
f"### IQC 异常汇总\n"
f"- **供应商**: {report.supplier}\n"
f"- **品名**: {report.name}\n"
f"- **异常项**: {entry_names_str}"
)
send_wechat_notification_sync(msg, settings.IQC_WEBHOOK_KEY)
3. 调用执行逻辑
系统采用 “即时单项预警” + “后台异步汇总预警” 双重机制。
流程图
graph TD
A[用户操作: 导入Excel/手动更新数据] --> B{操作类型}
B -- 手动更新单个条目 --> C[Router 实时计算状态]
C --> D[异步推送 push_abnormal_notification]
D --> E((企业微信群))
B -- 批量导入Excel --> F[提交进程池异步计算]
F --> G[子进程执行 _calculate_entries_sync]
G --> H{判断来源 origin}
H -- 供应商 SUPPLIER --> I[同步汇总推送 _check_and_push_summary_sync]
H -- 天禄自己 SELF --> J[结束-不预警]
I --> E
4. 关键策略点
跨进程处理:在
ProcessPoolExecutor中,由于无法直接共享父进程的数据库 Session 和事件循环,因此在子进程中使用session_maker()创建新的同步 Session,并调用send_wechat_notification_sync。环境配置:Webhook 地址前缀(
WEBHOOK_BASE_URL)和前端跳转基准地址(FRONTEND_URL)均统一在src/conf.py中维护,方便多环境切换。异常处理:推送逻辑使用
try...except包裹,确保推送服务的偶发异常不会中断主业务流程(如数据导入或计算)。预警过滤:
OQC:全量预警。
IQC:通过查询
IQCReportImportLog确认来源,仅对供应商(SUPPLIER)上传的数据进行预警,天禄自检数据产生的异常不发送通知,避免干扰。
消息交互:URL 格式严格遵循
模块/报告ID/条目ID,确保用户点击后能精准定位到异常图表。
实现效果


源码与目录结构
点击展开查看目录结构
.
├── src/
│ ├── conf.py # 全局配置(Webhook Key, Frontend URL)
│ ├── utils/
│ │ └── wechat_push.py # 核心推送工具(同步/异步实现)
│ ├── iqc/
│ │ ├── router.py # IQC 路由(触发单项推送)
│ │ ├── service.py # IQC 服务(逻辑处理、汇总推送、进程池任务)
│ │ └── models.py # IQC 模型(导入日志记录)
│ └── oqc/
│ ├── router.py # OQC 路由(触发单项推送)
│ └── service.py # OQC 服务(逻辑处理、汇总推送、进程池任务)
点击展开查看核心源码
1. 推送工具类 (src/utils/wechat_push.py)
import logging
import httpx
from src.conf import settings
logger = logging.getLogger(__name__)
def send_wechat_notification_sync(content: str, webhook_key: str | None = None) -> bool:
"""同步版本:通过企业微信 Webhook 发送消息"""
key = webhook_key or settings.OQC_WEBHOOK_KEY
if not key:
logger.warning("未配置微信 Webhook Key,跳过消息推送")
return False
url = f"{settings.WEBHOOK_BASE_URL}{key}"
data = {"msgtype": "markdown", "markdown": {"content": content}}
try:
with httpx.Client() as client:
response = client.post(url, json=data, timeout=10.0)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"同步微信消息推送异常: {str(e)}")
return False
async def send_wechat_notification(content: str, webhook_key: str | None = None) -> bool:
"""异步版本:通过企业微信 Webhook 发送消息"""
key = webhook_key or settings.OQC_WEBHOOK_KEY
if not key:
logger.warning("未配置微信 Webhook Key,跳过消息推送")
return False
url = f"{settings.WEBHOOK_BASE_URL}{key}"
data = {"msgtype": "markdown", "markdown": {"content": content}}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, timeout=10.0)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"微信消息推送异常: {str(e)}")
return False
2. OQC 服务逻辑 (src/oqc/service.py)
async def push_oqc_abnormal_notification(db: AsyncSessionDep, entry: OQCReportEntry):
"""单项异常推送"""
if entry.status == EntryStatusEnum.ABNORMAL.value:
try:
report = await db.get(OQCReport, entry.report_id)
# ... 构造 msg ...
await send_wechat_notification(msg, settings.OQC_WEBHOOK_KEY)
except Exception as e:
logger.error(f"推送 OQC 异常消息失败: {e}")
def _check_and_push_oqc_summary_sync(db, report_id: int):
"""汇总异常推送 (同步)"""
# ... 查询异常项 ...
# ... 构造汇总 msg ...
send_wechat_notification_sync(msg, settings.OQC_WEBHOOK_KEY)
3. IQC 服务逻辑 (src/iqc/service.py)
def _calculate_entries_sync(report_ids: list[int]):
"""子进程计算逻辑"""
# ... 计算过程 ...
for report_id in report_ids:
# 仅对供应商来源的数据进行汇总预警
last_log = db.query(IQCReportImportLog).filter(...).first()
if last_log and last_log.origin == OriginType.SUPPLIER.value:
_check_and_push_iqc_summary_sync(db, report_id)