使用 Trigger.dev v4 设计和实现生产级后台任务系统。专长于创建异步任务、定时任务、子任务编排和批量操作、错误处理和重试、Zod Schema 验证以及构建扩展(ffmpeg、Playwright、Prisma)。在需要架构后台任务、实现定时任务、构建 AI 工作流或处理长时间运行的异步操作时使用。
import { task } from "@trigger.dev/sdk"
export const processData = task({
id: "process-data",
retry: { maxAttempts: 3 },
run: async (payload: { userId: string }) => {
// 任务逻辑 - 无超时限制,可长时间运行
return { processed: true }
},
})
import { tasks } from "@trigger.dev/sdk"
import type { processData } from "./trigger/tasks"
// 从 API 路由触发
const handle = await tasks.trigger<typeof processData>("process-data", {
userId: "123",
})
import { schemaTask } from "@trigger.dev/sdk"
import { z } from "zod"
export const validatedTask = schemaTask({
id: "validated-task",
schema: z.object({
email: z.string().email(),
age: z.number().min(0),
}),
run: async (payload) => {
// payload 自动验证和类型推断
},
})
// ✅ 正确 - v4 SDK
import { task } from "@trigger.dev/sdk"
// ❌ 错误 - v2 API (会破坏应用)
client.defineJob({ id: "job-id", run: async () => {} })
// triggerAndWait 返回 Result 对象,不是直接输出
const result = await childTask.triggerAndWait({ data: "value" })
if (result.ok) {
console.log(result.output) // ✅ 正确 - 访问实际输出
} else {
console.error(result.error)
}
// 或使用 unwrap 快速访问(失败时抛出错误)
const output = await childTask.triggerAndWait({ data: "value" }).unwrap()
// ❌ 错误 - 网络问题时可能重复执行
const result = await childTask.trigger({ payload })
// ✅ 正确 - 使用幂等性 key
const result = await childTask.trigger(
{ payload },
{ idempotencyKey: `${ctx.run.id}-child-1` }
)
为什么重要: 如果网络故障,任务可能被触发多次。使用 idempotencyKey 确保相同操作只执行一次。
// ❌ 错误 - Result 对象不是直接输出
const output = await childTask.triggerAndWait({ data: "value" })
console.log(output.userId) // undefined!
// ✅ 正确 - 检查 ok 状态
const result = await childTask.triggerAndWait({ data: "value" })
if (result.ok) {
console.log(result.output.userId) // ✅ 成功
} else {
console.error(result.error) // 处理错误
}
// ✅ 或者使用 unwrap(简洁但失败时抛出)
const output = (await childTask.triggerAndWait({ data: "value" })).unwrap()
为什么重要: 参考官方指南 "⚠️ 重要提醒" > "Result vs Output" 部分。
// ❌ 反模式 - 太多细碎的任务
const result1 = await validateTask.triggerAndWait({ user })
const result2 = await sendEmailTask.triggerAndWait({ email })
const result3 = await logTask.triggerAndWait({ userId })
// 这会导致 3 个独立的执行和调用开销
// ✅ 最佳实践 - 合理的任务粒度
export const processUser = schemaTask({
id: "users.process.complete",
schema: z.object({ userId: z.string() }),
run: async (payload) => {
// 验证
const user = await validateUser(payload.userId)
// 发送邮件
await sendEmail(user.email)
// 日志记录
logger.log("User processed", { userId: payload.userId })
return { processed: true }
},
})
为什么重要: 任务拆分应该基于可独立重试的边界,不是为了代码重用。
// ❌ 反模式 - 不必要的高成本配置
export const expensiveTask = task({
id: "expensive",
machine: "large-2x", // 😅 不必要的大机器
retry: { maxAttempts: 100 }, // 😅 过多重试
run: async (payload) => {
/* ... */
},
})
// ✅ 最佳实践 - 选择合适的资源
export const optimizedTask = schemaTask({
id: "optimized",
machine: "small-1x", // 够用即可
retry: { maxAttempts: 3 }, // 合理的重试
schema: z.object({
/* ... */
}),
run: async (payload) => {
/* ... */
},
})
为什么重要: 参考 machines.md 了解各机器预设的成本和性能对比。
// ❌ 错误 - 不支持
await Promise.all([
task1.triggerAndWait(payload1),
task2.triggerAndWait(payload2),
])
// ❌ 错误 - 不支持
await Promise.all([wait.for({ seconds: 30 }), wait.for({ seconds: 60 })])
// ✅ 正确 - 使用 batchTriggerAndWait
await parentTask.batchTriggerAndWait([
{ payload: payload1 },
{ payload: payload2 },
])
domain.action.target 模式trigger/
├── index.ts # 导出所有任务
├── tasks/ # 任务定义
│ ├── users.ts # 用户相关任务
│ ├── payments.ts # 支付任务
│ └── ...
└── shared/ # 共享工具
├── queues.ts # 队列定义
└── streams.ts # Stream 定义
| 任务 | 阅读文档 |
|---|---|
| 第一次使用 | getting-started.md |
| 创建基础任务 | task-basics.md → basic-task.md |
| 实现定时任务 | scheduled-tasks.md |
| 构建复杂工作流 | advanced-tasks.md → workflow-example.md |
| 添加实时监控 | realtime.md → realtime-example.md |
| 配置构建扩展 | configuration.md |
| 快速查 API | reference/api-reference.md |
| 最佳实践 | reference/best-practices.md |