Step-by-step guide for creating Temporal workflows in Dust. Use when adding background jobs, async processing, durable workflows, or task queues.
This skill guides you through creating Temporal workflows for durable background processing.
temporal/your_queue/
├── config.ts # Queue name and version
├── helpers.ts # Workflow ID generators
├── activities.ts # Activity implementations (DB, API calls)
├── workflows.ts # Workflow orchestration
├── worker.ts # Worker setup
└── client.ts # Workflow launcher functions
Create temporal/your_queue/config.ts:
const QUEUE_VERSION = 1;
export const QUEUE_NAME = `your-queue-v${QUEUE_VERSION}`;
Create temporal/your_queue/helpers.ts:
export function makeYourWorkflowId({ entityId }: { entityId: string }): string {
return `your-workflow-${entityId}`;
}
Important: Workflow IDs must be deterministic (same inputs = same ID) for idempotency.
Create temporal/your_queue/activities.ts:
import { YourResource } from "@app/lib/resources/your_resource";
import logger from "@app/logger/logger";
export async function yourActivity({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
const entity = await YourResource.fetchById(entityId);
if (!entity) {
throw new Error(`Entity not found: ${entityId}`);
}
const result = await entity.doSomething();
if (result.isErr()) {
logger.error({ entityId, error: result.error }, "Failed to process entity");
throw new Error(`Failed to process: ${result.error.message}`);
}
}
Guidelines: Activities perform side effects, can throw (Temporal retries), should be idempotent.
Create temporal/your_queue/workflows.ts:
import { proxyActivities } from "@temporalio/workflow";
import type * as activities from "@app/temporal/your_queue/activities";
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
});
export async function yourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
await yourActivity({ entityId, workspaceId });
}
Guidelines: Workflows are deterministic - no Math.random(), Date.now(), etc.
Create temporal/your_queue/client.ts:
import { WorkflowExecutionAlreadyStartedError } from "@temporalio/client";
import { getTemporalClientForFrontNamespace } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { QUEUE_NAME } from "@app/temporal/your_queue/config";
import { makeYourWorkflowId } from "@app/temporal/your_queue/helpers";
import { yourWorkflow } from "@app/temporal/your_queue/workflows";
import type { Result } from "@app/types";
import { Err, normalizeError, Ok } from "@app/types";
export async function launchYourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<Result<undefined, Error>> {
const client = await getTemporalClientForFrontNamespace();
const workflowId = makeYourWorkflowId({ entityId });
try {
await client.workflow.start(yourWorkflow, {
args: [{ entityId, workspaceId }],
taskQueue: QUEUE_NAME,
workflowId,
memo: { entityId, workspaceId },
});
return new Ok(undefined);
} catch (e) {
if (!(e instanceof WorkflowExecutionAlreadyStartedError)) {
logger.error({ workflowId, entityId, workspaceId, error: e }, "Failed starting workflow");
}
return new Err(normalizeError(e));
}
}
Create temporal/your_queue/worker.ts:
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import { getTemporalWorkerConnection, TEMPORAL_MAXED_CACHED_WORKFLOWS } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import * as activities from "@app/temporal/your_queue/activities";
import { getWorkflowConfig } from "@app/temporal/bundle_helper";
import { QUEUE_NAME } from "./config";
export async function runYourQueueWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
...getWorkflowConfig({
workerName: "your_queue",
getWorkflowsPath: () => require.resolve("./workflows"),
}),
activities,
taskQueue: QUEUE_NAME,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
maxConcurrentActivityTaskExecutions: 16,
connection,
namespace,
interceptors: {
activityInbound: [(ctx: Context) => new ActivityInboundLogInterceptor(ctx, logger)],
},
});
await worker.run();
}
Edit temporal/worker_registry.ts:
// 1. Add import
import { runYourQueueWorker } from "@app/temporal/your_queue/worker";
// 2. Add to WorkerName type
export type WorkerName =
| "agent_loop"
// ... existing workers
| "your_queue"; // <- Add this
// 3. Add to workerFunctions mapping
export const workerFunctions: Record<WorkerName, () => Promise<void>> = {
// ... existing workers
your_queue: runYourQueueWorker, // <- Add this
};
Without registration, workflows will never execute!
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
retry: {
maximumAttempts: 3,
initialInterval: "1s",
backoffCoefficient: 2,
maximumInterval: "1m",
nonRetryableErrorTypes: ["ValidationError"],
},
});
proxyActivities with appropriate timeoutsResult<> and handles WorkflowExecutionAlreadyStartedErrorworker_registry.tsSee temporal/ directory for existing implementations.