Guide for adding a new RolloutWorkflow to AReaL. Use when user wants to create a new workflow.
Add a new RolloutWorkflow implementation to AReaL.
This skill is triggered when:
Before starting, ensure you understand:
Create areal/workflow/<name>.py:
import uuid
from typing import Any, Callable
import torch
from areal.api.cli_args import GenerationHyperparameters
from areal.api.engine_api import InferenceEngine
from areal.api.io_struct import ModelRequest, ModelResponse
from areal.api.reward_api import AsyncRewardWrapper
from areal.api.workflow_api import RolloutWorkflow
from areal.utils import logging
logger = logging.getLogger("MyWorkflow")
class MyWorkflow(RolloutWorkflow):
"""Description of your workflow."""
def __init__(
self,
gconfig: GenerationHyperparameters,
tokenizer,
reward_fn: Callable,
):
self.gconfig = gconfig.new_with_stop_and_pad_token_ids(tokenizer)
self.tokenizer = tokenizer
self.async_reward_fn = AsyncRewardWrapper(reward_fn)
async def arun_episode(
self,
engine: InferenceEngine,
data: dict[str, Any],
) -> dict[str, torch.Tensor]:
"""Run a single episode. MUST be async and non-blocking."""
# 1. Prepare input_ids from data
input_ids = self.tokenizer.apply_chat_template(
data["messages"],
tokenize=True,
add_generation_prompt=True,
)
# 2. Build ModelRequest
req = ModelRequest(
rid=uuid.uuid4().hex,
input_ids=list(input_ids),
gconfig=self.gconfig.new(n_samples=1),
tokenizer=self.tokenizer,
)
# 3. Generate completion (async)
resp: ModelResponse = await engine.agenerate(req)
# 4. Compute reward (async)
prompt_str = self.tokenizer.decode(input_ids)
completion_str = self.tokenizer.decode(resp.output_tokens)
reward = await self.async_reward_fn(
prompt_str,
completion_str,
resp.input_tokens,
resp.output_tokens,
**data,
)
# 5. Return results in expected format
return {
"input_ids": torch.tensor(resp.input_tokens),
"output_ids": torch.tensor(resp.output_tokens),
"reward": torch.tensor(reward),
}
<!--
================================================================================
MAINTAINER GUIDE
================================================================================
Location: .claude/skills/add-workflow/SKILL.md
Invocation: /add-workflow <name>
## Purpose
Step-by-step guide for adding new RolloutWorkflow implementations.
## How to Update
### When Workflow API Changes
1. Update the code template in Step 1
2. Update the required imports
3. Update the method signature if changed
### When New Patterns Emerge
1. Add to "Reference Implementations" table
2. Update "Key Requirements" if new requirements added
================================================================================
-->Add to areal/workflow/__init__.py:
from areal.workflow.<name> import MyWorkflow
__all__ = [
# ... existing exports
"MyWorkflow",
]
Update your training script to use the new workflow:
trainer.train(
workflow="areal.workflow.<name>.MyWorkflow",
# ... other args
)
Create tests/test_<name>_workflow.py:
import pytest
from areal.workflow.<name> import MyWorkflow
@pytest.mark.asyncio
async def test_workflow_basic():
# Test basic functionality
pass
| Workflow | File | Description |
|---|---|---|
| MultiTurnWorkflow | areal/workflow/multi_turn.py | Multi-turn conversation |
| RLVRWorkflow | areal/workflow/rlvr.py | RL with verifiable rewards |
| VisionRLVRWorkflow | areal/workflow/vision_rlvr.py | Vision + RLVR |
arun_episode must be async def and non-blockingaiofiles for file operationsAsyncRewardWrapper for reward functions[batch, seq_len, ...]concat_padded_tensors for combining outputsopen() instead of aiofiles.open()await async callsAsyncRewardWrapper