Skip to content

yaml_workflow.tasks

yaml_workflow.tasks

Task modules for the YAML Workflow Engine.

This package contains various task modules that can be used in workflows. Each module provides specific functionality that can be referenced in workflow YAML files.

Classes

TaskConfig

Configuration class for task handlers with namespace support.

Source code in src/yaml_workflow/tasks/config.py
class TaskConfig:
    """Configuration class for task handlers with namespace support."""

    def __init__(self, step: Dict[str, Any], context: Dict[str, Any], workspace: Path):
        """
        Initialize task configuration.

        Args:
            step: Step configuration from workflow
            context: Execution context with namespaces
            workspace: Workspace path
        """
        self.step = step  # Store the full step configuration
        self.name = step.get("name")
        self.type = step.get("task")
        self.inputs = step.get("inputs", {})
        self._context = context
        self.workspace = workspace
        self._processed_inputs: Dict[str, Any] = {}
        self._template_engine = TemplateEngine()

    @property
    def context(self) -> Dict[str, Any]:
        """Read-only access to the execution context."""
        return self._context

    @property
    def processed_inputs(self) -> Dict[str, Any]:
        """Access to processed (template-resolved) inputs."""
        return self._processed_inputs

    @processed_inputs.setter
    def processed_inputs(self, value: Dict[str, Any]) -> None:
        self._processed_inputs = value

    def get_variable(self, name: str, namespace: Optional[str] = None) -> Any:
        """
        Get a variable with namespace support.

        Args:
            name: Variable name
            namespace: Optional namespace (args, env, steps)

        Returns:
            Any: Variable value if found
        """
        if namespace:
            return self._context.get(namespace, {}).get(name)
        return self._context.get(name)

    def get_available_variables(self) -> Dict[str, List[str]]:
        """
        Get available variables by namespace.

        Returns:
            Dict[str, List[str]]: Available variables in each namespace
        """
        return {
            "args": list(self._context.get("args", {}).keys()),
            "env": list(self._context.get("env", {}).keys()),
            "steps": list(self._context.get("steps", {}).keys()),
            "root": [
                k for k in self._context.keys() if k not in ["args", "env", "steps"]
            ],
        }

    def process_inputs(self) -> Dict[str, Any]:
        """
        Process task inputs with template resolution.

        Recursively processes all string values in the inputs dictionary,
        including nested dictionaries and lists.

        Returns:
            Dict[str, Any]: Processed inputs with resolved templates
        """
        if not self._processed_inputs:
            # Create a flattened context for template processing
            template_context = {
                "args": self._context.get("args", {}),
                "env": self._context.get("env", {}),
                "steps": self._context.get("steps", {}),
                **{
                    k: v
                    for k, v in self._context.items()
                    if k not in ["args", "env", "steps"]
                },
            }

            self._processed_inputs = self._process_value(self.inputs, template_context)
        return self._processed_inputs

    def _process_value(self, value: Any, template_context: Dict[str, Any]) -> Any:
        """
        Recursively process a value with template resolution.

        Args:
            value: Value to process
            template_context: Template context for variable resolution

        Returns:
            Any: Processed value with resolved templates

        Raises:
            TaskExecutionError: If template processing fails due to undefined variables.
        """
        if isinstance(value, str):
            try:
                result = self._template_engine.process_template(value, template_context)
                # Try to convert string results back to their original type
                if result == "True":
                    return True
                elif result == "False":
                    return False
                try:
                    # First try to evaluate as a Python literal (for lists, dicts, etc.)
                    import ast

                    try:
                        return ast.literal_eval(result)
                    except (ValueError, SyntaxError):
                        # If not a valid Python literal, try numeric conversion
                        if "." in result:
                            return float(result)
                        return int(result)
                except (ValueError, TypeError, SyntaxError):
                    return result
            except UndefinedError as e:
                # Use the new centralized error handler
                context = ErrorContext(
                    step_name=str(self.name),
                    task_type=str(self.type),
                    error=e,
                    task_config=self.step,
                    template_context=template_context,
                )
                handle_task_error(context)
        elif isinstance(value, dict):
            return {
                k: self._process_value(v, template_context) for k, v in value.items()
            }
        elif isinstance(value, list):
            return [self._process_value(item, template_context) for item in value]
        return value

    def _get_undefined_namespace(self, error_msg: str) -> str:
        """
        Extract namespace from undefined variable error.

        Args:
            error_msg: Error message from UndefinedError

        Returns:
            str: Namespace name or 'root' if not found
        """
        # Check for direct variable access pattern (e.g., args.undefined)
        for namespace in ["args", "env", "steps"]:
            if f"{namespace}." in error_msg:
                return namespace

        # Check for dictionary access pattern (e.g., 'dict object' has no attribute 'undefined')
        # Extract the undefined attribute name from the error message
        match = re.search(r"no attribute '(\w+)'", error_msg)
        if match:
            undefined_attr = match.group(1)
            # Find which namespace was trying to access this attribute
            for namespace in ["args", "env", "steps"]:
                if namespace in self._context:
                    template_str = next(
                        (
                            v
                            for v in self.inputs.values()
                            if isinstance(v, str)
                            and f"{namespace}.{undefined_attr}" in v
                        ),
                        "",
                    )
                    if template_str:
                        return namespace

        return "root"
Attributes
context: Dict[str, Any] property

Read-only access to the execution context.

processed_inputs: Dict[str, Any] property writable

Access to processed (template-resolved) inputs.

Functions
get_available_variables() -> Dict[str, List[str]]

Get available variables by namespace.

Returns:

Type Description
Dict[str, List[str]]

Dict[str, List[str]]: Available variables in each namespace

Source code in src/yaml_workflow/tasks/config.py
def get_available_variables(self) -> Dict[str, List[str]]:
    """
    Get available variables by namespace.

    Returns:
        Dict[str, List[str]]: Available variables in each namespace
    """
    return {
        "args": list(self._context.get("args", {}).keys()),
        "env": list(self._context.get("env", {}).keys()),
        "steps": list(self._context.get("steps", {}).keys()),
        "root": [
            k for k in self._context.keys() if k not in ["args", "env", "steps"]
        ],
    }
get_variable(name: str, namespace: Optional[str] = None) -> Any

Get a variable with namespace support.

Parameters:

Name Type Description Default
name str

Variable name

required
namespace Optional[str]

Optional namespace (args, env, steps)

None

Returns:

Name Type Description
Any Any

Variable value if found

Source code in src/yaml_workflow/tasks/config.py
def get_variable(self, name: str, namespace: Optional[str] = None) -> Any:
    """
    Get a variable with namespace support.

    Args:
        name: Variable name
        namespace: Optional namespace (args, env, steps)

    Returns:
        Any: Variable value if found
    """
    if namespace:
        return self._context.get(namespace, {}).get(name)
    return self._context.get(name)
process_inputs() -> Dict[str, Any]

Process task inputs with template resolution.

Recursively processes all string values in the inputs dictionary, including nested dictionaries and lists.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Processed inputs with resolved templates

Source code in src/yaml_workflow/tasks/config.py
def process_inputs(self) -> Dict[str, Any]:
    """
    Process task inputs with template resolution.

    Recursively processes all string values in the inputs dictionary,
    including nested dictionaries and lists.

    Returns:
        Dict[str, Any]: Processed inputs with resolved templates
    """
    if not self._processed_inputs:
        # Create a flattened context for template processing
        template_context = {
            "args": self._context.get("args", {}),
            "env": self._context.get("env", {}),
            "steps": self._context.get("steps", {}),
            **{
                k: v
                for k, v in self._context.items()
                if k not in ["args", "env", "steps"]
            },
        }

        self._processed_inputs = self._process_value(self.inputs, template_context)
    return self._processed_inputs

Functions

add_numbers(a: float, b: float) -> float

Add two numbers together.

Parameters:

Name Type Description Default
a float

First number

required
b float

Second number

required

Returns:

Name Type Description
float float

Sum of the numbers

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def add_numbers(a: float, b: float) -> float:
    """
    Add two numbers together.

    Args:
        a: First number
        b: Second number

    Returns:
        float: Sum of the numbers
    """
    return a + b

append_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Append content to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("append_file")
def append_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Append content to a file."""
    task_name = str(config.name or "append_file")
    task_type = config.type or "append_file"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        content = processed.get("content")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")
        if content is None:
            raise ValueError("No content provided")

        result = append_file_direct(
            file_path, str(content), config.workspace, encoding, task_name
        )
        output = {"path": result, "content": content}
        log_task_result(logger, output)
        return output
    except (
        ValueError,
        OSError,
        UnicodeEncodeError,
        TaskExecutionError,
        TemplateError,
    ) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None

batch_task(config: TaskConfig) -> Dict[str, Any]

Process a batch of items using specified task configuration.

This task processes a list of items in parallel chunks using the specified task configuration. Each item is passed to the task as an argument.

Parameters:

Name Type Description Default
config TaskConfig

TaskConfig object containing: - items: List of items to process - task: Task configuration for processing each item - arg_name: Name of the argument to use for each item (default: "item") - chunk_size: Optional size of chunks (default: 10) - max_workers: Optional maximum worker threads

required

Returns:

Type Description
Dict[str, Any]

Dict containing: - processed: List of successfully processed items - failed: List of failed items with errors - results: List of processing results - stats: Processing statistics

Example YAML usage
steps:
  - name: process_data
    task: batch
    inputs:
      items: [5, 7, 12]
      arg_name: x  # Name items will be passed as
      chunk_size: 2
      max_workers: 2
      task:
        task: python
        inputs:
          operation: multiply
          factor: 2
Source code in src/yaml_workflow/tasks/batch.py
@register_task("batch")
def batch_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Process a batch of items using specified task configuration.

    This task processes a list of items in parallel chunks using the specified
    task configuration. Each item is passed to the task as an argument.

    Args:
        config: TaskConfig object containing:
            - items: List of items to process
            - task: Task configuration for processing each item
            - arg_name: Name of the argument to use for each item (default: "item")
            - chunk_size: Optional size of chunks (default: 10)
            - max_workers: Optional maximum worker threads

    Returns:
        Dict containing:
            - processed: List of successfully processed items
            - failed: List of failed items with errors
            - results: List of processing results
            - stats: Processing statistics

    Example YAML usage:
        ```yaml
        steps:
          - name: process_data
            task: batch
            inputs:
              items: [5, 7, 12]
              arg_name: x  # Name items will be passed as
              chunk_size: 2
              max_workers: 2
              task:
                task: python
                inputs:
                  operation: multiply
                  factor: 2
        ```
    """
    task_name = config.name or "batch_task"
    task_type = config.type or "batch"

    try:
        # Process inputs with template resolution
        processed = config.process_inputs()

        # Get required parameters
        items = processed.get("items")
        if items is None:
            raise ValueError("'items' parameter is required for batch task")

        # Ensure items is a list
        if not isinstance(items, list):
            raise ValueError("'items' must resolve to a list after template processing")

        task_config = processed.get("task")
        if not task_config:
            raise ValueError(
                "'task' configuration is required within batch task inputs"
            )

        # Get optional parameters with defaults
        chunk_size = int(processed.get("chunk_size", 10))
        if chunk_size <= 0:
            raise ValueError("'chunk_size' must be greater than 0")

        max_workers = int(
            processed.get("max_workers", min(chunk_size, os.cpu_count() or 1))
        )
        if max_workers <= 0:
            raise ValueError("'max_workers' must be greater than 0")

        # Handle case where items list is empty after processing
        if not items:
            return {
                "processed": [],
                "failed": [],
                "results": [],
                "stats": {
                    "total": 0,
                    "processed": 0,
                    "failed": 0,
                    "start_time": datetime.now().isoformat(),
                    "end_time": datetime.now().isoformat(),
                    "success_rate": 100.0,
                },
            }

        # Get argument name to use for items, defaulting to "item"
        arg_name = processed.get("arg_name", "item")

        # Initialize state
        state: Dict[str, Any] = {
            "processed": [],
            "failed": [],
            "results": [],
            "stats": {
                "total": len(items),
                "processed": 0,
                "failed": 0,
                "start_time": datetime.now().isoformat(),
            },
        }

        # Store results with their indices for ordering
        ordered_results: List[Tuple[int, Any]] = []
        ordered_processed: List[Tuple[int, Any]] = []
        ordered_failed: List[Tuple[int, Dict[str, Any]]] = []

        # Process items in chunks
        for chunk_index, chunk_start in enumerate(range(0, len(items), chunk_size)):
            chunk = cast(List[Any], items[chunk_start : chunk_start + chunk_size])

            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {}

                # Submit tasks for chunk
                for item_index, item in enumerate(chunk):
                    # Pass the sub-task config, not the main batch config inputs
                    sub_task_config_for_item = task_config
                    future = executor.submit(
                        process_item,
                        item,  # item: Any
                        sub_task_config_for_item,  # task_config: Dict[str, Any]
                        config.context,  # context: Dict[str, Any]
                        config.workspace,  # workspace: Path
                        arg_name,  # arg_name: str
                        chunk_index,  # chunk_index: int
                        chunk_start + item_index,  # item_index: int
                        len(items),  # total: int
                        chunk_size,  # chunk_size: int
                    )
                    futures[future] = (item, chunk_start + item_index)

                # Process completed futures
                for future in as_completed(futures):
                    item, index = futures[future]
                    try:
                        result = future.result()
                        ordered_processed.append((index, item))
                        ordered_results.append((index, result))
                        state["stats"]["processed"] += 1
                    except (
                        Exception
                    ) as e:  # noqa: BLE001 - broad catch required: futures may propagate arbitrary user task errors
                        # Capture the error from process_item (already wrapped if needed)
                        error_info = {"item": item, "error": str(e)}
                        # If it's a TaskExecutionError, add more details if possible
                        if isinstance(e, TaskExecutionError):
                            error_info["step_name"] = e.step_name
                            if e.task_config:
                                error_info["task_config"] = e.task_config
                        ordered_failed.append((index, error_info))
                        state["stats"]["failed"] += 1

        # Sort results by index and extract values
        state["processed"] = [item for _, item in sorted(ordered_processed)]
        state["results"] = [result for _, result in sorted(ordered_results)]
        state["failed"] = [error for _, error in sorted(ordered_failed)]

        # Add completion statistics
        state["stats"]["end_time"] = datetime.now().isoformat()
        total_items = state["stats"]["total"]
        processed_items = state["stats"]["processed"]
        state["stats"]["success_rate"] = (
            (processed_items / total_items) * 100.0
            if total_items > 0
            else 100.0  # Avoid division by zero if total is 0
        )

        return state

    except (TaskExecutionError, ValueError, TypeError) as e:
        # Centralized error handling for exceptions during batch setup/config
        err_context = ErrorContext(
            step_name=str(task_name),
            task_type=str(task_type),
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(err_context)
        # handle_task_error always raises, so return is unreachable but satisfies type checker
        return {}

create_greeting(name: str, context: Dict[str, Any]) -> str

Create a greeting message.

Parameters:

Name Type Description Default
name str

Name to greet

required
context Dict[str, Any]

Template context

required

Returns:

Name Type Description
str str

Greeting message

Raises:

Type Description
TemplateError

If template resolution fails

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def create_greeting(name: str, context: Dict[str, Any]) -> str:
    """
    Create a greeting message.

    Args:
        name: Name to greet
        context: Template context

    Returns:
        str: Greeting message

    Raises:
        TemplateError: If template resolution fails
    """
    try:
        template = Template("Hello {{ name }}!", undefined=StrictUndefined)
        return template.render(name=name, **context)
    except (UndefinedError, TemplateSyntaxError, TypeError, ValueError) as e:
        raise TemplateError(f"Failed to create greeting: {str(e)}")

echo(message: str) -> str

Echo back the input message.

Parameters:

Name Type Description Default
message str

Message to echo

required

Returns:

Name Type Description
str str

The input message

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def echo(message: str) -> str:
    """
    Echo back the input message.

    Args:
        message: Message to echo

    Returns:
        str: The input message
    """
    return message

fail(message: str = 'Task failed') -> None

A task that always fails.

Parameters:

Name Type Description Default
message str

Error message

'Task failed'

Raises:

Type Description
RuntimeError

Always raises this error

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def fail(message: str = "Task failed") -> None:
    """
    A task that always fails.

    Args:
        message: Error message

    Raises:
        RuntimeError: Always raises this error
    """
    raise RuntimeError(message)

get_task_handler(name: str) -> Optional[TaskHandler]

Get a task handler by name.

Parameters:

Name Type Description Default
name str

Task name

required

Returns:

Type Description
Optional[TaskHandler]

Optional[TaskHandler]: Task handler if found

Source code in src/yaml_workflow/tasks/__init__.py
def get_task_handler(name: str) -> Optional[TaskHandler]:
    """Get a task handler by name.

    Args:
        name: Task name

    Returns:
        Optional[TaskHandler]: Task handler if found
    """
    handler = _task_registry.get(name)
    # print(f"--- get_task_handler requested: '{name}', found: {handler} ---") # DEBUG
    logging.debug(f"Retrieved handler for task '{name}': {handler}")
    return handler

hello_world(name: str = 'World') -> str

A simple hello world function.

Parameters:

Name Type Description Default
name str

Name to include in greeting. Defaults to "World".

'World'

Returns:

Name Type Description
str str

The greeting message

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def hello_world(name: str = "World") -> str:
    """
    A simple hello world function.

    Args:
        name: Name to include in greeting. Defaults to "World".

    Returns:
        str: The greeting message
    """
    return f"Hello, {name}!"

http_request_task(config: TaskConfig) -> Dict[str, Any]

Make an HTTP request to a URL.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration with namespace support

required
Inputs

url (str): Target URL. Required. method (str): HTTP method. Default GET. headers (dict): Extra request headers. Default {}. body (str | dict | bytes): Request body. When a dict is provided it is serialised to JSON and Content-Type: application/json is set automatically (unless already present in headers). timeout (int | float): Request timeout in seconds. Default 30. token (str): Shorthand Bearer token. Sets Authorization: Bearer <token>. Ignored when auth is also provided. auth (dict): Authentication configuration. Supports the following types:

bearer::

    auth:
      type: bearer
      token: "{{ env.API_TOKEN }}"
      # OR read from environment:
      token_env: API_TOKEN

api_key::

    auth:
      type: api_key
      key: "{{ env.API_KEY }}"
      header: X-API-Key   # optional, default: X-API-Key

basic::

    auth:
      type: basic
      username: "{{ env.API_USER }}"
      password: "{{ env.API_PASS }}"

verify_ssl (bool): Verify the server's TLS certificate. Default True. Set to False to disable certificate verification (equivalent to curl --insecure). retry (dict): Retry configuration::

    retry:
      max_attempts: 3          # total attempts, default: 1
      delay: 1.0               # seconds between retries, default: 1.0
      status_codes: [429, 503] # retry on these codes,
                               # default: [429, 500, 502, 503, 504]

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Response with keys status_code, headers,

Dict[str, Any]

body, and json.

Raises:

Type Description
TaskExecutionError

If the request fails or authentication config is invalid.

Source code in src/yaml_workflow/tasks/http_tasks.py
@register_task("http.request")
def http_request_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Make an HTTP request to a URL.

    Args:
        config: Task configuration with namespace support

    Inputs:
        url (str): Target URL. Required.
        method (str): HTTP method. Default ``GET``.
        headers (dict): Extra request headers. Default ``{}``.
        body (str | dict | bytes): Request body. When a dict is provided it is
            serialised to JSON and ``Content-Type: application/json`` is set
            automatically (unless already present in *headers*).
        timeout (int | float): Request timeout in seconds. Default ``30``.
        token (str): Shorthand Bearer token. Sets
            ``Authorization: Bearer <token>``.  Ignored when *auth* is also
            provided.
        auth (dict): Authentication configuration.  Supports the following
            types:

            bearer::

                auth:
                  type: bearer
                  token: "{{ env.API_TOKEN }}"
                  # OR read from environment:
                  token_env: API_TOKEN

            api_key::

                auth:
                  type: api_key
                  key: "{{ env.API_KEY }}"
                  header: X-API-Key   # optional, default: X-API-Key

            basic::

                auth:
                  type: basic
                  username: "{{ env.API_USER }}"
                  password: "{{ env.API_PASS }}"

        verify_ssl (bool): Verify the server's TLS certificate. Default
            ``True``.  Set to ``False`` to disable certificate verification
            (equivalent to ``curl --insecure``).
        retry (dict): Retry configuration::

                retry:
                  max_attempts: 3          # total attempts, default: 1
                  delay: 1.0               # seconds between retries, default: 1.0
                  status_codes: [429, 503] # retry on these codes,
                                           # default: [429, 500, 502, 503, 504]

    Returns:
        Dict[str, Any]: Response with keys ``status_code``, ``headers``,
        ``body``, and ``json``.

    Raises:
        TaskExecutionError: If the request fails or authentication config is
            invalid.
    """
    task_name = str(config.name or "http_request_task")
    task_type = str(config.type or "http.request")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config.context, config.workspace)

        processed = config.process_inputs()
        config.processed_inputs = processed

        # Validate required inputs
        if "url" not in processed:
            raise ValueError("url parameter is required")

        url = processed["url"]
        method = processed.get("method", "GET").upper()
        headers = processed.get("headers", {})
        body = processed.get("body", None)
        timeout = processed.get("timeout", 30)
        token = processed.get("token", None)
        auth = processed.get("auth", None)
        verify_ssl = processed.get("verify_ssl", True)

        # Retry configuration
        retry_cfg = processed.get("retry", {}) or {}
        max_attempts = int(retry_cfg.get("max_attempts", 1))
        retry_delay = float(retry_cfg.get("delay", 1.0))
        retry_status_codes = list(
            retry_cfg.get("status_codes", [429, 500, 502, 503, 504])
        )

        # Apply authentication
        headers = _apply_auth(headers, auth, token)

        # Prepare request body
        data = None
        if body is not None:
            if isinstance(body, dict):
                data = json.dumps(body).encode("utf-8")
                if "Content-Type" not in headers:
                    headers["Content-Type"] = "application/json"
            elif isinstance(body, str):
                data = body.encode("utf-8")
            elif isinstance(body, bytes):
                data = body

        # SSL context
        ssl_context: Optional[ssl.SSLContext] = None
        if not verify_ssl:
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

        # Retry loop
        last_exception: Optional[Exception] = None
        result: Optional[Dict[str, Any]] = None

        for attempt in range(1, max_attempts + 1):
            try:
                # Build request
                req = urllib.request.Request(url, data=data, method=method)
                for key, value in headers.items():
                    req.add_header(key, value)

                # Execute request
                response = urllib.request.urlopen(
                    req, timeout=timeout, context=ssl_context
                )

                response_body = response.read().decode("utf-8")
                response_headers = dict(response.headers)
                status_code = response.status

                # Try to parse JSON
                json_body = None
                try:
                    json_body = json.loads(response_body)
                except (json.JSONDecodeError, ValueError):
                    pass

                result = {
                    "status_code": status_code,
                    "headers": response_headers,
                    "body": response_body,
                    "json": json_body,
                }
                # Success — exit retry loop
                break

            except urllib.error.HTTPError as http_err:
                if http_err.code in retry_status_codes and attempt < max_attempts:
                    logger.warning(
                        f"HTTP {http_err.code} on attempt {attempt}/{max_attempts}; "
                        f"retrying in {retry_delay}s…"
                    )
                    last_exception = http_err
                    time.sleep(retry_delay)
                    continue
                # Not retryable or last attempt — re-raise to outer handler
                raise

            except (urllib.error.URLError, socket.timeout, OSError) as net_err:
                if attempt < max_attempts:
                    logger.warning(
                        f"Network error on attempt {attempt}/{max_attempts}: {net_err}; "
                        f"retrying in {retry_delay}s…"
                    )
                    last_exception = net_err
                    time.sleep(retry_delay)
                    continue
                raise

        if result is None:
            # All attempts exhausted via retry loop without raising — shouldn't
            # normally happen, but guard defensively.
            raise last_exception or RuntimeError("All retry attempts exhausted.")

        log_task_result(logger, result)
        return result

    except (
        TaskExecutionError,
        TemplateError,
        urllib.error.HTTPError,
        urllib.error.URLError,
        socket.timeout,
        OSError,
        ValueError,
    ) as e:
        # For HTTPError, extract response details before wrapping
        if isinstance(e, urllib.error.HTTPError):
            try:
                error_body = e.read().decode("utf-8")
            except Exception:
                error_body = ""
            error_msg = f"HTTP {e.code} {e.reason} for {method} {url}: {error_body}"
            e = ValueError(error_msg)

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return {}  # Unreachable

join_strings(*strings: str, separator: str = ' ') -> str

Join multiple strings together.

Parameters:

Name Type Description Default
*strings str

Variable number of strings to join

()
separator str

String to use as separator. Defaults to space.

' '

Returns:

Name Type Description
str str

Joined string

Source code in src/yaml_workflow/tasks/basic_tasks.py
@register_task()
def join_strings(*strings: str, separator: str = " ") -> str:
    """
    Join multiple strings together.

    Args:
        *strings: Variable number of strings to join
        separator: String to use as separator. Defaults to space.

    Returns:
        str: Joined string
    """
    return separator.join(strings)

noop_task(config: TaskConfig) -> Dict[str, Any]

No-operation task that returns its inputs and metadata.

This task is useful for testing and demonstrating the workflow engine's features without performing any actual operations.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration with: - should_fail: Optional boolean to simulate task failure

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Task inputs and metadata

Raises:

Type Description
TaskExecutionError

If should_fail is True (via handle_task_error)

Source code in src/yaml_workflow/tasks/noop.py
@register_task("noop")
def noop_task(config: TaskConfig) -> Dict[str, Any]:
    """
    No-operation task that returns its inputs and metadata.

    This task is useful for testing and demonstrating the workflow engine's
    features without performing any actual operations.

    Args:
        config: Task configuration with:
            - should_fail: Optional boolean to simulate task failure

    Returns:
        Dict[str, Any]: Task inputs and metadata

    Raises:
        TaskExecutionError: If should_fail is True (via handle_task_error)
    """
    task_name = str(config.name or "noop_task")
    task_type = str(config.type or "noop")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config.context, config.workspace)

        processed = config.process_inputs()

        # Demonstrate error handling if should_fail is True
        if processed.get("should_fail", False):
            error = Exception("Task failed as requested")
            context = ErrorContext(
                step_name=task_name,
                task_type=task_type,
                error=error,
                task_config=config.step,
                template_context=config.context,
            )
            handle_task_error(context)
            # handle_task_error always raises, so this is unreachable
            return {}  # Add return for type checker

        # Return processed inputs and some metadata to demonstrate output handling
        result = {
            "processed_inputs": processed,
            "task_name": task_name,
            "task_type": config.type,
            "available_variables": config.get_available_variables(),
        }
        log_task_result(logger, result)
        return result

    except (TaskExecutionError, TemplateError, ValueError, TypeError, KeyError) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return {}  # Unreachable

notify_task(config: TaskConfig) -> Dict[str, Any]

Send a notification via one of several supported channels.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration with namespace support.

required
Inputs

channel (str): Notification channel. Required. One of webhook, log, slack, file.

webhook channel::

channel: webhook
url: "https://hooks.example.com/..."
message: "Workflow completed"
method: POST          # optional, default POST
headers:              # optional extra headers
  X-Custom: value
payload:              # optional; used as-is; otherwise {"text": message}
  text: "..."

log channel::

channel: log
message: "Step completed"
level: info           # debug|info|warning|error, default: info
file: "notify.log"    # optional; appended as plain text

slack channel::

channel: slack
webhook_url: "{{ env.SLACK_WEBHOOK_URL }}"
message: "{{ workflow.name }} finished"
username: "yaml-workflow"     # optional
icon_emoji: ":robot_face:"    # optional
color: good                   # optional: good|warning|danger|#hex

file channel::

channel: file
file: "notifications.jsonl"
message: "Completed"
append: true          # default true
format: jsonl         # jsonl|text, default: jsonl

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: {"channel": channel, "status": "sent", "message": message}

Raises:

Type Description
TaskExecutionError

If the notification cannot be delivered.

Source code in src/yaml_workflow/tasks/notify_tasks.py
@register_task("notify")
def notify_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Send a notification via one of several supported channels.

    Args:
        config: Task configuration with namespace support.

    Inputs:
        channel (str): Notification channel. Required. One of ``webhook``,
            ``log``, ``slack``, ``file``.

        **webhook channel**::

            channel: webhook
            url: "https://hooks.example.com/..."
            message: "Workflow completed"
            method: POST          # optional, default POST
            headers:              # optional extra headers
              X-Custom: value
            payload:              # optional; used as-is; otherwise {"text": message}
              text: "..."

        **log channel**::

            channel: log
            message: "Step completed"
            level: info           # debug|info|warning|error, default: info
            file: "notify.log"    # optional; appended as plain text

        **slack channel**::

            channel: slack
            webhook_url: "{{ env.SLACK_WEBHOOK_URL }}"
            message: "{{ workflow.name }} finished"
            username: "yaml-workflow"     # optional
            icon_emoji: ":robot_face:"    # optional
            color: good                   # optional: good|warning|danger|#hex

        **file channel**::

            channel: file
            file: "notifications.jsonl"
            message: "Completed"
            append: true          # default true
            format: jsonl         # jsonl|text, default: jsonl

    Returns:
        Dict[str, Any]: ``{"channel": channel, "status": "sent", "message": message}``

    Raises:
        TaskExecutionError: If the notification cannot be delivered.
    """
    task_name = str(config.name or "notify_task")
    task_type = str(config.type or "notify")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config.context, config.workspace)

        processed = config.process_inputs()
        config.processed_inputs = processed

        channel = processed.get("channel", "")
        if not channel:
            raise ValueError("'channel' input is required for the notify task.")

        message = str(processed.get("message", ""))

        # Derive a workflow name for structured outputs
        workflow_name = ""
        if config.context:
            wf = config.context.get("workflow", {})
            if isinstance(wf, dict):
                workflow_name = str(wf.get("name", ""))

        # ----------------------------------------------------------------
        # Dispatch by channel
        # ----------------------------------------------------------------
        extra: Dict[str, Any] = {}

        if channel == "webhook":
            url = processed.get("url")
            if not url:
                raise ValueError("webhook channel requires 'url'.")
            method = processed.get("method", "POST")
            extra_headers = processed.get("headers", {}) or {}
            payload = processed.get("payload", None)
            send_result = _notify_webhook(
                url,
                message,
                method=method,
                extra_headers=extra_headers,
                payload=payload,
            )
            extra["status_code"] = send_result["status_code"]

        elif channel == "log":
            level = processed.get("level", "info")
            log_file = processed.get("file", None)
            _notify_log(message, level=level, file=log_file)

        elif channel == "slack":
            webhook_url = processed.get("webhook_url")
            if not webhook_url:
                raise ValueError("slack channel requires 'webhook_url'.")
            username = processed.get("username", "yaml-workflow")
            icon_emoji = processed.get("icon_emoji", ":robot_face:")
            color = processed.get("color", None)
            send_result = _notify_slack(
                webhook_url,
                message,
                username=username,
                icon_emoji=icon_emoji,
                color=color,
            )
            extra["status_code"] = send_result["status_code"]

        elif channel == "file":
            dest_file = processed.get("file")
            if not dest_file:
                raise ValueError("file channel requires 'file'.")
            append = bool(processed.get("append", True))
            fmt = processed.get("format", "jsonl")
            _notify_file(
                dest_file,
                message,
                workflow_name=workflow_name,
                append=append,
                fmt=fmt,
            )

        else:
            raise ValueError(
                f"Unsupported notification channel: '{channel}'. "
                "Supported channels: webhook, log, slack, file."
            )

        result: Dict[str, Any] = {
            "channel": channel,
            "status": "sent",
            "message": message,
            **extra,
        }
        log_task_result(logger, result)
        return result

    except (
        TaskExecutionError,
        TemplateError,
        urllib.error.HTTPError,
        urllib.error.URLError,
        OSError,
        ValueError,
    ) as e:
        if isinstance(e, urllib.error.HTTPError):
            try:
                error_body = e.read().decode("utf-8")
            except Exception:
                error_body = ""
            error_msg = (
                f"HTTP {e.code} {e.reason} sending notification to "
                f"{processed.get('url') or processed.get('webhook_url', '?')}: "
                f"{error_body}"
            )
            e = ValueError(error_msg)

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return {}  # Unreachable

print_vars_task(config: TaskConfig) -> dict

Prints selected variables from the context for debugging.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def print_vars_task(config: TaskConfig) -> dict:
    """Prints selected variables from the context for debugging."""
    inputs = config.process_inputs()
    context = config.context
    message = inputs.get("message", "Current Context Variables:")

    print(f"\n--- {message} ---")  # Prints directly to runner's stdout

    # Select variables to print (add more as needed)
    print("Workflow Variables:")
    print("==================")
    # Use direct context access via config.context
    print(f"args: {context.get('args')}")
    print(f"workflow_name: {context.get('workflow_name')}")
    print(f"workspace: {context.get('workspace')}")
    print(f"output: {context.get('output')}")
    print(f"run_number: {context.get('run_number')}")
    print(f"timestamp: {context.get('timestamp')}")

    # Safely access nested step results
    print("\nStep Results:")
    print("=============")
    steps_context = context.get("steps", {})
    if steps_context:
        # Use pprint for potentially large/nested step results
        pprint.pprint(steps_context, indent=2)
        # for name, step_info in steps_context.items():
        #     if step_info.get("skipped"):
        #         print(f"  - {name}: (skipped)")
        #     else:
        #         # Truncate long results for clarity
        #         result_repr = repr(step_info.get('result', 'N/A'))
        #         if len(result_repr) > 100:
        #             result_repr = result_repr[:100] + "..."
        #         print(f"  - {name}: {result_repr}")
    else:
        print("  (No step results yet)")

    print("--------------------\n")
    sys.stdout.flush()  # Flush after printing
    return {"success": True}  # Indicate task success

read_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Read content from a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("read_file")
def read_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Read content from a file."""
    task_name = str(config.name or "read_file")
    task_type = config.type or "read_file"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path_input = processed.get("file")

        # Debug logging
        logger.debug(
            f"Workspace={config.workspace}, exists={config.workspace.exists()}"
        )
        logger.debug(f"file_path param={file_path_input}")
        logger.debug(f"All inputs={processed}")

        encoding = processed.get("encoding", "utf-8")

        if not file_path_input:
            error_msg = "No file path provided (param 'file' is missing)"
            logger.error(error_msg)
            raise ValueError(error_msg)

        # Debug: Print resolved path
        resolved_path = str(resolve_path(config.workspace, file_path_input))
        logger.debug(f"Resolved path={resolved_path}")

        # Check if file exists before trying to read it
        resolved_file = Path(resolved_path)
        logger.debug(f"File exists={resolved_file.exists()}")
        if not resolved_file.exists():
            parent_dir = resolved_file.parent
            logger.debug(f"Parent directory={parent_dir}, exists={parent_dir.exists()}")
            if parent_dir.exists():
                logger.debug(f"Directory contents: {list(parent_dir.glob('*'))}")

        content = read_file_direct(
            file_path_input, config.workspace, encoding, task_name, logger=logger
        )
        output = {"path": file_path_input, "content": content}
        log_task_result(logger, output)
        return output
    except (
        ValueError,
        OSError,
        UnicodeDecodeError,
        TaskExecutionError,
        TemplateError,
    ) as e:
        # Debug: Print detailed exception info
        logger.error(f"EXCEPTION: {type(e).__name__}: {str(e)}")
        import traceback

        logger.error(f"TRACEBACK:\n{traceback.format_exc()}")

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None

read_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Task handler for reading JSON files.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("read_json")
def read_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Task handler for reading JSON files."""
    task_name = str(config.name or "read_json")
    task_type = config.type or "read_json"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)

    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")

        result_data = read_json(file_path, config.workspace, encoding=encoding)
        output = {"data": result_data}
        log_task_result(logger, output)
        return output
    except (
        ValueError,
        OSError,
        json.JSONDecodeError,
        TaskExecutionError,
        TemplateError,
    ) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None

read_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Task handler for reading YAML files.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("read_yaml")
def read_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Task handler for reading YAML files."""
    task_name = str(config.name or "read_yaml")
    task_type = config.type or "read_yaml"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)

    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")

        result_data = read_yaml(file_path, config.workspace, encoding=encoding)
        output = {"data": result_data}
        log_task_result(logger, output)
        return output
    except (
        ValueError,
        OSError,
        yaml.YAMLError,
        TaskExecutionError,
        TemplateError,
    ) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None

register_task(name: Optional[str] = None) -> Callable[..., Callable[[TaskConfig], R]]

Decorator to register a function as a workflow task.

Source code in src/yaml_workflow/tasks/__init__.py
def register_task(
    name: Optional[str] = None,
) -> Callable[..., Callable[[TaskConfig], R]]:
    """Decorator to register a function as a workflow task."""

    def task_wrapper(func: Callable[..., R]) -> Callable[[TaskConfig], R]:
        task_name = name or func.__name__

        @wraps(func)
        def wrapper(config: TaskConfig) -> R:
            sig = inspect.signature(func)
            params = sig.parameters

            # Simplified Check: Handle tasks taking only TaskConfig first
            if (
                list(params.keys()) == ["config"]
                and params["config"].annotation is TaskConfig
            ):
                return func(config)

            processed = config.process_inputs()
            kwargs = {}
            pos_args = []
            extra_kwargs: Dict[str, Any] = {}  # For **kwargs
            unmapped_inputs = (
                processed.copy()
            )  # Track inputs not mapped to named params

            # Identify special parameter names (*args, **kwargs, config, context)
            var_arg_name: Optional[str] = None
            kw_arg_name: Optional[str] = None
            config_param_name: Optional[str] = None
            context_param_name: Optional[str] = None

            for name, param in params.items():
                if param.annotation is TaskConfig:
                    config_param_name = name
                elif name == "context" and param.annotation in (
                    Dict[str, Any],
                    dict,
                    Any,
                ):
                    context_param_name = name
                elif param.kind == param.VAR_POSITIONAL:
                    var_arg_name = name
                elif param.kind == param.VAR_KEYWORD:
                    kw_arg_name = name

            # Map processed inputs to function arguments
            for name, param in params.items():
                if name == config_param_name or name == context_param_name:
                    continue  # Skip special params for now
                if name == var_arg_name or name == kw_arg_name:
                    continue  # Skip *args/**kwargs for now

                if name in processed:
                    kwargs[name] = processed[name]
                    del unmapped_inputs[name]  # Mark as mapped
                elif param.default is inspect.Parameter.empty:
                    # Check if required named param is missing from inputs
                    raise ValueError(f"Missing required parameter: {name}")
                # else: use default value (implicitly handled by function call)

            # Handle remaining unmapped inputs
            if var_arg_name and var_arg_name in processed:
                # If *args name exists as an input key (e.g., join_strings(strings=...))
                arg_input = processed[var_arg_name]
                pos_args = (
                    list(arg_input)
                    if isinstance(arg_input, (list, tuple))
                    else [arg_input]
                )
                if var_arg_name in unmapped_inputs:
                    del unmapped_inputs[var_arg_name]
            elif var_arg_name:
                # If *args exists but no input key matches, maybe map remaining unmapped inputs?
                # This is ambiguous. Let's require explicit mapping for now.
                # If len(unmapped_inputs) == 1 and var_arg_name:
                #      pos_args = list(unmapped_inputs.values())[0]
                #      if not isinstance(pos_args, list): pos_args = [pos_args]
                #      unmapped_inputs.clear()
                pass  # Requires explicit input name for *args mapping

            # Assign remaining unmapped inputs to **kwargs if available
            if kw_arg_name and unmapped_inputs:
                kwargs[kw_arg_name] = unmapped_inputs
            elif unmapped_inputs and not var_arg_name:
                # If unmapped inputs remain and there's no **kwargs or *args to catch them,
                # it might indicate an issue (e.g., typo in YAML input name).
                # However, the function call itself will raise TypeError if unexpected args are passed.
                # Let the function call handle the final validation for unexpected kwargs.
                pass

            # Inject config and context if needed
            if config_param_name:
                kwargs[config_param_name] = config
            if context_param_name:
                # Access the protected context member
                kwargs[context_param_name] = (
                    config.context
                )  # Pass the full context dict

            # Call the function
            try:
                return func(*pos_args, **kwargs)
            except TypeError as e:
                arg_summary = f"pos_args={pos_args}, kwargs={list(kwargs.keys())}"
                logging.error(
                    f"TypeError calling task '{task_name}': {e}. Call info: {arg_summary}"
                )
                raise

        _task_registry[task_name] = wrapper
        return wrapper

    return task_wrapper

render_template(config: TaskConfig) -> Dict[str, Any]

Render a template and save it to a file.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration object

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary containing the path to the output file

Raises:

Type Description
TaskExecutionError

If template resolution fails or file cannot be written (via handle_task_error)

Source code in src/yaml_workflow/tasks/template_tasks.py
@register_task("template")
def render_template(config: TaskConfig) -> Dict[str, Any]:
    """
    Render a template and save it to a file.

    Args:
        config: Task configuration object

    Returns:
        Dict[str, Any]: Dictionary containing the path to the output file

    Raises:
        TaskExecutionError: If template resolution fails or file cannot be written (via handle_task_error)
    """
    task_name = str(config.name or "template_task")
    task_type = str(config.type or "template")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config.context, config.workspace)

        # Get raw inputs directly, do not process them here
        raw_inputs = config.step.get("inputs", {})
        template_str = raw_inputs.get("template")
        if not template_str or not isinstance(template_str, str):
            raise ValueError("Input 'template' must be a non-empty string")

        # Correctly get the output file path using 'output_file' key
        output_file_rel_path = raw_inputs.get("output_file")
        if not output_file_rel_path or not isinstance(output_file_rel_path, str):
            raise ValueError("Input 'output_file' must be a non-empty string")

        # Resolve the full output path relative to the workspace
        output_path = config.workspace / output_file_rel_path

        # Ensure parent directory exists
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Resolve the template content using the engine
        # Pass workspace as searchpath to allow includes
        resolved_content = config._template_engine.process_template(
            template_str,
            config.context,
            searchpath=str(config.workspace),  # Pass workspace for includes
        )

        # Write the resolved content to the output file
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(resolved_content)

        log_task_result(logger, str(output_path))
        return {"path": str(output_path)}

    except (
        TaskExecutionError,
        TemplateSyntaxError,
        UndefinedError,
        OSError,
        ValueError,
    ) as e:
        logger.error(f"Task '{task_name}' failed: {str(e)}", exc_info=True)
        raise TaskExecutionError(
            step_name=task_name, original_error=e, task_config=config.step
        )

shell_task(config: TaskConfig) -> Dict[str, Any]

Run a shell command with namespace support.

Parameters:

Name Type Description Default
config TaskConfig

Task configuration with namespace support

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Command execution results

Raises:

Type Description
TaskExecutionError

If command execution fails or template resolution fails

Source code in src/yaml_workflow/tasks/shell_tasks.py
@register_task("shell")
def shell_task(config: TaskConfig) -> Dict[str, Any]:
    """
    Run a shell command with namespace support.

    Args:
        config: Task configuration with namespace support

    Returns:
        Dict[str, Any]: Command execution results

    Raises:
        TaskExecutionError: If command execution fails or template resolution fails
    """
    task_name = str(config.name or "shell_task")
    task_type = str(config.type or "shell")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config.context, config.workspace)

        processed = config.process_inputs()
        config.processed_inputs = processed

        if "command" not in processed:
            missing_cmd_error = ValueError("command parameter is required")
            raise missing_cmd_error
        command = resolve_platform_command(processed["command"])

        cwd = config.workspace
        if "working_dir" in processed:
            working_dir = processed["working_dir"]
            if not os.path.isabs(working_dir):
                cwd = config.workspace / working_dir
            else:
                cwd = Path(working_dir)

        env = get_environment()
        if "env" in processed:
            env.update(processed["env"])

        shell = processed.get("shell", True)

        # Get timeout
        timeout = processed.get("timeout", None)

        # Process command template ONLY if it's a string
        if isinstance(command, str):
            # Pass necessary context for error reporting within process_command
            command_context = {
                **config.context,
                "step_name": task_name,
                "task_type": task_type,
                "task_config": config.step,
            }
            command = process_command(command, command_context)
            if shell:
                _warn_if_bash_syntax(command, task_name)
        elif not isinstance(command, list):
            # Raise error if command is neither string nor list
            invalid_type_error = TypeError(
                f"Invalid command type: {type(command).__name__}. Expected string or list."
            )
            raise TaskExecutionError(
                step_name=task_name, original_error=invalid_type_error
            )

        # Run command
        returncode, stdout, stderr = run_command(
            command, cwd=str(cwd), env=env, shell=shell, timeout=timeout
        )

        if returncode != 0:
            error_message = f"Command failed with exit code {returncode}"
            if stderr:
                error_message += f"\nStderr:\n{stderr}"
            cmd_error = subprocess.CalledProcessError(
                returncode, cmd=command, output=stdout, stderr=stderr
            )
            raise cmd_error

        result = {"return_code": returncode, "stdout": stdout, "stderr": stderr}
        log_task_result(logger, result)
        return result

    except (
        TaskExecutionError,
        TemplateError,
        subprocess.CalledProcessError,
        subprocess.TimeoutExpired,
        OSError,
        ValueError,
        TypeError,
    ) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return {}  # Unreachable

write_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Write content to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("write_file")
def write_file_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Write content to a file."""
    task_name = str(config.name or "write_file")
    task_type = config.type or "write_file"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        content = processed.get("content")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")
        if content is None:
            raise ValueError("No content provided")

        result = write_file_direct(
            file_path, str(content), config.workspace, encoding, task_name
        )
        output = {"path": result, "content": content}
        log_task_result(logger, output)
        return output
    except (ValueError, OSError, TaskExecutionError, TemplateError) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None

write_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Write JSON data to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("write_json")
def write_json_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Write JSON data to a file."""
    task_name = str(config.name or "write_json")
    task_type = config.type or "write_json"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")

        # Debug: Print workspace and file path info
        logger.debug(
            f"Workspace={config.workspace}, exists={config.workspace.exists()}"
        )
        logger.debug(f"file_path param={file_path}")
        logger.debug(f"All inputs={processed}")

        data = processed.get("data")
        indent = int(processed.get("indent", 2))
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            error_msg = "No file path provided (param 'file' is missing)"
            logger.error(error_msg)
            raise ValueError(error_msg)
        if data is None:
            error_msg = "No data provided"
            logger.error(error_msg)
            raise ValueError(error_msg)

        # Ensure output directory exists
        if "output/" in file_path:
            output_dir = config.workspace / "output"
            logger.debug(
                f"Creating output dir {output_dir}, exists={output_dir.exists()}"
            )
            output_dir.mkdir(exist_ok=True, parents=True)

        # Debug: Print resolved path
        resolved_path = str(resolve_path(config.workspace, file_path))
        logger.debug(f"Resolved path={resolved_path}")

        result_path = write_json_direct(
            file_path,
            data,
            indent=indent,
            workspace=config.workspace,
            encoding=encoding,
            step_name=task_name,
            logger=logger,
        )

        # Debug: Check if file was created
        result_file = Path(result_path)
        logger.debug(f"Result path={result_path}, exists={result_file.exists()}")

        output = {"path": result_path}
        log_task_result(logger, output)
        return output
    except (ValueError, TypeError, OSError, TaskExecutionError, TemplateError) as e:
        # Debug: Print detailed exception info
        logger.error(f"EXCEPTION: {type(e).__name__}: {str(e)}")
        import traceback

        logger.error(f"TRACEBACK:\n{traceback.format_exc()}")

        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None

write_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]

Write YAML data to a file.

Source code in src/yaml_workflow/tasks/file_tasks.py
@register_task("write_yaml")
def write_yaml_task(config: TaskConfig) -> Optional[Dict[str, Any]]:
    """Write YAML data to a file."""
    task_name = str(config.name or "write_yaml")
    task_type = config.type or "write_yaml"
    logger = get_task_logger(config.workspace, task_name)
    log_task_execution(logger, config.step, config.context, config.workspace)
    try:
        processed = config.process_inputs()
        file_path = processed.get("file")
        data = processed.get("data")
        encoding = processed.get("encoding", "utf-8")

        if not file_path:
            raise ValueError("No file path provided")
        if data is None:
            raise ValueError("No data provided")

        result_path = write_yaml_direct(
            file_path,
            data,
            workspace=config.workspace,
            encoding=encoding,
            step_name=task_name,
        )
        output = {"path": result_path}
        log_task_result(logger, output)
        return output
    except (
        ValueError,
        OSError,
        yaml.YAMLError,
        TaskExecutionError,
        TemplateError,
    ) as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config.context,
        )
        handle_task_error(context)
        return None