Skip to content

yaml_workflow.tasks

yaml_workflow.tasks

Task modules for the YAML Workflow Engine.

This package contains various task modules that can be used in workflows. Each module provides specific functionality that can be referenced in workflow YAML files.

Classes

TaskConfig

Configuration class for task handlers with namespace support.

Source code in src/yaml_workflow/tasks/config.py
class TaskConfig:
    """Configuration class for task handlers with namespace support."""

    def __init__(self, step: Dict[str, Any], context: Dict[str, Any], workspace: Path):
        """
        Initialize task configuration.

        Args:
            step: Step configuration from workflow
            context: Execution context with namespaces
            workspace: Workspace path
        """
        self.step = step  # Store the full step configuration
        self.name = step.get("name")
        self.type = step.get("task")
        self.inputs = step.get("inputs", {})
        self._context = context
        self.workspace = workspace
        self._processed_inputs: Dict[str, Any] = {}
        self._template_engine = TemplateEngine()

    def get_variable(self, name: str, namespace: Optional[str] = None) -> Any:
        """
        Get a variable with namespace support.

        Args:
            name: Variable name
            namespace: Optional namespace (args, env, steps)

        Returns:
            Any: Variable value if found
        """
        if namespace:
            return self._context.get(namespace, {}).get(name)
        return self._context.get(name)

    def get_available_variables(self) -> Dict[str, List[str]]:
        """
        Get available variables by namespace.

        Returns:
            Dict[str, List[str]]: Available variables in each namespace
        """
        return {
            "args": list(self._context.get("args", {}).keys()),
            "env": list(self._context.get("env", {}).keys()),
            "steps": list(self._context.get("steps", {}).keys()),
            "root": [
                k for k in self._context.keys() if k not in ["args", "env", "steps"]
            ],
        }

    def process_inputs(self) -> Dict[str, Any]:
        """
        Process task inputs with template resolution.

        Recursively processes all string values in the inputs dictionary,
        including nested dictionaries and lists.

        Returns:
            Dict[str, Any]: Processed inputs with resolved templates
        """
        if not self._processed_inputs:
            # Create a flattened context for template processing
            template_context = {
                "args": self._context.get("args", {}),
                "env": self._context.get("env", {}),
                "steps": self._context.get("steps", {}),
                **{
                    k: v
                    for k, v in self._context.items()
                    if k not in ["args", "env", "steps"]
                },
            }

            self._processed_inputs = self._process_value(self.inputs, template_context)
        return self._processed_inputs

    def _process_value(self, value: Any, template_context: Dict[str, Any]) -> Any:
        """
        Recursively process a value with template resolution.

        Args:
            value: Value to process
            template_context: Template context for variable resolution

        Returns:
            Any: Processed value with resolved templates

        Raises:
            TaskExecutionError: If template processing fails due to undefined variables.
        """
        if isinstance(value, str):
            try:
                result = self._template_engine.process_template(value, template_context)
                # Try to convert string results back to their original type
                if result == "True":
                    return True
                elif result == "False":
                    return False
                try:
                    # First try to evaluate as a Python literal (for lists, dicts, etc.)
                    import ast

                    try:
                        return ast.literal_eval(result)
                    except (ValueError, SyntaxError):
                        # If not a valid Python literal, try numeric conversion
                        if "." in result:
                            return float(result)
                        return int(result)
                except (ValueError, TypeError, SyntaxError):
                    return result
            except UndefinedError as e:
                # Use the new centralized error handler
                context = ErrorContext(
                    step_name=str(self.name),
                    task_type=str(self.type),
                    error=e,
                    task_config=self.step,
                    template_context=template_context,
                )
                handle_task_error(context)
        elif isinstance(value, dict):
            return {
                k: self._process_value(v, template_context) for k, v in value.items()
            }
        elif isinstance(value, list):
            return [self._process_value(item, template_context) for item in value]
        return value

    def _get_undefined_namespace(self, error_msg: str) -> str:
        """
        Extract namespace from undefined variable error.

        Args:
            error_msg: Error message from UndefinedError

        Returns:
            str: Namespace name or 'root' if not found
        """
        # Check for direct variable access pattern (e.g., args.undefined)
        for namespace in ["args", "env", "steps"]:
            if f"{namespace}." in error_msg:
                return namespace

        # Check for dictionary access pattern (e.g., 'dict object' has no attribute 'undefined')
        # Extract the undefined attribute name from the error message
        match = re.search(r"no attribute '(\w+)'", error_msg)
        if match:
            undefined_attr = match.group(1)
            # Find which namespace was trying to access this attribute
            for namespace in ["args", "env", "steps"]:
                if namespace in self._context:
                    template_str = next(
                        (
                            v
                            for v in self.inputs.values()
                            if isinstance(v, str)
                            and f"{namespace}.{undefined_attr}" in v
                        ),
                        "",
                    )
                    if template_str:
                        return namespace

        return "root"
Functions
get_available_variables() -> Dict[str, List[str]]

Get available variables by namespace.

Returns:

Type Description
Dict[str, List[str]]

Dict[str, List[str]]: Available variables in each namespace

Source code in src/yaml_workflow/tasks/config.py
def get_available_variables(self) -> Dict[str, List[str]]:
    """
    Get available variables by namespace.

    Returns:
        Dict[str, List[str]]: Available variables in each namespace
    """
    return {
        "args": list(self._context.get("args", {}).keys()),
        "env": list(self._context.get("env", {}).keys()),
        "steps": list(self._context.get("steps", {}).keys()),
        "root": [
            k for k in self._context.keys() if k not in ["args", "env", "steps"]
        ],
    }
get_variable(name: str, namespace: Optional[str] = None) -> Any

Get a variable with namespace support.

Parameters:

Name Type Description Default
name str

Variable name

required
namespace Optional[str]

Optional namespace (args, env, steps)

None

Returns:

Name Type Description
Any Any

Variable value if found

Source code in src/yaml_workflow/tasks/config.py
def get_variable(self, name: str, namespace: Optional[str] = None) -> Any:
    """
    Get a variable with namespace support.

    Args:
        name: Variable name
        namespace: Optional namespace (args, env, steps)

    Returns:
        Any: Variable value if found
    """
    if namespace:
        return self._context.get(namespace, {}).get(name)
    return self._context.get(name)
process_inputs() -> Dict[str, Any]

Process task inputs with template resolution.

Recursively processes all string values in the inputs dictionary, including nested dictionaries and lists.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Processed inputs with resolved templates

Source code in src/yaml_workflow/tasks/config.py
def process_inputs(self) -> Dict[str, Any]:
    """
    Process task inputs with template resolution.

    Recursively processes all string values in the inputs dictionary,
    including nested dictionaries and lists.

    Returns:
        Dict[str, Any]: Processed inputs with resolved templates
    """
    if not self._processed_inputs:
        # Create a flattened context for template processing
        template_context = {
            "args": self._context.get("args", {}),
            "env": self._context.get("env", {}),
            "steps": self._context.get("steps", {}),
            **{
                k: v
                for k, v in self._context.items()
                if k not in ["args", "env", "steps"]
            },
        }

        self._processed_inputs = self._process_value(self.inputs, template_context)
    return self._processed_inputs

Functions

add_numbers(a: float, b: float) -> float

Add two numbers together.

Parameters:

Name Type Description Default
a float

First number

required
b float

Second number

required

Returns:

Name Type Description
float float

Sum of the numbers

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def add_numbers(a: float, b: float) -> float:
    """
    Add two numbers together.

    Args:
        a: First number
        b: Second number

    Returns:
        float: Sum of the numbers
    """
    return a + b

append_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Append content to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("append_file")
def append_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Append content to a file."""
    task_name = str(config.name or "append_file")
    task_type = config.type or "append_file"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        content = processed.get("content")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")
        if content is None:
            raise ValueError("No content provided")

        result = append_file_direct(
            file_path, str(content), config.workspace, encoding, task_name
        )
        output = {"path": result, "content": content}
        log_task_result(logger, output)
        return output
    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None

batch_task(config: TaskConfig) -> Dict[str, Any]

Process a batch of items using specified task configuration.

This task processes a list of items in parallel chunks using the specified task configuration. Each item is passed to the task as an argument.

Parameters:

Name Type Description Default
config TaskConfig

TaskConfig object containing: - items: List of items to process - task: Task configuration for processing each item - arg_name: Name of the argument to use for each item (default: "item") - chunk_size: Optional size of chunks (default: 10) - max_workers: Optional maximum worker threads

required

Returns:

Type Description
Dict[str, Any]

Dict containing: - processed: List of successfully processed items - failed: List of failed items with errors - results: List of processing results - stats: Processing statistics

Example YAML usage
steps:
  - name: process_data
    task: batch
    inputs:
      items: [5, 7, 12]
      arg_name: x  # Name items will be passed as
      chunk_size: 2
      max_workers: 2
      task:
        task: python
        inputs:
          operation: multiply
          factor: 2
Source code in src/yaml_workflow/tasks/batch.py
@register_task("batch")
def batch_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Process a batch of items using specified task configuration.

    This task processes a list of items in parallel chunks using the specified
    task configuration. Each item is passed to the task as an argument.

    Args:
        config: TaskConfig object containing:
            - items: List of items to process
            - task: Task configuration for processing each item
            - arg_name: Name of the argument to use for each item (default: "item")
            - chunk_size: Optional size of chunks (default: 10)
            - max_workers: Optional maximum worker threads

    Returns:
        Dict containing:
            - processed: List of successfully processed items
            - failed: List of failed items with errors
            - results: List of processing results
            - stats: Processing statistics

    Example YAML usage:
        ```yaml
        steps:
          - name: process_data
            task: batch
            inputs:
              items: [5, 7, 12]
              arg_name: x  # Name items will be passed as
              chunk_size: 2
              max_workers: 2
              task:
                task: python
                inputs:
                  operation: multiply
                  factor: 2
        ```
    """
    task_name = config.name or "batch_task"
    task_type = config.type or "batch"

    try:
        # Process inputs with template resolution
        processed = config.process_inputs()

        # Get required parameters
        items = processed.get("items")
        if items is None:
            raise ValueError("'items' parameter is required for batch task")

        # Ensure items is a list
        if not isinstance(items, list):
            raise ValueError("'items' must resolve to a list after template processing")

        task_config = processed.get("task")
        if not task_config:
            raise ValueError(
                "'task' configuration is required within batch task inputs"
            )

        # Get optional parameters with defaults
        chunk_size = int(processed.get("chunk_size", 10))
        if chunk_size <= 0:
            raise ValueError("'chunk_size' must be greater than 0")

        max_workers = int(
            processed.get("max_workers", min(chunk_size, os.cpu_count() or 1))
        )
        if max_workers <= 0:
            raise ValueError("'max_workers' must be greater than 0")

        # Handle case where items list is empty after processing
        if not items:
            return {
                "processed": [],
                "failed": [],
                "results": [],
                "stats": {
                    "total": 0,
                    "processed": 0,
                    "failed": 0,
                    "start_time": datetime.now().isoformat(),
                    "end_time": datetime.now().isoformat(),
                    "success_rate": 100.0,
                },
            }

        # Get argument name to use for items, defaulting to "item"
        arg_name = processed.get("arg_name", "item")

        # Initialize state
        state: Dict[str, Any] = {
            "processed": [],
            "failed": [],
            "results": [],
            "stats": {
                "total": len(items),
                "processed": 0,
                "failed": 0,
                "start_time": datetime.now().isoformat(),
            },
        }

        # Store results with their indices for ordering
        ordered_results: List[Tuple[int, Any]] = []
        ordered_processed: List[Tuple[int, Any]] = []
        ordered_failed: List[Tuple[int, Dict[str, Any]]] = []

        # Process items in chunks
        for chunk_index, chunk_start in enumerate(range(0, len(items), chunk_size)):
            chunk = cast(List[Any], items[chunk_start : chunk_start + chunk_size])

            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {}

                # Submit tasks for chunk
                for item_index, item in enumerate(chunk):
                    # Pass the sub-task config, not the main batch config inputs
                    sub_task_config_for_item = task_config
                    future = executor.submit(
                        process_item,
                        item,  # item: Any
                        sub_task_config_for_item,  # task_config: Dict[str, Any]
                        config._context,  # context: Dict[str, Any]
                        config.workspace,  # workspace: Path
                        arg_name,  # arg_name: str
                        chunk_index,  # chunk_index: int
                        chunk_start + item_index,  # item_index: int
                        len(items),  # total: int
                        chunk_size,  # chunk_size: int
                    )
                    futures[future] = (item, chunk_start + item_index)

                # Process completed futures
                for future in as_completed(futures):
                    item, index = futures[future]
                    try:
                        result = future.result()
                        ordered_processed.append((index, item))
                        ordered_results.append((index, result))
                        state["stats"]["processed"] += 1
                    except Exception as e:
                        # Capture the error from process_item (already wrapped if needed)
                        error_info = {"item": item, "error": str(e)}
                        # If it's a TaskExecutionError, add more details if possible
                        if isinstance(e, TaskExecutionError):
                            error_info["step_name"] = e.step_name
                            if e.task_config:
                                error_info["task_config"] = e.task_config
                        ordered_failed.append((index, error_info))
                        state["stats"]["failed"] += 1

        # Sort results by index and extract values
        state["processed"] = [item for _, item in sorted(ordered_processed)]
        state["results"] = [result for _, result in sorted(ordered_results)]
        state["failed"] = [error for _, error in sorted(ordered_failed)]

        # Add completion statistics
        state["stats"]["end_time"] = datetime.now().isoformat()
        total_items = state["stats"]["total"]
        processed_items = state["stats"]["processed"]
        state["stats"]["success_rate"] = (
            (processed_items / total_items) * 100.0
            if total_items > 0
            else 100.0  # Avoid division by zero if total is 0
        )

        return state

    except Exception as e:
        # Centralized error handling for exceptions during batch setup/config
        err_context = ErrorContext(
            step_name=str(task_name),
            task_type=str(task_type),
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(err_context)
        # handle_task_error always raises, so return is unreachable but satisfies type checker
        return {}

create_greeting(name: str, context: Dict[str, Any]) -> str

Create a greeting message.

Parameters:

Name Type Description Default
name str

Name to greet

required
context Dict[str, Any]

Template context

required

Returns:

Name Type Description
str str

Greeting message

Raises:

Type Description
TemplateError

If template resolution fails

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def create_greeting(name: str, context: Dict[str, Any]) -> str:
    """
    Create a greeting message.

    Args:
        name: Name to greet
        context: Template context

    Returns:
        str: Greeting message

    Raises:
        TemplateError: If template resolution fails
    """
    try:
        template = Template("Hello {{ name }}!", undefined=StrictUndefined)
        return template.render(name=name, **context)
    except Exception as e:
        raise TemplateError(f"Failed to create greeting: {str(e)}")

echo(message: str) -> str

Echo back the input message.

Parameters:

Name Type Description Default
message str

Message to echo

required

Returns:

Name Type Description
str str

The input message

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def echo(message: str) -> str:
    """
    Echo back the input message.

    Args:
        message: Message to echo

    Returns:
        str: The input message
    """
    return message

fail(message: str = 'Task failed') -> None

A task that always fails.

Parameters:

Name Type Description Default
message str

Error message

'Task failed'

Raises:

Type Description
RuntimeError

Always raises this error

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def fail(message: str = "Task failed") -> None:
    """
    A task that always fails.

    Args:
        message: Error message

    Raises:
        RuntimeError: Always raises this error
    """
    raise RuntimeError(message)

get_task_handler(name: str) -> Optional[TaskHandler]

Get a task handler by name.

Parameters:

Name Type Description Default
name str

Task name

required

Returns:

Type Description
Optional[TaskHandler]

Optional[TaskHandler]: Task handler if found

Source code in src/yaml_workflow/tasks/__init__.py
def get_task_handler(name: str) -> Optional[TaskHandler]:
    """Get a task handler by name.

    Args:
        name: Task name

    Returns:
        Optional[TaskHandler]: Task handler if found
    """
    handler = _task_registry.get(name)
    # print(f"--- get_task_handler requested: '{name}', found: {handler} ---") # DEBUG
    logging.debug(f"Retrieved handler for task '{name}': {handler}")
    return handler

hello_world(name: str = 'World') -> str

A simple hello world function.

Parameters:

Name Type Description Default
name str

Name to include in greeting. Defaults to "World".

'World'

Returns:

Name Type Description
str str

The greeting message

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def hello_world(name: str = "World") -> str:
    """
    A simple hello world function.

    Args:
        name: Name to include in greeting. Defaults to "World".

    Returns:
        str: The greeting message
    """
    return f"Hello, {name}!"

join_strings(*strings: str, separator: str = ' ') -> str

Join multiple strings together.

Parameters:

Name Type Description Default
*strings str

Variable number of strings to join

()
separator str

String to use as separator. Defaults to space.

' '

Returns:

Name Type Description
str str

Joined string

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def join_strings(*strings: str, separator: str = " ") -> str:
    """
    Join multiple strings together.

    Args:
        *strings: Variable number of strings to join
        separator: String to use as separator. Defaults to space.

    Returns:
        str: Joined string
    """
    return separator.join(strings)

print_vars_task(config: TaskConfig) -> dict

Prints selected variables from the context for debugging.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def print_vars_task(config: TaskConfig) -> dict:
    """Prints selected variables from the context for debugging."""
    inputs = config.process_inputs()
    context = config._context
    message = inputs.get("message", "Current Context Variables:")

    print(f"\n--- {message} ---")  # Prints directly to runner's stdout

    # Select variables to print (add more as needed)
    print("Workflow Variables:")
    print("==================")
    # Use direct context access via config.context
    print(f"args: {context.get('args')}")
    print(f"workflow_name: {context.get('workflow_name')}")
    print(f"workspace: {context.get('workspace')}")
    print(f"output: {context.get('output')}")
    print(f"run_number: {context.get('run_number')}")
    print(f"timestamp: {context.get('timestamp')}")

    # Safely access nested step results
    print("\nStep Results:")
    print("=============")
    steps_context = context.get("steps", {})
    if steps_context:
        # Use pprint for potentially large/nested step results
        pprint.pprint(steps_context, indent=2)
        # for name, step_info in steps_context.items():
        #     if step_info.get("skipped"):
        #         print(f"  - {name}: (skipped)")
        #     else:
        #         # Truncate long results for clarity
        #         result_repr = repr(step_info.get('result', 'N/A'))
        #         if len(result_repr) > 100:
        #             result_repr = result_repr[:100] + "..."
        #         print(f"  - {name}: {result_repr}")
    else:
        print("  (No step results yet)")

    print("--------------------\n")
    sys.stdout.flush()  # Flush after printing
    return {"success": True}  # Indicate task success

read_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Read content from a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("read_file")
def read_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Read content from a file."""
    task_name = str(config.name or "read_file")
    task_type = config.type or "read_file"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path_input = processed.get("file")

        # Debug logging
        logger.debug(
            f"Workspace={config.workspace}, exists={config.workspace.exists()}"
        )
        logger.debug(f"file_path param={file_path_input}")
        logger.debug(f"All inputs={processed}")

        encoding = processed.get("encoding", "utf-8")

        if not file_path_input:
            error_msg = "No file path provided (param 'file' is missing)"
            logger.error(error_msg)
            raise ValueError(error_msg)

        # Debug: Print resolved path
        resolved_path = str(resolve_path(config.workspace, file_path_input))
        logger.debug(f"Resolved path={resolved_path}")

        # Check if file exists before trying to read it
        resolved_file = Path(resolved_path)
        logger.debug(f"File exists={resolved_file.exists()}")
        if not resolved_file.exists():
            parent_dir = resolved_file.parent
            logger.debug(f"Parent directory={parent_dir}, exists={parent_dir.exists()}")
            if parent_dir.exists():
                logger.debug(f"Directory contents: {list(parent_dir.glob('*'))}")

        content = read_file_direct(
            file_path_input, config.workspace, encoding, task_name, logger=logger
        )
        output = {"path": file_path_input, "content": content}
        log_task_result(logger, output)
        return output
    except Exception as e:
        # Debug: Print detailed exception info
        logger.error(f"EXCEPTION: {type(e).__name__}: {str(e)}")
        import traceback

        logger.error(f"TRACEBACK:\n{traceback.format_exc()}")

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None

read_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Task handler for reading JSON files.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("read_json")
def read_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Task handler for reading JSON files."""
    task_name = str(config.name or "read_json")
    task_type = config.type or "read_json"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)

    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")

        result_data = read_json(file_path, config.workspace, encoding=encoding)
        output = {"data": result_data}
        log_task_result(logger, output)
        return output
    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None

read_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Task handler for reading YAML files.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("read_yaml")
def read_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Task handler for reading YAML files."""
    task_name = str(config.name or "read_yaml")
    task_type = config.type or "read_yaml"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)

    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")

        result_data = read_yaml(file_path, config.workspace, encoding=encoding)
        output = {"data": result_data}
        log_task_result(logger, output)
        return output
    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None

register_task(name: Optional[str] = None) -> Callable[..., Callable[[TaskConfig], R]]

Decorator to register a function as a workflow task.

Source code in src/yaml_workflow/tasks/__init__.py
def register_task(
    name: Optional[str] = None,
) -> Callable[..., Callable[[TaskConfig], R]]:
    """Decorator to register a function as a workflow task."""

    def task_wrapper(func: Callable[..., R]) -> Callable[[TaskConfig], R]:
        task_name = name or func.__name__

        @wraps(func)
        def wrapper(config: TaskConfig) -> R:
            sig = inspect.signature(func)
            params = sig.parameters

            # Simplified Check: Handle tasks taking only TaskConfig first
            if (
                list(params.keys()) == ["config"]
                and params["config"].annotation is TaskConfig
            ):
                return func(config)

            processed = config.process_inputs()
            kwargs = {}
            pos_args = []
            extra_kwargs: Dict[str, Any] = {}  # For **kwargs
            unmapped_inputs = (
                processed.copy()
            )  # Track inputs not mapped to named params

            # Identify special parameter names (*args, **kwargs, config, context)
            var_arg_name: Optional[str] = None
            kw_arg_name: Optional[str] = None
            config_param_name: Optional[str] = None
            context_param_name: Optional[str] = None

            for name, param in params.items():
                if param.annotation is TaskConfig:
                    config_param_name = name
                elif name == "context" and param.annotation in (
                    Dict[str, Any],
                    dict,
                    Any,
                ):
                    context_param_name = name
                elif param.kind == param.VAR_POSITIONAL:
                    var_arg_name = name
                elif param.kind == param.VAR_KEYWORD:
                    kw_arg_name = name

            # Map processed inputs to function arguments
            for name, param in params.items():
                if name == config_param_name or name == context_param_name:
                    continue  # Skip special params for now
                if name == var_arg_name or name == kw_arg_name:
                    continue  # Skip *args/**kwargs for now

                if name in processed:
                    kwargs[name] = processed[name]
                    del unmapped_inputs[name]  # Mark as mapped
                elif param.default is inspect.Parameter.empty:
                    # Check if required named param is missing from inputs
                    raise ValueError(f"Missing required parameter: {name}")
                # else: use default value (implicitly handled by function call)

            # Handle remaining unmapped inputs
            if var_arg_name and var_arg_name in processed:
                # If *args name exists as an input key (e.g., join_strings(strings=...))
                arg_input = processed[var_arg_name]
                pos_args = (
                    list(arg_input)
                    if isinstance(arg_input, (list, tuple))
                    else [arg_input]
                )
                if var_arg_name in unmapped_inputs:
                    del unmapped_inputs[var_arg_name]
            elif var_arg_name:
                # If *args exists but no input key matches, maybe map remaining unmapped inputs?
                # This is ambiguous. Let's require explicit mapping for now.
                # If len(unmapped_inputs) == 1 and var_arg_name:
                #      pos_args = list(unmapped_inputs.values())[0]
                #      if not isinstance(pos_args, list): pos_args = [pos_args]
                #      unmapped_inputs.clear()
                pass  # Requires explicit input name for *args mapping

            # Assign remaining unmapped inputs to **kwargs if available
            if kw_arg_name and unmapped_inputs:
                kwargs[kw_arg_name] = unmapped_inputs
            elif unmapped_inputs and not var_arg_name:
                # If unmapped inputs remain and there's no **kwargs or *args to catch them,
                # it might indicate an issue (e.g., typo in YAML input name).
                # However, the function call itself will raise TypeError if unexpected args are passed.
                # Let the function call handle the final validation for unexpected kwargs.
                pass

            # Inject config and context if needed
            if config_param_name:
                kwargs[config_param_name] = config
            if context_param_name:
                # Access the protected context member
                kwargs[context_param_name] = (
                    config._context
                )  # Pass the full context dict

            # Call the function
            try:
                return func(*pos_args, **kwargs)
            except TypeError as e:
                arg_summary = f"pos_args={pos_args}, kwargs={list(kwargs.keys())}"
                logging.error(
                    f"TypeError calling task '{task_name}': {e}. Call info: {arg_summary}"
                )
                raise

        _task_registry[task_name] = wrapper
        return wrapper

    return task_wrapper

render_template(config: TaskConfig) -> Dict[str, Any]

Render a template and save it to a file.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration object

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary containing the path to the output file

Raises:

Type Description
TaskExecutionError

If template resolution fails or file cannot be written (via handle_task_error)

Source code in src/yaml_workflow/tasks/template_tasks.py
@register_task("template")
def render_template(config: TaskConfig) -> Dict[str, Any]:
    """
    Render a template and save it to a file.

    Args:
        config: Task configuration object

    Returns:
        Dict[str, Any]: Dictionary containing the path to the output file

    Raises:
        TaskExecutionError: If template resolution fails or file cannot be written (via handle_task_error)
    """
    task_name = str(config.name or "template_task")
    task_type = str(config.type or "template")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config._context, config.workspace)

        # Get raw inputs directly, do not process them here
        raw_inputs = config.step.get("inputs", {})
        template_str = raw_inputs.get("template")
        if not template_str or not isinstance(template_str, str):
            raise ValueError("Input 'template' must be a non-empty string")

        # Correctly get the output file path using 'output_file' key
        output_file_rel_path = raw_inputs.get("output_file")
        if not output_file_rel_path or not isinstance(output_file_rel_path, str):
            raise ValueError("Input 'output_file' must be a non-empty string")

        # Resolve the full output path relative to the workspace
        output_path = config.workspace / output_file_rel_path

        # Ensure parent directory exists
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Resolve the template content using the engine
        # Pass workspace as searchpath to allow includes
        resolved_content = config._template_engine.process_template(
            template_str,
            config._context,
            searchpath=str(config.workspace),  # Pass workspace for includes
        )

        # Write the resolved content to the output file
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(resolved_content)

        log_task_result(logger, str(output_path))
        return {"path": str(output_path)}

    except Exception as e:
        # Use a centralized error handler if available, otherwise raise TaskExecutionError
        # Assuming handle_task_error exists (replace if necessary)
        # handle_task_error(task_name, e, config)
        # Fallback if no central handler:
        logger.error(f"Task '{task_name}' failed: {str(e)}", exc_info=True)
        raise TaskExecutionError(
            step_name=task_name, original_error=e, task_config=config.step
        )

shell_task(config: TaskConfig) -> Dict[str, Any]

Run a shell command with namespace support.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration with namespace support

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Command execution results

Raises:

Type Description
TaskExecutionError

If command execution fails or template resolution fails

Source code in src/yaml_workflow/tasks/shell_tasks.py
@register_task("shell")
def shell_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Run a shell command with namespace support.

    Args:
        config: Task configuration with namespace support

    Returns:
        Dict[str, Any]: Command execution results

    Raises:
        TaskExecutionError: If command execution fails or template resolution fails
    """
    task_name = str(config.name or "shell_task")
    task_type = str(config.type or "shell")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config._context, config.workspace)

        processed = config.process_inputs()
        config._processed_inputs = processed

        if "command" not in processed:
            missing_cmd_error = ValueError("command parameter is required")
            raise missing_cmd_error
        command = processed["command"]

        cwd = config.workspace
        if "working_dir" in processed:
            working_dir = processed["working_dir"]
            if not os.path.isabs(working_dir):
                cwd = config.workspace / working_dir
            else:
                cwd = Path(working_dir)

        env = get_environment()
        if "env" in processed:
            env.update(processed["env"])

        shell = processed.get("shell", True)

        # Get timeout
        timeout = processed.get("timeout", None)

        # Process command template ONLY if it's a string
        if isinstance(command, str):
            # Pass necessary context for error reporting within process_command
            command_context = {
                **config._context,
                "step_name": task_name,
                "task_type": task_type,
                "task_config": config.step,
            }
            command = process_command(command, command_context)
        elif not isinstance(command, list):
            # Raise error if command is neither string nor list
            invalid_type_error = TypeError(
                f"Invalid command type: {type(command).__name__}. Expected string or list."
            )
            raise TaskExecutionError(
                step_name=task_name, original_error=invalid_type_error
            )

        # Run command
        returncode, stdout, stderr = run_command(
            command, cwd=str(cwd), env=env, shell=shell, timeout=timeout
        )

        if returncode != 0:
            error_message = f"Command failed with exit code {returncode}"
            if stderr:
                error_message += f"\nStderr:\n{stderr}"
            cmd_error = subprocess.CalledProcessError(
                returncode, cmd=command, output=stdout, stderr=stderr
            )
            raise cmd_error

        result = {"return_code": returncode, "stdout": stdout, "stderr": stderr}
        log_task_result(logger, result)
        return result

    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return {}  # Unreachable

write_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Write content to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("write_file")
def write_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Write content to a file."""
    task_name = str(config.name or "write_file")
    task_type = config.type or "write_file"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        content = processed.get("content")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")
        if content is None:
            raise ValueError("No content provided")

        result = write_file_direct(
            file_path, str(content), config.workspace, encoding, task_name
        )
        output = {"path": result, "content": content}
        log_task_result(logger, output)
        return output
    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None

write_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Write JSON data to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("write_json")
def write_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Write JSON data to a file."""
    task_name = str(config.name or "write_json")
    task_type = config.type or "write_json"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")

        # Debug: Print workspace and file path info
        logger.debug(
            f"Workspace={config.workspace}, exists={config.workspace.exists()}"
        )
        logger.debug(f"file_path param={file_path}")
        logger.debug(f"All inputs={processed}")

        data = processed.get("data")
        indent = int(processed.get("indent", 2))
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            error_msg = "No file path provided (param 'file' is missing)"
            logger.error(error_msg)
            raise ValueError(error_msg)
        if data is None:
            error_msg = "No data provided"
            logger.error(error_msg)
            raise ValueError(error_msg)

        # Ensure output directory exists
        if "output/" in file_path:
            output_dir = config.workspace / "output"
            logger.debug(
                f"Creating output dir {output_dir}, exists={output_dir.exists()}"
            )
            output_dir.mkdir(exist_ok=True, parents=True)

        # Debug: Print resolved path
        resolved_path = str(resolve_path(config.workspace, file_path))
        logger.debug(f"Resolved path={resolved_path}")

        result_path = write_json_direct(
            file_path,
            data,
            indent=indent,
            workspace=config.workspace,
            encoding=encoding,
            step_name=task_name,
            logger=logger,
        )

        # Debug: Check if file was created
        result_file = Path(result_path)
        logger.debug(f"Result path={result_path}, exists={result_file.exists()}")

        output = {"path": result_path}
        log_task_result(logger, output)
        return output
    except Exception as e:
        # Debug: Print detailed exception info
        logger.error(f"EXCEPTION: {type(e).__name__}: {str(e)}")
        import traceback

        logger.error(f"TRACEBACK:\n{traceback.format_exc()}")

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None

write_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Write YAML data to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("write_yaml")
def write_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Write YAML data to a file."""
    task_name = str(config.name or "write_yaml")
    task_type = config.type or "write_yaml"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config._context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        data = processed.get("data")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")
        if data is None:
            raise ValueError("No data provided")

        result_path = write_yaml_direct(
            file_path,
            data,
            workspace=config.workspace,
            encoding=encoding,
            step_name=task_name,
        )
        output = {"path": result_path}
        log_task_result(logger, output)
        return output
    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return None