Architecture¶
This page describes the internal architecture of the YAML Workflow Engine.
Overview¶
+-----------+
| CLI |
| (cli.py) |
+-----+-----+
|
args, workflow path
|
v
+----------+-----------+
| WorkflowEngine |
| (engine.py) |
| |
| - Load YAML |
| - Process imports |
| - Validate params |
| - Resolve flows |
| - Execute steps |
+---+------+-------+---+
| | |
+------------+ | +------------+
| | |
v v v
+------+------+ +-------+-------+ +-------+-------+
| TemplateEng | | WorkflowState | | Workspace |
| (template) | | (state.py) | | (workspace.py)|
| | | | | |
| Jinja2 | | Persistence | | Directories |
| resolution | | Resume/retry | | Logging |
+-------------+ +---------------+ +---------------+
+----------+-----------+
| Step |
| (step.py) |
| |
| - Condition eval |
| - Input rendering |
| - Error handling |
+----------+-----------+
|
v
+----------+-----------+
| Task Registry |
| (tasks/__init__.py) |
| |
| @register_task() |
| get_task_handler() |
| Plugin discovery |
+---+--+--+--+--+--+--+
| | | | | |
+------+-----+ | | | +-----+------+
| | | | | | |
v v v v v v v
shell file python template http batch
tasks tasks tasks tasks tasks tasks
Execution Flow¶
When you run yaml-workflow run workflow.yaml name=Alice:
- CLI (
cli.py) parses arguments, extractsname=Aliceas a parameter - WorkflowEngine (
engine.py) initializes:- Loads the YAML file
- Processes
imports(merges steps/params from imported files) - Validates the workflow structure and parameters
- Creates a workspace directory for this run
- Sets up logging
- Initializes execution context with namespaces (
args,env,steps)
- Run loop iterates through steps:
- For each step, creates a Step object
- Evaluates the
condition(if any) via TemplateEngine - Resolves input templates (e.g.,
{{ args.name }}becomesAlice) - Looks up the task handler via Task Registry
- Executes the handler with a TaskConfig object
- Stores the result in
context["steps"][step_name] - Updates WorkflowState (for resume capability)
- On error: retries, skips, or jumps to error handler based on
on_errorconfig - On completion: marks workflow as completed, returns results
Key Components¶
WorkflowEngine (engine.py)¶
The central orchestrator. Responsibilities:
- YAML loading and validation
- Import processing (merging steps/params from other files)
- Parameter validation (required, type, min/max length, allowed values)
- Flow resolution (determining which steps to execute in which order)
- Step execution loop with retry/error handling
- Dry-run mode (preview without executing)
- Context management (namespaces for variable access)
Step (step.py)¶
Represents a single workflow step. Handles:
- Condition evaluation: Jinja2 expression that must resolve to
"True" - Input rendering: Recursive template processing of all input values
- Error handling:
on_errorconfiguration (fail, continue, retry, next)
TaskConfig (tasks/config.py)¶
The interface between the engine and task handlers:
class TaskConfig:
name: str # Step name
type: str # Task type (e.g., "shell")
inputs: dict # Raw inputs from YAML
workspace: Path # Workspace directory
context: Dict[str, Any] # Read-only execution context
processed_inputs: Dict # Template-resolved inputs
def process_inputs() -> dict # Resolve templates in inputs
def get_variable(name, namespace) -> Any
def get_available_variables() -> dict
Task Registry (tasks/__init__.py)¶
Manages task handler registration and lookup:
@register_task("name")decorator registers a functionget_task_handler("name")returns the handler_discover_plugins()loads tasks from entry points at startup
Template Engine (template.py)¶
Jinja2 wrapper with:
StrictUndefinedmode (errors on undefined variables)AttrDictfor dot-notation access (e.g.,steps.my_step.result)- Recursive processing of nested dicts/lists
- Helpful error messages showing available variables
Workflow State (state.py)¶
Manages execution persistence:
- Tracks completed/failed/skipped steps
- Stores step outputs for resume capability
- Manages retry counts per step
- Saves to
.workflow_metadata.jsonin workspace
Workspace (workspace.py)¶
File system management:
- Creates run-specific directories (
workflow_name_run_N/) - Manages logs directory
- Provides workspace info (size, file count, creation time)
Variable Namespaces¶
The execution context provides these namespaces:
| Namespace | Access | Description |
|---|---|---|
args |
{{ args.name }} |
CLI parameters and workflow params |
env |
{{ env.PATH }} |
Environment variables |
steps |
{{ steps.step_name.result }} |
Results from completed steps |
workflow |
{{ workflow.name }} |
Workflow metadata |
settings |
{{ settings.key }} |
Workflow settings |
batch |
{{ batch.item }} |
Current item in batch processing (only available inside batch tasks) |
Plugin System¶
External packages can register tasks via Python entry points:
# In plugin's pyproject.toml
[project.entry-points."yaml_workflow.tasks"]
my_tasks = "my_plugin.tasks"
When my_plugin.tasks is imported, any @register_task() decorated functions are automatically registered. The engine discovers plugins at import time via importlib.metadata.entry_points().