Skip to content

yaml_workflow.tasks.python_tasks

yaml_workflow.tasks.python_tasks

Python task implementations for executing Python functions.

Classes

Functions

print_message_task(config: TaskConfig) -> dict

Prints a templated message to the console.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task(name="print_message")  # Explicitly register with desired name
def print_message_task(config: TaskConfig) -> dict:
    """Prints a templated message to the console."""
    inputs = config.process_inputs()  # Render inputs using context
    context = config._context
    message = inputs.get("message", "")

    if not message:
        logger.warning("print_message task called with no message.")
        # Even if empty, consider it success, just print nothing
        # return {"success": False, "error": "No message provided"}

    # The message is already rendered by process_inputs, just print it
    print(message)  # Prints directly to runner's stdout
    sys.stdout.flush()  # Flush after printing
    return {"success": True, "printed_length": len(message)}

print_vars_task(config: TaskConfig) -> dict

Prints selected variables from the context for debugging.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def print_vars_task(config: TaskConfig) -> dict:
    """Prints selected variables from the context for debugging."""
    inputs = config.process_inputs()
    context = config._context
    message = inputs.get("message", "Current Context Variables:")

    print(f"\n--- {message} ---")  # Prints directly to runner's stdout

    # Select variables to print (add more as needed)
    print("Workflow Variables:")
    print("==================")
    # Use direct context access via config.context
    print(f"args: {context.get('args')}")
    print(f"workflow_name: {context.get('workflow_name')}")
    print(f"workspace: {context.get('workspace')}")
    print(f"output: {context.get('output')}")
    print(f"run_number: {context.get('run_number')}")
    print(f"timestamp: {context.get('timestamp')}")

    # Safely access nested step results
    print("\nStep Results:")
    print("=============")
    steps_context = context.get("steps", {})
    if steps_context:
        # Use pprint for potentially large/nested step results
        pprint.pprint(steps_context, indent=2)
        # for name, step_info in steps_context.items():
        #     if step_info.get("skipped"):
        #         print(f"  - {name}: (skipped)")
        #     else:
        #         # Truncate long results for clarity
        #         result_repr = repr(step_info.get('result', 'N/A'))
        #         if len(result_repr) > 100:
        #             result_repr = result_repr[:100] + "..."
        #         print(f"  - {name}: {result_repr}")
    else:
        print("  (No step results yet)")

    print("--------------------\n")
    sys.stdout.flush()  # Flush after printing
    return {"success": True}  # Indicate task success

python_code(config: TaskConfig) -> Dict[str, Any]

Execute a snippet of Python code.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def python_code(config: TaskConfig) -> Dict[str, Any]:
    """Execute a snippet of Python code."""
    task_name = str(config.name or "python_code_task")
    task_type = str(config.type or "python_code")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config._context, config.workspace)
        processed = config.process_inputs()
        config._processed_inputs = processed

        code = processed.get("code")
        result_variable = processed.get("result_variable")

        if not code or not isinstance(code, str):
            raise ValueError("Input 'code' (string) is required.")
        if result_variable is not None and not isinstance(result_variable, str):
            raise ValueError("Input 'result_variable' must be a string.")

        result_value = _execute_code(code, config, result_variable)

        # Log the raw result value before returning
        log_task_result(logger, {"result_value_from_code": result_value})
        return result_value  # Return the raw value, engine will wrap it

    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return {}  # Unreachable

python_function(config: TaskConfig) -> Dict[str, Any]

Execute a Python function from a specified module.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def python_function(config: TaskConfig) -> Dict[str, Any]:
    """Execute a Python function from a specified module."""
    task_name = str(config.name or "python_function_task")
    task_type = str(config.type or "python_function")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config._context, config.workspace)
        processed = config.process_inputs()
        config._processed_inputs = processed  # Store for helpers

        # Get module/function from processed inputs
        module_name = processed.get("module")
        function_name = processed.get("function")

        if not module_name or not isinstance(module_name, str):
            raise ValueError("Input 'module' (string) is required.")
        if not function_name or not isinstance(function_name, str):
            raise ValueError("Input 'function' (string) is required.")

        # Load and execute
        func = _load_function(module_name, function_name)
        result_value = _execute_python_function(func, config)

        # Log the result (as a dict for consistency in logs)
        log_task_result(logger, result={"result": result_value})
        # Return the raw result_value, engine will wrap it
        return result_value

    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return {}  # Unreachable

python_module(config: TaskConfig) -> Dict[str, Any]

Execute a Python module as a script.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def python_module(config: TaskConfig) -> Dict[str, Any]:
    """Execute a Python module as a script."""
    task_name = str(config.name or "python_module_task")
    task_type = str(config.type or "python_module")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config._context, config.workspace)
        processed = config.process_inputs()
        config._processed_inputs = processed

        module_name = processed.get("module")
        args = processed.get("args")
        cwd = processed.get("cwd")
        timeout = processed.get("timeout")

        if not module_name or not isinstance(module_name, str):
            raise ValueError("Input 'module' (string) is required.")
        if args is not None and not isinstance(args, list):
            raise ValueError("Input 'args' must be a list of strings.")
        if cwd is not None and not isinstance(cwd, str):
            raise ValueError("Input 'cwd' must be a string.")
        if timeout is not None:
            try:
                timeout = float(timeout)
            except ValueError:
                raise ValueError("Input 'timeout' must be a number.")

        returncode, stdout, stderr = _execute_module(
            module_name=module_name,
            args=args,
            cwd=cwd,
            timeout=timeout,
            workspace=config.workspace,  # Pass workspace path
        )

        result = {
            "returncode": returncode,
            "stdout": stdout,
            "stderr": stderr,
        }

        # Optionally raise error on non-zero exit code
        check = processed.get("check", True)
        if check and returncode != 0:
            error_message = f"Module '{module_name}' failed with exit code {returncode}.\nStderr:\n{stderr}"
            # Fix: Wrap the failure reason in a standard error type
            raise TaskExecutionError(
                step_name=task_name, original_error=RuntimeError(error_message)
            )

        log_task_result(logger, result)
        return result

    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return {}  # Unreachable

python_script(config: TaskConfig) -> Dict[str, Any]

Execute an external Python script.

Source code in src/yaml_workflow/tasks/python_tasks.py
@register_task()
def python_script(config: TaskConfig) -> Dict[str, Any]:
    """Execute an external Python script."""
    task_name = str(config.name or "python_script_task")
    task_type = str(config.type or "python_script")
    logger = get_task_logger(config.workspace, task_name)

    try:
        log_task_execution(logger, config.step, config._context, config.workspace)
        processed = config.process_inputs()
        config._processed_inputs = processed

        script_path_in = processed.get("script_path")
        args = processed.get("args")  # Should be list or None
        cwd = processed.get("cwd")
        timeout = processed.get("timeout")

        if not script_path_in or not isinstance(script_path_in, str):
            raise ValueError("Input 'script_path' (string) is required.")
        if args is not None and not isinstance(args, list):
            raise ValueError("Input 'args' must be a list of strings.")
        if cwd is not None and not isinstance(cwd, str):
            raise ValueError("Input 'cwd' must be a string.")
        if timeout is not None:
            try:
                timeout = float(timeout)
            except ValueError:
                raise ValueError("Input 'timeout' must be a number.")

        script_path = _find_script(script_path_in, config.workspace)
        returncode, stdout, stderr = _execute_script(
            script_path=script_path, args=args, cwd=cwd, timeout=timeout
        )

        result = {
            "returncode": returncode,
            "stdout": stdout,
            "stderr": stderr,
        }

        # Optionally raise error on non-zero exit code
        check = processed.get("check", True)  # Default to True
        if check and returncode != 0:
            error_message = f"Script '{script_path}' failed with exit code {returncode}.\nStderr:\n{stderr}"
            # Fix: Wrap the failure reason in a standard error type
            raise TaskExecutionError(
                step_name=task_name, original_error=RuntimeError(error_message)
            )

        log_task_result(logger, result)
        return result

    except Exception as e:
        context = ErrorContext(
            step_name=task_name,
            task_type=task_type,
            error=e,
            task_config=config.step,
            template_context=config._context,
        )
        handle_task_error(context)
        return {}  # Unreachable