Build Temporal applications in Python using the temporalio SDK. Use when creating workflows, activities, workers, clients, signals, queries, updates, child workflows, timers, retry policies, saga/compensation patterns, testing, or any durable execution pattern in Python.
Build durable, fault-tolerant distributed applications in Python. Temporal guarantees workflow completion even through process crashes, network failures, and server outages.
pip install temporalio
Requires Python >= 3.9. The SDK is a single package with no heavy dependencies.
my_temporal_app/
workflows.py # Workflow definitions
activities.py # Activity definitions
worker.py # Worker startup
client.py # Client code (start/signal/query workflows)
models.py # Shared dataclasses for inputs/outputs
tests/
conftest.py # Test fixtures (WorkflowEnvironment)
test_workflows.py
test_activities.py
# Workflow module -- used inside workflow definitions
from temporalio import workflow
# Activity module -- used inside activity definitions
from temporalio import activity
# Client -- used to connect to Temporal and interact with workflows
from temporalio.client import Client
# Worker -- used to run workflows and activities
from temporalio.worker import Worker
# Common types -- RetryPolicy, SearchAttributes, etc.
from temporalio.common import RetryPolicy
# Exceptions
from temporalio.exceptions import (
ApplicationError, # Raise from activities/workflows for business errors
ActivityError, # Caught in workflows when an activity fails
ChildWorkflowError, # Caught when a child workflow fails
CancelledError, # Caught when workflow/activity is cancelled
FailureError, # Base class for all failure errors
)
# Testing
from temporalio.testing import WorkflowEnvironment, ActivityEnvironment
Workflow code runs in a sandbox that intercepts non-deterministic calls. When importing third-party modules or your own activity/model modules inside a workflow file, use the pass-through pattern:
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from my_app.activities import my_activity
from my_app.models import MyInput, MyOutput
This prevents the sandbox from interfering with imports that are only used as type references or activity references.
Temporal strongly encourages using a single dataclass parameter for activities and workflows instead of multiple parameters. This allows backwards-compatible field additions.
from dataclasses import dataclass
from typing import Optional
@dataclass
class OrderInput:
order_id: str
customer_name: str
amount: float
currency: str = "USD"
@dataclass
class OrderResult:
order_id: str
status: str
confirmation_number: Optional[str] = None
from temporalio import activity
@activity.defn
async def process_payment(input: PaymentInput) -> PaymentResult:
activity.logger.info("Processing payment for order %s", input.order_id)
result = await payment_gateway.charge(input.amount, input.card_token)
return PaymentResult(transaction_id=result.id, status="completed")
import time
from temporalio import activity
@activity.defn
def send_email(input: EmailInput) -> None:
activity.logger.info("Sending email to %s", input.recipient)
# Blocking I/O is fine in sync activities
smtp_client.send(to=input.recipient, subject=input.subject, body=input.body)
import time
from temporalio import activity
@activity.defn
def process_large_dataset(input: DatasetInput) -> DatasetResult:
rows_processed = 0
for batch in read_batches(input.file_path):
# Heartbeat to tell the server this activity is still alive.
# If heartbeat_timeout passes without a heartbeat, the server
# considers the activity failed and retries it.
activity.heartbeat(rows_processed)
process_batch(batch)
rows_processed += len(batch)
return DatasetResult(total_rows=rows_processed)
When the activity is retried, you can retrieve the last heartbeat details:
@activity.defn
def resumable_activity(input: ProcessInput) -> ProcessResult:
# On retry, pick up from where we left off
start_index = 0
if activity.info().heartbeat_details:
start_index = activity.info().heartbeat_details[0]
for i in range(start_index, len(input.items)):
activity.heartbeat(i)
process_item(input.items[i])
return ProcessResult(processed=len(input.items))
from temporalio import activity
class MyActivities:
def __init__(self, db_client: DatabaseClient) -> None:
self.db_client = db_client
@activity.defn
async def fetch_record(self, input: FetchInput) -> Record:
return await self.db_client.get(input.record_id)
@activity.defn
async def save_record(self, input: SaveInput) -> None:
await self.db_client.save(input.record)
When registering with the worker, instantiate the class and pass the bound methods:
my_activities = MyActivities(db_client)
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activities.fetch_record, my_activities.save_record],
)
When calling from a workflow, use workflow.execute_activity_method:
result = await workflow.execute_activity_method(
MyActivities.fetch_record,
FetchInput(record_id="123"),
start_to_close_timeout=timedelta(seconds=10),
)
@activity.defn(name="custom-activity-name")
async def my_activity(input: MyInput) -> MyOutput:
...
from typing import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_activity(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, MyDataClass)
activity_type = activity.info().activity_type
return f"Dynamic activity {activity_type} received: {arg1}"
from temporalio import activity
class MyProcessor:
def __init__(self, client: Client) -> None:
self.client = client
@activity.defn
async def start_processing(self, input: ProcessInput) -> str:
task_token = activity.info().task_token
# Hand off to external system, passing the task_token
await self.external_queue.send(task_token, input)
# Signal that this activity will be completed externally
activity.raise_complete_async()
async def complete_from_external(self, task_token: bytes, result: str) -> None:
handle = self.client.get_async_activity_handle(task_token=task_token)
await handle.complete(result)
async def fail_from_external(self, task_token: bytes, error: Exception) -> None:
handle = self.client.get_async_activity_handle(task_token=task_token)
await handle.fail(error)
async def heartbeat_from_external(self, task_token: bytes) -> None:
handle = self.client.get_async_activity_handle(task_token=task_token)
await handle.heartbeat()
Access activity metadata inside an activity:
@activity.defn
async def my_activity(input: MyInput) -> MyOutput:
info = activity.info()
info.activity_id # Unique activity ID
info.activity_type # Activity type name
info.attempt # Current attempt number (starts at 1)
info.workflow_id # Parent workflow ID
info.workflow_run_id # Parent workflow run ID
info.task_token # Token for async completion
info.heartbeat_details # Details from last heartbeat (on retry)
info.scheduled_time # When the activity was scheduled
...
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from my_app.activities import process_order
from my_app.models import OrderInput, OrderResult
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, input: OrderInput) -> OrderResult:
workflow.logger.info("Processing order %s", input.order_id)
result = await workflow.execute_activity(
process_order,
input,
start_to_close_timeout=timedelta(seconds=30),
)
return result
Inside @workflow.run and all signal/query/update handlers, you must NOT:
datetime.now(), time.time(), random, or uuid4() -- use workflow.now(), workflow.random(), workflow.uuid4() insteadasyncio primitives onlyYou CAN use:
asyncio.sleep() -- becomes a durable timerasyncio.gather() -- run activities in parallelasyncio.Lock() -- protect shared workflow state from interleaved handler executionworkflow.wait_condition() -- wait for a boolean condition to become truefrom dataclasses import dataclass
from typing import List, Optional
@dataclass
class ApprovalInput:
approver: str
approved: bool
@workflow.defn
class ApprovalWorkflow:
def __init__(self) -> None:
self._approved: Optional[bool] = None
self._approver: Optional[str] = None
@workflow.run
async def run(self, input: OrderInput) -> str:
# Wait until we receive the approval signal
await workflow.wait_condition(lambda: self._approved is not None)
if self._approved:
return f"Order {input.order_id} approved by {self._approver}"
else:
return f"Order {input.order_id} rejected by {self._approver}"
@workflow.signal
def approve(self, input: ApprovalInput) -> None:
self._approved = input.approved
self._approver = input.approver
@workflow.defn
class OrderWorkflow:
def __init__(self) -> None:
self._status = "pending"
self._items_processed = 0
@workflow.run
async def run(self, input: OrderInput) -> OrderResult:
self._status = "processing"
# ... do work ...
self._status = "completed"
return result
@workflow.query
def get_status(self) -> str:
return self._status
@workflow.query
def get_progress(self) -> int:
return self._items_processed
Query handlers must NOT modify workflow state or call activities. They are read-only.
from temporalio.exceptions import ApplicationError
@workflow.defn
class ShoppingCartWorkflow:
def __init__(self) -> None:
self._items: dict[str, int] = {}
self._checked_out = False
@workflow.run
async def run(self) -> dict:
await workflow.wait_condition(lambda: self._checked_out)
return self._items
@workflow.update
async def add_item(self, item_id: str, quantity: int) -> dict:
self._items[item_id] = self._items.get(item_id, 0) + quantity
return self._items
@add_item.validator
def validate_add_item(self, item_id: str, quantity: int) -> None:
if self._checked_out:
raise ApplicationError("Cannot add items after checkout")
if quantity <= 0:
raise ApplicationError("Quantity must be positive")
@workflow.update
async def checkout(self) -> dict:
self._checked_out = True
return self._items
Update validators run before the update handler and can reject the update by raising an exception. Validators must NOT modify workflow state.
@workflow.run
async def run(self, input: OrderInput) -> str:
try:
# Wait up to 24 hours for approval
await workflow.wait_condition(
lambda: self._approved is not None,
timeout=timedelta(hours=24),
)
return "approved" if self._approved else "rejected"
except asyncio.TimeoutError:
return "timed_out"
Before completing a workflow, ensure all signal and update handlers have finished:
@workflow.run
async def run(self, input: MyInput) -> str:
# ... main workflow logic ...
# Wait for any in-progress signal/update handlers to complete
await workflow.wait_condition(workflow.all_handlers_finished)
return "done"
@workflow.run
async def run(self, input: ReminderInput) -> None:
# This sleep is durable -- if the worker crashes, the timer
# continues on the server and fires when it expires
await asyncio.sleep(3600) # Wait 1 hour
await workflow.execute_activity(
send_reminder,
input,
start_to_close_timeout=timedelta(seconds=10),
)
@workflow.run
async def run(self, users: list[str]) -> list[str]:
# Run multiple activities concurrently using asyncio.gather
results = await asyncio.gather(
*[
workflow.execute_activity(
greet_user,
user,
start_to_close_timeout=timedelta(seconds=5),
)
for user in users
]
)
return list(results)
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self, input: ParentInput) -> str:
# Execute and wait for child workflow result
child_result = await workflow.execute_child_workflow(
ChildWorkflow.run,
ChildInput(data=input.data),
id=f"child-{input.parent_id}",
)
return f"Parent got: {child_result}"
@workflow.defn
class ChildWorkflow:
@workflow.run
async def run(self, input: ChildInput) -> str:
return await workflow.execute_activity(
process_data,
input,
start_to_close_timeout=timedelta(seconds=30),
)
Start a child workflow without waiting for its result:
child_handle = await workflow.start_child_workflow(
ChildWorkflow.run,
ChildInput(data=input.data),
id=f"child-{input.parent_id}",
)
# Can signal the child
await child_handle.signal(ChildWorkflow.some_signal, signal_data)
# Wait for result later
result = await child_handle
Parent close policy controls what happens to the child when the parent completes:
from temporalio.workflow import ParentClosePolicy
child_result = await workflow.execute_child_workflow(
ChildWorkflow.run,
input,
id="child-id",
parent_close_policy=ParentClosePolicy.ABANDON, # Child keeps running
# Other options: TERMINATE (default), REQUEST_CANCEL
)
Workflows accumulate event history. For workflows that run indefinitely or for very long periods, use continue-as-new to reset history while preserving logical state:
@workflow.defn
class EntityWorkflow:
@workflow.run
async def run(self, state: EntityState) -> EntityResult:
while not state.is_done:
await workflow.wait_condition(
lambda: state.has_pending_work or state.is_done,
timeout=timedelta(minutes=10),
)
if state.has_pending_work:
await self.process_work(state)
# Check if Temporal suggests continuing as new
if workflow.info().is_continue_as_new_suggested():
# Finish any in-progress handlers first
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(state)
return EntityResult(...)
workflow.continue_as_new() raises an internal exception that stops the current workflow execution and starts a new one with the same workflow ID, passing the provided arguments.
@dataclass
class MyWorkflowInput:
name: str
@workflow.defn
class MyWorkflow:
@workflow.init
def __init__(self, input: MyWorkflowInput) -> None:
# Set up state before run() is called.
# Same input is passed to both __init__ and run.
self.greeting_prefix = f"Hello, {input.name}"
@workflow.run
async def run(self, input: MyWorkflowInput) -> str:
return self.greeting_prefix
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
# Get a handle to another running workflow and signal it
handle = workflow.get_external_workflow_handle_for(
WorkflowA.run, "workflow-a-id"
)
await handle.signal(WorkflowA.some_signal, "signal data")
from typing import Sequence
from temporalio.common import RawValue
@workflow.defn
class FlexibleWorkflow:
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
# Called for any signal that does not match a named handler
payload = workflow.payload_converter().from_payload(args[0].payload, str)
workflow.logger.info(f"Received dynamic signal '{name}': {payload}")
@workflow.query(dynamic=True)
def dynamic_query(self, name: str, args: Sequence[RawValue]) -> str:
return f"Dynamic query '{name}' handled"
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return f"Hello, {name}!"
When multiple signal or update handlers might modify the same state and include await points, use an asyncio.Lock to prevent interleaving:
@workflow.defn
class SafeWorkflow:
def __init__(self) -> None:
self.lock = asyncio.Lock()
self.balance = 0
@workflow.update
async def transfer(self, amount: float) -> float:
async with self.lock:
# Check balance
if self.balance < amount:
raise ApplicationError("Insufficient funds")
# This await yields control, but the lock prevents interleaving
await workflow.execute_activity(
record_transfer,
amount,
start_to_close_timeout=timedelta(seconds=10),
)
self.balance -= amount
return self.balance
# Inside a workflow:
workflow.info() # WorkflowInfo with id, run_id, task_queue, etc.
workflow.now() # Current time (deterministic)
workflow.random() # Random instance (deterministic, seeded)
workflow.uuid4() # Deterministic UUID generation
workflow.logger # Logger that includes workflow context
workflow.memo() # Access workflow memo
workflow.all_handlers_finished() # True when all handlers are done
workflow.info().is_continue_as_new_suggested() # Server recommends continue-as-new
workflow.info().get_current_history_length() # Current event history size
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[activity_a, activity_b],
# Required for synchronous (non-async) activities:
activity_executor=ThreadPoolExecutor(10),
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
async with Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
activity_executor=ThreadPoolExecutor(10),
) as worker:
# Worker is running, do other things...
await some_other_task()
# Worker shuts down gracefully when exiting the context
db_client = DatabaseClient(connection_string)
my_activities = MyActivities(db_client)
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activities.fetch_record, my_activities.save_record],
)
Worker(
client,
task_queue="my-task-queue",
workflows=[...],
activities=[...],
activity_executor=ThreadPoolExecutor(10), # For sync activities
max_concurrent_activities=100, # Limit concurrent activities
max_concurrent_workflow_tasks=100, # Limit concurrent workflow tasks
max_cached_workflows=100, # Workflow cache size
)
from temporalio.client import Client
# Local development
client = await Client.connect("localhost:7233")
# With namespace
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Temporal Cloud with mTLS
client = await Client.connect(
"my-namespace.a1b2c.tmprl.cloud:7233",
namespace="my-namespace.a1b2c",
tls=TLSConfig(
client_cert=Path("client.pem").read_bytes(),
client_private_key=Path("client.key").read_bytes(),
),
)
# Start and wait for result
result = await client.execute_workflow(
MyWorkflow.run,
MyInput(data="hello"),
id="my-workflow-id",
task_queue="my-task-queue",
)
# Start without waiting (returns handle)
handle = await client.start_workflow(
MyWorkflow.run,
MyInput(data="hello"),
id="my-workflow-id",
task_queue="my-task-queue",
)
# Get result later
result = await handle.result()
# Get a handle to an existing workflow
handle = client.get_workflow_handle("my-workflow-id")
# Signal the workflow
await handle.signal(MyWorkflow.my_signal, SignalInput(data="value"))
# Query the workflow
status = await handle.query(MyWorkflow.get_status)
# Execute an update (send + wait for result)
update_result = await handle.execute_update(
MyWorkflow.my_update, UpdateInput(data="value")
)
# Cancel the workflow
await handle.cancel()
# Terminate the workflow (immediate, no cleanup)
await handle.terminate("reason for termination")
# Describe the workflow (get execution info)
description = await handle.describe()
print(description.status) # WorkflowExecutionStatus.RUNNING, COMPLETED, etc.
# Get the result (will raise WorkflowFailureError if workflow failed)
from temporalio.client import WorkflowFailureError