aipype - Core Framework

The core aipype package provides the foundation for building AI agent pipelines with declarative task orchestration and automatic dependency resolution.

Core Components

aipype: A modular AI agent framework with declarative pipeline-based task orchestration.

class PipelineAgent[source]

Bases: BasePipelineAgent

Declarative agent that discovers tasks from @task decorated methods.

PipelineAgent provides a cleaner, more Pythonic way to define AI pipelines. Instead of manually creating task objects and wiring dependencies, you simply define methods decorated with @task and let the framework infer the execution order from parameter names.

Key Features:
  • Automatic task discovery from decorated methods

  • Dependency inference from function signatures

  • Support for explicit paths via Annotated[T, Depends(“path”)]

  • Full compatibility with existing execution engine

  • Parallel execution of independent tasks

How It Works:
  1. On initialization, discovers all @task decorated methods

  2. Analyzes method signatures to infer dependencies

  3. Topologically sorts methods by dependency order

  4. Creates TaskWrapper instances for each method

  5. Returns task list to parent BasePipelineAgent for execution

The parent BasePipelineAgent handles all execution concerns:
  • Building execution phases

  • Resolving dependencies

  • Parallel execution

  • Error handling

  • Result collection

Example:

class ArticleAgent(PipelineAgent):
    @task
    def search(self) -> dict:
        # Access config directly for initial inputs
        return {"results": f"Results for {self.config['topic']}"}

    @task
    def summarize(self, search: dict) -> str:
        # 'search' parameter auto-wired from search task
        return f"Summary of: {search}"

    @task
    def write(self, summarize: str) -> str:
        return f"Article based on: {summarize}"

agent = ArticleAgent("writer", {"topic": "AI trends"})
result = agent.run()
name

Agent identifier

config

Configuration dictionary

tasks

List of TaskWrapper instances (populated by setup_tasks())

context

TaskContext for inter-task communication

Parameters:
__init__(name, config=None)[source]

Initialize the pipeline agent.

Parameters:
  • name (str) – Unique identifier for this agent

  • config (Optional[Dict[str, Any]]) – Configuration dictionary passed to agent and tasks

__str__()[source]

String representation of the agent.

Return type:

str

setup_tasks()[source]

Auto-discover @task methods and build the task list.

This method: 1. Finds all methods decorated with @task 2. Infers dependencies from method signatures 3. Sorts methods by dependency order 4. Creates TaskWrapper instances

Return type:

List[BaseTask]

Returns:

List of BaseTask instances ready for execution

class BasePipelineAgent[source]

Bases: object

Base agent class for building AI workflows with automatic task orchestration.

BasePipelineAgent provides the foundation for creating AI automation workflows. It automatically handles task dependency resolution, parallel execution, error handling, and result collection. Users inherit from this class and implement setup_tasks() to define their workflow.

Key Features
  • Automatic Dependency Resolution: Tasks receive data from previous tasks

  • Parallel Execution: Independent tasks run simultaneously for performance

  • Error Handling: Graceful handling of task failures with detailed reporting

  • Context Management: Shared data store for inter-task communication

  • Result Tracking: Comprehensive execution metrics and status reporting

Configuration Options:
  • enable_parallel (bool): Enable parallel task execution (default: True)

  • max_parallel_tasks (int): Maximum concurrent tasks (default: 5)

  • stop_on_failure (bool): Stop pipeline on first failure (default: True)

Lifecycle:
  1. Initialization: Agent created with name and config

  2. Task Setup: setup_tasks() called to define workflow

  3. Execution: run() method orchestrates task execution

  4. Results: AgentRunResult returned with status and metrics

Example

Basic agent implementation:

class DataProcessorAgent(BasePipelineAgent):
    def setup_tasks(self):
        return [
            SearchTask("gather_data", {
                "query": self.config.get("search_query", ""),
                "max_results": 10
            }),
            TransformTask("process_data", {
                "transform_function": process_search_results,
                "input_field": "results"
            }, dependencies=[
                TaskDependency("results", "gather_data.results", REQUIRED)
            ]),
            FileSaveTask("save_results", {
                "file_path": "/tmp/results.json"
            }, dependencies=[
                TaskDependency("data", "process_data.result", REQUIRED)
            ])
        ]

# Usage
agent = DataProcessorAgent("processor", {
    "search_query": "machine learning trends",
    "enable_parallel": True
})
result = agent.run()

if result.is_success():
    print(f"Processed {result.completed_tasks} tasks successfully")
    # Access individual task results
    search_result = agent.context.get_result("gather_data")
    processed_data = agent.context.get_result("process_data")
else:
    print(f"Pipeline failed: {result.error_message}")

Advanced configuration:

agent = DataProcessorAgent("processor", {
    "search_query": "AI research",
    "enable_parallel": False,  # Sequential execution
    "stop_on_failure": False,  # Continue despite failures
    "max_parallel_tasks": 10   # Higher concurrency
})

Return Values

run() returns AgentRunResult with:

  • status: SUCCESS, PARTIAL, ERROR, or RUNNING

  • completed_tasks/failed_tasks: Task completion counts

  • total_phases: Number of execution phases

  • execution_time: Total runtime in seconds

  • metadata: Additional execution information

Error Handling

  • Individual task failures are captured and reported

  • Pipeline can continue or stop based on stop_on_failure setting

  • Detailed error information available in execution context

  • Partial success supported for workflows with optional components

Thread Safety

  • TaskContext is thread-safe for concurrent task execution

  • Agent instance should not be shared between threads

  • Each agent execution creates isolated context

Performance

  • Tasks in same phase execute in parallel (when enabled)

  • Dependency resolution minimizes execution phases

  • ThreadPoolExecutor manages concurrent task execution

  • Configurable concurrency limits prevent resource exhaustion

Parameters:
__init__(name, config=None)[source]

Initialize pipeline agent with name and configuration.

Creates a new pipeline agent instance with automatic task setup. The agent will call setup_tasks() during initialization to configure the workflow. TaskContext and DependencyResolver are created automatically.

Attributes Created:

  • name: Agent identifier

  • config: Configuration dictionary

  • tasks: List of tasks (populated by setup_tasks())

  • context: TaskContext for inter-task communication

  • dependency_resolver: Handles dependency resolution

  • execution_plan: Created during run() execution

Parameters:
  • name (str) – Unique identifier for this agent instance. Used in logging and result tracking. Should be descriptive of the agent’s purpose.

  • config (Optional[Dict[str, Any]]) –

    Configuration dictionary with agent settings:

    • enable_parallel (bool): Enable parallel task execution (default: True)

    • max_parallel_tasks (int): Maximum concurrent tasks (default: 5)

    • stop_on_failure (bool): Stop pipeline on first failure (default: True)

  • self.config. (Additional config keys are passed to tasks via)

Example:

agent = MyAgent("data_processor", {
    "enable_parallel": True,
    "max_parallel_tasks": 3,
    "input_file": "data.csv"
})
print(f"Agent {agent.name} has {len(agent.tasks)} tasks")

Note

The agent automatically calls setup_tasks() during initialization. If setup_tasks() raises an exception, the agent will log a warning but continue with an empty task list.

See also

  • BaseTask: Base class for implementing custom tasks

  • TaskDependency: Declarative dependency specification

  • TaskContext: Shared data store for inter-task communication

  • AgentRunResult: Standardized execution result format

__str__()[source]

String representation of the agent.

Return type:

str

add_tasks(tasks)[source]

Add tasks to the agent.

Parameters:

tasks (List[BaseTask]) – List of tasks to add

Return type:

None

create_context()[source]

Create and return the task context for this agent.

Return type:

TaskContext

Returns:

TaskContext instance

display_results(sections=None)[source]

Display results with configurable sections.

This method provides a simple way to display agent results without writing complex display logic in each agent. Users can choose which sections to show.

Parameters:

sections (Optional[List[str]]) – List of sections to display. Options: “summary”, “tasks”, “errors”. Defaults to [“summary”, “tasks”, “errors”] (show all sections).

Return type:

None

get_context()[source]

Get the task context for this agent.

Return type:

TaskContext

Returns:

TaskContext instance

get_dependency_info()[source]

Get dependency information for all tasks.

Return type:

Dict[str, List[Dict[str, Any]]]

Returns:

Dictionary mapping task names to their dependency info

get_execution_plan()[source]

Get the current execution plan.

Return type:

Optional[TaskExecutionPlan]

Returns:

TaskExecutionPlan instance or None if not yet created

reset()[source]

Reset the agent and clear context.

Return type:

None

run()[source]

Execute the complete workflow with automatic task orchestration.

This is the main execution method that runs the entire pipeline:

  1. Builds execution plan from task dependencies

  2. Resolves dependencies and updates task configurations

  3. Executes tasks in phases (parallel within phases, sequential between phases)

  4. Collects results and handles errors

  5. Returns comprehensive execution status

The method handles all aspects of task execution including dependency resolution, parallel execution, error propagation, and result collection. Individual task failures are captured and the pipeline can continue based on the stop_on_failure configuration.

The method returns AgentRunResult containing:

  • status:
    • SUCCESS (all tasks completed),

    • PARTIAL (some failed),

    • ERROR (all failed),

    • RUNNING (already executing)

  • completed_tasks: Number of successfully completed tasks

  • failed_tasks: Number of tasks that failed

  • total_phases: Number of execution phases used

  • execution_time: Total runtime in seconds

  • metadata: Additional execution information

Exceptions

No exceptions are raised directly. All operational errors are captured in the returned AgentRunResult. Only programming errors (like missing setup_tasks implementation) will raise exceptions.

Example

Basic execution and result checking:

agent = MyAgent("processor", config)
result = agent.run()

if result.is_success():
    print(f"All {result.completed_tasks} tasks completed successfully")

    # Access individual task results
    search_data = agent.context.get_result("search_task")
    processed_data = agent.context.get_result("process_task")

elif result.is_partial():
    print(f"Partial success: {result.completed_tasks}/{result.total_tasks}")
    print(f"Failed tasks: {result.failed_tasks}")

else:  # result.is_error()
    print(f"Pipeline failed: {result.error_message}")

Performance monitoring:

result = agent.run()
print(f"Execution time: {result.execution_time:.2f}s")
print(f"Phases used: {result.total_phases}")

# Check if parallel execution was effective
if result.total_phases < len(agent.tasks):
    print("Parallel execution optimized the pipeline")

Execution Phases

Tasks are automatically organized into phases based on dependencies:

  • Phase 1: Tasks with no dependencies

  • Phase 2: Tasks depending only on Phase 1 tasks

  • Phase N: Tasks depending on previous phases only

Within each phase, tasks can execute in parallel (if enabled). Between phases, execution is sequential to maintain dependency order.

Error Handling

  • Individual task errors are captured in TaskResult objects

  • Pipeline can continue or stop based on stop_on_failure config

  • Partial success is supported when some tasks succeed

  • All errors are logged with detailed context information

  • Failed task dependencies propagate to dependent tasks

Thread Safety

This method is not thread-safe. Each agent instance should only execute one pipeline at a time. Concurrent task execution within the pipeline is handled internally with ThreadPoolExecutor.

Performance

  • Parallel execution significantly improves performance for independent tasks

  • Dependency resolution optimizes execution order

  • Thread pool size controlled by max_parallel_tasks config

  • Memory usage scales with number of concurrent tasks

Note

If the agent is already running, this method returns immediately with a RUNNING status to prevent concurrent execution conflicts.

See also

  • AgentRunResult: For detailed result format and status checking

  • TaskContext: Access via agent.context for individual task results

  • TaskExecutionPlan: For understanding execution phase organization

Return type:

AgentRunResult

Returns:

AgentRunResult

setup_tasks()[source]

Define the workflow tasks for this agent. Must be implemented by subclasses.

This is the main method where users define their AI workflow by creating and configuring tasks with their dependencies. Tasks are automatically organized into execution phases based on their dependency relationships.

Task Naming:
  • Use descriptive names that indicate the task’s purpose

  • Names must be unique within the agent

  • Use snake_case for consistency

  • Good: “fetch_articles”, “analyze_sentiment”, “save_results”

  • Avoid: “task1”, “t”, “process”

Dependency Design:
  • Required dependencies must be satisfied for task to run

  • Optional dependencies use default values if source unavailable

  • Circular dependencies will cause pipeline execution to fail

  • Use transform_func in dependencies for data preprocessing

Note

This method is called automatically during agent initialization. Tasks should not be added to self.tasks directly - return them from this method instead.

See also

  • TaskDependency: For creating task dependencies

  • BaseTask subclasses: LLMTask, SearchTask, ConditionalTask, etc.

  • Task-specific documentation for configuration options

Returns:

List of configured BaseTask instances that define the workflow.

Return type:

List[BaseTask]

Example:
  • Basic search and summarize workflow

    def setup_tasks(self):
        return [
            SearchTask("search", {
                "query": self.config.get("topic", ""),
                "max_results": 5
            }),
            LLMTask("summarize", {
                "prompt": "Summarize these articles: ${articles}",
                "llm_provider": "openai",
                "llm_model": "gpt-4"
            }, dependencies=[
                TaskDependency("articles", "search.results", REQUIRED)
            ])
        ]
    
  • Complex workflow with conditional logic

    def setup_tasks(self):
        return [
            SearchTask("search", {"query": "AI news"}),
            TransformTask("filter", {
                "transform_function": filter_recent_articles,
                "input_field": "results"
            }, dependencies=[
                TaskDependency("results", "search.results", REQUIRED)
            ]),
            ConditionalTask("quality_check", {
                "condition_function": lambda articles: len(articles) >= 3,
                "condition_inputs": ["filtered_articles"],
                "action_function": log_action("Quality check passed"),
                "else_function": log_action("Insufficient articles")
            }, dependencies=[
                TaskDependency("filtered_articles", "filter.result", REQUIRED)
            ]),
            LLMTask("summarize", {
                "prompt": "Create summary from: ${content}",
                "llm_provider": "openai"
            }, dependencies=[
                TaskDependency("content", "filter.result", REQUIRED)
            ])
        ]
    
Raises:

NotImplementedError – Must be implemented by subclasses

validate_dependencies()[source]

Validate that all task dependencies can be satisfied.

Return type:

List[str]

Returns:

List of validation error messages (empty if all valid)

class TaskExecutionPlan[source]

Bases: object

Execution plan that organizes tasks into phases based on dependencies.

This class analyzes task dependencies and creates an optimal execution plan where:

  • Tasks with no dependencies run in the first phase

  • Tasks whose dependencies are satisfied run in subsequent phases

  • Tasks within the same phase can run in parallel

  • Phases execute sequentially to maintain dependency ordering

The plan automatically detects and rejects circular dependencies, ensuring all workflows can execute successfully.

tasks

Original list of tasks to organize

phases

List of task lists, each representing an execution phase

Example:
  • Given tasks – A, B (depends on A), C (depends on A), D (depends on B,C)

  • Execution plan

  • * Phase 1 – [A] (no dependencies)

  • * Phase 2 – [B, C] (depend only on A, can run in parallel)

  • * Phase 3 – [D] (depends on B and C)

Parameters:

tasks (List[BaseTask])

__init__(tasks)[source]

Create execution plan from list of tasks.

Parameters:

tasks (List[BaseTask]) – List of tasks to organize into execution phases

Raises:

ValueError – If circular dependencies detected or dependencies cannot be satisfied

get_phase(phase_index)[source]

Get tasks in a specific phase.

Parameters:

phase_index (int) – Index of the phase

Return type:

List[BaseTask]

Returns:

List of tasks in the phase

get_total_tasks()[source]

Get total number of tasks in the plan.

Return type:

int

Returns:

Total number of tasks

total_phases()[source]

Get total number of execution phases.

Return type:

int

Returns:

Number of phases

class BaseTask[source]

Bases: ABC

Abstract base class for all task implementations in the aipype framework.

BaseTask provides the foundation for creating custom tasks that can be orchestrated by PipelineAgent. It handles common concerns like validation, status tracking, error handling, and dependency management, allowing subclasses to focus on their specific functionality.

All tasks follow the TaskResult pattern where operational errors are returned as structured results rather than raised as exceptions. This enables graceful error handling and pipeline continuation.

name

Unique identifier for this task instance

config

Configuration dictionary with task-specific parameters

dependencies

List of TaskDependency objects for data flow

validation_rules

Optional validation rules for configuration

agent_name

Name of the parent agent (set automatically)

logger

Task-specific logger instance

Abstract Methods:

run(): Must be implemented by subclasses to perform task work

Common Patterns

Configuration validation:

class MyTask(BaseTask):
    def __init__(self, name, config, dependencies=None):
        super().__init__(name, config, dependencies)
        self.validation_rules = {
            "required": ["api_key", "endpoint"],
            "optional": ["timeout", "retries"],
            "defaults": {"timeout": 30, "retries": 3},
            "types": {
                "api_key": str,
                "endpoint": str,
                "timeout": int,
                "retries": int
            },
            "ranges": {
                "timeout": (1, 300),
                "retries": (0, 10)
            }
        }

Error handling with TaskResult:

def run(self) -> TaskResult:
    start_time = datetime.now()

    # Always validate first
    validation_failure = self._validate_or_fail(start_time)
    if validation_failure:
        return validation_failure

    try:
        # Perform task work
        result = self.do_work()
        execution_time = (datetime.now() - start_time).total_seconds()

        return TaskResult.success(
            data=result,
            execution_time=execution_time,
            metadata={"processed_items": len(result)}
        )

    except ApiError as e:
        # Expected operational error
        execution_time = (datetime.now() - start_time).total_seconds()
        return TaskResult.failure(
            error_message=f"API call failed: {str(e)}",
            execution_time=execution_time,
            metadata={"error_type": "ApiError", "retry_recommended": True}
        )

Working with dependencies:

class ProcessDataTask(BaseTask):
    def __init__(self, name, config, dependencies=None):
        super().__init__(name, config, dependencies)
        # Dependencies will be resolved automatically before run()

    def run(self) -> TaskResult:
        # Access resolved dependency data
        input_data = self.config.get("input_data")  # From dependency
        processing_mode = self.config.get("mode", "default")  # From config

        # Process the data...

Validation Rules

Tasks can define validation_rules to automatically validate their configuration. The validation system supports:

  • required: Keys that must be present

  • optional: Keys that are optional

  • defaults: Default values for missing optional keys

  • types: Expected Python types for values

  • ranges: (min, max) tuples for numeric validation

  • custom: Custom validation functions

Status Management

Task status is managed automatically:

  • NOT_STARTED: Initial state

  • RUNNING: During execution (set by framework)

  • SUCCESS: Task completed successfully

  • ERROR: Task failed with error

  • PARTIAL: Task completed with partial success

Thread Safety

Individual task instances are not thread-safe and should not be executed concurrently. However, different task instances can run in parallel safely.

See Also:
  • TaskResult: Return format for task execution

  • TaskDependency: For creating task dependencies

  • PipelineAgent: For orchestrating multiple tasks

  • Validation documentation in utils.common module

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize a new task instance with configuration and dependencies.

Creates a new task with the specified configuration. The task will be initialized with default status and logging setup. Dependencies will be resolved automatically by the PipelineAgent before execution.

Parameters:
  • name (str) – Unique identifier for this task within the agent. Must be unique across all tasks in the same pipeline. Used for: * Logging identification * Dependency references (other_task.field) * Result storage in TaskContext * Status tracking and error reporting

  • config (Optional[Dict[str, Any]]) – Configuration dictionary containing task-specific parameters. The exact keys depend on the task implementation. Common patterns: * API credentials and endpoints * File paths and processing options * Behavioral flags and timeout values * Data transformation parameters Will be validated against validation_rules if defined.

  • dependencies (Optional[List[TaskDependency]]) – List of TaskDependency objects specifying how this task receives data from other tasks. Dependencies are resolved automatically before run() is called, with resolved data added to the config dictionary. Use TaskDependency.create_required() or TaskDependency.create_optional() for convenience.

Attributes Initialized:
  • name: Task identifier

  • config: Configuration dictionary (may be updated by dependencies)

  • dependencies: List of dependency specifications

  • validation_rules: Set by subclasses for configuration validation

  • agent_name: Set automatically by PipelineAgent

  • logger: Task-specific logger (task.{name})

  • Status tracking fields for execution monitoring

Example

Basic task creation:

task = MyTask(
    name="process_data",
    config={
        "input_file": "data.csv",
        "output_format": "json",
        "batch_size": 1000
    }
)

Task with dependencies:

task = TransformTask(
    name="process_results",
    config={
        "transform_function": my_transform_func,
        "output_name": "processed_data"
    },
    dependencies=[
        TaskDependency("input_data", "search_task.results", REQUIRED),
        TaskDependency("options", "config_task.settings", OPTIONAL)
    ]
)

    Note:
        * Task names should be descriptive and use snake_case
        * Config validation occurs during run(), not during initialization
        * Dependencies are resolved by PipelineAgent, not by the task itself
        * Subclasses should call super().__init__() and set validation_rules

    See Also:
        * TaskDependency: For creating task dependencies
        * TaskResult: Return format from run() method
        * Validation rules documentation for config validation format
__str__()[source]

String representation of the task.

Return type:

str

create_task_result_from_current_state()[source]

Create a TaskResult object from current task state.

This method is useful for backward compatibility and migration purposes.

Return type:

TaskResult

get_dependencies()[source]

Get the list of dependencies for this task.

Return type:

List[TaskDependency]

Returns:

List of TaskDependency objects for this task.

get_error()[source]

Get the error message if task failed.

Return type:

Optional[str]

get_execution_time()[source]

Get the task execution time in seconds.

Return type:

float

get_result()[source]

Get the result of the task if completed successfully.

Return type:

Optional[Any]

get_status()[source]

Get the current status of the task.

Return type:

TaskStatus

has_error()[source]

Check if the task has encountered an error.

Return type:

bool

is_completed()[source]

Check if the task has been completed successfully.

Return type:

bool

mark_error(error)[source]

Mark the task as failed with an error message.

Parameters:

error (str)

Return type:

None

mark_started()[source]

Mark the task as started.

Return type:

None

mark_success(result=None)[source]

Mark the task as successfully completed with an optional result.

Parameters:

result (Any)

Return type:

None

reset()[source]

Reset the task to its initial state.

Return type:

None

abstractmethod run()[source]

Execute the task and return a structured result.

This is the main method that subclasses must implement to perform their specific functionality. The method should follow the TaskResult pattern for error handling and return structured results.

Implementation Guidelines

  1. Validation First: Always validate configuration before work:

    def run(self) -> TaskResult:
        start_time = datetime.now()
    
        validation_failure = self._validate_or_fail(start_time)
        if validation_failure:
            return validation_failure
    
  2. Error Handling: Use TaskResult pattern, not exceptions:

    try:
        result = self.do_work()
        return TaskResult.success(data=result, execution_time=elapsed)
    except ExpectedError as e:
        return TaskResult.failure(error_message=str(e), execution_time=elapsed)
    
  3. Timing: Always include execution timing:

    start_time = datetime.now()
    # ... do work ...
    execution_time = (datetime.now() - start_time).total_seconds()
    
  4. Metadata: Include useful execution information:

    return TaskResult.success(
        data=result,
        execution_time=execution_time,
        metadata={
            "processed_items": len(items),
            "api_calls": call_count,
            "cache_hits": cache_hits
        }
    )
    

Return Patterns

  • Success: All work completed successfully

  • Partial: Some work completed, some failed (recoverable)

  • Failure: Task could not complete due to errors

  • Skipped: Task was skipped due to conditions not being met

A returned TaskResult contains:
  • status: SUCCESS, PARTIAL, ERROR, or SKIPPED

  • data: Result data (dict) if successful, None if failed

  • error: Error message if failed, None if successful

  • execution_time: Time taken to execute in seconds

  • metadata: Additional information about execution

Example Implementation

def run(self) -> TaskResult:
    start_time = datetime.now()

    # Validate configuration
    validation_failure = self._validate_or_fail(start_time)
    if validation_failure:
        return validation_failure

    try:
        # Extract configuration
        input_file = self.config["input_file"]
        output_format = self.config.get("output_format", "json")

        # Perform work
        data = self.process_file(input_file)
        formatted_data = self.format_output(data, output_format)

        execution_time = (datetime.now() - start_time).total_seconds()

        return TaskResult.success(
            data={"processed_data": formatted_data, "format": output_format},
            execution_time=execution_time,
            metadata={"records_processed": len(data)}
        )

    except FileNotFoundError:
        execution_time = (datetime.now() - start_time).total_seconds()
        return TaskResult.failure(
            error_message=f"Input file not found: {input_file}",
            execution_time=execution_time,
            metadata={"error_type": "FileNotFoundError"}
        )

Note

  • Never raise exceptions for operational errors

  • Always calculate and include execution_time

  • Use descriptive error messages that help users understand issues

  • Include relevant metadata for debugging and monitoring

Return type:

TaskResult

Returns:

TaskResult

See also

  • TaskResult: For understanding return value structure

  • _validate_or_fail(): For configuration validation pattern

  • Task-specific implementations for concrete examples

set_agent_name(agent_name)[source]

Set the name of the agent that owns this task.

Parameters:

agent_name (str) – Name of the agent that owns this task

Return type:

None

set_context(context)[source]

Set the task context for dependency resolution.

Parameters:

context (TaskContext) – TaskContext instance for resolving dependencies

Return type:

None

Note

Default implementation does nothing. Override in subclasses that use context.

property status_changed_at: datetime

Get the timestamp when the status was last changed.

task(func)[source]

Mark a method as a pipeline task with automatic dependency inference.

The @task decorator marks methods in a PipelineAgent as pipeline tasks. Dependencies are automatically inferred from the method’s parameter names - if a parameter name matches another task’s name, it becomes a dependency.

Parameters:

func (TypeVar(F, bound= Callable[..., Any])) – The method to mark as a task

Return type:

TypeVar(F, bound= Callable[..., Any])

Returns:

The decorated method with task metadata attached

Example:

class MyAgent(PipelineAgent):
    @task
    def fetch_data(self) -> dict:
        return search("AI news", max_results=5)

    @task
    def process(self, fetch_data: dict) -> str:
        # fetch_data parameter auto-wired from fetch_data task
        return llm(f"Summarize: {fetch_data}", model="gpt-4o")
class Depends[source]

Bases: object

Specify an explicit dependency path for a task parameter.

Use with typing.Annotated to specify the exact path to resolve from the task context, rather than using the default inference.

path

Dot-notation path to the dependency value (e.g., “task_name.field”)

Example:
  • from typing import Annotated

  • @task

  • def write_article( – self, # Explicitly get .content field instead of full task output outline: Annotated[str, Depends(“generate_outline.content”)], summary: Annotated[str, Depends(“summarize.content”)]

  • ) -> str – return llm(f”Write from: {outline}”, model=”gpt-4o”)

Parameters:

path (str)

path
classmethod __class_getitem__(path)[source]

Allow Depends[“path”] syntax as alternative to Depends(“path”).

Parameters:

path (str) – Dot-notation path like “task_name.field”

Return type:

Depends

Returns:

Depends instance with the specified path

Example:

param – Annotated[str, Depends[“task.field”]]

__init__(path)[source]

Initialize with a dependency path.

Parameters:

path (str) – Dot-notation path like “task_name.field.nested”

Raises:

ValueError – If path is empty or doesn’t contain a dot

__repr__()[source]

Return string representation.

Return type:

str

class TaskWrapper[source]

Bases: BaseTask

Wraps a decorated @task method as a BaseTask.

TaskWrapper bridges the gap between the new decorator-based syntax and the existing execution engine. It:

  1. Stores a reference to the decorated method

  2. Builds kwargs from resolved dependencies

  3. Calls the method and processes return values

  4. Handles delegation to returned BaseTask instances

This allows the existing TaskExecutionPlan and DependencyResolver to work without modification.

method

The bound method to call

agent

Reference to the parent agent

context_instance

TaskContext for dependency resolution

Example:

class MyAgent(PipelineAgent):
    @task
    def fetch_data(self) -> dict:
        return {"data": "value"}

    @task
    def process(self, fetch_data: dict) -> str:
        return llm(f"Process: {fetch_data}")

# TaskWrapper is created automatically for each @task method
# and handles the execution and return value processing
Parameters:
__init__(name, method, agent, dependencies)[source]

Initialize a task wrapper.

Parameters:
  • name (str) – Task name (usually the method name)

  • method (Callable[..., Any]) – The bound method to call

  • agent (Any) – Reference to the parent agent

  • dependencies (List[TaskDependency]) – Inferred TaskDependency objects

__str__()[source]

String representation.

Return type:

str

run()[source]

Execute the wrapped method and process its return value.

This method: 1. Builds kwargs from resolved dependencies in self.config 2. Calls the decorated method with those kwargs 3. Processes the return value based on its type

Return type:

TaskResult

Returns:

TaskResult with the execution result

set_context(context)[source]

Set the task context for dependency resolution.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

llm(prompt, *, model='gpt-4o', provider='openai', system=None, temperature=0.7, max_tokens=1000, tools=None, response_format=None, timeout=60.0, **kwargs)[source]

Create an LLMTask with a clean, intuitive API.

This helper simplifies LLMTask creation by providing sensible defaults and a more intuitive parameter interface. Designed to be used within @task decorated methods - returns LLMTask for delegation by TaskWrapper.

Parameters:
  • prompt (str) – The prompt to send to the LLM

  • model (str) – Model identifier (default: “gpt-4o”)

  • provider (str) – LLM provider (default: “openai”)

  • system (Optional[str]) – System/context message

  • temperature (float) – Randomness control 0.0-2.0 (default: 0.7)

  • max_tokens (int) – Maximum response tokens (default: 1000)

  • tools (Optional[List[Union[Callable[..., Any], Dict[str, Any]]]]) – List of @tool functions and/or MCP server configs

  • response_format (Union[Type[BaseModel], Dict[str, Any], None]) – Pydantic model or JSON schema for structured output

  • timeout (float) – Request timeout in seconds (default: 60)

  • **kwargs (Any) – Additional LLMTask configuration

Return type:

LLMTask

Returns:

Configured LLMTask ready for delegation

Example:

@task
def summarize(self, articles: list) -> str:
    return llm(
        prompt=f"Summarize these articles: {articles}",
        model="gpt-4o",
        temperature=0.3,
        max_tokens=500
    )

Example with structured output and tools:

@task
def analyze(self, data: dict) -> AnalysisResult:
    return llm(
        prompt=f"Analyze: {data}",
        model="gpt-4o",
        response_format=AnalysisResult,
        tools=[calculator, mcp_server("brave", "https://mcp.brave.com")]
    )
search(query, *, max_results=5, **kwargs)[source]

Create a SearchTask with a clean API.

Parameters:
  • query (str) – Search query string

  • max_results (int) – Maximum number of results (default: 5)

  • **kwargs (Any) – Additional SearchTask configuration

Return type:

SearchTask

Returns:

Configured SearchTask ready for delegation

Example:

@task
def find_articles(self) -> dict:
    topic = self.config.get("topic", "AI")
    return search(f"{topic} latest news", max_results=10)
mcp_server(label, url, *, allowed_tools=None, require_approval='never')[source]

Create MCP server configuration for use with llm().

Model Context Protocol (MCP) servers provide external tools that can be called by LLMs. This helper creates the configuration dict needed by LLMTask.

Parameters:
  • label (str) – Server label/name for identification

  • url (str) – MCP server URL endpoint

  • allowed_tools (Optional[List[str]]) – Optional list of specific tools to enable

  • require_approval (str) – Approval mode - “never”, “always”, or “once”

Return type:

Dict[str, Any]

Returns:

MCP configuration dict for use in tools parameter

Example:

@task
def research(self, topic: str) -> str:
    return llm(
        prompt=f"Research {topic} thoroughly",
        model="claude-sonnet-4",
        provider="anthropic",
        tools=[
            mcp_server("brave", "https://mcp.brave.com"),
            mcp_server("github", "https://mcp.github.com",
                      allowed_tools=["search_repos", "get_file"])
        ]
    )
transform(input_data, fn, *, output_name='result')[source]

Apply a transformation function and return result as dict.

Unlike returning a TransformTask, this helper executes the transformation immediately and returns the result. Use this for simple data transformations within @task methods.

Parameters:
  • input_data (Any) – Data to transform

  • fn (Callable[[Any], Any]) – Transformation function

  • output_name (str) – Key name for the result in output dict

Return type:

Dict[str, Any]

Returns:

Dict with transformed data under output_name and ‘data’ keys

Example:

@task
def extract_urls(self, search_results: dict) -> dict:
    results = search_results.get("results", [])
    return transform(
        results,
        fn=lambda r: [item["url"] for item in r]
    )
class TaskResult[source]

Bases: object

Standardized result format for all task executions.

This class provides a consistent interface for task results across the framework, enabling better error handling, performance tracking, and result processing.

Parameters:
__init__(status, data=None, error=None, metadata=<factory>, execution_time=0.0)
Parameters:
__str__()[source]

String representation of TaskResult.

Return type:

str

add_metadata(key, value)[source]

Add metadata entry.

Parameters:
  • key (str) – Metadata key

  • value (Any) – Metadata value

Return type:

None

data: Any = None
error: Optional[str] = None
execution_time: float = 0.0
classmethod failure(error_message, metadata=None, execution_time=0.0)[source]

Create an error task result.

Parameters:
  • error_message (str) – Description of the error

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with ERROR status

get_legacy_result()[source]

Get result in legacy format for backward compatibility.

This method extracts the data field to maintain compatibility with existing code that expects raw return values.

Return type:

Any

Returns:

The data field, or raises RuntimeError if task failed

get_metadata(key, default=None)[source]

Get metadata value.

Parameters:
  • key (str) – Metadata key

  • default (Any) – Default value if key not found

Return type:

Any

Returns:

Metadata value or default

has_data()[source]

Check if the result contains data.

Return type:

bool

Returns:

True if data is not None

is_error()[source]

Check if the task failed with an error.

Return type:

bool

Returns:

True if status is ERROR

is_partial()[source]

Check if the task completed with partial success.

Return type:

bool

Returns:

True if status is PARTIAL

is_skipped()[source]

Check if the task was skipped.

Return type:

bool

Returns:

True if status is SKIPPED

is_success()[source]

Check if the task completed successfully.

Return type:

bool

Returns:

True if status is SUCCESS

classmethod partial(data=None, error=None, metadata=None, execution_time=0.0)[source]

Create a partial success task result.

Parameters:
  • data (Any) – The partial result data

  • error (Optional[str]) – Optional description of what failed

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with PARTIAL status

classmethod skipped(reason, metadata=None, execution_time=0.0)[source]

Create a skipped task result.

Parameters:
  • reason (str) – Reason why the task was skipped

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with SKIPPED status

classmethod success(data=None, metadata=None, execution_time=0.0)[source]

Create a successful task result.

Parameters:
  • data (Any) – The result data

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with SUCCESS status

status: TaskStatus
metadata: Dict[str, Any]
class TaskStatus[source]

Bases: Enum

Enumeration for task execution status.

NOT_STARTED = 'not_started'
STARTED = 'started'
SUCCESS = 'success'
ERROR = 'error'
SKIPPED = 'skipped'
PARTIAL = 'partial'
wrap_legacy_result(result, execution_time=0.0)[source]

Wrap a legacy task result in TaskResult format.

This utility function helps with gradual migration by wrapping existing task results in the new standardized format.

Parameters:
  • result (Any) – Legacy task result (any type)

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with SUCCESS status containing the legacy result

unwrap_to_legacy(task_result)[source]

Unwrap TaskResult to legacy format for backward compatibility.

Parameters:

task_result (TaskResult) – TaskResult instance

Return type:

Any

Returns:

The data field from the TaskResult

Raises:

RuntimeError – If the task failed

class AgentRunResult[source]

Bases: object

Standardized result format for agent pipeline executions.

This class provides a consistent interface for agent execution results, enabling better error handling, status checking, and result processing.

Note: Individual task results are accessible via agent.context.get_result(task_name)

Parameters:
__init__(status, agent_name, total_tasks=0, completed_tasks=0, failed_tasks=0, total_phases=0, execution_time=0.0, metadata=<factory>, error_message=None)
Parameters:
__str__()[source]

String representation of AgentRunResult.

Return type:

str

add_metadata(key, value)[source]

Add metadata entry.

Parameters:
  • key (str) – Metadata key

  • value (Any) – Metadata value

Return type:

None

completed_tasks: int = 0
error_message: Optional[str] = None
execution_time: float = 0.0
failed_tasks: int = 0
classmethod failure(agent_name, total_tasks, failed_tasks, error_message, total_phases=0, execution_time=0.0, metadata=None)[source]

Create a failed agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • total_tasks (int) – Total number of tasks in the pipeline

  • failed_tasks (int) – Number of tasks that failed

  • error_message (str) – Description of the error

  • total_phases (int) – Number of execution phases

  • execution_time (float) – Total execution time in seconds

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with ERROR status

get_metadata(key, default=None)[source]

Get metadata value.

Parameters:
  • key (str) – Metadata key

  • default (Any) – Default value if key not found

Return type:

Any

Returns:

Metadata value or default

is_error()[source]

Check if the agent execution failed with an error.

Return type:

bool

Returns:

True if status is ERROR

is_partial()[source]

Check if the agent execution completed with partial success.

Return type:

bool

Returns:

True if status is PARTIAL

is_running()[source]

Check if the agent is currently running.

Return type:

bool

Returns:

True if status is RUNNING

is_success()[source]

Check if the agent execution completed successfully.

Return type:

bool

Returns:

True if status is SUCCESS

classmethod partial(agent_name, total_tasks, completed_tasks, failed_tasks, total_phases, execution_time=0.0, metadata=None)[source]

Create a partial success agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • total_tasks (int) – Total number of tasks in the pipeline

  • completed_tasks (int) – Number of tasks that completed successfully

  • failed_tasks (int) – Number of tasks that failed

  • total_phases (int) – Number of execution phases

  • execution_time (float) – Total execution time in seconds

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with PARTIAL status

classmethod running(agent_name, metadata=None)[source]

Create a running agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with RUNNING status

classmethod success(agent_name, total_tasks, completed_tasks, total_phases, execution_time=0.0, metadata=None)[source]

Create a successful agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • total_tasks (int) – Total number of tasks in the pipeline

  • completed_tasks (int) – Number of tasks that completed successfully

  • total_phases (int) – Number of execution phases

  • execution_time (float) – Total execution time in seconds

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with SUCCESS status

total_phases: int = 0
total_tasks: int = 0
status: AgentRunStatus
agent_name: str
metadata: Dict[str, Any]
class AgentRunStatus[source]

Bases: Enum

Enumeration for agent execution status.

SUCCESS = 'success'
PARTIAL = 'partial'
ERROR = 'error'
RUNNING = 'running'
class TaskContext[source]

Bases: object

Simplified context for storing and accessing task results across pipeline execution.

Focuses on string-based data storage and simple dot notation path access. Designed for text-based automation workflows.

__init__()[source]

Initialize empty task context.

__repr__()[source]

Detailed representation of the context.

Return type:

str

__str__()[source]

String representation of the context.

Return type:

str

clear()[source]

Clear all stored results and execution history.

Return type:

None

display_completed_results(task_content_pairs)[source]

Display content for multiple completed tasks in one call.

Ultra-safe: handles all edge cases internally, only displays content for tasks that completed successfully and have valid content.

Parameters:

task_content_pairs (List[tuple[str, str]]) – List of (task_name, title) pairs to display

Return type:

None

display_result_content(task_name, title, field_name='content')[source]

Display task result content in a message box if present.

Ultra-safe: handles all edge cases internally including: - Task not completed - Missing or empty content - Any errors during display

No return value needed - this is a pure convenience method.

Parameters:
  • task_name (str) – Name of the task to display content from

  • title (str) – Title for the message box

  • field_name (str) – Name of the field to display (default: “content”)

Return type:

None

get_all_results()[source]

Get all stored task results.

Return type:

Dict[str, Dict[str, Any]]

Returns:

Dictionary mapping task names to their result dictionaries

get_completed_tasks()[source]

Get names of all completed tasks.

Return type:

List[str]

Returns:

List of task names that completed successfully

get_data(task_name)[source]

Get data for a task (alias for get_result for compatibility).

Parameters:

task_name (str) – Name of the task

Return type:

Optional[Dict[str, Any]]

Returns:

Task data dictionary or None if not found

get_execution_history()[source]

Get the execution history of all tasks.

Return type:

List[Dict[str, Any]]

Returns:

List of execution history entries

get_failed_tasks()[source]

Get names of all failed tasks.

Return type:

List[str]

Returns:

List of task names that failed

get_path_value(path)[source]

Get a value from context using paths like ‘task.field’, ‘task.array[0]’, or ‘task.array[].field’.

Parameters:

path (str) – The path to resolve (e.g., ‘search.query’, ‘search.results[0].title’, ‘search.results[].url’)

Return type:

Optional[Any]

Returns:

The resolved value or None if path cannot be resolved

get_result(task_name)[source]

Get a task result from the context.

Parameters:

task_name (str) – Name of the task to get result for

Return type:

Optional[Dict[str, Any]]

Returns:

The stored result dictionary or None if not found

get_result_content(task_name)[source]

Get the ‘content’ field from a task result as a string.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:

task_name (str) – Name of the task to get content from

Return type:

Optional[str]

Returns:

The content string if found and valid, otherwise None

get_result_field(task_name, field_name, expected_type=str, default=None)[source]

Get a specific field from a task result with type checking.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:
  • task_name (str) – Name of the task to get result from

  • field_name (str) – Name of the field to extract

  • expected_type (Type[TypeVar(T)]) – Expected type of the field value (default: str)

  • default (Optional[TypeVar(T)]) – Default value to return if field not found or wrong type

Return type:

Optional[TypeVar(T)]

Returns:

The field value if found and correct type, otherwise default

get_result_fields(task_name, *field_names)[source]

Get multiple fields from a task result at once.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:
  • task_name (str) – Name of the task to get fields from

  • *field_names (str) – Names of the fields to extract

Return type:

tuple[Any, ...]

Returns:

Tuple of field values in the same order as field_names

get_task_count()[source]

Get the number of tasks with stored results.

Return type:

int

Returns:

Number of tasks with results

has_result(task_name)[source]

Check if a task result exists in the context.

Parameters:

task_name (str) – Name of the task to check

Return type:

bool

Returns:

True if result exists, False otherwise

has_result_content(task_name)[source]

Check if a task has non-empty content.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:

task_name (str) – Name of the task to check

Return type:

bool

Returns:

True if task has non-empty content field, False otherwise

record_task_completed(task_name, result)[source]

Record that a task has completed successfully.

Parameters:
  • task_name (str) – Name of the task that completed

  • result (Dict[str, Any]) – The result produced by the task

Return type:

None

record_task_failed(task_name, error)[source]

Record that a task has failed.

Parameters:
  • task_name (str) – Name of the task that failed

  • error (str) – Error message describing the failure

Return type:

None

record_task_started(task_name)[source]

Record that a task has started execution.

Parameters:

task_name (str) – Name of the task that started

Return type:

None

set_data(task_name, data)[source]

Set data for a task (alias for store_result for compatibility).

Parameters:
  • task_name (str) – Name of the task

  • data (Dict[str, Any]) – Data dictionary to store

Return type:

None

store_result(task_name, result)[source]

Store a task result in the context.

Parameters:
  • task_name (str) – Name of the task that produced the result

  • result (Dict[str, Any]) – The result data to store (dictionary with string keys)

Return type:

None

class TaskDependency[source]

Bases: object

Specification for automatic data flow between tasks in a pipeline.

TaskDependency defines how a task receives data from other tasks in the pipeline. Dependencies are resolved automatically by the PipelineAgent before task execution, with resolved data added to the task’s configuration.

The dependency system supports flexible path-based data access, optional data transformation, and both required and optional dependencies with appropriate error handling.

Core Components:
  • name: Key in target task’s config where resolved data is stored

  • source_path: Dot-notation path to source data in TaskContext

  • dependency_type: REQUIRED or OPTIONAL behavior for missing data

  • transform_func: Optional function to process resolved data

  • default_value: Fallback value for optional dependencies

Path Syntax:
  • Source paths use dot notation to access nested data

  • “task.field” - Direct field access

  • “task.data.nested.value” - Nested object access

  • “task.results[].url” - Array element extraction

  • “task.metadata.statistics” - Metadata access

Transform Functions

Optional preprocessing of resolved data before injection:

def extract_titles(articles):
    return [article.get('title', 'Untitled') for article in articles]

TaskDependency(
    "titles",
    "search.results",
    REQUIRED,
    transform_func=extract_titles
)

Common Patterns

Basic data passing:

TaskDependency("search_results", "search_task.results", REQUIRED)

Data transformation:

TaskDependency(
    "article_urls",
    "search_task.results",
    REQUIRED,
    transform_func=lambda results: [r['url'] for r in results]
)

Optional configuration:

TaskDependency(
    "processing_options",
    "config_task.settings",
    OPTIONAL,
    default_value={"batch_size": 100, "timeout": 30}
)

Nested data access:

TaskDependency("api_endpoint", "config_task.api.endpoints.primary", REQUIRED)

Complex transformation:

def combine_and_filter(search_results):
    # Filter recent articles and combine text
    recent = [r for r in search_results if is_recent(r)]
    return ' '.join(r.get('content', '') for r in recent)

TaskDependency(
    "combined_content",
    "search_task.results",
    REQUIRED,
    transform_func=combine_and_filter
)

Error Handling

  • Required dependencies: Missing data causes task execution failure

  • Optional dependencies: Missing data uses default_value

  • Transform errors: Transform function exceptions cause dependency failure

  • Path errors: Invalid paths cause resolution failure with detailed messages

Best Practices

  • Use descriptive names that indicate the data being passed

  • Prefer specific paths over broad data passing

  • Use transform functions to shape data for consuming tasks

  • Provide meaningful default values for optional dependencies

  • Document complex transform functions for maintainability

Thread Safety

TaskDependency instances are immutable after creation and thread-safe. However, transform functions should be thread-safe if used in parallel execution.

See also

  • DependencyType: REQUIRED vs OPTIONAL dependency behavior

  • DependencyResolver: Automatic resolution engine

  • TaskContext: Source of dependency data

  • Built-in transform utilities: extract_urls_from_results, etc.

Parameters:
__init__(name, source_path, dependency_type, default_value=None, transform_func=None, override_existing=False, description='')[source]

Initialize a task dependency specification.

Creates a dependency that will be resolved automatically by the PipelineAgent before task execution. The resolved data will be injected into the task’s configuration under the specified name.

Note

  • Dependencies are resolved in the order they are defined

  • Transform functions should be deterministic and thread-safe

  • Source task must complete successfully for REQUIRED dependencies

  • Path resolution supports nested objects and array access

Parameters:
  • name (str) – Key name where resolved data will be stored in the target task’s config dictionary. Should be descriptive and follow naming conventions (snake_case recommended). This is how the consuming task will access the dependency data.

  • source_path (str) –

    Dot-notation path to the source data in TaskContext. Format: “source_task_name.field_path”. .. admonition:: Examples

    • ”search.results” - Access results field from search task

    • ”fetch.data.articles[].content” - Extract content from article array

    • ”config.api.endpoints.primary” - Access nested configuration

    • ”process.metadata.item_count” - Access metadata fields

  • dependency_type (DependencyType) –

    How to handle missing dependencies:

    • DependencyType.REQUIRED: Task execution fails if unavailable

    • DependencyType.OPTIONAL: Uses default_value if unavailable

  • default_value (Any) –

    Value to use when optional dependency is unavailable. Only relevant for OPTIONAL dependencies. Can be any type that makes sense for the consuming task. Common patterns:

    • Empty list: []

    • Empty dict: {}

    • Default config: {“timeout”: 30, “retries”: 3}

    • None: None (explicit null)

  • transform_func (Optional[Callable[[Any], Any]]) –

    Optional function to preprocess resolved data before injection. Function signature: (resolved_data: Any) -> Any. Common uses:

    • Extract specific fields: lambda x: [item[‘url’] for item in x]

    • Filter data: lambda x: [item for item in x if item[‘valid’]]

    • Combine data: lambda x: ‘ ‘.join(x)

    • Format data: lambda x: {“processed”: x, “count”: len(x)}

  • override_existing (bool) –

    Whether to override existing values in task config.

    • False (default): Only inject if key doesn’t exist in config

    • True: Always inject, overriding existing config values

    Use with caution as it can override user-provided configuration.

  • description (str) – Human-readable description of this dependency’s purpose. Helpful for documentation and debugging. Should explain what data is being passed and how it will be used.

Example:

Basic dependency:

# Pass search results to summarization task

TaskDependency(
    name="search_results",
    source_path="web_search.results",
    dependency_type=DependencyType.REQUIRED
)

Transformed dependency:

TaskDependency(
    name="article_urls",
    source_path="search.results",
    dependency_type=DependencyType.REQUIRED,
    transform_func=lambda results: [r['url'] for r in results],
    description="Extract URLs from search results for fetching"
)

Optional dependency with default:

TaskDependency(
    name="processing_config",
    source_path="config_loader.settings",
    dependency_type=DependencyType.OPTIONAL,
    default_value={"batch_size": 100, "parallel": True},
    description="Processing configuration with sensible defaults"
)
Raises:

ValueError – If name is empty, source_path is empty, or source_path doesn’t contain a dot (invalid format).

See also

  • DependencyType: For dependency type behavior

  • DependencyResolver: For resolution implementation details

  • Built-in transform functions: extract_urls_from_results, etc.

__repr__()[source]

Detailed representation of the dependency.

Return type:

str

__str__()[source]

String representation of the dependency.

Return type:

str

is_optional()[source]

Check if this is an optional dependency.

Return type:

bool

is_required()[source]

Check if this is a required dependency.

Return type:

bool

class DependencyType[source]

Bases: Enum

Enumeration of dependency types for task data flow.

Dependency types control how missing dependencies are handled:

  • REQUIRED: Task execution fails if dependency cannot be resolved

  • OPTIONAL: Default value used if dependency unavailable

Example

# Required dependency - task fails if search_task not available
TaskDependency("query_results", "search_task.results", REQUIRED)

# Optional dependency - uses default if config_task unavailable
TaskDependency("settings", "config_task.options", OPTIONAL, default_value={})
REQUIRED = 'required'
OPTIONAL = 'optional'
class DependencyResolver[source]

Bases: object

Resolves task dependencies from context and builds task configuration.

Parameters:

context (TaskContext)

__init__(context)[source]

Initialize dependency resolver with context.

Parameters:

context (TaskContext) – TaskContext to resolve dependencies from

get_dependency_info(task)[source]

Get information about all dependencies for a task.

Parameters:

task (BaseTask) – Task to get dependency info for

Return type:

List[Dict[str, Any]]

Returns:

List of dependency information dictionaries

resolve_dependencies(task)[source]

Resolve all dependencies for a task and return merged configuration.

Parameters:

task (BaseTask) – Task instance with get_dependencies() method

Return type:

Dict[str, Any]

Returns:

Dictionary with resolved dependencies merged with existing config

Raises:

ValueError – If required dependencies cannot be satisfied

validate_dependencies(task)[source]

Validate that all dependencies for a task can be satisfied.

Parameters:

task (BaseTask) – Task to validate dependencies for

Return type:

List[str]

Returns:

List of validation error messages (empty if all valid)

create_required_dependency(name, source_path, transform_func=None)[source]

Create a required dependency.

Parameters:
  • name (str) – Name of the dependency

  • source_path (str) – Context path to resolve

  • transform_func (Optional[Callable[[Any], Any]]) – Optional transformation function

Return type:

TaskDependency

Returns:

TaskDependency instance

create_optional_dependency(name, source_path, default_value, transform_func=None)[source]

Create an optional dependency with default value.

Parameters:
  • name (str) – Name of the dependency

  • source_path (str) – Context path to resolve

  • default_value (Any) – Default value if resolution fails

  • transform_func (Optional[Callable[[Any], Any]]) – Optional transformation function

Return type:

TaskDependency

Returns:

TaskDependency instance

extract_urls_from_results(search_results)[source]

Extract URLs from search results.

Parameters:

search_results (Dict[str, Any]) – Search results dictionary

Return type:

List[str]

Returns:

List of URLs

combine_article_content(articles, separator='\\n\\n')[source]

Combine content from multiple articles.

Parameters:
  • articles (List[Dict[str, Any]]) – List of article dictionaries

  • separator (str) – Separator between articles

Return type:

str

Returns:

Combined content string

format_search_query(keywords, filters=None)[source]

Format search query with optional filters.

Parameters:
  • keywords (str) – Base search keywords

  • filters (Optional[Dict[str, Any]]) – Optional filters to apply

Return type:

str

Returns:

Formatted search query

class LLMTask[source]

Bases: BaseTask

Advanced LLM integration task with template substitution and tool calling.

LLMTask provides sophisticated Large Language Model integration for AI pipelines. It supports dynamic prompt generation through template substitution, automatic function/tool calling, and seamless integration with multiple LLM providers.

The task automatically resolves template variables from dependency data and manages the complete LLM interaction lifecycle including request formatting, response parsing, tool execution, and error handling.

Core Capabilities:
  • Dynamic Prompts: ${variable} template substitution from dependencies

  • Tool Integration: Automatic function calling with @tool decorated functions

  • Multi-Provider: Support for 50+ LLM providers via litellm

  • Response Processing: Structured response parsing and validation

  • Error Recovery: Graceful handling of API failures and timeouts

  • Usage Tracking: Detailed token usage and cost monitoring

Template Substitution:
Templates use ${variable_name} syntax and are resolved from:
  1. Task dependencies (via TaskDependency)

  2. Task configuration values

  3. Environment variables (as fallback)

Template fields support:
  • prompt/prompt_template: Main user message

  • context: System/context message

  • role: User role description

Tool Calling Workflow:
  1. Functions decorated with @tool are automatically registered

  2. LLM decides which tools to call based on prompt

  3. Tools execute with validated parameters

  4. Results injected back into conversation

  5. LLM continues with tool results

Provider Configuration

Different providers require different configuration:

OpenAI:

{
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.7,
    "max_tokens": 1000
}

Ollama (local):

{
    "llm_provider": "ollama",
    "llm_model": "llama2",
    "temperature": 0.8
}

Response Structure

Successful responses include:

{
    "content": "Generated text response",
    "usage": {
        "prompt_tokens": 150,
        "completion_tokens": 300,
        "total_tokens": 450
    },
    "tool_calls": [
        {
            "tool_name": "search_web",
            "arguments": {"query": "AI trends"},
            "result": {"results": [...]},
            "success": True
        }
    ],
    "model": "gpt-4",
    "provider": "openai",
    "metadata": {
        "response_time": 2.34,
        "template_variables": ["topic", "domain"]
    }
}

Common Usage Patterns

Content generation with templates:

LLMTask("writer", {
    "prompt": "Write a ${length} article about ${topic} for ${audience}",
    "context": "You are an expert ${field} writer",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.7
}, dependencies=[
    TaskDependency("topic", "input.topic", REQUIRED),
    TaskDependency("length", "config.article_length", OPTIONAL, default_value="brief"),
    TaskDependency("audience", "config.target_audience", REQUIRED),
    TaskDependency("field", "config.expertise_field", REQUIRED)
])

Research with tool calling:

@tool
def search_academic_papers(query: str, limit: int = 5) -> List[Dict]:
    # Search implementation
    return papers

LLMTask("researcher", {
    "prompt": "Research ${topic} and provide comprehensive analysis",
    "tools": [search_academic_papers],
    "llm_provider": "anthropic",
    "llm_model": "claude-3-opus",
    "tool_choice": "auto",
    "max_tokens": 3000
})

Data analysis and reasoning:

LLMTask("analyzer", {
    "prompt": "Analyze this data and provide insights: ${data}",
    "context": "You are a data scientist specializing in ${domain}",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.3,  # Lower temperature for analytical tasks
    "max_tokens": 2000
}, dependencies=[
    TaskDependency("data", "processor.results", REQUIRED),
    TaskDependency("domain", "config.analysis_domain", REQUIRED)
])

Error Handling

Common failure scenarios and recovery:

  • API Errors: Network failures, authentication issues

  • Rate Limits: Automatic retry with exponential backoff

  • Invalid Models: Clear error messages for unsupported models

  • Tool Failures: Individual tool errors don’t fail entire task

  • Template Errors: Missing variables result in clear error messages

Performance Considerations

  • Use appropriate max_tokens to control costs and latency

  • Lower temperature (0.1-0.3) for factual/analytical tasks

  • Higher temperature (0.7-1.0) for creative tasks

  • Consider model capabilities vs cost (gpt-3.5-turbo vs gpt-4)

  • Use tool_choice=”none” to disable tool calling when not needed

See also

  • @tool: For creating callable functions

  • TaskDependency: For template variable injection

  • litellm: Underlying provider abstraction

  • Tool calling examples in the examples package

Parameters:
DEFAULT_MAX_TOKENS = 1000
DEFAULT_TEMPERATURE = 0.7
DEFAULT_TIMEOUT = 60
REQUIRED_CONFIGS = ['llm_provider', 'llm_model']
__init__(name, config=None, dependencies=None)[source]

Initialize an advanced LLM task with template and tool support.

Creates a new LLM task that can automatically substitute template variables from dependencies, execute function calls, and interact with multiple LLM providers. The task handles the complete LLM interaction lifecycle.

Parameters:
  • name (str) – Unique identifier for this task within the pipeline. Used for logging, dependency references, and result storage.

  • config (Optional[Dict[str, Any]]) –

    LLM configuration dictionary with the following keys:

    Required:

    • llm_provider (str): LLM provider name. Supported values include: “openai”, “anthropic”, “ollama”, “azure”, “google”, “cohere”, etc.

    • llm_model (str): Model identifier specific to the provider:

      • OpenAI: “gpt-4”, “gpt-3.5-turbo”, “gpt-4-turbo-preview”

      • Anthropic: “claude-3-opus”, “claude-3-sonnet”, “claude-3-haiku”

      • Ollama: “llama2”, “mistral”, “codellama”

      • Google: “gemini-pro”, “gemini-pro-vision”

    Prompt Configuration:

    • prompt OR prompt_template (str): Main user message with ${var} templates

    • context (str, optional): System/context message (supports templates)

    • role (str, optional): User role description (supports templates)

    Generation Parameters:

    • temperature (float, 0.0-2.0): Randomness control (default: 0.7)

    • max_tokens (int): Maximum response tokens (default: 1000)

    • timeout (float): Request timeout in seconds (default: 60)

    Tool Calling:

    • tools (List[Callable | Dict], optional): List of @tool decorated functions and/or MCP server configurations. Each item can be:

      • Python function: Decorated with @tool for local execution

      • MCP config dict: {“type”: “mcp”, “server_label”: “…”, “server_url”: “…”}

    • tool_choice (str, optional): “auto”, “none”, or specific tool name

    • parallel_tool_calls (bool): Enable parallel tool execution (default: True)

    • max_tool_execution_time (float): Tool timeout in seconds (default: 30)

    Provider-Specific:

    • api_key (str, optional): API key (can also use environment variables)

    • api_base (str, optional): Custom API endpoint (for Azure, local deployments)

    • api_version (str, optional): API version (for Azure OpenAI)

  • dependencies (Optional[List[TaskDependency]]) – List of TaskDependency objects for template variable injection. Template variables in prompt, context, and role fields will be automatically replaced with resolved dependency values.

Examples

Basic text generation

LLMTask("summarizer", {
    "prompt": "Summarize this text: ${content}",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.3,
    "max_tokens": 500
}, dependencies=[
    TaskDependency("content", "input.text", REQUIRED)
])

Advanced with tools and context:

@tool
def web_search(query: str) -> Dict[str, Any]:
    return {"results": search_engine.search(query)}

LLMTask("researcher", {
    "prompt": "Research ${topic} and provide analysis",
    "context": "You are an expert ${field} researcher with access to web search",
    "llm_provider": "anthropic",
    "llm_model": "claude-3-opus",
    "tools": [web_search],
    "tool_choice": "auto",
    "temperature": 0.5,
    "max_tokens": 3000
}, dependencies=[
    TaskDependency("topic", "config.research_topic", REQUIRED),
    TaskDependency("field", "config.expertise_field", REQUIRED)
])

Local model with Ollama:

LLMTask("local_chat", {
    "prompt": "Help with this question: ${question}",
    "llm_provider": "ollama",
    "llm_model": "llama2",
    "temperature": 0.8,
    "max_tokens": 2000
})

Template Variables

Variables in prompt, context, and role fields use ${variable_name} syntax. Resolution order:

  1. Dependency values (from TaskDependency objects)

  2. Direct config values

  3. Environment variables

  4. Error if required variable not found

Tool Integration

Functions decorated with @tool are automatically:

  1. Registered with the LLM provider

  2. Made available to the model during generation

  3. Executed when called by the model

  4. Results injected back into the conversation

Error Handling

Configuration errors are caught during validation:

  • Missing required fields (llm_provider, llm_model)

  • Invalid parameter ranges (temperature, max_tokens)

  • Unsupported tool configurations

  • Provider-specific validation

Environment Variables

API keys can be provided via environment variables:

  • OPENAI_API_KEY for OpenAI

  • GOOGLE_API_KEY for Google

  • Etc.

See also

  • @tool: For creating callable functions

  • TaskDependency: For template variable injection

  • Provider documentation for model-specific capabilities

__str__()[source]

Enhanced string representation including template info.

Return type:

str

get_resolved_context()[source]

Get the resolved context after template substitution.

Return type:

Optional[str]

Returns:

Resolved context string or None if not yet resolved

get_resolved_prompt()[source]

Get the resolved prompt after template substitution.

Return type:

Optional[str]

Returns:

Resolved prompt string or None if not yet resolved

get_resolved_role()[source]

Get the resolved role after template substitution.

Return type:

Optional[str]

Returns:

Resolved role string or None if not yet resolved

preview_resolved_templates()[source]

Preview what the templates would resolve to with current context.

Return type:

Dict[str, str]

Returns:

Dictionary with resolved template previews

run()[source]

Execute the LLM task with context-resolved prompts and logging.

Return type:

TaskResult

set_context(context)[source]

Set the task context for template resolution.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

class SearchTask[source]

Bases: BaseTask

Simple web search task that executes searches and returns results.

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize search task.

Parameters:

Config parameters: - query: Search query string (required) - max_results: Maximum number of search results (default: 5) - serper_api_key: Optional API key for Serper search

__str__()[source]

String representation of the search task.

Return type:

str

run()[source]

Execute the search and return results.

Return type:

TaskResult

class ConditionalTask[source]

Bases: BaseTask

Task that executes actions based on conditional logic using context data.

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize conditional task.

Parameters:

Config parameters: - condition_function: Function that evaluates to True/False - condition_inputs: List of input field names for condition function - action_function: Function to execute when condition is True - action_inputs: List of input field names for action function - else_function: Optional function to execute when condition is False - else_inputs: List of input field names for else function - skip_reason: Custom reason message when condition is False

__str__()[source]

String representation of the conditional task.

Return type:

str

get_execution_summary(context)[source]

Get a formatted summary of the conditional task execution results.

Parameters:

context (TaskContext) – TaskContext to retrieve results from

Return type:

Optional[Dict[str, Any]]

Returns:

Dictionary containing execution summary with formatted info, or None if no results found

preview_condition()[source]

Preview what the condition would evaluate to with current input.

Return type:

Dict[str, Any]

Returns:

Preview information about the condition

run()[source]

Execute the conditional task.

Return type:

TaskResult

set_context(context)[source]

Set the task context.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

threshold_condition(threshold, operator='>=')[source]

Create a condition function that checks if a value meets a threshold.

Parameters:
  • threshold (float) – Threshold value to compare against

  • operator (str) – Comparison operator (>=, >, <=, <, ==, !=)

Return type:

Callable[[float], bool]

Returns:

Condition function

contains_condition(search_term, case_sensitive=False)[source]

Create a condition function that checks if text contains a term.

Parameters:
  • search_term (str) – Term to search for

  • case_sensitive (bool) – Whether search is case sensitive

Return type:

Callable[[str], bool]

Returns:

Condition function

list_size_condition(min_size=None, max_size=None)[source]

Create a condition function that checks list size.

Parameters:
  • min_size (Optional[int]) – Minimum required size (inclusive)

  • max_size (Optional[int]) – Maximum allowed size (inclusive)

Return type:

Callable[[List[Any]], bool]

Returns:

Condition function

success_rate_condition(min_success_rate)[source]

Create a condition function that checks success rate from a results dict.

Parameters:

min_success_rate (float) – Minimum success rate (0.0 to 1.0)

Return type:

Callable[[Dict[str, Any]], bool]

Returns:

Condition function

quality_gate_condition(min_score, score_field='score')[source]

Create a condition function that checks quality score.

Parameters:
  • min_score (float) – Minimum quality score

  • score_field (str) – Field name containing the score

Return type:

Callable[[Dict[str, Any]], bool]

Returns:

Condition function

log_action(message, level='info')[source]

Create an action function that logs a message.

Parameters:
  • message (str) – Message to log

  • level (str) – Log level (info, warning, error)

Return type:

Callable[[], Dict[str, Any]]

Returns:

Action function

increment_counter_action(counter_name='counter')[source]

Create an action function that increments a counter in context.

Parameters:

counter_name (str) – Name of the counter in context

Return type:

Callable[[TaskContext], Dict[str, Any]]

Returns:

Action function

set_flag_action(flag_name, flag_value=True)[source]

Create an action function that sets a flag value.

Parameters:
  • flag_name (str) – Name of the flag

  • flag_value (Any) – Value to set

Return type:

Callable[[], Dict[str, Any]]

Returns:

Action function

class TransformTask[source]

Bases: BaseTask

Task that applies transformations to data from context.

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize transform task.

Parameters:
Config parameters:
  • transform_function: Function to apply transformation

  • input_field: Single input field name (for single-input transforms)

  • input_fields: List of input field names (for multi-input transforms)

  • output_name: Name for the output in the result

  • validate_input: Whether to validate input data (default: True)

  • validate_output: Whether to validate output data (default: False)

__str__()[source]

String representation of the transform task.

Return type:

str

property input_data: Any

Public readonly property to access input data for testing purposes.

Returns:

Input data for transformation

preview_transformation()[source]

Preview what the transformation would do with current input.

Return type:

Dict[str, Any]

Returns:

Preview information about the transformation

run()[source]

Execute the transformation.

Return type:

TaskResult

set_context(context)[source]

Set the task context.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

extract_field_from_list(field_name)[source]

Create a transformation function that extracts a field from each item in a list.

Parameters:

field_name (str) – Name of the field to extract

Return type:

Callable[[List[Dict[str, Any]]], List[Any]]

Returns:

Transformation function

combine_text_fields(separator='\\n\\n', title_field='title', content_field='content')[source]

Create a transformation function that combines text from multiple items.

Parameters:
  • separator (str) – Separator between items

  • title_field (str) – Field name for titles

  • content_field (str) – Field name for content

Return type:

Callable[[List[Dict[str, Any]]], str]

Returns:

Transformation function

filter_by_condition(condition_func)[source]

Create a transformation function that filters items by a condition.

Parameters:

condition_func (Callable[[Any], bool]) – Function that returns True for items to keep

Return type:

Callable[[List[Any]], List[Any]]

Returns:

Transformation function

aggregate_numeric_field(field_name, operation='sum')[source]

Create a transformation function that aggregates a numeric field.

Parameters:
  • field_name (str) – Name of the numeric field

  • operation (str) – Aggregation operation (sum, avg, min, max, count)

Return type:

Callable[[List[Dict[str, Any]]], float]

Returns:

Transformation function

format_as_markdown_list(title_field='title', url_field='url')[source]

Create a transformation function that formats items as a markdown list.

Parameters:
  • title_field (str) – Field name for titles

  • url_field (str) – Field name for URLs

Return type:

Callable[[List[Dict[str, Any]]], str]

Returns:

Transformation function

tool(func)[source]

Decorator to mark a function as a tool and extract metadata.

Parameters:

func (Callable[..., Any]) – Function to be marked as a tool

Return type:

Callable[..., Any]

Returns:

Decorated function with tool metadata attached

Example:
  • @tool

  • def calculate(expression – str) -> float: ‘’’Perform mathematical calculations.

    param expression:

    Mathematical expression to evaluate

    returns:

    Calculated result

    ‘’’ return eval(expression) # Use safe_eval in production

class ToolMetadata[source]

Bases: object

Stores metadata for a tool function.

Parameters:
__init__(func, name, description)[source]
Parameters:
class ToolSchemaGenerator[source]

Bases: object

Generates OpenAI-compatible schemas from decorated functions.

static generate_schema(tool_func)[source]

Generate OpenAI function schema from tool metadata.

Parameters:

tool_func (Callable[..., Any]) – Function decorated with @tool

Return type:

Dict[str, Any]

Returns:

OpenAI-compatible function schema

Raises:

ValueError – If function is not decorated with @tool

static validate_tool_function(tool_func)[source]

Validate that a function is properly decorated as a tool.

Parameters:

tool_func (Callable[..., Any]) – Function to validate

Return type:

bool

Returns:

True if function is a valid tool, False otherwise

search_with_content(query, max_results=5, max_content_results=5, content_timeout=15, html_method='readability', skip_pdf=False)[source]

Search the web and automatically fetch content from the top results.

This tool provides enhanced search functionality that not only returns search results but also automatically fetches and extracts readable content from the top URLs. Perfect for research tasks where you need both search results and their content.

Parameters:
  • query (str) – The search query string to search for

  • max_results (int) – Maximum number of search results to return (1-10, default: 5)

  • max_content_results (int) – Maximum number of results to fetch content from (1-10, default: 5)

  • content_timeout (int) – Timeout in seconds for fetching content from each URL (default: 15)

  • html_method (str) – Method for HTML text extraction - “readability” or “basic” (default: “readability”)

  • skip_pdf (bool) – Whether to skip PDF files when fetching content (default: False)

Return type:

Dict[str, Any]

Returns:

Dictionary containing search results with fetched content including query, total_results, search_time, results list with title/url/snippet/content for each, and content_stats with attempted/successful/failed/skipped counts.

class ToolRegistry[source]

Bases: object

Manages tool schemas and implementations with automatic generation.

Parameters:

tool_functions (List[Callable[..., Any]])

__init__(tool_functions)[source]

Initialize tool registry with list of decorated functions.

Parameters:

tool_functions (List[Callable[..., Any]]) – List of functions decorated with @tool

Raises:

ValueError – If any function is not decorated with @tool

__repr__()[source]

Detailed string representation of the tool registry.

Return type:

str

__str__()[source]

String representation of the tool registry.

Return type:

str

generate_tool_context()[source]

Generate context description for system prompt.

Return type:

str

Returns:

Formatted tool context for inclusion in system prompt

get_tool_count()[source]

Get the number of registered tools.

Return type:

int

Returns:

Number of tools in registry

get_tool_function(name)[source]

Get tool function by name.

Parameters:

name (str) – Name of the tool to retrieve

Return type:

Callable[..., Any]

Returns:

Tool function implementation

Raises:

ValueError – If tool is not found

get_tool_metadata(name)[source]

Get tool metadata by name.

Parameters:

name (str) – Name of the tool

Return type:

ToolMetadata

Returns:

ToolMetadata instance

Raises:

ValueError – If tool is not found

get_tool_schemas()[source]

Get all tool schemas for LLM API.

Return type:

List[Dict[str, Any]]

Returns:

List of OpenAI-compatible tool schemas

has_tool(name)[source]

Check if a tool exists in the registry.

Parameters:

name (str) – Name of the tool to check

Return type:

bool

Returns:

True if tool exists, False otherwise

list_tool_names()[source]

Get list of all registered tool names.

Return type:

List[str]

Returns:

List of tool names

validate_all_tools()[source]

Validate all registered tools.

Return type:

Dict[str, bool]

Returns:

Dictionary mapping tool names to validation status

class ToolExecutor[source]

Bases: object

Handles tool execution with error handling and logging.

Parameters:
__init__(tool_registry, max_execution_time=30.0)[source]

Initialize tool executor.

Parameters:
  • tool_registry (ToolRegistry) – Registry containing tool implementations

  • max_execution_time (float) – Maximum time (seconds) to allow for tool execution

__str__()[source]

String representation of the tool executor.

Return type:

str

execute_multiple_tools(tool_calls)[source]

Execute multiple tools in sequence.

Parameters:

tool_calls (List[Any]) – List of tool call objects (expected to be dictionaries with ‘name’ and ‘arguments’)

Return type:

List[Dict[str, Any]]

Returns:

List of execution results for each tool call

execute_tool(tool_name, arguments)[source]

Execute a tool and return result or error.

Parameters:
  • tool_name (str) – Name of the tool to execute

  • arguments (Dict[str, Any]) – Dictionary of arguments to pass to the tool

Returns:

  • success (bool): Whether execution was successful

  • result (Any): Tool result if successful

  • error (str): Error message if failed

  • execution_time (float): Time taken in seconds

  • tool_name (str): Name of executed tool

  • error_type (str): Type of error if failed

Return type:

Dictionary with execution result containing

get_execution_stats()[source]

Get execution statistics for monitoring.

Return type:

Dict[str, Any]

Returns:

Dictionary with execution statistics

class BatchArticleSummarizeTask[source]

Bases: BaseTask

Reusable task that creates individual summaries for each article using separate LLM calls.

This task processes a list of articles and creates summaries with configurable length. It includes content validation, error handling, and detection of generic LLM responses.

Configuration Options:
  • summary_length: Target summary length in characters (default: 1000)

  • content_limit: Maximum content length to send to LLM (default: 3000)

  • min_content_length: Minimum content length to process (default: 50)

  • llm_provider: LLM provider to use (default: “openai”)

  • llm_model: LLM model to use (default: “gpt-4o-mini”)

  • temperature: LLM temperature (default: 0.3)

  • max_tokens: Maximum tokens for LLM response (default: 300)

Parameters:
__init__(name, config, dependencies=None)[source]

Initialize a new task instance with configuration and dependencies.

Creates a new task with the specified configuration. The task will be initialized with default status and logging setup. Dependencies will be resolved automatically by the PipelineAgent before execution.

Parameters:
  • name (str) – Unique identifier for this task within the agent. Must be unique across all tasks in the same pipeline. Used for: * Logging identification * Dependency references (other_task.field) * Result storage in TaskContext * Status tracking and error reporting

  • config (Dict[str, Any]) – Configuration dictionary containing task-specific parameters. The exact keys depend on the task implementation. Common patterns: * API credentials and endpoints * File paths and processing options * Behavioral flags and timeout values * Data transformation parameters Will be validated against validation_rules if defined.

  • dependencies (Optional[List[TaskDependency]]) – List of TaskDependency objects specifying how this task receives data from other tasks. Dependencies are resolved automatically before run() is called, with resolved data added to the config dictionary. Use TaskDependency.create_required() or TaskDependency.create_optional() for convenience.

Attributes Initialized:
  • name: Task identifier

  • config: Configuration dictionary (may be updated by dependencies)

  • dependencies: List of dependency specifications

  • validation_rules: Set by subclasses for configuration validation

  • agent_name: Set automatically by PipelineAgent

  • logger: Task-specific logger (task.{name})

  • Status tracking fields for execution monitoring

Example

Basic task creation:

task = MyTask(
    name="process_data",
    config={
        "input_file": "data.csv",
        "output_format": "json",
        "batch_size": 1000
    }
)

Task with dependencies:

task = TransformTask(
    name="process_results",
    config={
        "transform_function": my_transform_func,
        "output_name": "processed_data"
    },
    dependencies=[
        TaskDependency("input_data", "search_task.results", REQUIRED),
        TaskDependency("options", "config_task.settings", OPTIONAL)
    ]
)

    Note:
        * Task names should be descriptive and use snake_case
        * Config validation occurs during run(), not during initialization
        * Dependencies are resolved by PipelineAgent, not by the task itself
        * Subclasses should call super().__init__() and set validation_rules

    See Also:
        * TaskDependency: For creating task dependencies
        * TaskResult: Return format from run() method
        * Validation rules documentation for config validation format
get_dependencies()[source]

Get the list of dependencies for this task.

Return type:

List[TaskDependency]

Returns:

List of TaskDependency objects for this task.

run()[source]

Execute the task and return a structured result.

This is the main method that subclasses must implement to perform their specific functionality. The method should follow the TaskResult pattern for error handling and return structured results.

Implementation Guidelines

  1. Validation First: Always validate configuration before work:

    def run(self) -> TaskResult:
        start_time = datetime.now()
    
        validation_failure = self._validate_or_fail(start_time)
        if validation_failure:
            return validation_failure
    
  2. Error Handling: Use TaskResult pattern, not exceptions:

    try:
        result = self.do_work()
        return TaskResult.success(data=result, execution_time=elapsed)
    except ExpectedError as e:
        return TaskResult.failure(error_message=str(e), execution_time=elapsed)
    
  3. Timing: Always include execution timing:

    start_time = datetime.now()
    # ... do work ...
    execution_time = (datetime.now() - start_time).total_seconds()
    
  4. Metadata: Include useful execution information:

    return TaskResult.success(
        data=result,
        execution_time=execution_time,
        metadata={
            "processed_items": len(items),
            "api_calls": call_count,
            "cache_hits": cache_hits
        }
    )
    

Return Patterns

  • Success: All work completed successfully

  • Partial: Some work completed, some failed (recoverable)

  • Failure: Task could not complete due to errors

  • Skipped: Task was skipped due to conditions not being met

A returned TaskResult contains:
  • status: SUCCESS, PARTIAL, ERROR, or SKIPPED

  • data: Result data (dict) if successful, None if failed

  • error: Error message if failed, None if successful

  • execution_time: Time taken to execute in seconds

  • metadata: Additional information about execution

Example Implementation

def run(self) -> TaskResult:
    start_time = datetime.now()

    # Validate configuration
    validation_failure = self._validate_or_fail(start_time)
    if validation_failure:
        return validation_failure

    try:
        # Extract configuration
        input_file = self.config["input_file"]
        output_format = self.config.get("output_format", "json")

        # Perform work
        data = self.process_file(input_file)
        formatted_data = self.format_output(data, output_format)

        execution_time = (datetime.now() - start_time).total_seconds()

        return TaskResult.success(
            data={"processed_data": formatted_data, "format": output_format},
            execution_time=execution_time,
            metadata={"records_processed": len(data)}
        )

    except FileNotFoundError:
        execution_time = (datetime.now() - start_time).total_seconds()
        return TaskResult.failure(
            error_message=f"Input file not found: {input_file}",
            execution_time=execution_time,
            metadata={"error_type": "FileNotFoundError"}
        )

Note

  • Never raise exceptions for operational errors

  • Always calculate and include execution_time

  • Use descriptive error messages that help users understand issues

  • Include relevant metadata for debugging and monitoring

Return type:

TaskResult

Returns:

TaskResult

See also

  • TaskResult: For understanding return value structure

  • _validate_or_fail(): For configuration validation pattern

  • Task-specific implementations for concrete examples

class FileSaveTask[source]

Bases: BaseTask

Task that saves content to a file with timestamp suffix.

This is a generic file saving task that can handle various content types and formats. Originally designed for article saving but generalized for broader reuse.

Parameters:
__init__(name, config, dependencies=None)[source]

Initialize file save task.

Parameters:
  • name (str) – Task name

  • config (Dict[str, Any]) – Configuration dictionary containing: - content: The content to save (optional, can be resolved from dependencies) - title: Title for the file (optional, default: “Generated Content”) - output_dir: Output directory path (default: “output”) - file_format: Output format - “txt”, “md”, “json”, etc. (default: “txt”) - filename_prefix: Custom prefix for filename (optional)

  • dependencies (Optional[List[TaskDependency]]) – List of task dependencies

get_dependencies()[source]

Get the list of task dependencies.

Return type:

List[TaskDependency]

Returns:

List of TaskDependency objects

run()[source]

Save the content to a file with timestamp suffix.

Returns:

  • file_path: Path to the saved file

  • file_size: Size of the saved file in bytes

  • timestamp: Timestamp used in filename

  • output_dir: Output directory used

  • filename: Name of the created file

Return type:

Dictionary containing

class URLFetchTask[source]

Bases: BaseTask

Task that fetches main text content from a list of URLs.

Parameters:
__init__(name, config, dependencies=None)[source]

Initialize URL fetch task.

Parameters:
  • name (str) – Task name

  • config (Dict[str, Any]) –

    Configuration dictionary containing:

    • urls: List of URLs to fetch (optional, can be resolved from dependencies)

    • max_urls: Maximum number of URLs to process (default: 5)

    • timeout: Request timeout in seconds (default: 30)

  • dependencies (Optional[List[TaskDependency]]) – List of task dependencies

get_dependencies()[source]

Get the list of task dependencies.

Return type:

List[TaskDependency]

Returns:

List of TaskDependency objects

run()[source]

Fetch main text from the provided URLs.

Returns:

  • total_urls: Total number of URLs processed

  • successful_fetches: Number of successful fetches

  • failed_fetches: Number of failed fetches

  • articles: List of article content with HTTP 200 status only

  • all_articles: List of all fetched article content (including non-200)

  • status_200_count: Number of articles with HTTP 200 status

  • non_200_count: Number of articles with non-200 HTTP status

  • errors: List of error messages for failed fetches

Return type:

Dictionary containing

class ExtractAudioFromVideoTask[source]

Bases: BaseTask

Extract audio from video file using moviepy and compress to MP3 format.

Parameters:
__init__(name, config)[source]

Initialize video audio extraction task.

Parameters:
  • name (str) – Task name

  • config (Dict[str, Any]) – Task configuration

run()[source]

Extract audio from video file and compress to MP3.

Return type:

TaskResult

class AudioTranscriptTask[source]

Bases: BaseTask

Transcribe audio file using OpenAI Whisper API.

Parameters:
__init__(name, config, dependencies=None)[source]

Initialize audio transcription task.

Parameters:
get_dependencies()[source]

Get task dependencies.

Return type:

List[TaskDependency]

run()[source]

Transcribe audio file using OpenAI Whisper.

Return type:

TaskResult

setup_logger(name, level=logging.INFO)[source]

Set up a logger with the specified name and level.

Parameters:
Return type:

Logger

timestamp()[source]

Return current timestamp as ISO format string.

Return type:

str

safe_dict_get(data, key, default=None)[source]

Safely get a value from a dictionary.

Parameters:
Return type:

Any

flatten_list(nested_list)[source]

Flatten a list of lists into a single list.

Parameters:

nested_list (List[List[Any]])

Return type:

List[Any]

validate_required_fields(data, required_fields)[source]

Validate that all required fields are present in the data.

Parameters:
Return type:

bool

class SearchResult[source]

Bases: object

Represents a single search result.

Parameters:
__init__(title, url, snippet, position=0, metadata=None)[source]

Initialize search result.

Parameters:
  • title (str) – The title of the search result

  • url (str) – The URL of the search result

  • snippet (str) – A brief description/snippet of the result

  • position (int) – Position in search results (0-based)

  • metadata (Optional[Dict[str, Any]]) – Additional metadata from the search provider

__repr__()[source]

Detailed string representation.

Return type:

str

__str__()[source]

String representation of search result.

Return type:

str

to_dict()[source]

Convert search result to dictionary.

Return type:

Dict[str, Any]

class SearchResponse[source]

Bases: object

Represents the complete search response from a provider.

Parameters:
__init__(query, results, total_results=0, search_time=0.0, metadata=None)[source]

Initialize search response.

Parameters:
  • query (str) – The original search query

  • results (List[SearchResult]) – List of search results

  • total_results (int) – Total number of results available

  • search_time (float) – Time taken for the search in seconds

  • metadata (Optional[Dict[str, Any]]) – Additional metadata from the search provider

__repr__()[source]

Detailed string representation.

Return type:

str

__str__()[source]

String representation of search response.

Return type:

str

to_dict()[source]

Convert search response to dictionary.

Return type:

Dict[str, Any]

class BaseSearcher[source]

Bases: ABC

Abstract base class for search providers.

Parameters:

config (Optional[Dict[str, Any]])

__init__(config=None)[source]

Initialize the searcher with configuration.

Parameters:

config (Optional[Dict[str, Any]]) – Configuration dictionary for the search provider

__repr__()[source]

Detailed string representation.

Return type:

str

__str__()[source]

String representation of the searcher.

Return type:

str

abstract property api_key: str | None

Get API key from config or environment variables.

Returns:

API key string or None if not required/found

abstractmethod search(query, max_results=10, **kwargs)[source]

Perform a search query.

Parameters:
  • query (str) – The search query string

  • max_results (int) – Maximum number of results to return

  • **kwargs (Any) – Additional search parameters specific to the provider

Return type:

SearchResponse

Returns:

SearchResponse object containing results

Raises:
validate_query(query)[source]

Validate search query.

Parameters:

query (str) – The search query to validate

Raises:

ValueError – If query is invalid

Return type:

None

class SerperSearcher[source]

Bases: BaseSearcher

Search provider implementation using Serper API.

Parameters:

config (Optional[Dict[str, Any]])

API_BASE_URL = 'https://google.serper.dev'
DEFAULT_TIMEOUT = 30
MAX_RESULTS_LIMIT = 100
SEARCH_ENDPOINT = '/search'
__init__(config=None)[source]

Initialize Serper searcher.

Required config or environment variables: - api_key: Serper API key (or SERPER_API_KEY environment variable)

Optional config parameters:

  • timeout: Request timeout in seconds (default: 30)

  • country: Country code for search results (e.g., ‘us’, ‘uk’)

  • language: Language code for search results (e.g., ‘en’, ‘es’)

  • safe_search: Safe search setting (‘active’, ‘moderate’, ‘off’)

  • search_type: Type of search (‘search’, ‘images’, ‘videos’, ‘places’, ‘news’)

Parameters:

config (Optional[Dict[str, Any]])

__str__()[source]

String representation of the Serper searcher.

Return type:

str

property api_key: str | None

Get Serper API key from config or environment variables.

search(query, max_results=10, **kwargs)[source]

Perform a search using Serper API.

Parameters:
  • query (str) – The search query string

  • max_results (int) – Maximum number of results to return (max 100)

  • **kwargs (Any) – Additional Serper API parameters: - gl: Country code (e.g., ‘us’, ‘uk’) - hl: Language code (e.g., ‘en’, ‘es’) - safe: Safe search (‘active’, ‘moderate’, ‘off’) - type: Search type (‘search’, ‘images’, ‘videos’, ‘places’, ‘news’)

Return type:

SearchResponse

Returns:

SearchResponse object containing results

Raises:
fetch_main_text(url, config=None, **kwargs)[source]

Fetch and extract main text content from a URL.

This function automatically detects content type and extracts readable text: - HTML pages: Extracts main article content using readability algorithm - PDF files: Extracts text from all pages - DOCX files: Extracts text from paragraphs and tables - Plain text: Returns as-is

Parameters:
  • url (str) – URL to fetch and extract text from

  • config (Optional[Dict[str, Any]]) – Optional configuration for URLFetcher, plus text extraction options: - html_method: “readability” (default) or “basic” for HTML extraction - include_metadata: Whether to include extraction metadata (default: True)

  • **kwargs (Any) – Additional arguments to pass to fetch_url

Returns:

  • url: Final URL after redirects

  • content_type: Detected content type

  • text: Extracted text content

  • extraction_method: Method used for text extraction

  • original_size: Size of original content in bytes

  • text_size: Size of extracted text in characters

  • metadata: Additional extraction metadata (if include_metadata=True)

Return type:

Dictionary containing

Raises:
Example:
  • >>> result = fetch_main_text(“https – //example.com/article.html”)

  • >>> print (result[“text”])

  • >>> print (f”Extracted {result[‘text_size’]} characters from {result[‘content_type’]}”)

  • >>> pdf_result = fetch_main_text(“https – //example.com/document.pdf”)

  • >>> print (pdf_result[“text”])

fetch_url(url, config=None, **kwargs)[source]

Convenience function to fetch a URL with Chrome-like headers.

Parameters:
  • url (str) – URL to fetch

  • config (Optional[Dict[str, Any]]) – Optional configuration for URLFetcher

  • **kwargs (Any) – Additional arguments to pass to the fetch method

Return type:

Dict[str, Any]

Returns:

Dictionary with fetch results (see URLFetcher.fetch for details)

Example:

result = fetch_url("https://example.com")
print(result["content"])
print(f"Status: {result['status_code']}")
class URLFetcher[source]

Bases: object

Utility class for fetching web content with Chrome-like behavior.

Parameters:

config (Optional[Dict[str, Any]])

__enter__()[source]

Context manager entry.

Return type:

URLFetcher

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit.

Parameters:
Return type:

None

__init__(config=None)[source]

Initialize URL fetcher with optional configuration.

Parameters:

config (Optional[Dict[str, Any]]) –

Optional configuration dictionary with options:

  • timeout: Request timeout in seconds (default: 30)

  • user_agent: Custom user agent string

  • headers: Additional headers to include

  • follow_redirects: Whether to follow redirects (default: True)

  • max_redirects: Maximum number of redirects (default: 10)

__str__()[source]

String representation.

Return type:

str

close()[source]

Close the session to free up resources.

Return type:

None

fetch(url, **kwargs)[source]

Fetch content from a URL.

Parameters:
  • url (str) – URL to fetch

  • **kwargs (Any) – Additional arguments to pass to requests.get()

Returns:

  • url: Final URL after redirects

  • status_code: HTTP status code

  • headers: Response headers

  • content: Response content as text

  • content_type: Content type from headers

  • encoding: Character encoding

  • size: Content size in bytes

  • response_time: Time taken for request in seconds

Return type:

Dictionary containing

Raises:
fetch_headers_only(url, **kwargs)[source]

Fetch only headers from a URL using HEAD request.

Parameters:
  • url (str) – URL to fetch headers from

  • **kwargs (Any) – Additional arguments to pass to requests.head()

Returns:

  • url: Final URL after redirects

  • status_code: HTTP status code

  • headers: Response headers

  • content_type: Content type from headers

  • content_length: Content length if available

  • response_time: Time taken for request in seconds

Return type:

Dictionary containing

print_header(title, width=80, char='=')[source]

Print a formatted header with title centered between separator characters.

Parameters:
  • title (str) – The header title to display

  • width (int) – Total width of the header line (default: 80)

  • char (str) – Character to use for the separator (default: “=”)

Return type:

None

print_message_box(title, content_lines, width=80, char='=', newline_before=False)[source]

Print a message box with title and content lines, with intelligent text processing.

Parameters:
  • title (str) – The message box title (will be trimmed if too long)

  • content_lines (list[str]) – List of content lines (will be wrapped if too long)

  • width (int) – Total width for the message box (default: 80)

  • char (str) – Character to use for separators (default: “=”)

  • newline_before (bool) – Add newline before the message box (default: False)

Return type:

None

BasePipelineAgent (Legacy Syntax)

Pipeline-based AI agent framework with automatic task orchestration.

This module provides BasePipelineAgent, the core component for building AI workflows with automatic dependency resolution and parallel execution. Tasks are organized into execution phases based on their dependencies, enabling optimal performance and data flow between tasks.

Key Concepts:

  • Dependency Resolution: Tasks automatically receive data from dependent tasks

  • Execution Phases: Tasks are grouped for optimal parallel/sequential execution

  • Task Context: Shared data store for inter-task communication

  • Error Handling: TaskResult pattern for graceful error propagation

Quick Example

class ArticleWriterAgent(BasePipelineAgent):
    def setup_tasks(self):
        return [
            SearchTask("search", {"query": "AI trends", "max_results": 5}),
            LLMTask("summarize", {
                "prompt": "Summarize: ${content}",
                "llm_provider": "openai",
                "llm_model": "gpt-4"
            }, dependencies=[
                TaskDependency("content", "search.results", REQUIRED)
            ])
        ]

agent = ArticleWriterAgent("writer", {})
result = agent.run()
print(f"Status: {result.status}, Tasks: {result.completed_tasks}/{result.total_tasks}")

Execution Flow

  1. Setup: Tasks defined with dependencies in setup_tasks()

  2. Planning: Dependency graph analyzed, phases created

  3. Resolution: Task configs updated with dependency data

  4. Execution: Phases run sequentially, tasks within phases run in parallel

  5. Results: AgentRunResult returned with execution status and metrics

See also

  • BaseTask: Base class for implementing custom tasks

  • TaskDependency: Declarative dependency specification

  • TaskContext: Shared data store for task communication

  • AgentRunResult: Standardized execution result format

class TaskExecutionPlan[source]

Bases: object

Execution plan that organizes tasks into phases based on dependencies.

This class analyzes task dependencies and creates an optimal execution plan where:

  • Tasks with no dependencies run in the first phase

  • Tasks whose dependencies are satisfied run in subsequent phases

  • Tasks within the same phase can run in parallel

  • Phases execute sequentially to maintain dependency ordering

The plan automatically detects and rejects circular dependencies, ensuring all workflows can execute successfully.

tasks

Original list of tasks to organize

phases

List of task lists, each representing an execution phase

Example:
  • Given tasks – A, B (depends on A), C (depends on A), D (depends on B,C)

  • Execution plan

  • * Phase 1 – [A] (no dependencies)

  • * Phase 2 – [B, C] (depend only on A, can run in parallel)

  • * Phase 3 – [D] (depends on B and C)

Parameters:

tasks (List[BaseTask])

__init__(tasks)[source]

Create execution plan from list of tasks.

Parameters:

tasks (List[BaseTask]) – List of tasks to organize into execution phases

Raises:

ValueError – If circular dependencies detected or dependencies cannot be satisfied

total_phases()[source]

Get total number of execution phases.

Return type:

int

Returns:

Number of phases

get_phase(phase_index)[source]

Get tasks in a specific phase.

Parameters:

phase_index (int) – Index of the phase

Return type:

List[BaseTask]

Returns:

List of tasks in the phase

get_total_tasks()[source]

Get total number of tasks in the plan.

Return type:

int

Returns:

Total number of tasks

class BasePipelineAgent[source]

Bases: object

Base agent class for building AI workflows with automatic task orchestration.

BasePipelineAgent provides the foundation for creating AI automation workflows. It automatically handles task dependency resolution, parallel execution, error handling, and result collection. Users inherit from this class and implement setup_tasks() to define their workflow.

Key Features
  • Automatic Dependency Resolution: Tasks receive data from previous tasks

  • Parallel Execution: Independent tasks run simultaneously for performance

  • Error Handling: Graceful handling of task failures with detailed reporting

  • Context Management: Shared data store for inter-task communication

  • Result Tracking: Comprehensive execution metrics and status reporting

Configuration Options:
  • enable_parallel (bool): Enable parallel task execution (default: True)

  • max_parallel_tasks (int): Maximum concurrent tasks (default: 5)

  • stop_on_failure (bool): Stop pipeline on first failure (default: True)

Lifecycle:
  1. Initialization: Agent created with name and config

  2. Task Setup: setup_tasks() called to define workflow

  3. Execution: run() method orchestrates task execution

  4. Results: AgentRunResult returned with status and metrics

Example

Basic agent implementation:

class DataProcessorAgent(BasePipelineAgent):
    def setup_tasks(self):
        return [
            SearchTask("gather_data", {
                "query": self.config.get("search_query", ""),
                "max_results": 10
            }),
            TransformTask("process_data", {
                "transform_function": process_search_results,
                "input_field": "results"
            }, dependencies=[
                TaskDependency("results", "gather_data.results", REQUIRED)
            ]),
            FileSaveTask("save_results", {
                "file_path": "/tmp/results.json"
            }, dependencies=[
                TaskDependency("data", "process_data.result", REQUIRED)
            ])
        ]

# Usage
agent = DataProcessorAgent("processor", {
    "search_query": "machine learning trends",
    "enable_parallel": True
})
result = agent.run()

if result.is_success():
    print(f"Processed {result.completed_tasks} tasks successfully")
    # Access individual task results
    search_result = agent.context.get_result("gather_data")
    processed_data = agent.context.get_result("process_data")
else:
    print(f"Pipeline failed: {result.error_message}")

Advanced configuration:

agent = DataProcessorAgent("processor", {
    "search_query": "AI research",
    "enable_parallel": False,  # Sequential execution
    "stop_on_failure": False,  # Continue despite failures
    "max_parallel_tasks": 10   # Higher concurrency
})

Return Values

run() returns AgentRunResult with:

  • status: SUCCESS, PARTIAL, ERROR, or RUNNING

  • completed_tasks/failed_tasks: Task completion counts

  • total_phases: Number of execution phases

  • execution_time: Total runtime in seconds

  • metadata: Additional execution information

Error Handling

  • Individual task failures are captured and reported

  • Pipeline can continue or stop based on stop_on_failure setting

  • Detailed error information available in execution context

  • Partial success supported for workflows with optional components

Thread Safety

  • TaskContext is thread-safe for concurrent task execution

  • Agent instance should not be shared between threads

  • Each agent execution creates isolated context

Performance

  • Tasks in same phase execute in parallel (when enabled)

  • Dependency resolution minimizes execution phases

  • ThreadPoolExecutor manages concurrent task execution

  • Configurable concurrency limits prevent resource exhaustion

Parameters:
__init__(name, config=None)[source]

Initialize pipeline agent with name and configuration.

Creates a new pipeline agent instance with automatic task setup. The agent will call setup_tasks() during initialization to configure the workflow. TaskContext and DependencyResolver are created automatically.

Attributes Created:

  • name: Agent identifier

  • config: Configuration dictionary

  • tasks: List of tasks (populated by setup_tasks())

  • context: TaskContext for inter-task communication

  • dependency_resolver: Handles dependency resolution

  • execution_plan: Created during run() execution

Parameters:
  • name (str) – Unique identifier for this agent instance. Used in logging and result tracking. Should be descriptive of the agent’s purpose.

  • config (Optional[Dict[str, Any]]) –

    Configuration dictionary with agent settings:

    • enable_parallel (bool): Enable parallel task execution (default: True)

    • max_parallel_tasks (int): Maximum concurrent tasks (default: 5)

    • stop_on_failure (bool): Stop pipeline on first failure (default: True)

  • self.config. (Additional config keys are passed to tasks via)

Example:

agent = MyAgent("data_processor", {
    "enable_parallel": True,
    "max_parallel_tasks": 3,
    "input_file": "data.csv"
})
print(f"Agent {agent.name} has {len(agent.tasks)} tasks")

Note

The agent automatically calls setup_tasks() during initialization. If setup_tasks() raises an exception, the agent will log a warning but continue with an empty task list.

See also

  • BaseTask: Base class for implementing custom tasks

  • TaskDependency: Declarative dependency specification

  • TaskContext: Shared data store for inter-task communication

  • AgentRunResult: Standardized execution result format

add_tasks(tasks)[source]

Add tasks to the agent.

Parameters:

tasks (List[BaseTask]) – List of tasks to add

Return type:

None

setup_tasks()[source]

Define the workflow tasks for this agent. Must be implemented by subclasses.

This is the main method where users define their AI workflow by creating and configuring tasks with their dependencies. Tasks are automatically organized into execution phases based on their dependency relationships.

Task Naming:
  • Use descriptive names that indicate the task’s purpose

  • Names must be unique within the agent

  • Use snake_case for consistency

  • Good: “fetch_articles”, “analyze_sentiment”, “save_results”

  • Avoid: “task1”, “t”, “process”

Dependency Design:
  • Required dependencies must be satisfied for task to run

  • Optional dependencies use default values if source unavailable

  • Circular dependencies will cause pipeline execution to fail

  • Use transform_func in dependencies for data preprocessing

Note

This method is called automatically during agent initialization. Tasks should not be added to self.tasks directly - return them from this method instead.

See also

  • TaskDependency: For creating task dependencies

  • BaseTask subclasses: LLMTask, SearchTask, ConditionalTask, etc.

  • Task-specific documentation for configuration options

Returns:

List of configured BaseTask instances that define the workflow.

Return type:

List[BaseTask]

Example:
  • Basic search and summarize workflow

    def setup_tasks(self):
        return [
            SearchTask("search", {
                "query": self.config.get("topic", ""),
                "max_results": 5
            }),
            LLMTask("summarize", {
                "prompt": "Summarize these articles: ${articles}",
                "llm_provider": "openai",
                "llm_model": "gpt-4"
            }, dependencies=[
                TaskDependency("articles", "search.results", REQUIRED)
            ])
        ]
    
  • Complex workflow with conditional logic

    def setup_tasks(self):
        return [
            SearchTask("search", {"query": "AI news"}),
            TransformTask("filter", {
                "transform_function": filter_recent_articles,
                "input_field": "results"
            }, dependencies=[
                TaskDependency("results", "search.results", REQUIRED)
            ]),
            ConditionalTask("quality_check", {
                "condition_function": lambda articles: len(articles) >= 3,
                "condition_inputs": ["filtered_articles"],
                "action_function": log_action("Quality check passed"),
                "else_function": log_action("Insufficient articles")
            }, dependencies=[
                TaskDependency("filtered_articles", "filter.result", REQUIRED)
            ]),
            LLMTask("summarize", {
                "prompt": "Create summary from: ${content}",
                "llm_provider": "openai"
            }, dependencies=[
                TaskDependency("content", "filter.result", REQUIRED)
            ])
        ]
    
Raises:

NotImplementedError – Must be implemented by subclasses

create_context()[source]

Create and return the task context for this agent.

Return type:

TaskContext

Returns:

TaskContext instance

run()[source]

Execute the complete workflow with automatic task orchestration.

This is the main execution method that runs the entire pipeline:

  1. Builds execution plan from task dependencies

  2. Resolves dependencies and updates task configurations

  3. Executes tasks in phases (parallel within phases, sequential between phases)

  4. Collects results and handles errors

  5. Returns comprehensive execution status

The method handles all aspects of task execution including dependency resolution, parallel execution, error propagation, and result collection. Individual task failures are captured and the pipeline can continue based on the stop_on_failure configuration.

The method returns AgentRunResult containing:

  • status:
    • SUCCESS (all tasks completed),

    • PARTIAL (some failed),

    • ERROR (all failed),

    • RUNNING (already executing)

  • completed_tasks: Number of successfully completed tasks

  • failed_tasks: Number of tasks that failed

  • total_phases: Number of execution phases used

  • execution_time: Total runtime in seconds

  • metadata: Additional execution information

Exceptions

No exceptions are raised directly. All operational errors are captured in the returned AgentRunResult. Only programming errors (like missing setup_tasks implementation) will raise exceptions.

Example

Basic execution and result checking:

agent = MyAgent("processor", config)
result = agent.run()

if result.is_success():
    print(f"All {result.completed_tasks} tasks completed successfully")

    # Access individual task results
    search_data = agent.context.get_result("search_task")
    processed_data = agent.context.get_result("process_task")

elif result.is_partial():
    print(f"Partial success: {result.completed_tasks}/{result.total_tasks}")
    print(f"Failed tasks: {result.failed_tasks}")

else:  # result.is_error()
    print(f"Pipeline failed: {result.error_message}")

Performance monitoring:

result = agent.run()
print(f"Execution time: {result.execution_time:.2f}s")
print(f"Phases used: {result.total_phases}")

# Check if parallel execution was effective
if result.total_phases < len(agent.tasks):
    print("Parallel execution optimized the pipeline")

Execution Phases

Tasks are automatically organized into phases based on dependencies:

  • Phase 1: Tasks with no dependencies

  • Phase 2: Tasks depending only on Phase 1 tasks

  • Phase N: Tasks depending on previous phases only

Within each phase, tasks can execute in parallel (if enabled). Between phases, execution is sequential to maintain dependency order.

Error Handling

  • Individual task errors are captured in TaskResult objects

  • Pipeline can continue or stop based on stop_on_failure config

  • Partial success is supported when some tasks succeed

  • All errors are logged with detailed context information

  • Failed task dependencies propagate to dependent tasks

Thread Safety

This method is not thread-safe. Each agent instance should only execute one pipeline at a time. Concurrent task execution within the pipeline is handled internally with ThreadPoolExecutor.

Performance

  • Parallel execution significantly improves performance for independent tasks

  • Dependency resolution optimizes execution order

  • Thread pool size controlled by max_parallel_tasks config

  • Memory usage scales with number of concurrent tasks

Note

If the agent is already running, this method returns immediately with a RUNNING status to prevent concurrent execution conflicts.

See also

  • AgentRunResult: For detailed result format and status checking

  • TaskContext: Access via agent.context for individual task results

  • TaskExecutionPlan: For understanding execution phase organization

Return type:

AgentRunResult

Returns:

AgentRunResult

get_context()[source]

Get the task context for this agent.

Return type:

TaskContext

Returns:

TaskContext instance

get_execution_plan()[source]

Get the current execution plan.

Return type:

Optional[TaskExecutionPlan]

Returns:

TaskExecutionPlan instance or None if not yet created

validate_dependencies()[source]

Validate that all task dependencies can be satisfied.

Return type:

List[str]

Returns:

List of validation error messages (empty if all valid)

get_dependency_info()[source]

Get dependency information for all tasks.

Return type:

Dict[str, List[Dict[str, Any]]]

Returns:

Dictionary mapping task names to their dependency info

reset()[source]

Reset the agent and clear context.

Return type:

None

display_results(sections=None)[source]

Display results with configurable sections.

This method provides a simple way to display agent results without writing complex display logic in each agent. Users can choose which sections to show.

Parameters:

sections (Optional[List[str]]) – List of sections to display. Options: “summary”, “tasks”, “errors”. Defaults to [“summary”, “tasks”, “errors”] (show all sections).

Return type:

None

__str__()[source]

String representation of the agent.

Return type:

str

Task Types

LLM Task

Advanced LLM task with context-aware prompts, tool calling, and template substitution.

This module provides LLMTask, a sophisticated task for integrating Large Language Models into AI pipelines. It supports automatic prompt template substitution using dependency data, tool/function calling, multiple LLM providers, and comprehensive response handling.

Key Features
  • Template Substitution: Dynamic prompt generation using ${variable} syntax

  • Tool Calling: Function calling with automatic tool registration and execution

  • Multi-Provider: Support for OpenAI, Anthropic, Ollama, and other LLM providers

  • Context Integration: Automatic injection of dependency data into prompts

  • Response Handling: Structured response parsing and error handling

  • Logging: Optional detailed logging for debugging and monitoring

Template System

LLMTask uses ${variable} syntax for dynamic prompt generation:

task = LLMTask("summarize", {
    "prompt": "Summarize these articles: ${articles}",
    "context": "You are an expert ${domain} analyst",
    "llm_provider": "openai",
    "llm_model": "gpt-4"
}, dependencies=[
    TaskDependency("articles", "search.results", REQUIRED),
    TaskDependency("domain", "config.analysis_domain", OPTIONAL, default_value="general")
])

Tool Calling

Enable function calling with automatic tool management:

@tool
def search_web(query: str) -> Dict[str, Any]:
    return {"results": web_search(query)}

task = LLMTask("research", {
    "prompt": "Research ${topic} and provide analysis",
    "tools": [search_web],
    "llm_provider": "openai",
    "llm_model": "gpt-4"
})

MCP Integration

Use Model Context Protocol servers alongside or instead of Python tools:

# MCP server only
task = LLMTask("web_research", {
    "prompt": "Search for ${topic} and summarize findings",
    "tools": [
        {
            "type": "mcp",
            "server_label": "brave_search",
            "server_url": "https://mcp.brave.com",
            "require_approval": "never"
        }
    ],
    "llm_provider": "anthropic",
    "llm_model": "claude-sonnet-4"
})

# Mixed: Python tools + MCP servers
task = LLMTask("comprehensive_research", {
    "prompt": "Research ${topic} using all available tools",
    "tools": [
        local_calculator_tool,  # Python @tool function
        {
            "type": "mcp",
            "server_label": "web_search",
            "server_url": "https://mcp.example.com"
        }
    ],
    "llm_provider": "openai",
    "llm_model": "gpt-4"
})

Structured Outputs

Get typed, structured responses using Pydantic models or JSON schemas:

from pydantic import BaseModel

class Article(BaseModel):
    title: str
    summary: str
    key_points: list[str]

task = LLMTask("extract", {
    "prompt": "Extract article information from: ${text}",
    "response_format": Article,  # Pass Pydantic model
    "llm_provider": "openai",
    "llm_model": "gpt-4o-mini"
})

# Access parsed structured data
result = task.run()
if result.is_success():
    parsed_data = result.data["parsed_object"]  # Dict matching Article schema

Supported Providers

  • OpenAI: gpt-4, gpt-3.5-turbo, etc.

  • Anthropic: claude-3-opus, claude-3-sonnet, etc.

  • Ollama: Local models (llama2, mistral, etc.)

  • Google: gemini-pro, gemini-pro-vision

  • Azure OpenAI: Azure-hosted OpenAI models

  • And many more via litellm

Configuration Options

Core settings:

  • llm_provider: Provider name (required)

  • llm_model: Model identifier (required)

  • prompt/prompt_template: Main prompt text with templates

  • temperature: Randomness control (0.0-2.0, default: 0.7)

  • max_tokens: Maximum response tokens (default: 1000)

  • timeout: Request timeout in seconds (default: 60)

Tool calling:

  • tools: List of @tool decorated functions

  • tool_choice: “auto”, “none”, or specific tool name

  • parallel_tool_calls: Enable parallel tool execution

  • max_tool_execution_time: Tool timeout in seconds

Structured outputs:

  • response_format: Pydantic BaseModel class or JSON schema dict

    • Pydantic model: Pass model class directly (requires pydantic>=2.0)

    • JSON schema: Dict with {“type”: “json_schema”, “json_schema”: {…}}

Context and role:

  • context: System/context message (supports templates)

  • role: User role description (supports templates)

Response Format

LLMTask returns structured data including:

  • content: Generated text response

  • parsed_object: Parsed JSON object (when response_format is used)

  • usage: Token usage statistics

  • tool_calls: List of executed tool calls (if any)

  • model: Model used for generation

  • provider: Provider used

  • metadata: Additional response information

Example Usage

Basic text generation:

task = LLMTask("generate", {
    "prompt": "Write a brief summary about ${topic}",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.3,
    "max_tokens": 500
})

With dependencies and templates:

task = LLMTask("analyze", {
    "prompt": "Analyze these search results about ${topic}: ${results}",
    "context": "You are an expert ${field} researcher",
    "llm_provider": "anthropic",
    "llm_model": "claude-3-opus"
}, dependencies=[
    TaskDependency("topic", "input.topic", REQUIRED),
    TaskDependency("results", "search.results", REQUIRED),
    TaskDependency("field", "config.research_field", OPTIONAL, default_value="general")
])

With tool calling:

@tool
def calculate(expression: str) -> float:
    return eval(expression)  # Simple calculator

task = LLMTask("math_helper", {
    "prompt": "Help solve this math problem: ${problem}",
    "tools": [calculate],
    "llm_provider": "openai",
    "llm_model": "gpt-4o-mini",
    "tool_choice": "auto"
})

Error Handling

LLMTask handles various error conditions gracefully:

  • API failures (network, authentication, rate limits)

  • Invalid model/provider combinations

  • Tool execution errors

  • Template substitution errors

  • Response parsing failures

All errors are returned as TaskResult.failure() with detailed error messages and relevant metadata for debugging.

See also

  • BaseTask: Parent class for task implementation patterns

  • TaskDependency: For injecting data into prompt templates

  • @tool decorator: For creating tool functions

  • litellm: Underlying LLM provider abstraction

class ChatMessage[source]

Bases: TypedDict

Type definition for chat messages.

role: str
content: str
tool_calls: Optional[List[Any]]
class LLMTask[source]

Bases: BaseTask

Advanced LLM integration task with template substitution and tool calling.

LLMTask provides sophisticated Large Language Model integration for AI pipelines. It supports dynamic prompt generation through template substitution, automatic function/tool calling, and seamless integration with multiple LLM providers.

The task automatically resolves template variables from dependency data and manages the complete LLM interaction lifecycle including request formatting, response parsing, tool execution, and error handling.

Core Capabilities:
  • Dynamic Prompts: ${variable} template substitution from dependencies

  • Tool Integration: Automatic function calling with @tool decorated functions

  • Multi-Provider: Support for 50+ LLM providers via litellm

  • Response Processing: Structured response parsing and validation

  • Error Recovery: Graceful handling of API failures and timeouts

  • Usage Tracking: Detailed token usage and cost monitoring

Template Substitution:
Templates use ${variable_name} syntax and are resolved from:
  1. Task dependencies (via TaskDependency)

  2. Task configuration values

  3. Environment variables (as fallback)

Template fields support:
  • prompt/prompt_template: Main user message

  • context: System/context message

  • role: User role description

Tool Calling Workflow:
  1. Functions decorated with @tool are automatically registered

  2. LLM decides which tools to call based on prompt

  3. Tools execute with validated parameters

  4. Results injected back into conversation

  5. LLM continues with tool results

Provider Configuration

Different providers require different configuration:

OpenAI:

{
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.7,
    "max_tokens": 1000
}

Ollama (local):

{
    "llm_provider": "ollama",
    "llm_model": "llama2",
    "temperature": 0.8
}

Response Structure

Successful responses include:

{
    "content": "Generated text response",
    "usage": {
        "prompt_tokens": 150,
        "completion_tokens": 300,
        "total_tokens": 450
    },
    "tool_calls": [
        {
            "tool_name": "search_web",
            "arguments": {"query": "AI trends"},
            "result": {"results": [...]},
            "success": True
        }
    ],
    "model": "gpt-4",
    "provider": "openai",
    "metadata": {
        "response_time": 2.34,
        "template_variables": ["topic", "domain"]
    }
}

Common Usage Patterns

Content generation with templates:

LLMTask("writer", {
    "prompt": "Write a ${length} article about ${topic} for ${audience}",
    "context": "You are an expert ${field} writer",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.7
}, dependencies=[
    TaskDependency("topic", "input.topic", REQUIRED),
    TaskDependency("length", "config.article_length", OPTIONAL, default_value="brief"),
    TaskDependency("audience", "config.target_audience", REQUIRED),
    TaskDependency("field", "config.expertise_field", REQUIRED)
])

Research with tool calling:

@tool
def search_academic_papers(query: str, limit: int = 5) -> List[Dict]:
    # Search implementation
    return papers

LLMTask("researcher", {
    "prompt": "Research ${topic} and provide comprehensive analysis",
    "tools": [search_academic_papers],
    "llm_provider": "anthropic",
    "llm_model": "claude-3-opus",
    "tool_choice": "auto",
    "max_tokens": 3000
})

Data analysis and reasoning:

LLMTask("analyzer", {
    "prompt": "Analyze this data and provide insights: ${data}",
    "context": "You are a data scientist specializing in ${domain}",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.3,  # Lower temperature for analytical tasks
    "max_tokens": 2000
}, dependencies=[
    TaskDependency("data", "processor.results", REQUIRED),
    TaskDependency("domain", "config.analysis_domain", REQUIRED)
])

Error Handling

Common failure scenarios and recovery:

  • API Errors: Network failures, authentication issues

  • Rate Limits: Automatic retry with exponential backoff

  • Invalid Models: Clear error messages for unsupported models

  • Tool Failures: Individual tool errors don’t fail entire task

  • Template Errors: Missing variables result in clear error messages

Performance Considerations

  • Use appropriate max_tokens to control costs and latency

  • Lower temperature (0.1-0.3) for factual/analytical tasks

  • Higher temperature (0.7-1.0) for creative tasks

  • Consider model capabilities vs cost (gpt-3.5-turbo vs gpt-4)

  • Use tool_choice=”none” to disable tool calling when not needed

See also

  • @tool: For creating callable functions

  • TaskDependency: For template variable injection

  • litellm: Underlying provider abstraction

  • Tool calling examples in the examples package

Parameters:
REQUIRED_CONFIGS = ['llm_provider', 'llm_model']
DEFAULT_TEMPERATURE = 0.7
DEFAULT_MAX_TOKENS = 1000
DEFAULT_TIMEOUT = 60
__init__(name, config=None, dependencies=None)[source]

Initialize an advanced LLM task with template and tool support.

Creates a new LLM task that can automatically substitute template variables from dependencies, execute function calls, and interact with multiple LLM providers. The task handles the complete LLM interaction lifecycle.

Parameters:
  • name (str) – Unique identifier for this task within the pipeline. Used for logging, dependency references, and result storage.

  • config (Optional[Dict[str, Any]]) –

    LLM configuration dictionary with the following keys:

    Required:

    • llm_provider (str): LLM provider name. Supported values include: “openai”, “anthropic”, “ollama”, “azure”, “google”, “cohere”, etc.

    • llm_model (str): Model identifier specific to the provider:

      • OpenAI: “gpt-4”, “gpt-3.5-turbo”, “gpt-4-turbo-preview”

      • Anthropic: “claude-3-opus”, “claude-3-sonnet”, “claude-3-haiku”

      • Ollama: “llama2”, “mistral”, “codellama”

      • Google: “gemini-pro”, “gemini-pro-vision”

    Prompt Configuration:

    • prompt OR prompt_template (str): Main user message with ${var} templates

    • context (str, optional): System/context message (supports templates)

    • role (str, optional): User role description (supports templates)

    Generation Parameters:

    • temperature (float, 0.0-2.0): Randomness control (default: 0.7)

    • max_tokens (int): Maximum response tokens (default: 1000)

    • timeout (float): Request timeout in seconds (default: 60)

    Tool Calling:

    • tools (List[Callable | Dict], optional): List of @tool decorated functions and/or MCP server configurations. Each item can be:

      • Python function: Decorated with @tool for local execution

      • MCP config dict: {“type”: “mcp”, “server_label”: “…”, “server_url”: “…”}

    • tool_choice (str, optional): “auto”, “none”, or specific tool name

    • parallel_tool_calls (bool): Enable parallel tool execution (default: True)

    • max_tool_execution_time (float): Tool timeout in seconds (default: 30)

    Provider-Specific:

    • api_key (str, optional): API key (can also use environment variables)

    • api_base (str, optional): Custom API endpoint (for Azure, local deployments)

    • api_version (str, optional): API version (for Azure OpenAI)

  • dependencies (Optional[List[TaskDependency]]) – List of TaskDependency objects for template variable injection. Template variables in prompt, context, and role fields will be automatically replaced with resolved dependency values.

Examples

Basic text generation

LLMTask("summarizer", {
    "prompt": "Summarize this text: ${content}",
    "llm_provider": "openai",
    "llm_model": "gpt-4",
    "temperature": 0.3,
    "max_tokens": 500
}, dependencies=[
    TaskDependency("content", "input.text", REQUIRED)
])

Advanced with tools and context:

@tool
def web_search(query: str) -> Dict[str, Any]:
    return {"results": search_engine.search(query)}

LLMTask("researcher", {
    "prompt": "Research ${topic} and provide analysis",
    "context": "You are an expert ${field} researcher with access to web search",
    "llm_provider": "anthropic",
    "llm_model": "claude-3-opus",
    "tools": [web_search],
    "tool_choice": "auto",
    "temperature": 0.5,
    "max_tokens": 3000
}, dependencies=[
    TaskDependency("topic", "config.research_topic", REQUIRED),
    TaskDependency("field", "config.expertise_field", REQUIRED)
])

Local model with Ollama:

LLMTask("local_chat", {
    "prompt": "Help with this question: ${question}",
    "llm_provider": "ollama",
    "llm_model": "llama2",
    "temperature": 0.8,
    "max_tokens": 2000
})

Template Variables

Variables in prompt, context, and role fields use ${variable_name} syntax. Resolution order:

  1. Dependency values (from TaskDependency objects)

  2. Direct config values

  3. Environment variables

  4. Error if required variable not found

Tool Integration

Functions decorated with @tool are automatically:

  1. Registered with the LLM provider

  2. Made available to the model during generation

  3. Executed when called by the model

  4. Results injected back into the conversation

Error Handling

Configuration errors are caught during validation:

  • Missing required fields (llm_provider, llm_model)

  • Invalid parameter ranges (temperature, max_tokens)

  • Unsupported tool configurations

  • Provider-specific validation

Environment Variables

API keys can be provided via environment variables:

  • OPENAI_API_KEY for OpenAI

  • GOOGLE_API_KEY for Google

  • Etc.

See also

  • @tool: For creating callable functions

  • TaskDependency: For template variable injection

  • Provider documentation for model-specific capabilities

validation_rules: Optional[Dict[str, Any]]
context_instance: Optional[TaskContext]
resolved_prompt: Optional[str]
resolved_context: Optional[str]
resolved_role: Optional[str]
tool_registry: Optional[ToolRegistry]
tool_executor: Optional[ToolExecutor]
supports_tools: bool
tool_setup_error: Optional[str]
mcp_tools: List[Dict[str, Any]]
response_format_type: Optional[str]
response_format_config: Optional[Dict[str, Any]]
supports_response_format: bool
set_context(context)[source]

Set the task context for template resolution.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

run()[source]

Execute the LLM task with context-resolved prompts and logging.

Return type:

TaskResult

get_resolved_prompt()[source]

Get the resolved prompt after template substitution.

Return type:

Optional[str]

Returns:

Resolved prompt string or None if not yet resolved

get_resolved_context()[source]

Get the resolved context after template substitution.

Return type:

Optional[str]

Returns:

Resolved context string or None if not yet resolved

get_resolved_role()[source]

Get the resolved role after template substitution.

Return type:

Optional[str]

Returns:

Resolved role string or None if not yet resolved

preview_resolved_templates()[source]

Preview what the templates would resolve to with current context.

Return type:

Dict[str, str]

Returns:

Dictionary with resolved template previews

__str__()[source]

Enhanced string representation including template info.

Return type:

str

agent_name: Optional[str]

Search Task

SearchTask - Simple web search task.

class SearchTask[source]

Bases: BaseTask

Simple web search task that executes searches and returns results.

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize search task.

Parameters:

Config parameters: - query: Search query string (required) - max_results: Maximum number of search results (default: 5) - serper_api_key: Optional API key for Serper search

run()[source]

Execute the search and return results.

Return type:

TaskResult

__str__()[source]

String representation of the search task.

Return type:

str

Transform Task

TransformTask - Generic data transformation task.

class TransformTask[source]

Bases: BaseTask

Task that applies transformations to data from context.

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize transform task.

Parameters:
Config parameters:
  • transform_function: Function to apply transformation

  • input_field: Single input field name (for single-input transforms)

  • input_fields: List of input field names (for multi-input transforms)

  • output_name: Name for the output in the result

  • validate_input: Whether to validate input data (default: True)

  • validate_output: Whether to validate output data (default: False)

set_context(context)[source]

Set the task context.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

run()[source]

Execute the transformation.

Return type:

TaskResult

property input_data: Any

Public readonly property to access input data for testing purposes.

Returns:

Input data for transformation

preview_transformation()[source]

Preview what the transformation would do with current input.

Return type:

Dict[str, Any]

Returns:

Preview information about the transformation

__str__()[source]

String representation of the transform task.

Return type:

str

extract_field_from_list(field_name)[source]

Create a transformation function that extracts a field from each item in a list.

Parameters:

field_name (str) – Name of the field to extract

Return type:

Callable[[List[Dict[str, Any]]], List[Any]]

Returns:

Transformation function

combine_text_fields(separator='\\n\\n', title_field='title', content_field='content')[source]

Create a transformation function that combines text from multiple items.

Parameters:
  • separator (str) – Separator between items

  • title_field (str) – Field name for titles

  • content_field (str) – Field name for content

Return type:

Callable[[List[Dict[str, Any]]], str]

Returns:

Transformation function

filter_by_condition(condition_func)[source]

Create a transformation function that filters items by a condition.

Parameters:

condition_func (Callable[[Any], bool]) – Function that returns True for items to keep

Return type:

Callable[[List[Any]], List[Any]]

Returns:

Transformation function

aggregate_numeric_field(field_name, operation='sum')[source]

Create a transformation function that aggregates a numeric field.

Parameters:
  • field_name (str) – Name of the numeric field

  • operation (str) – Aggregation operation (sum, avg, min, max, count)

Return type:

Callable[[List[Dict[str, Any]]], float]

Returns:

Transformation function

format_as_markdown_list(title_field='title', url_field='url')[source]

Create a transformation function that formats items as a markdown list.

Parameters:
  • title_field (str) – Field name for titles

  • url_field (str) – Field name for URLs

Return type:

Callable[[List[Dict[str, Any]]], str]

Returns:

Transformation function

Conditional Task

ConditionalTask - Task that executes based on context conditions.

class ConditionalTask[source]

Bases: BaseTask

Task that executes actions based on conditional logic using context data.

Parameters:
__init__(name, config=None, dependencies=None)[source]

Initialize conditional task.

Parameters:

Config parameters: - condition_function: Function that evaluates to True/False - condition_inputs: List of input field names for condition function - action_function: Function to execute when condition is True - action_inputs: List of input field names for action function - else_function: Optional function to execute when condition is False - else_inputs: List of input field names for else function - skip_reason: Custom reason message when condition is False

set_context(context)[source]

Set the task context.

Parameters:

context (TaskContext) – TaskContext instance

Return type:

None

run()[source]

Execute the conditional task.

Return type:

TaskResult

preview_condition()[source]

Preview what the condition would evaluate to with current input.

Return type:

Dict[str, Any]

Returns:

Preview information about the condition

get_execution_summary(context)[source]

Get a formatted summary of the conditional task execution results.

Parameters:

context (TaskContext) – TaskContext to retrieve results from

Return type:

Optional[Dict[str, Any]]

Returns:

Dictionary containing execution summary with formatted info, or None if no results found

__str__()[source]

String representation of the conditional task.

Return type:

str

threshold_condition(threshold, operator='>=')[source]

Create a condition function that checks if a value meets a threshold.

Parameters:
  • threshold (float) – Threshold value to compare against

  • operator (str) – Comparison operator (>=, >, <=, <, ==, !=)

Return type:

Callable[[float], bool]

Returns:

Condition function

contains_condition(search_term, case_sensitive=False)[source]

Create a condition function that checks if text contains a term.

Parameters:
  • search_term (str) – Term to search for

  • case_sensitive (bool) – Whether search is case sensitive

Return type:

Callable[[str], bool]

Returns:

Condition function

list_size_condition(min_size=None, max_size=None)[source]

Create a condition function that checks list size.

Parameters:
  • min_size (Optional[int]) – Minimum required size (inclusive)

  • max_size (Optional[int]) – Maximum allowed size (inclusive)

Return type:

Callable[[List[Any]], bool]

Returns:

Condition function

success_rate_condition(min_success_rate)[source]

Create a condition function that checks success rate from a results dict.

Parameters:

min_success_rate (float) – Minimum success rate (0.0 to 1.0)

Return type:

Callable[[Dict[str, Any]], bool]

Returns:

Condition function

quality_gate_condition(min_score, score_field='score')[source]

Create a condition function that checks quality score.

Parameters:
  • min_score (float) – Minimum quality score

  • score_field (str) – Field name containing the score

Return type:

Callable[[Dict[str, Any]], bool]

Returns:

Condition function

log_action(message, level='info')[source]

Create an action function that logs a message.

Parameters:
  • message (str) – Message to log

  • level (str) – Log level (info, warning, error)

Return type:

Callable[[], Dict[str, Any]]

Returns:

Action function

increment_counter_action(counter_name='counter')[source]

Create an action function that increments a counter in context.

Parameters:

counter_name (str) – Name of the counter in context

Return type:

Callable[[TaskContext], Dict[str, Any]]

Returns:

Action function

set_flag_action(flag_name, flag_value=True)[source]

Create an action function that sets a flag value.

Parameters:
  • flag_name (str) – Name of the flag

  • flag_value (Any) – Value to set

Return type:

Callable[[], Dict[str, Any]]

Returns:

Action function

Core Infrastructure

Task Result

Task result classes for standardized response format.

class TaskStatus[source]

Bases: Enum

Enumeration for task execution status.

NOT_STARTED = 'not_started'
STARTED = 'started'
SUCCESS = 'success'
ERROR = 'error'
SKIPPED = 'skipped'
PARTIAL = 'partial'
class TaskResult[source]

Bases: object

Standardized result format for all task executions.

This class provides a consistent interface for task results across the framework, enabling better error handling, performance tracking, and result processing.

Parameters:
status: TaskStatus
data: Any = None
error: Optional[str] = None
metadata: Dict[str, Any]
execution_time: float = 0.0
classmethod success(data=None, metadata=None, execution_time=0.0)[source]

Create a successful task result.

Parameters:
  • data (Any) – The result data

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with SUCCESS status

classmethod failure(error_message, metadata=None, execution_time=0.0)[source]

Create an error task result.

Parameters:
  • error_message (str) – Description of the error

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with ERROR status

classmethod partial(data=None, error=None, metadata=None, execution_time=0.0)[source]

Create a partial success task result.

Parameters:
  • data (Any) – The partial result data

  • error (Optional[str]) – Optional description of what failed

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with PARTIAL status

classmethod skipped(reason, metadata=None, execution_time=0.0)[source]

Create a skipped task result.

Parameters:
  • reason (str) – Reason why the task was skipped

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with SKIPPED status

is_success()[source]

Check if the task completed successfully.

Return type:

bool

Returns:

True if status is SUCCESS

is_error()[source]

Check if the task failed with an error.

Return type:

bool

Returns:

True if status is ERROR

is_partial()[source]

Check if the task completed with partial success.

Return type:

bool

Returns:

True if status is PARTIAL

is_skipped()[source]

Check if the task was skipped.

Return type:

bool

Returns:

True if status is SKIPPED

has_data()[source]

Check if the result contains data.

Return type:

bool

Returns:

True if data is not None

get_legacy_result()[source]

Get result in legacy format for backward compatibility.

This method extracts the data field to maintain compatibility with existing code that expects raw return values.

Return type:

Any

Returns:

The data field, or raises RuntimeError if task failed

add_metadata(key, value)[source]

Add metadata entry.

Parameters:
  • key (str) – Metadata key

  • value (Any) – Metadata value

Return type:

None

get_metadata(key, default=None)[source]

Get metadata value.

Parameters:
  • key (str) – Metadata key

  • default (Any) – Default value if key not found

Return type:

Any

Returns:

Metadata value or default

__str__()[source]

String representation of TaskResult.

Return type:

str

__init__(status, data=None, error=None, metadata=<factory>, execution_time=0.0)
Parameters:
wrap_legacy_result(result, execution_time=0.0)[source]

Wrap a legacy task result in TaskResult format.

This utility function helps with gradual migration by wrapping existing task results in the new standardized format.

Parameters:
  • result (Any) – Legacy task result (any type)

  • execution_time (float) – Task execution time in seconds

Return type:

TaskResult

Returns:

TaskResult with SUCCESS status containing the legacy result

unwrap_to_legacy(task_result)[source]

Unwrap TaskResult to legacy format for backward compatibility.

Parameters:

task_result (TaskResult) – TaskResult instance

Return type:

Any

Returns:

The data field from the TaskResult

Raises:

RuntimeError – If the task failed

Agent Run Result

Agent run result classes for standardized pipeline execution response format.

class AgentRunStatus[source]

Bases: Enum

Enumeration for agent execution status.

SUCCESS = 'success'
PARTIAL = 'partial'
ERROR = 'error'
RUNNING = 'running'
class AgentRunResult[source]

Bases: object

Standardized result format for agent pipeline executions.

This class provides a consistent interface for agent execution results, enabling better error handling, status checking, and result processing.

Note: Individual task results are accessible via agent.context.get_result(task_name)

Parameters:
status: AgentRunStatus
agent_name: str
total_tasks: int = 0
completed_tasks: int = 0
failed_tasks: int = 0
total_phases: int = 0
execution_time: float = 0.0
metadata: Dict[str, Any]
error_message: Optional[str] = None
classmethod success(agent_name, total_tasks, completed_tasks, total_phases, execution_time=0.0, metadata=None)[source]

Create a successful agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • total_tasks (int) – Total number of tasks in the pipeline

  • completed_tasks (int) – Number of tasks that completed successfully

  • total_phases (int) – Number of execution phases

  • execution_time (float) – Total execution time in seconds

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with SUCCESS status

classmethod partial(agent_name, total_tasks, completed_tasks, failed_tasks, total_phases, execution_time=0.0, metadata=None)[source]

Create a partial success agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • total_tasks (int) – Total number of tasks in the pipeline

  • completed_tasks (int) – Number of tasks that completed successfully

  • failed_tasks (int) – Number of tasks that failed

  • total_phases (int) – Number of execution phases

  • execution_time (float) – Total execution time in seconds

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with PARTIAL status

classmethod failure(agent_name, total_tasks, failed_tasks, error_message, total_phases=0, execution_time=0.0, metadata=None)[source]

Create a failed agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • total_tasks (int) – Total number of tasks in the pipeline

  • failed_tasks (int) – Number of tasks that failed

  • error_message (str) – Description of the error

  • total_phases (int) – Number of execution phases

  • execution_time (float) – Total execution time in seconds

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with ERROR status

classmethod running(agent_name, metadata=None)[source]

Create a running agent execution result.

Parameters:
  • agent_name (str) – Name of the agent

  • metadata (Optional[Dict[str, Any]]) – Optional metadata dictionary

Return type:

AgentRunResult

Returns:

AgentRunResult with RUNNING status

is_success()[source]

Check if the agent execution completed successfully.

Return type:

bool

Returns:

True if status is SUCCESS

is_error()[source]

Check if the agent execution failed with an error.

Return type:

bool

Returns:

True if status is ERROR

is_partial()[source]

Check if the agent execution completed with partial success.

Return type:

bool

Returns:

True if status is PARTIAL

is_running()[source]

Check if the agent is currently running.

Return type:

bool

Returns:

True if status is RUNNING

add_metadata(key, value)[source]

Add metadata entry.

Parameters:
  • key (str) – Metadata key

  • value (Any) – Metadata value

Return type:

None

get_metadata(key, default=None)[source]

Get metadata value.

Parameters:
  • key (str) – Metadata key

  • default (Any) – Default value if key not found

Return type:

Any

Returns:

Metadata value or default

__str__()[source]

String representation of AgentRunResult.

Return type:

str

__init__(status, agent_name, total_tasks=0, completed_tasks=0, failed_tasks=0, total_phases=0, execution_time=0.0, metadata=<factory>, error_message=None)
Parameters:

Task Context

TaskContext - Simplified shared data store for inter-task communication.

class TaskContext[source]

Bases: object

Simplified context for storing and accessing task results across pipeline execution.

Focuses on string-based data storage and simple dot notation path access. Designed for text-based automation workflows.

__init__()[source]

Initialize empty task context.

store_result(task_name, result)[source]

Store a task result in the context.

Parameters:
  • task_name (str) – Name of the task that produced the result

  • result (Dict[str, Any]) – The result data to store (dictionary with string keys)

Return type:

None

get_result(task_name)[source]

Get a task result from the context.

Parameters:

task_name (str) – Name of the task to get result for

Return type:

Optional[Dict[str, Any]]

Returns:

The stored result dictionary or None if not found

has_result(task_name)[source]

Check if a task result exists in the context.

Parameters:

task_name (str) – Name of the task to check

Return type:

bool

Returns:

True if result exists, False otherwise

get_path_value(path)[source]

Get a value from context using paths like ‘task.field’, ‘task.array[0]’, or ‘task.array[].field’.

Parameters:

path (str) – The path to resolve (e.g., ‘search.query’, ‘search.results[0].title’, ‘search.results[].url’)

Return type:

Optional[Any]

Returns:

The resolved value or None if path cannot be resolved

set_data(task_name, data)[source]

Set data for a task (alias for store_result for compatibility).

Parameters:
  • task_name (str) – Name of the task

  • data (Dict[str, Any]) – Data dictionary to store

Return type:

None

get_data(task_name)[source]

Get data for a task (alias for get_result for compatibility).

Parameters:

task_name (str) – Name of the task

Return type:

Optional[Dict[str, Any]]

Returns:

Task data dictionary or None if not found

record_task_started(task_name)[source]

Record that a task has started execution.

Parameters:

task_name (str) – Name of the task that started

Return type:

None

record_task_completed(task_name, result)[source]

Record that a task has completed successfully.

Parameters:
  • task_name (str) – Name of the task that completed

  • result (Dict[str, Any]) – The result produced by the task

Return type:

None

record_task_failed(task_name, error)[source]

Record that a task has failed.

Parameters:
  • task_name (str) – Name of the task that failed

  • error (str) – Error message describing the failure

Return type:

None

get_execution_history()[source]

Get the execution history of all tasks.

Return type:

List[Dict[str, Any]]

Returns:

List of execution history entries

get_completed_tasks()[source]

Get names of all completed tasks.

Return type:

List[str]

Returns:

List of task names that completed successfully

get_failed_tasks()[source]

Get names of all failed tasks.

Return type:

List[str]

Returns:

List of task names that failed

clear()[source]

Clear all stored results and execution history.

Return type:

None

get_all_results()[source]

Get all stored task results.

Return type:

Dict[str, Dict[str, Any]]

Returns:

Dictionary mapping task names to their result dictionaries

get_task_count()[source]

Get the number of tasks with stored results.

Return type:

int

Returns:

Number of tasks with results

__str__()[source]

String representation of the context.

Return type:

str

get_result_field(task_name, field_name, expected_type=str, default=None)[source]

Get a specific field from a task result with type checking.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:
  • task_name (str) – Name of the task to get result from

  • field_name (str) – Name of the field to extract

  • expected_type (Type[TypeVar(T)]) – Expected type of the field value (default: str)

  • default (Optional[TypeVar(T)]) – Default value to return if field not found or wrong type

Return type:

Optional[TypeVar(T)]

Returns:

The field value if found and correct type, otherwise default

get_result_content(task_name)[source]

Get the ‘content’ field from a task result as a string.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:

task_name (str) – Name of the task to get content from

Return type:

Optional[str]

Returns:

The content string if found and valid, otherwise None

has_result_content(task_name)[source]

Check if a task has non-empty content.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:

task_name (str) – Name of the task to check

Return type:

bool

Returns:

True if task has non-empty content field, False otherwise

display_result_content(task_name, title, field_name='content')[source]

Display task result content in a message box if present.

Ultra-safe: handles all edge cases internally including: - Task not completed - Missing or empty content - Any errors during display

No return value needed - this is a pure convenience method.

Parameters:
  • task_name (str) – Name of the task to display content from

  • title (str) – Title for the message box

  • field_name (str) – Name of the field to display (default: “content”)

Return type:

None

get_result_fields(task_name, *field_names)[source]

Get multiple fields from a task result at once.

Ultra-safe: handles all edge cases internally, never throws errors.

Parameters:
  • task_name (str) – Name of the task to get fields from

  • *field_names (str) – Names of the fields to extract

Return type:

tuple[Any, ...]

Returns:

Tuple of field values in the same order as field_names

display_completed_results(task_content_pairs)[source]

Display content for multiple completed tasks in one call.

Ultra-safe: handles all edge cases internally, only displays content for tasks that completed successfully and have valid content.

Parameters:

task_content_pairs (List[tuple[str, str]]) – List of (task_name, title) pairs to display

Return type:

None

__repr__()[source]

Detailed representation of the context.

Return type:

str

Task Dependencies

Declarative task dependency system for automated data flow in AI pipelines.

This module provides TaskDependency and DependencyResolver for creating automated data flow between tasks in PipelineAgent workflows. Dependencies are specified declaratively and resolved automatically, enabling complex data transformations and pipeline orchestration without manual data passing.

Key Features
  • Declarative Dependencies: Specify data flow using simple path syntax

  • Automatic Resolution: Dependencies resolved before task execution

  • Data Transformation: Optional transform functions for data preprocessing

  • Type Safety: Required vs optional dependencies with validation

  • Path Access: Flexible dot-notation path access for nested data

Dependency Flow
  1. Declaration: Tasks declare dependencies using source paths

  2. Resolution: DependencyResolver extracts data from TaskContext

  3. Transformation: Optional transform functions process data

  4. Injection: Resolved data added to task configuration

  5. Execution: Task runs with all dependencies satisfied

Quick Example

# Search task produces results
search_task = SearchTask("search", {"query": "AI news"})

# LLM task depends on search results
llm_task = LLMTask("summarize", {
    "prompt": "Summarize: ${articles}",
    "llm_provider": "openai"
}, dependencies=[
    TaskDependency("articles", "search.results", REQUIRED)
])

# Pipeline automatically resolves search.results -> articles

Path Syntax

Dependencies use dot notation to access nested data:

  • “task_name.field” - Access field in task result

  • “task_name.data.nested” - Access nested field

  • “task_name.results[].url” - Extract URLs from result list

  • “task_name.metadata.count” - Access metadata fields

Transform Functions

Use transform_func to preprocess dependency data:

TaskDependency(
    "urls",
    "search.results",
    REQUIRED,
    transform_func=lambda results: [r['url'] for r in results]
)

Dependency Types

  • REQUIRED: Task execution fails if dependency unavailable

  • OPTIONAL: Uses default_value if dependency unavailable

See also

  • TaskDependency: Individual dependency specification

  • DependencyResolver: Automatic dependency resolution engine

  • TaskContext: Shared data store for inter-task communication

  • Built-in transform functions: extract_urls_from_results, combine_article_content

class DependencyType[source]

Bases: Enum

Enumeration of dependency types for task data flow.

Dependency types control how missing dependencies are handled:

  • REQUIRED: Task execution fails if dependency cannot be resolved

  • OPTIONAL: Default value used if dependency unavailable

Example

# Required dependency - task fails if search_task not available
TaskDependency("query_results", "search_task.results", REQUIRED)

# Optional dependency - uses default if config_task unavailable
TaskDependency("settings", "config_task.options", OPTIONAL, default_value={})
REQUIRED = 'required'
OPTIONAL = 'optional'
class TaskDependency[source]

Bases: object

Specification for automatic data flow between tasks in a pipeline.

TaskDependency defines how a task receives data from other tasks in the pipeline. Dependencies are resolved automatically by the PipelineAgent before task execution, with resolved data added to the task’s configuration.

The dependency system supports flexible path-based data access, optional data transformation, and both required and optional dependencies with appropriate error handling.

Core Components:
  • name: Key in target task’s config where resolved data is stored

  • source_path: Dot-notation path to source data in TaskContext

  • dependency_type: REQUIRED or OPTIONAL behavior for missing data

  • transform_func: Optional function to process resolved data

  • default_value: Fallback value for optional dependencies

Path Syntax:
  • Source paths use dot notation to access nested data

  • “task.field” - Direct field access

  • “task.data.nested.value” - Nested object access

  • “task.results[].url” - Array element extraction

  • “task.metadata.statistics” - Metadata access

Transform Functions

Optional preprocessing of resolved data before injection:

def extract_titles(articles):
    return [article.get('title', 'Untitled') for article in articles]

TaskDependency(
    "titles",
    "search.results",
    REQUIRED,
    transform_func=extract_titles
)

Common Patterns

Basic data passing:

TaskDependency("search_results", "search_task.results", REQUIRED)

Data transformation:

TaskDependency(
    "article_urls",
    "search_task.results",
    REQUIRED,
    transform_func=lambda results: [r['url'] for r in results]
)

Optional configuration:

TaskDependency(
    "processing_options",
    "config_task.settings",
    OPTIONAL,
    default_value={"batch_size": 100, "timeout": 30}
)

Nested data access:

TaskDependency("api_endpoint", "config_task.api.endpoints.primary", REQUIRED)

Complex transformation:

def combine_and_filter(search_results):
    # Filter recent articles and combine text
    recent = [r for r in search_results if is_recent(r)]
    return ' '.join(r.get('content', '') for r in recent)

TaskDependency(
    "combined_content",
    "search_task.results",
    REQUIRED,
    transform_func=combine_and_filter
)

Error Handling

  • Required dependencies: Missing data causes task execution failure

  • Optional dependencies: Missing data uses default_value

  • Transform errors: Transform function exceptions cause dependency failure

  • Path errors: Invalid paths cause resolution failure with detailed messages

Best Practices

  • Use descriptive names that indicate the data being passed

  • Prefer specific paths over broad data passing

  • Use transform functions to shape data for consuming tasks

  • Provide meaningful default values for optional dependencies

  • Document complex transform functions for maintainability

Thread Safety

TaskDependency instances are immutable after creation and thread-safe. However, transform functions should be thread-safe if used in parallel execution.

See also

  • DependencyType: REQUIRED vs OPTIONAL dependency behavior

  • DependencyResolver: Automatic resolution engine

  • TaskContext: Source of dependency data

  • Built-in transform utilities: extract_urls_from_results, etc.

Parameters:
__init__(name, source_path, dependency_type, default_value=None, transform_func=None, override_existing=False, description='')[source]

Initialize a task dependency specification.

Creates a dependency that will be resolved automatically by the PipelineAgent before task execution. The resolved data will be injected into the task’s configuration under the specified name.

Note

  • Dependencies are resolved in the order they are defined

  • Transform functions should be deterministic and thread-safe

  • Source task must complete successfully for REQUIRED dependencies

  • Path resolution supports nested objects and array access

Parameters:
  • name (str) – Key name where resolved data will be stored in the target task’s config dictionary. Should be descriptive and follow naming conventions (snake_case recommended). This is how the consuming task will access the dependency data.

  • source_path (str) –

    Dot-notation path to the source data in TaskContext. Format: “source_task_name.field_path”. .. admonition:: Examples

    • ”search.results” - Access results field from search task

    • ”fetch.data.articles[].content” - Extract content from article array

    • ”config.api.endpoints.primary” - Access nested configuration

    • ”process.metadata.item_count” - Access metadata fields

  • dependency_type (DependencyType) –

    How to handle missing dependencies:

    • DependencyType.REQUIRED: Task execution fails if unavailable

    • DependencyType.OPTIONAL: Uses default_value if unavailable

  • default_value (Any) –

    Value to use when optional dependency is unavailable. Only relevant for OPTIONAL dependencies. Can be any type that makes sense for the consuming task. Common patterns:

    • Empty list: []

    • Empty dict: {}

    • Default config: {“timeout”: 30, “retries”: 3}

    • None: None (explicit null)

  • transform_func (Optional[Callable[[Any], Any]]) –

    Optional function to preprocess resolved data before injection. Function signature: (resolved_data: Any) -> Any. Common uses:

    • Extract specific fields: lambda x: [item[‘url’] for item in x]

    • Filter data: lambda x: [item for item in x if item[‘valid’]]

    • Combine data: lambda x: ‘ ‘.join(x)

    • Format data: lambda x: {“processed”: x, “count”: len(x)}

  • override_existing (bool) –

    Whether to override existing values in task config.

    • False (default): Only inject if key doesn’t exist in config

    • True: Always inject, overriding existing config values

    Use with caution as it can override user-provided configuration.

  • description (str) – Human-readable description of this dependency’s purpose. Helpful for documentation and debugging. Should explain what data is being passed and how it will be used.

Example:

Basic dependency:

# Pass search results to summarization task

TaskDependency(
    name="search_results",
    source_path="web_search.results",
    dependency_type=DependencyType.REQUIRED
)

Transformed dependency:

TaskDependency(
    name="article_urls",
    source_path="search.results",
    dependency_type=DependencyType.REQUIRED,
    transform_func=lambda results: [r['url'] for r in results],
    description="Extract URLs from search results for fetching"
)

Optional dependency with default:

TaskDependency(
    name="processing_config",
    source_path="config_loader.settings",
    dependency_type=DependencyType.OPTIONAL,
    default_value={"batch_size": 100, "parallel": True},
    description="Processing configuration with sensible defaults"
)
Raises:

ValueError – If name is empty, source_path is empty, or source_path doesn’t contain a dot (invalid format).

See also

  • DependencyType: For dependency type behavior

  • DependencyResolver: For resolution implementation details

  • Built-in transform functions: extract_urls_from_results, etc.

is_required()[source]

Check if this is a required dependency.

Return type:

bool

is_optional()[source]

Check if this is an optional dependency.

Return type:

bool

__str__()[source]

String representation of the dependency.

Return type:

str

__repr__()[source]

Detailed representation of the dependency.

Return type:

str

class DependencyResolver[source]

Bases: object

Resolves task dependencies from context and builds task configuration.

Parameters:

context (TaskContext)

__init__(context)[source]

Initialize dependency resolver with context.

Parameters:

context (TaskContext) – TaskContext to resolve dependencies from

resolve_dependencies(task)[source]

Resolve all dependencies for a task and return merged configuration.

Parameters:

task (BaseTask) – Task instance with get_dependencies() method

Return type:

Dict[str, Any]

Returns:

Dictionary with resolved dependencies merged with existing config

Raises:

ValueError – If required dependencies cannot be satisfied

validate_dependencies(task)[source]

Validate that all dependencies for a task can be satisfied.

Parameters:

task (BaseTask) – Task to validate dependencies for

Return type:

List[str]

Returns:

List of validation error messages (empty if all valid)

get_dependency_info(task)[source]

Get information about all dependencies for a task.

Parameters:

task (BaseTask) – Task to get dependency info for

Return type:

List[Dict[str, Any]]

Returns:

List of dependency information dictionaries

create_required_dependency(name, source_path, transform_func=None)[source]

Create a required dependency.

Parameters:
  • name (str) – Name of the dependency

  • source_path (str) – Context path to resolve

  • transform_func (Optional[Callable[[Any], Any]]) – Optional transformation function

Return type:

TaskDependency

Returns:

TaskDependency instance

create_optional_dependency(name, source_path, default_value, transform_func=None)[source]

Create an optional dependency with default value.

Parameters:
  • name (str) – Name of the dependency

  • source_path (str) – Context path to resolve

  • default_value (Any) – Default value if resolution fails

  • transform_func (Optional[Callable[[Any], Any]]) – Optional transformation function

Return type:

TaskDependency

Returns:

TaskDependency instance

extract_urls_from_results(search_results)[source]

Extract URLs from search results.

Parameters:

search_results (Dict[str, Any]) – Search results dictionary

Return type:

List[str]

Returns:

List of URLs

combine_article_content(articles, separator='\\n\\n')[source]

Combine content from multiple articles.

Parameters:
  • articles (List[Dict[str, Any]]) – List of article dictionaries

  • separator (str) – Separator between articles

Return type:

str

Returns:

Combined content string

format_search_query(keywords, filters=None)[source]

Format search query with optional filters.

Parameters:
  • keywords (str) – Base search keywords

  • filters (Optional[Dict[str, Any]]) – Optional filters to apply

Return type:

str

Returns:

Formatted search query

Tools and Utilities

Tool Registry

Tool registry for managing tool schemas and implementations.

class ToolRegistry[source]

Bases: object

Manages tool schemas and implementations with automatic generation.

Parameters:

tool_functions (List[Callable[..., Any]])

__init__(tool_functions)[source]

Initialize tool registry with list of decorated functions.

Parameters:

tool_functions (List[Callable[..., Any]]) – List of functions decorated with @tool

Raises:

ValueError – If any function is not decorated with @tool

get_tool_schemas()[source]

Get all tool schemas for LLM API.

Return type:

List[Dict[str, Any]]

Returns:

List of OpenAI-compatible tool schemas

get_tool_function(name)[source]

Get tool function by name.

Parameters:

name (str) – Name of the tool to retrieve

Return type:

Callable[..., Any]

Returns:

Tool function implementation

Raises:

ValueError – If tool is not found

get_tool_metadata(name)[source]

Get tool metadata by name.

Parameters:

name (str) – Name of the tool

Return type:

ToolMetadata

Returns:

ToolMetadata instance

Raises:

ValueError – If tool is not found

has_tool(name)[source]

Check if a tool exists in the registry.

Parameters:

name (str) – Name of the tool to check

Return type:

bool

Returns:

True if tool exists, False otherwise

list_tool_names()[source]

Get list of all registered tool names.

Return type:

List[str]

Returns:

List of tool names

get_tool_count()[source]

Get the number of registered tools.

Return type:

int

Returns:

Number of tools in registry

generate_tool_context()[source]

Generate context description for system prompt.

Return type:

str

Returns:

Formatted tool context for inclusion in system prompt

validate_all_tools()[source]

Validate all registered tools.

Return type:

Dict[str, bool]

Returns:

Dictionary mapping tool names to validation status

__str__()[source]

String representation of the tool registry.

Return type:

str

__repr__()[source]

Detailed string representation of the tool registry.

Return type:

str

Tool Executor

Tool execution engine with error handling and logging.

class ToolExecutor[source]

Bases: object

Handles tool execution with error handling and logging.

Parameters:
__init__(tool_registry, max_execution_time=30.0)[source]

Initialize tool executor.

Parameters:
  • tool_registry (ToolRegistry) – Registry containing tool implementations

  • max_execution_time (float) – Maximum time (seconds) to allow for tool execution

execute_tool(tool_name, arguments)[source]

Execute a tool and return result or error.

Parameters:
  • tool_name (str) – Name of the tool to execute

  • arguments (Dict[str, Any]) – Dictionary of arguments to pass to the tool

Returns:

  • success (bool): Whether execution was successful

  • result (Any): Tool result if successful

  • error (str): Error message if failed

  • execution_time (float): Time taken in seconds

  • tool_name (str): Name of executed tool

  • error_type (str): Type of error if failed

Return type:

Dictionary with execution result containing

execute_multiple_tools(tool_calls)[source]

Execute multiple tools in sequence.

Parameters:

tool_calls (List[Any]) – List of tool call objects (expected to be dictionaries with ‘name’ and ‘arguments’)

Return type:

List[Dict[str, Any]]

Returns:

List of execution results for each tool call

get_execution_stats()[source]

Get execution statistics for monitoring.

Return type:

Dict[str, Any]

Returns:

Dictionary with execution statistics

__str__()[source]

String representation of the tool executor.

Return type:

str

Tools

Tool decorator and metadata extraction for function calling.

class ToolMetadata[source]

Bases: object

Stores metadata for a tool function.

Parameters:
__init__(func, name, description)[source]
Parameters:
tool(func)[source]

Decorator to mark a function as a tool and extract metadata.

Parameters:

func (Callable[..., Any]) – Function to be marked as a tool

Return type:

Callable[..., Any]

Returns:

Decorated function with tool metadata attached

Example:
  • @tool

  • def calculate(expression – str) -> float: ‘’’Perform mathematical calculations.

    param expression:

    Mathematical expression to evaluate

    returns:

    Calculated result

    ‘’’ return eval(expression) # Use safe_eval in production

class ToolSchemaGenerator[source]

Bases: object

Generates OpenAI-compatible schemas from decorated functions.

static generate_schema(tool_func)[source]

Generate OpenAI function schema from tool metadata.

Parameters:

tool_func (Callable[..., Any]) – Function decorated with @tool

Return type:

Dict[str, Any]

Returns:

OpenAI-compatible function schema

Raises:

ValueError – If function is not decorated with @tool

static validate_tool_function(tool_func)[source]

Validate that a function is properly decorated as a tool.

Parameters:

tool_func (Callable[..., Any]) – Function to validate

Return type:

bool

Returns:

True if function is a valid tool, False otherwise

search_with_content(query, max_results=5, max_content_results=5, content_timeout=15, html_method='readability', skip_pdf=False)[source]

Search the web and automatically fetch content from the top results.

This tool provides enhanced search functionality that not only returns search results but also automatically fetches and extracts readable content from the top URLs. Perfect for research tasks where you need both search results and their content.

Parameters:
  • query (str) – The search query string to search for

  • max_results (int) – Maximum number of search results to return (1-10, default: 5)

  • max_content_results (int) – Maximum number of results to fetch content from (1-10, default: 5)

  • content_timeout (int) – Timeout in seconds for fetching content from each URL (default: 15)

  • html_method (str) – Method for HTML text extraction - “readability” or “basic” (default: “readability”)

  • skip_pdf (bool) – Whether to skip PDF files when fetching content (default: False)

Return type:

Dict[str, Any]

Returns:

Dictionary containing search results with fetched content including query, total_results, search_time, results list with title/url/snippet/content for each, and content_stats with attempted/successful/failed/skipped counts.

URL Fetcher

class URLFetcher[source]

Utility class for fetching web content with Chrome-like behavior.

Parameters:

config (Optional[Dict[str, Any]])

__init__(config=None)[source]

Initialize URL fetcher with optional configuration.

Parameters:

config (Optional[Dict[str, Any]]) –

Optional configuration dictionary with options:

  • timeout: Request timeout in seconds (default: 30)

  • user_agent: Custom user agent string

  • headers: Additional headers to include

  • follow_redirects: Whether to follow redirects (default: True)

  • max_redirects: Maximum number of redirects (default: 10)

fetch(url, **kwargs)[source]

Fetch content from a URL.

Parameters:
  • url (str) – URL to fetch

  • **kwargs (Any) – Additional arguments to pass to requests.get()

Returns:

  • url: Final URL after redirects

  • status_code: HTTP status code

  • headers: Response headers

  • content: Response content as text

  • content_type: Content type from headers

  • encoding: Character encoding

  • size: Content size in bytes

  • response_time: Time taken for request in seconds

Return type:

Dictionary containing

Raises:
fetch_headers_only(url, **kwargs)[source]

Fetch only headers from a URL using HEAD request.

Parameters:
  • url (str) – URL to fetch headers from

  • **kwargs (Any) – Additional arguments to pass to requests.head()

Returns:

  • url: Final URL after redirects

  • status_code: HTTP status code

  • headers: Response headers

  • content_type: Content type from headers

  • content_length: Content length if available

  • response_time: Time taken for request in seconds

Return type:

Dictionary containing

close()[source]

Close the session to free up resources.

Return type:

None

__enter__()[source]

Context manager entry.

Return type:

URLFetcher

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit.

Parameters:
Return type:

None

__str__()[source]

String representation.

Return type:

str