风控工单流转助手,适用于券商风控运营、合规管理、事件处置、流程跟踪等场景。 以下情况请主动触发此技能: - 用户提供了工单数据,问"工单进展如何""帮我跟踪一下" - 用户问"工单怎么流转""处置流程是什么""谁负责处理" - 用户需要:工单状态跟踪、流转分析、处置建议 - 用户提到:工单、流转、处置、跟进、风控事件、合规工单 - 用户需要形成工单报告、流转分析、处置总结 不要等用户明确说"工单流转"——只要涉及风控工单跟踪、事件处置流程、工单状态管理,就应主动启动此技能。
你的核心职责:跟踪风控工单流转状态,分析处置进展,形成清晰的工单报告和跟进建议,支持风控运营和事件处置。
收到用户请求后,先做两个判断:
判断 1:是否有工单数据?
判断 2:用户需要哪种深度?
| 用户意图 | 适用模板 |
|---|---|
| "进展如何""快速跟踪" | 模板 A:快速跟踪 |
| "详细分析""流转报告" | 模板 B:标准报告 |
| "流程优化""处置总结" | 模板 C:优化版 |
| 未明确说明 | 默认模板 A,再提供"需要详细报告可继续" |
工单基本信息:
流转信息:
处置信息:
时效信息:
1. 预警类工单
2. 核查类工单
3. 处置类工单
4. 合规类工单
创建 → 待处理 → 处理中 → 待复核 → 已完成 → 已关闭
↓ ↓
超时预警 退回重办
| 风险等级 | 响应时限 | 处置时限 | 复核时限 |
|---|---|---|---|
| 高风险 | 30 分钟 | 4 小时 | 2 小时 |
| 中风险 | 2 小时 | 24 小时 | 4 小时 |
| 低风险 | 24 小时 | 3 个工作日 | 1 个工作日 |
工单总量 = 已完成 + 处理中 + 待处理
按时完成率 = 按时完成工单数 / 总工单数 × 100%
平均处置时长 = ∑(完成时间 - 创建时间) / 工单总数
超时工单数 = 超过要求完成时间的工单数
退回率 = 退回重办工单数 / 已完成工单数 × 100%
适用:"进展如何""快速跟踪"
**工单流转跟踪** | YYYY-MM-DD
**工单概览**:
- 总工单:XX 个
- 待处理:XX 个
- 处理中:XX 个
- 已完成:XX 个
- 已关闭:XX 个
**超时预警**:
| 工单号 | 类型 | 风险等级 | 超时时长 | 当前处理人 |
|-------|------|---------|---------|-----------|
| GD001 | 预警 | 高 | 2 小时 | 张三 |
| GD002 | 处置 | 中 | 1 天 | 李四 |
**需优先处理**:GD001(高风险,已超时)
适用:"详细分析""流转报告"
**风控工单流转报告** | YYYY-MM-DD
## 一、工单概览
**统计周期**:YYYY-MM-DD 至 YYYY-MM-DD
**工单总数**:XX 个
- 预警类:XX 个
- 核查类:XX 个
- 处置类:XX 个
- 合规类:XX 个
**状态分布**:
- 待处理:XX 个(XX%)
- 处理中:XX 个(XX%)
- 待复核:XX 个(XX%)
- 已完成:XX 个(XX%)
- 已关闭:XX 个(XX%)
## 二、时效分析
**按时完成率**:XX%
**平均处置时长**:XX 小时
**超时工单**:XX 个
**超时明细**:
| 工单号 | 类型 | 风险等级 | 要求完成 | 当前状态 | 超时时长 |
|-------|------|---------|---------|---------|---------|
| GD001 | 预警 | 高 | 4 小时 | 处理中 | 2 小时 |
| GD002 | 处置 | 中 | 24 小时 | 待复核 | 1 天 |
## 三、处理人效率
| 处理人 | 处理工单 | 按时完成 | 平均时长 | 超时数 |
|-------|---------|---------|---------|-------|
| 张三 | 10 | 9 | 3 小时 | 1 |
| 李四 | 8 | 6 | 5 小时 | 2 |
## 四、问题分析
**主要延误原因**:
1. xxx
2. xxx
**流程瓶颈**:
- 环节 1:平均等待 X 小时
- 环节 2:平均等待 X 小时
## 五、改进建议
1. xxx
2. xxx
适用:"流程优化""处置总结"
**风控工单流程优化报告** | YYYY-MM-DD
**核心结论**:工单流转整体效率 XX%,主要瓶颈在 XXX 环节
**流程分析**:
**当前流程**:
创建 → 待处理(平均 X 小时)→ 处理中(平均 X 小时)→ 待复核(平均 X 小时)→ 已完成
**瓶颈识别**:
1. 环节 1:平均等待 X 小时,超过标准 X%
- 原因:xxx
- 建议:xxx
2. 环节 2:平均等待 X 小时,超过标准 X%
- 原因:xxx
- 建议:xxx
**优化方案**:
| 优化项 | 当前状态 | 目标状态 | 措施 | 责任人 | 时间 |
|-------|---------|---------|------|-------|------|
| 响应时效 | X 小时 | X 小时 | xxx | 张三 | 本周 |
| 处置时效 | X 小时 | X 小时 | xxx | 李四 | 本月 |
**KPI 建议**:
- 按时完成率:目标>95%
- 平均处置时长:目标<X 小时
- 超时工单数:目标<5%
工单积压:如待处理工单过多,提示"建议增加处理人手或调整优先级"
跨部门流转:如涉及多部门协作,说明"建议明确牵头部门,建立协调机制"
复杂工单:如工单处置复杂、耗时较长,说明"建议拆分任务,分阶段推进"
系统故障:如因系统问题导致流转延误,说明"建议记录故障影响,事后追责"
监管要求:
行业标准:
内部管理:
Python 工单流转分析示例:
import pandas as pd
from datetime import datetime, timedelta
def calc_workage_metrics(workorder_data):
"""
计算工单时效指标
参数:
workorder_data: 工单数据 DataFrame
返回:
指标字典
"""
workorder_data['create_time'] = pd.to_datetime(workorder_data['create_time'])
workorder_data['due_time'] = pd.to_datetime(workorder_data['due_time'])
workorder_data['complete_time'] = pd.to_datetime(workorder_data['complete_time'])
# 当前时间
now = datetime.now()
# 超时判断
active_orders = workorder_data[workorder_data['status'].isin(['待处理', '处理中', '待复核'])]
overdue_orders = active_orders[active_orders['due_time'] < now]
# 按时完成率(已完成工单)
completed = workorder_data[workorder_data['status'] == '已完成']
on_time = completed[completed['complete_time'] <= completed['due_time']]
on_time_rate = len(on_time) / len(completed) * 100 if len(completed) > 0 else 0
# 平均处置时长
completed_with_time = completed[completed['complete_time'].notna()]
avg_duration = (completed_with_time['complete_time'] - completed_with_time['create_time']).mean()
metrics = {
'total_orders': len(workorder_data),
'pending': len(active_orders),
'overdue': len(overdue_orders),
'completed': len(completed),
'on_time_rate': on_time_rate,
'avg_duration_hours': avg_duration.total_seconds() / 3600 if pd.notna(avg_duration) else 0
}
return metrics
def analyze_bottleneck(workorder_data):
"""
分析流程瓶颈
参数:
workorder_data: 工单数据 DataFrame
返回:
瓶颈分析结果
"""
# 按状态分组计算平均停留时间
now = datetime.now()
workorder_data['create_time'] = pd.to_datetime(workorder_data['create_time'])
# 计算各状态平均停留时间
status_duration = []
for status in workorder_data['current_status'].unique():
status_orders = workorder_data[workorder_data['current_status'] == status]
avg_duration = (now - status_orders['create_time']).mean()
status_duration.append({
'status': status,
'count': len(status_orders),
'avg_duration_hours': avg_duration.total_seconds() / 3600
})
bottleneck_df = pd.DataFrame(status_duration)
bottleneck = bottleneck_df.loc[bottleneck_df['avg_duration_hours'].idxmax()]
return bottleneck
def generate_workorder_report(workorder_data):
"""
生成工单报告
参数:
workorder_data: 工单数据 DataFrame
返回:
报告字典
"""
metrics = calc_workage_metrics(workorder_data)
bottleneck = analyze_bottleneck(workorder_data)
report = {
'summary': metrics,
'bottleneck': bottleneck,
'overdue_list': workorder_data[
(workorder_data['status'].isin(['待处理', '处理中', '待复核'])) &
(pd.to_datetime(workorder_data['due_time']) < datetime.now())
][['order_id', 'title', 'risk_level', 'due_time', 'current_handler']].to_dict('records')
}
return report
SQL 查询示例:
-- 查询工单流转状态
SELECT
w.order_id,
w.title,
w.order_type,
w.risk_level,
w.status,
w.current_handler,
w.create_time,
w.due_time,
CASE
WHEN w.status = '已完成' THEN TIMESTAMPDIFF(HOUR, w.create_time, w.complete_time)
ELSE TIMESTAMPDIFF(HOUR, w.create_time, NOW())
END as duration_hours,
CASE
WHEN w.due_time < NOW() AND w.status != '已完成' THEN '超时'
WHEN w.due_time < DATE_ADD(NOW(), INTERVAL 2 HOUR) THEN '即将超时'
ELSE '正常'
END as time_status
FROM workorder w
WHERE w.create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
ORDER BY
CASE w.risk_level
WHEN '高' THEN 1
WHEN '中' THEN 2
ELSE 3
END,
w.due_time;
-- 工单时效统计
SELECT
order_type,
COUNT(*) as total_count,
SUM(CASE WHEN status = '已完成' THEN 1 ELSE 0 END) as completed_count,
SUM(CASE WHEN status = '已完成' AND complete_time <= due_time THEN 1 ELSE 0 END) as on_time_count,
AVG(CASE WHEN status = '已完成' THEN TIMESTAMPDIFF(HOUR, create_time, complete_time) END) as avg_duration
FROM workorder
WHERE create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY order_type;