aipype - Core Framework
The core aipype package provides the foundation for building AI agent pipelines with declarative task orchestration and automatic dependency resolution.
Core Components
aipype: A modular AI agent framework with declarative pipeline-based task orchestration.
- class PipelineAgent[source]
Bases:
BasePipelineAgentDeclarative agent that discovers tasks from @task decorated methods.
PipelineAgent provides a cleaner, more Pythonic way to define AI pipelines. Instead of manually creating task objects and wiring dependencies, you simply define methods decorated with @task and let the framework infer the execution order from parameter names.
- Key Features:
Automatic task discovery from decorated methods
Dependency inference from function signatures
Support for explicit paths via Annotated[T, Depends(“path”)]
Full compatibility with existing execution engine
Parallel execution of independent tasks
- How It Works:
On initialization, discovers all @task decorated methods
Analyzes method signatures to infer dependencies
Topologically sorts methods by dependency order
Creates TaskWrapper instances for each method
Returns task list to parent BasePipelineAgent for execution
- The parent BasePipelineAgent handles all execution concerns:
Building execution phases
Resolving dependencies
Parallel execution
Error handling
Result collection
Example:
class ArticleAgent(PipelineAgent): @task def search(self) -> dict: # Access config directly for initial inputs return {"results": f"Results for {self.config['topic']}"} @task def summarize(self, search: dict) -> str: # 'search' parameter auto-wired from search task return f"Summary of: {search}" @task def write(self, summarize: str) -> str: return f"Article based on: {summarize}" agent = ArticleAgent("writer", {"topic": "AI trends"}) result = agent.run()
- name
Agent identifier
- config
Configuration dictionary
- tasks
List of TaskWrapper instances (populated by setup_tasks())
- context
TaskContext for inter-task communication
- class BasePipelineAgent[source]
Bases:
objectBase agent class for building AI workflows with automatic task orchestration.
BasePipelineAgent provides the foundation for creating AI automation workflows. It automatically handles task dependency resolution, parallel execution, error handling, and result collection. Users inherit from this class and implement setup_tasks() to define their workflow.
- Key Features
Automatic Dependency Resolution: Tasks receive data from previous tasks
Parallel Execution: Independent tasks run simultaneously for performance
Error Handling: Graceful handling of task failures with detailed reporting
Context Management: Shared data store for inter-task communication
Result Tracking: Comprehensive execution metrics and status reporting
- Configuration Options:
enable_parallel (bool): Enable parallel task execution (default: True)
max_parallel_tasks (int): Maximum concurrent tasks (default: 5)
stop_on_failure (bool): Stop pipeline on first failure (default: True)
- Lifecycle:
Initialization: Agent created with name and config
Task Setup: setup_tasks() called to define workflow
Execution: run() method orchestrates task execution
Results: AgentRunResult returned with status and metrics
Example
Basic agent implementation:
class DataProcessorAgent(BasePipelineAgent): def setup_tasks(self): return [ SearchTask("gather_data", { "query": self.config.get("search_query", ""), "max_results": 10 }), TransformTask("process_data", { "transform_function": process_search_results, "input_field": "results" }, dependencies=[ TaskDependency("results", "gather_data.results", REQUIRED) ]), FileSaveTask("save_results", { "file_path": "/tmp/results.json" }, dependencies=[ TaskDependency("data", "process_data.result", REQUIRED) ]) ] # Usage agent = DataProcessorAgent("processor", { "search_query": "machine learning trends", "enable_parallel": True }) result = agent.run() if result.is_success(): print(f"Processed {result.completed_tasks} tasks successfully") # Access individual task results search_result = agent.context.get_result("gather_data") processed_data = agent.context.get_result("process_data") else: print(f"Pipeline failed: {result.error_message}")
Advanced configuration:
agent = DataProcessorAgent("processor", { "search_query": "AI research", "enable_parallel": False, # Sequential execution "stop_on_failure": False, # Continue despite failures "max_parallel_tasks": 10 # Higher concurrency })
Return Values
run() returns AgentRunResult with:
status: SUCCESS, PARTIAL, ERROR, or RUNNING
completed_tasks/failed_tasks: Task completion counts
total_phases: Number of execution phases
execution_time: Total runtime in seconds
metadata: Additional execution information
Error Handling
Individual task failures are captured and reported
Pipeline can continue or stop based on stop_on_failure setting
Detailed error information available in execution context
Partial success supported for workflows with optional components
Thread Safety
TaskContext is thread-safe for concurrent task execution
Agent instance should not be shared between threads
Each agent execution creates isolated context
Performance
Tasks in same phase execute in parallel (when enabled)
Dependency resolution minimizes execution phases
ThreadPoolExecutor manages concurrent task execution
Configurable concurrency limits prevent resource exhaustion
- __init__(name, config=None)[source]
Initialize pipeline agent with name and configuration.
Creates a new pipeline agent instance with automatic task setup. The agent will call setup_tasks() during initialization to configure the workflow. TaskContext and DependencyResolver are created automatically.
Attributes Created:
name: Agent identifier
config: Configuration dictionary
tasks: List of tasks (populated by setup_tasks())
context: TaskContext for inter-task communication
dependency_resolver: Handles dependency resolution
execution_plan: Created during run() execution
- Parameters:
name (
str) – Unique identifier for this agent instance. Used in logging and result tracking. Should be descriptive of the agent’s purpose.config (
Optional[Dict[str,Any]]) –Configuration dictionary with agent settings:
enable_parallel (bool): Enable parallel task execution (default: True)
max_parallel_tasks (int): Maximum concurrent tasks (default: 5)
stop_on_failure (bool): Stop pipeline on first failure (default: True)
self.config. (Additional config keys are passed to tasks via)
Example:
agent = MyAgent("data_processor", { "enable_parallel": True, "max_parallel_tasks": 3, "input_file": "data.csv" }) print(f"Agent {agent.name} has {len(agent.tasks)} tasks")
Note
The agent automatically calls setup_tasks() during initialization. If setup_tasks() raises an exception, the agent will log a warning but continue with an empty task list.
See also
BaseTask: Base class for implementing custom tasks
TaskDependency: Declarative dependency specification
TaskContext: Shared data store for inter-task communication
AgentRunResult: Standardized execution result format
- create_context()[source]
Create and return the task context for this agent.
- Return type:
- Returns:
TaskContext instance
- display_results(sections=None)[source]
Display results with configurable sections.
This method provides a simple way to display agent results without writing complex display logic in each agent. Users can choose which sections to show.
- get_context()[source]
Get the task context for this agent.
- Return type:
- Returns:
TaskContext instance
- get_execution_plan()[source]
Get the current execution plan.
- Return type:
- Returns:
TaskExecutionPlan instance or None if not yet created
- run()[source]
Execute the complete workflow with automatic task orchestration.
This is the main execution method that runs the entire pipeline:
Builds execution plan from task dependencies
Resolves dependencies and updates task configurations
Executes tasks in phases (parallel within phases, sequential between phases)
Collects results and handles errors
Returns comprehensive execution status
The method handles all aspects of task execution including dependency resolution, parallel execution, error propagation, and result collection. Individual task failures are captured and the pipeline can continue based on the stop_on_failure configuration.
The method returns AgentRunResult containing:
- status:
SUCCESS (all tasks completed),
PARTIAL (some failed),
ERROR (all failed),
RUNNING (already executing)
completed_tasks: Number of successfully completed tasks
failed_tasks: Number of tasks that failed
total_phases: Number of execution phases used
execution_time: Total runtime in seconds
metadata: Additional execution information
Exceptions
No exceptions are raised directly. All operational errors are captured in the returned AgentRunResult. Only programming errors (like missing setup_tasks implementation) will raise exceptions.
Example
Basic execution and result checking:
agent = MyAgent("processor", config) result = agent.run() if result.is_success(): print(f"All {result.completed_tasks} tasks completed successfully") # Access individual task results search_data = agent.context.get_result("search_task") processed_data = agent.context.get_result("process_task") elif result.is_partial(): print(f"Partial success: {result.completed_tasks}/{result.total_tasks}") print(f"Failed tasks: {result.failed_tasks}") else: # result.is_error() print(f"Pipeline failed: {result.error_message}")
Performance monitoring:
result = agent.run() print(f"Execution time: {result.execution_time:.2f}s") print(f"Phases used: {result.total_phases}") # Check if parallel execution was effective if result.total_phases < len(agent.tasks): print("Parallel execution optimized the pipeline")
Execution Phases
Tasks are automatically organized into phases based on dependencies:
Phase 1: Tasks with no dependencies
Phase 2: Tasks depending only on Phase 1 tasks
Phase N: Tasks depending on previous phases only
Within each phase, tasks can execute in parallel (if enabled). Between phases, execution is sequential to maintain dependency order.
Error Handling
Individual task errors are captured in TaskResult objects
Pipeline can continue or stop based on stop_on_failure config
Partial success is supported when some tasks succeed
All errors are logged with detailed context information
Failed task dependencies propagate to dependent tasks
Thread Safety
This method is not thread-safe. Each agent instance should only execute one pipeline at a time. Concurrent task execution within the pipeline is handled internally with ThreadPoolExecutor.
Performance
Parallel execution significantly improves performance for independent tasks
Dependency resolution optimizes execution order
Thread pool size controlled by max_parallel_tasks config
Memory usage scales with number of concurrent tasks
Note
If the agent is already running, this method returns immediately with a RUNNING status to prevent concurrent execution conflicts.
See also
AgentRunResult: For detailed result format and status checking
TaskContext: Access via agent.context for individual task results
TaskExecutionPlan: For understanding execution phase organization
- Return type:
- Returns:
AgentRunResult
- setup_tasks()[source]
Define the workflow tasks for this agent. Must be implemented by subclasses.
This is the main method where users define their AI workflow by creating and configuring tasks with their dependencies. Tasks are automatically organized into execution phases based on their dependency relationships.
- Task Naming:
Use descriptive names that indicate the task’s purpose
Names must be unique within the agent
Use snake_case for consistency
Good: “fetch_articles”, “analyze_sentiment”, “save_results”
Avoid: “task1”, “t”, “process”
- Dependency Design:
Required dependencies must be satisfied for task to run
Optional dependencies use default values if source unavailable
Circular dependencies will cause pipeline execution to fail
Use transform_func in dependencies for data preprocessing
Note
This method is called automatically during agent initialization. Tasks should not be added to self.tasks directly - return them from this method instead.
See also
TaskDependency: For creating task dependencies
BaseTask subclasses: LLMTask, SearchTask, ConditionalTask, etc.
Task-specific documentation for configuration options
- Returns:
List of configured BaseTask instances that define the workflow.
- Return type:
List[BaseTask]
- Example:
Basic search and summarize workflow –
def setup_tasks(self): return [ SearchTask("search", { "query": self.config.get("topic", ""), "max_results": 5 }), LLMTask("summarize", { "prompt": "Summarize these articles: ${articles}", "llm_provider": "openai", "llm_model": "gpt-4" }, dependencies=[ TaskDependency("articles", "search.results", REQUIRED) ]) ]
Complex workflow with conditional logic –
def setup_tasks(self): return [ SearchTask("search", {"query": "AI news"}), TransformTask("filter", { "transform_function": filter_recent_articles, "input_field": "results" }, dependencies=[ TaskDependency("results", "search.results", REQUIRED) ]), ConditionalTask("quality_check", { "condition_function": lambda articles: len(articles) >= 3, "condition_inputs": ["filtered_articles"], "action_function": log_action("Quality check passed"), "else_function": log_action("Insufficient articles") }, dependencies=[ TaskDependency("filtered_articles", "filter.result", REQUIRED) ]), LLMTask("summarize", { "prompt": "Create summary from: ${content}", "llm_provider": "openai" }, dependencies=[ TaskDependency("content", "filter.result", REQUIRED) ]) ]
- Raises:
NotImplementedError – Must be implemented by subclasses
- class TaskExecutionPlan[source]
Bases:
objectExecution plan that organizes tasks into phases based on dependencies.
This class analyzes task dependencies and creates an optimal execution plan where:
Tasks with no dependencies run in the first phase
Tasks whose dependencies are satisfied run in subsequent phases
Tasks within the same phase can run in parallel
Phases execute sequentially to maintain dependency ordering
The plan automatically detects and rejects circular dependencies, ensuring all workflows can execute successfully.
- tasks
Original list of tasks to organize
- phases
List of task lists, each representing an execution phase
- Example:
Given tasks – A, B (depends on A), C (depends on A), D (depends on B,C)
Execution plan
* Phase 1 – [A] (no dependencies)
* Phase 2 – [B, C] (depend only on A, can run in parallel)
* Phase 3 – [D] (depends on B and C)
- Parameters:
- __init__(tasks)[source]
Create execution plan from list of tasks.
- Parameters:
tasks (
List[BaseTask]) – List of tasks to organize into execution phases- Raises:
ValueError – If circular dependencies detected or dependencies cannot be satisfied
- class BaseTask[source]
Bases:
ABCAbstract base class for all task implementations in the aipype framework.
BaseTask provides the foundation for creating custom tasks that can be orchestrated by PipelineAgent. It handles common concerns like validation, status tracking, error handling, and dependency management, allowing subclasses to focus on their specific functionality.
All tasks follow the TaskResult pattern where operational errors are returned as structured results rather than raised as exceptions. This enables graceful error handling and pipeline continuation.
- name
Unique identifier for this task instance
- config
Configuration dictionary with task-specific parameters
- dependencies
List of TaskDependency objects for data flow
- validation_rules
Optional validation rules for configuration
- agent_name
Name of the parent agent (set automatically)
- logger
Task-specific logger instance
- Abstract Methods:
run(): Must be implemented by subclasses to perform task work
Common Patterns
Configuration validation:
class MyTask(BaseTask): def __init__(self, name, config, dependencies=None): super().__init__(name, config, dependencies) self.validation_rules = { "required": ["api_key", "endpoint"], "optional": ["timeout", "retries"], "defaults": {"timeout": 30, "retries": 3}, "types": { "api_key": str, "endpoint": str, "timeout": int, "retries": int }, "ranges": { "timeout": (1, 300), "retries": (0, 10) } }
Error handling with TaskResult:
def run(self) -> TaskResult: start_time = datetime.now() # Always validate first validation_failure = self._validate_or_fail(start_time) if validation_failure: return validation_failure try: # Perform task work result = self.do_work() execution_time = (datetime.now() - start_time).total_seconds() return TaskResult.success( data=result, execution_time=execution_time, metadata={"processed_items": len(result)} ) except ApiError as e: # Expected operational error execution_time = (datetime.now() - start_time).total_seconds() return TaskResult.failure( error_message=f"API call failed: {str(e)}", execution_time=execution_time, metadata={"error_type": "ApiError", "retry_recommended": True} )
Working with dependencies:
class ProcessDataTask(BaseTask): def __init__(self, name, config, dependencies=None): super().__init__(name, config, dependencies) # Dependencies will be resolved automatically before run() def run(self) -> TaskResult: # Access resolved dependency data input_data = self.config.get("input_data") # From dependency processing_mode = self.config.get("mode", "default") # From config # Process the data...
Validation Rules
Tasks can define validation_rules to automatically validate their configuration. The validation system supports:
required: Keys that must be present
optional: Keys that are optional
defaults: Default values for missing optional keys
types: Expected Python types for values
ranges: (min, max) tuples for numeric validation
custom: Custom validation functions
Status Management
Task status is managed automatically:
NOT_STARTED: Initial state
RUNNING: During execution (set by framework)
SUCCESS: Task completed successfully
ERROR: Task failed with error
PARTIAL: Task completed with partial success
Thread Safety
Individual task instances are not thread-safe and should not be executed concurrently. However, different task instances can run in parallel safely.
- See Also:
TaskResult: Return format for task execution
TaskDependency: For creating task dependencies
PipelineAgent: For orchestrating multiple tasks
Validation documentation in utils.common module
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize a new task instance with configuration and dependencies.
Creates a new task with the specified configuration. The task will be initialized with default status and logging setup. Dependencies will be resolved automatically by the PipelineAgent before execution.
- Parameters:
name (
str) – Unique identifier for this task within the agent. Must be unique across all tasks in the same pipeline. Used for: * Logging identification * Dependency references (other_task.field) * Result storage in TaskContext * Status tracking and error reportingconfig (
Optional[Dict[str,Any]]) – Configuration dictionary containing task-specific parameters. The exact keys depend on the task implementation. Common patterns: * API credentials and endpoints * File paths and processing options * Behavioral flags and timeout values * Data transformation parameters Will be validated against validation_rules if defined.dependencies (
Optional[List[TaskDependency]]) – List of TaskDependency objects specifying how this task receives data from other tasks. Dependencies are resolved automatically before run() is called, with resolved data added to the config dictionary. Use TaskDependency.create_required() or TaskDependency.create_optional() for convenience.
- Attributes Initialized:
name: Task identifier
config: Configuration dictionary (may be updated by dependencies)
dependencies: List of dependency specifications
validation_rules: Set by subclasses for configuration validation
agent_name: Set automatically by PipelineAgent
logger: Task-specific logger (task.{name})
Status tracking fields for execution monitoring
Example
Basic task creation:
task = MyTask( name="process_data", config={ "input_file": "data.csv", "output_format": "json", "batch_size": 1000 } )
Task with dependencies:
task = TransformTask( name="process_results", config={ "transform_function": my_transform_func, "output_name": "processed_data" }, dependencies=[ TaskDependency("input_data", "search_task.results", REQUIRED), TaskDependency("options", "config_task.settings", OPTIONAL) ] ) Note: * Task names should be descriptive and use snake_case * Config validation occurs during run(), not during initialization * Dependencies are resolved by PipelineAgent, not by the task itself * Subclasses should call super().__init__() and set validation_rules See Also: * TaskDependency: For creating task dependencies * TaskResult: Return format from run() method * Validation rules documentation for config validation format
- create_task_result_from_current_state()[source]
Create a TaskResult object from current task state.
This method is useful for backward compatibility and migration purposes.
- Return type:
- get_dependencies()[source]
Get the list of dependencies for this task.
- Return type:
- Returns:
List of TaskDependency objects for this task.
- abstractmethod run()[source]
Execute the task and return a structured result.
This is the main method that subclasses must implement to perform their specific functionality. The method should follow the TaskResult pattern for error handling and return structured results.
Implementation Guidelines
Validation First: Always validate configuration before work:
def run(self) -> TaskResult: start_time = datetime.now() validation_failure = self._validate_or_fail(start_time) if validation_failure: return validation_failure
Error Handling: Use TaskResult pattern, not exceptions:
try: result = self.do_work() return TaskResult.success(data=result, execution_time=elapsed) except ExpectedError as e: return TaskResult.failure(error_message=str(e), execution_time=elapsed)
Timing: Always include execution timing:
start_time = datetime.now() # ... do work ... execution_time = (datetime.now() - start_time).total_seconds()
Metadata: Include useful execution information:
return TaskResult.success( data=result, execution_time=execution_time, metadata={ "processed_items": len(items), "api_calls": call_count, "cache_hits": cache_hits } )
Return Patterns
Success: All work completed successfully
Partial: Some work completed, some failed (recoverable)
Failure: Task could not complete due to errors
Skipped: Task was skipped due to conditions not being met
- A returned TaskResult contains:
status: SUCCESS, PARTIAL, ERROR, or SKIPPED
data: Result data (dict) if successful, None if failed
error: Error message if failed, None if successful
execution_time: Time taken to execute in seconds
metadata: Additional information about execution
Example Implementation
def run(self) -> TaskResult: start_time = datetime.now() # Validate configuration validation_failure = self._validate_or_fail(start_time) if validation_failure: return validation_failure try: # Extract configuration input_file = self.config["input_file"] output_format = self.config.get("output_format", "json") # Perform work data = self.process_file(input_file) formatted_data = self.format_output(data, output_format) execution_time = (datetime.now() - start_time).total_seconds() return TaskResult.success( data={"processed_data": formatted_data, "format": output_format}, execution_time=execution_time, metadata={"records_processed": len(data)} ) except FileNotFoundError: execution_time = (datetime.now() - start_time).total_seconds() return TaskResult.failure( error_message=f"Input file not found: {input_file}", execution_time=execution_time, metadata={"error_type": "FileNotFoundError"} )
Note
Never raise exceptions for operational errors
Always calculate and include execution_time
Use descriptive error messages that help users understand issues
Include relevant metadata for debugging and monitoring
- Return type:
- Returns:
TaskResult
See also
TaskResult: For understanding return value structure
_validate_or_fail(): For configuration validation pattern
Task-specific implementations for concrete examples
- set_context(context)[source]
Set the task context for dependency resolution.
- Parameters:
context (
TaskContext) – TaskContext instance for resolving dependencies- Return type:
Note
Default implementation does nothing. Override in subclasses that use context.
- task(func)[source]
Mark a method as a pipeline task with automatic dependency inference.
The @task decorator marks methods in a PipelineAgent as pipeline tasks. Dependencies are automatically inferred from the method’s parameter names - if a parameter name matches another task’s name, it becomes a dependency.
- Parameters:
func (
TypeVar(F, bound=Callable[...,Any])) – The method to mark as a task- Return type:
- Returns:
The decorated method with task metadata attached
Example:
class MyAgent(PipelineAgent): @task def fetch_data(self) -> dict: return search("AI news", max_results=5) @task def process(self, fetch_data: dict) -> str: # fetch_data parameter auto-wired from fetch_data task return llm(f"Summarize: {fetch_data}", model="gpt-4o")
- class Depends[source]
Bases:
objectSpecify an explicit dependency path for a task parameter.
Use with typing.Annotated to specify the exact path to resolve from the task context, rather than using the default inference.
- path
Dot-notation path to the dependency value (e.g., “task_name.field”)
- Example:
from typing import Annotated
@task
def write_article( – self, # Explicitly get .content field instead of full task output outline: Annotated[str, Depends(“generate_outline.content”)], summary: Annotated[str, Depends(“summarize.content”)]
) -> str – return llm(f”Write from: {outline}”, model=”gpt-4o”)
- Parameters:
path (
str)
- path
- classmethod __class_getitem__(path)[source]
Allow Depends[“path”] syntax as alternative to Depends(“path”).
- __init__(path)[source]
Initialize with a dependency path.
- Parameters:
path (
str) – Dot-notation path like “task_name.field.nested”- Raises:
ValueError – If path is empty or doesn’t contain a dot
- class TaskWrapper[source]
Bases:
BaseTaskWraps a decorated @task method as a BaseTask.
TaskWrapper bridges the gap between the new decorator-based syntax and the existing execution engine. It:
Stores a reference to the decorated method
Builds kwargs from resolved dependencies
Calls the method and processes return values
Handles delegation to returned BaseTask instances
This allows the existing TaskExecutionPlan and DependencyResolver to work without modification.
- method
The bound method to call
- agent
Reference to the parent agent
- context_instance
TaskContext for dependency resolution
Example:
class MyAgent(PipelineAgent): @task def fetch_data(self) -> dict: return {"data": "value"} @task def process(self, fetch_data: dict) -> str: return llm(f"Process: {fetch_data}") # TaskWrapper is created automatically for each @task method # and handles the execution and return value processing
- run()[source]
Execute the wrapped method and process its return value.
This method: 1. Builds kwargs from resolved dependencies in self.config 2. Calls the decorated method with those kwargs 3. Processes the return value based on its type
- Return type:
- Returns:
TaskResult with the execution result
- set_context(context)[source]
Set the task context for dependency resolution.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
- llm(prompt, *, model='gpt-4o', provider='openai', system=None, temperature=0.7, max_tokens=1000, tools=None, response_format=None, timeout=60.0, **kwargs)[source]
Create an LLMTask with a clean, intuitive API.
This helper simplifies LLMTask creation by providing sensible defaults and a more intuitive parameter interface. Designed to be used within @task decorated methods - returns LLMTask for delegation by TaskWrapper.
- Parameters:
prompt (
str) – The prompt to send to the LLMmodel (
str) – Model identifier (default: “gpt-4o”)provider (
str) – LLM provider (default: “openai”)temperature (
float) – Randomness control 0.0-2.0 (default: 0.7)max_tokens (
int) – Maximum response tokens (default: 1000)tools (
Optional[List[Union[Callable[...,Any],Dict[str,Any]]]]) – List of @tool functions and/or MCP server configsresponse_format (
Union[Type[BaseModel],Dict[str,Any],None]) – Pydantic model or JSON schema for structured outputtimeout (
float) – Request timeout in seconds (default: 60)**kwargs (
Any) – Additional LLMTask configuration
- Return type:
- Returns:
Configured LLMTask ready for delegation
Example:
@task def summarize(self, articles: list) -> str: return llm( prompt=f"Summarize these articles: {articles}", model="gpt-4o", temperature=0.3, max_tokens=500 )
Example with structured output and tools:
@task def analyze(self, data: dict) -> AnalysisResult: return llm( prompt=f"Analyze: {data}", model="gpt-4o", response_format=AnalysisResult, tools=[calculator, mcp_server("brave", "https://mcp.brave.com")] )
- search(query, *, max_results=5, **kwargs)[source]
Create a SearchTask with a clean API.
- Parameters:
- Return type:
- Returns:
Configured SearchTask ready for delegation
Example:
@task def find_articles(self) -> dict: topic = self.config.get("topic", "AI") return search(f"{topic} latest news", max_results=10)
- mcp_server(label, url, *, allowed_tools=None, require_approval='never')[source]
Create MCP server configuration for use with llm().
Model Context Protocol (MCP) servers provide external tools that can be called by LLMs. This helper creates the configuration dict needed by LLMTask.
- Parameters:
- Return type:
- Returns:
MCP configuration dict for use in tools parameter
Example:
@task def research(self, topic: str) -> str: return llm( prompt=f"Research {topic} thoroughly", model="claude-sonnet-4", provider="anthropic", tools=[ mcp_server("brave", "https://mcp.brave.com"), mcp_server("github", "https://mcp.github.com", allowed_tools=["search_repos", "get_file"]) ] )
- transform(input_data, fn, *, output_name='result')[source]
Apply a transformation function and return result as dict.
Unlike returning a TransformTask, this helper executes the transformation immediately and returns the result. Use this for simple data transformations within @task methods.
- Parameters:
- Return type:
- Returns:
Dict with transformed data under output_name and ‘data’ keys
Example:
@task def extract_urls(self, search_results: dict) -> dict: results = search_results.get("results", []) return transform( results, fn=lambda r: [item["url"] for item in r] )
- class TaskResult[source]
Bases:
objectStandardized result format for all task executions.
This class provides a consistent interface for task results across the framework, enabling better error handling, performance tracking, and result processing.
- Parameters:
- __init__(status, data=None, error=None, metadata=<factory>, execution_time=0.0)
- classmethod failure(error_message, metadata=None, execution_time=0.0)[source]
Create an error task result.
- get_legacy_result()[source]
Get result in legacy format for backward compatibility.
This method extracts the data field to maintain compatibility with existing code that expects raw return values.
- Return type:
- Returns:
The data field, or raises RuntimeError if task failed
- has_data()[source]
Check if the result contains data.
- Return type:
- Returns:
True if data is not None
- is_error()[source]
Check if the task failed with an error.
- Return type:
- Returns:
True if status is ERROR
- is_partial()[source]
Check if the task completed with partial success.
- Return type:
- Returns:
True if status is PARTIAL
- is_skipped()[source]
Check if the task was skipped.
- Return type:
- Returns:
True if status is SKIPPED
- is_success()[source]
Check if the task completed successfully.
- Return type:
- Returns:
True if status is SUCCESS
- classmethod partial(data=None, error=None, metadata=None, execution_time=0.0)[source]
Create a partial success task result.
- classmethod skipped(reason, metadata=None, execution_time=0.0)[source]
Create a skipped task result.
- classmethod success(data=None, metadata=None, execution_time=0.0)[source]
Create a successful task result.
-
status:
TaskStatus
- class TaskStatus[source]
Bases:
EnumEnumeration for task execution status.
- NOT_STARTED = 'not_started'
- STARTED = 'started'
- SUCCESS = 'success'
- ERROR = 'error'
- SKIPPED = 'skipped'
- PARTIAL = 'partial'
- wrap_legacy_result(result, execution_time=0.0)[source]
Wrap a legacy task result in TaskResult format.
This utility function helps with gradual migration by wrapping existing task results in the new standardized format.
- Parameters:
- Return type:
- Returns:
TaskResult with SUCCESS status containing the legacy result
- unwrap_to_legacy(task_result)[source]
Unwrap TaskResult to legacy format for backward compatibility.
- Parameters:
task_result (
TaskResult) – TaskResult instance- Return type:
- Returns:
The data field from the TaskResult
- Raises:
RuntimeError – If the task failed
- class AgentRunResult[source]
Bases:
objectStandardized result format for agent pipeline executions.
This class provides a consistent interface for agent execution results, enabling better error handling, status checking, and result processing.
Note: Individual task results are accessible via agent.context.get_result(task_name)
- Parameters:
- __init__(status, agent_name, total_tasks=0, completed_tasks=0, failed_tasks=0, total_phases=0, execution_time=0.0, metadata=<factory>, error_message=None)
- classmethod failure(agent_name, total_tasks, failed_tasks, error_message, total_phases=0, execution_time=0.0, metadata=None)[source]
Create a failed agent execution result.
- Parameters:
agent_name (
str) – Name of the agenttotal_tasks (
int) – Total number of tasks in the pipelinefailed_tasks (
int) – Number of tasks that failederror_message (
str) – Description of the errortotal_phases (
int) – Number of execution phasesexecution_time (
float) – Total execution time in secondsmetadata (
Optional[Dict[str,Any]]) – Optional metadata dictionary
- Return type:
- Returns:
AgentRunResult with ERROR status
- is_error()[source]
Check if the agent execution failed with an error.
- Return type:
- Returns:
True if status is ERROR
- is_partial()[source]
Check if the agent execution completed with partial success.
- Return type:
- Returns:
True if status is PARTIAL
- is_running()[source]
Check if the agent is currently running.
- Return type:
- Returns:
True if status is RUNNING
- is_success()[source]
Check if the agent execution completed successfully.
- Return type:
- Returns:
True if status is SUCCESS
- classmethod partial(agent_name, total_tasks, completed_tasks, failed_tasks, total_phases, execution_time=0.0, metadata=None)[source]
Create a partial success agent execution result.
- Parameters:
agent_name (
str) – Name of the agenttotal_tasks (
int) – Total number of tasks in the pipelinecompleted_tasks (
int) – Number of tasks that completed successfullyfailed_tasks (
int) – Number of tasks that failedtotal_phases (
int) – Number of execution phasesexecution_time (
float) – Total execution time in secondsmetadata (
Optional[Dict[str,Any]]) – Optional metadata dictionary
- Return type:
- Returns:
AgentRunResult with PARTIAL status
- classmethod success(agent_name, total_tasks, completed_tasks, total_phases, execution_time=0.0, metadata=None)[source]
Create a successful agent execution result.
- Parameters:
agent_name (
str) – Name of the agenttotal_tasks (
int) – Total number of tasks in the pipelinecompleted_tasks (
int) – Number of tasks that completed successfullytotal_phases (
int) – Number of execution phasesexecution_time (
float) – Total execution time in secondsmetadata (
Optional[Dict[str,Any]]) – Optional metadata dictionary
- Return type:
- Returns:
AgentRunResult with SUCCESS status
-
status:
AgentRunStatus
- class AgentRunStatus[source]
Bases:
EnumEnumeration for agent execution status.
- SUCCESS = 'success'
- PARTIAL = 'partial'
- ERROR = 'error'
- RUNNING = 'running'
- class TaskContext[source]
Bases:
objectSimplified context for storing and accessing task results across pipeline execution.
Focuses on string-based data storage and simple dot notation path access. Designed for text-based automation workflows.
- display_completed_results(task_content_pairs)[source]
Display content for multiple completed tasks in one call.
Ultra-safe: handles all edge cases internally, only displays content for tasks that completed successfully and have valid content.
- display_result_content(task_name, title, field_name='content')[source]
Display task result content in a message box if present.
Ultra-safe: handles all edge cases internally including: - Task not completed - Missing or empty content - Any errors during display
No return value needed - this is a pure convenience method.
- get_path_value(path)[source]
Get a value from context using paths like ‘task.field’, ‘task.array[0]’, or ‘task.array[].field’.
- get_result_content(task_name)[source]
Get the ‘content’ field from a task result as a string.
Ultra-safe: handles all edge cases internally, never throws errors.
- get_result_field(task_name, field_name, expected_type=str, default=None)[source]
Get a specific field from a task result with type checking.
Ultra-safe: handles all edge cases internally, never throws errors.
- Parameters:
- Return type:
- Returns:
The field value if found and correct type, otherwise default
- get_result_fields(task_name, *field_names)[source]
Get multiple fields from a task result at once.
Ultra-safe: handles all edge cases internally, never throws errors.
- get_task_count()[source]
Get the number of tasks with stored results.
- Return type:
- Returns:
Number of tasks with results
- has_result_content(task_name)[source]
Check if a task has non-empty content.
Ultra-safe: handles all edge cases internally, never throws errors.
- class TaskDependency[source]
Bases:
objectSpecification for automatic data flow between tasks in a pipeline.
TaskDependency defines how a task receives data from other tasks in the pipeline. Dependencies are resolved automatically by the PipelineAgent before task execution, with resolved data added to the task’s configuration.
The dependency system supports flexible path-based data access, optional data transformation, and both required and optional dependencies with appropriate error handling.
- Core Components:
name: Key in target task’s config where resolved data is stored
source_path: Dot-notation path to source data in TaskContext
dependency_type: REQUIRED or OPTIONAL behavior for missing data
transform_func: Optional function to process resolved data
default_value: Fallback value for optional dependencies
- Path Syntax:
Source paths use dot notation to access nested data
“task.field” - Direct field access
“task.data.nested.value” - Nested object access
“task.results[].url” - Array element extraction
“task.metadata.statistics” - Metadata access
Transform Functions
Optional preprocessing of resolved data before injection:
def extract_titles(articles): return [article.get('title', 'Untitled') for article in articles] TaskDependency( "titles", "search.results", REQUIRED, transform_func=extract_titles )
Common Patterns
Basic data passing:
TaskDependency("search_results", "search_task.results", REQUIRED)
Data transformation:
TaskDependency( "article_urls", "search_task.results", REQUIRED, transform_func=lambda results: [r['url'] for r in results] )
Optional configuration:
TaskDependency( "processing_options", "config_task.settings", OPTIONAL, default_value={"batch_size": 100, "timeout": 30} )
Nested data access:
TaskDependency("api_endpoint", "config_task.api.endpoints.primary", REQUIRED)
Complex transformation:
def combine_and_filter(search_results): # Filter recent articles and combine text recent = [r for r in search_results if is_recent(r)] return ' '.join(r.get('content', '') for r in recent) TaskDependency( "combined_content", "search_task.results", REQUIRED, transform_func=combine_and_filter )
Error Handling
Required dependencies: Missing data causes task execution failure
Optional dependencies: Missing data uses default_value
Transform errors: Transform function exceptions cause dependency failure
Path errors: Invalid paths cause resolution failure with detailed messages
Best Practices
Use descriptive names that indicate the data being passed
Prefer specific paths over broad data passing
Use transform functions to shape data for consuming tasks
Provide meaningful default values for optional dependencies
Document complex transform functions for maintainability
Thread Safety
TaskDependency instances are immutable after creation and thread-safe. However, transform functions should be thread-safe if used in parallel execution.
See also
DependencyType: REQUIRED vs OPTIONAL dependency behavior
DependencyResolver: Automatic resolution engine
TaskContext: Source of dependency data
Built-in transform utilities: extract_urls_from_results, etc.
- Parameters:
- __init__(name, source_path, dependency_type, default_value=None, transform_func=None, override_existing=False, description='')[source]
Initialize a task dependency specification.
Creates a dependency that will be resolved automatically by the PipelineAgent before task execution. The resolved data will be injected into the task’s configuration under the specified name.
Note
Dependencies are resolved in the order they are defined
Transform functions should be deterministic and thread-safe
Source task must complete successfully for REQUIRED dependencies
Path resolution supports nested objects and array access
- Parameters:
name (
str) – Key name where resolved data will be stored in the target task’s config dictionary. Should be descriptive and follow naming conventions (snake_case recommended). This is how the consuming task will access the dependency data.source_path (
str) –Dot-notation path to the source data in TaskContext. Format: “source_task_name.field_path”. .. admonition:: Examples
”search.results” - Access results field from search task
”fetch.data.articles[].content” - Extract content from article array
”config.api.endpoints.primary” - Access nested configuration
”process.metadata.item_count” - Access metadata fields
dependency_type (
DependencyType) –How to handle missing dependencies:
DependencyType.REQUIRED: Task execution fails if unavailable
DependencyType.OPTIONAL: Uses default_value if unavailable
default_value (
Any) –Value to use when optional dependency is unavailable. Only relevant for OPTIONAL dependencies. Can be any type that makes sense for the consuming task. Common patterns:
Empty list: []
Empty dict: {}
Default config: {“timeout”: 30, “retries”: 3}
None: None (explicit null)
transform_func (
Optional[Callable[[Any],Any]]) –Optional function to preprocess resolved data before injection. Function signature: (resolved_data: Any) -> Any. Common uses:
Extract specific fields: lambda x: [item[‘url’] for item in x]
Filter data: lambda x: [item for item in x if item[‘valid’]]
Combine data: lambda x: ‘ ‘.join(x)
Format data: lambda x: {“processed”: x, “count”: len(x)}
override_existing (
bool) –Whether to override existing values in task config.
False (default): Only inject if key doesn’t exist in config
True: Always inject, overriding existing config values
Use with caution as it can override user-provided configuration.
description (
str) – Human-readable description of this dependency’s purpose. Helpful for documentation and debugging. Should explain what data is being passed and how it will be used.
Example:
Basic dependency:
# Pass search results to summarization task TaskDependency( name="search_results", source_path="web_search.results", dependency_type=DependencyType.REQUIRED )
Transformed dependency:
TaskDependency( name="article_urls", source_path="search.results", dependency_type=DependencyType.REQUIRED, transform_func=lambda results: [r['url'] for r in results], description="Extract URLs from search results for fetching" )
Optional dependency with default:
TaskDependency( name="processing_config", source_path="config_loader.settings", dependency_type=DependencyType.OPTIONAL, default_value={"batch_size": 100, "parallel": True}, description="Processing configuration with sensible defaults" )
- Raises:
ValueError – If name is empty, source_path is empty, or source_path doesn’t contain a dot (invalid format).
See also
DependencyType: For dependency type behavior
DependencyResolver: For resolution implementation details
Built-in transform functions: extract_urls_from_results, etc.
- class DependencyType[source]
Bases:
EnumEnumeration of dependency types for task data flow.
Dependency types control how missing dependencies are handled:
REQUIRED: Task execution fails if dependency cannot be resolved
OPTIONAL: Default value used if dependency unavailable
Example
# Required dependency - task fails if search_task not available TaskDependency("query_results", "search_task.results", REQUIRED) # Optional dependency - uses default if config_task unavailable TaskDependency("settings", "config_task.options", OPTIONAL, default_value={})
- REQUIRED = 'required'
- OPTIONAL = 'optional'
- class DependencyResolver[source]
Bases:
objectResolves task dependencies from context and builds task configuration.
- Parameters:
context (
TaskContext)
- __init__(context)[source]
Initialize dependency resolver with context.
- Parameters:
context (
TaskContext) – TaskContext to resolve dependencies from
- resolve_dependencies(task)[source]
Resolve all dependencies for a task and return merged configuration.
- Parameters:
task (
BaseTask) – Task instance with get_dependencies() method- Return type:
- Returns:
Dictionary with resolved dependencies merged with existing config
- Raises:
ValueError – If required dependencies cannot be satisfied
- create_required_dependency(name, source_path, transform_func=None)[source]
Create a required dependency.
- create_optional_dependency(name, source_path, default_value, transform_func=None)[source]
Create an optional dependency with default value.
- combine_article_content(articles, separator='\\n\\n')[source]
Combine content from multiple articles.
- class LLMTask[source]
Bases:
BaseTaskAdvanced LLM integration task with template substitution and tool calling.
LLMTask provides sophisticated Large Language Model integration for AI pipelines. It supports dynamic prompt generation through template substitution, automatic function/tool calling, and seamless integration with multiple LLM providers.
The task automatically resolves template variables from dependency data and manages the complete LLM interaction lifecycle including request formatting, response parsing, tool execution, and error handling.
- Core Capabilities:
Dynamic Prompts: ${variable} template substitution from dependencies
Tool Integration: Automatic function calling with @tool decorated functions
Multi-Provider: Support for 50+ LLM providers via litellm
Response Processing: Structured response parsing and validation
Error Recovery: Graceful handling of API failures and timeouts
Usage Tracking: Detailed token usage and cost monitoring
- Template Substitution:
- Templates use ${variable_name} syntax and are resolved from:
Task dependencies (via TaskDependency)
Task configuration values
Environment variables (as fallback)
- Template fields support:
prompt/prompt_template: Main user message
context: System/context message
role: User role description
- Tool Calling Workflow:
Functions decorated with @tool are automatically registered
LLM decides which tools to call based on prompt
Tools execute with validated parameters
Results injected back into conversation
LLM continues with tool results
Provider Configuration
Different providers require different configuration:
OpenAI:
{ "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.7, "max_tokens": 1000 }
Ollama (local):
{ "llm_provider": "ollama", "llm_model": "llama2", "temperature": 0.8 }
Response Structure
Successful responses include:
{ "content": "Generated text response", "usage": { "prompt_tokens": 150, "completion_tokens": 300, "total_tokens": 450 }, "tool_calls": [ { "tool_name": "search_web", "arguments": {"query": "AI trends"}, "result": {"results": [...]}, "success": True } ], "model": "gpt-4", "provider": "openai", "metadata": { "response_time": 2.34, "template_variables": ["topic", "domain"] } }
Common Usage Patterns
Content generation with templates:
LLMTask("writer", { "prompt": "Write a ${length} article about ${topic} for ${audience}", "context": "You are an expert ${field} writer", "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.7 }, dependencies=[ TaskDependency("topic", "input.topic", REQUIRED), TaskDependency("length", "config.article_length", OPTIONAL, default_value="brief"), TaskDependency("audience", "config.target_audience", REQUIRED), TaskDependency("field", "config.expertise_field", REQUIRED) ])
Research with tool calling:
@tool def search_academic_papers(query: str, limit: int = 5) -> List[Dict]: # Search implementation return papers LLMTask("researcher", { "prompt": "Research ${topic} and provide comprehensive analysis", "tools": [search_academic_papers], "llm_provider": "anthropic", "llm_model": "claude-3-opus", "tool_choice": "auto", "max_tokens": 3000 })
Data analysis and reasoning:
LLMTask("analyzer", { "prompt": "Analyze this data and provide insights: ${data}", "context": "You are a data scientist specializing in ${domain}", "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.3, # Lower temperature for analytical tasks "max_tokens": 2000 }, dependencies=[ TaskDependency("data", "processor.results", REQUIRED), TaskDependency("domain", "config.analysis_domain", REQUIRED) ])
Error Handling
Common failure scenarios and recovery:
API Errors: Network failures, authentication issues
Rate Limits: Automatic retry with exponential backoff
Invalid Models: Clear error messages for unsupported models
Tool Failures: Individual tool errors don’t fail entire task
Template Errors: Missing variables result in clear error messages
Performance Considerations
Use appropriate max_tokens to control costs and latency
Lower temperature (0.1-0.3) for factual/analytical tasks
Higher temperature (0.7-1.0) for creative tasks
Consider model capabilities vs cost (gpt-3.5-turbo vs gpt-4)
Use tool_choice=”none” to disable tool calling when not needed
See also
@tool: For creating callable functions
TaskDependency: For template variable injection
litellm: Underlying provider abstraction
Tool calling examples in the examples package
- Parameters:
- DEFAULT_MAX_TOKENS = 1000
- DEFAULT_TEMPERATURE = 0.7
- DEFAULT_TIMEOUT = 60
- REQUIRED_CONFIGS = ['llm_provider', 'llm_model']
- __init__(name, config=None, dependencies=None)[source]
Initialize an advanced LLM task with template and tool support.
Creates a new LLM task that can automatically substitute template variables from dependencies, execute function calls, and interact with multiple LLM providers. The task handles the complete LLM interaction lifecycle.
- Parameters:
name (
str) – Unique identifier for this task within the pipeline. Used for logging, dependency references, and result storage.config (
Optional[Dict[str,Any]]) –LLM configuration dictionary with the following keys:
Required:
llm_provider (str): LLM provider name. Supported values include: “openai”, “anthropic”, “ollama”, “azure”, “google”, “cohere”, etc.
llm_model (str): Model identifier specific to the provider:
OpenAI: “gpt-4”, “gpt-3.5-turbo”, “gpt-4-turbo-preview”
Anthropic: “claude-3-opus”, “claude-3-sonnet”, “claude-3-haiku”
Ollama: “llama2”, “mistral”, “codellama”
Google: “gemini-pro”, “gemini-pro-vision”
Prompt Configuration:
prompt OR prompt_template (str): Main user message with ${var} templates
context (str, optional): System/context message (supports templates)
role (str, optional): User role description (supports templates)
Generation Parameters:
temperature (float, 0.0-2.0): Randomness control (default: 0.7)
max_tokens (int): Maximum response tokens (default: 1000)
timeout (float): Request timeout in seconds (default: 60)
Tool Calling:
tools (List[Callable | Dict], optional): List of @tool decorated functions and/or MCP server configurations. Each item can be:
Python function: Decorated with @tool for local execution
MCP config dict: {“type”: “mcp”, “server_label”: “…”, “server_url”: “…”}
tool_choice (str, optional): “auto”, “none”, or specific tool name
parallel_tool_calls (bool): Enable parallel tool execution (default: True)
max_tool_execution_time (float): Tool timeout in seconds (default: 30)
Provider-Specific:
api_key (str, optional): API key (can also use environment variables)
api_base (str, optional): Custom API endpoint (for Azure, local deployments)
api_version (str, optional): API version (for Azure OpenAI)
dependencies (
Optional[List[TaskDependency]]) – List of TaskDependency objects for template variable injection. Template variables in prompt, context, and role fields will be automatically replaced with resolved dependency values.
Examples
Basic text generation
LLMTask("summarizer", { "prompt": "Summarize this text: ${content}", "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.3, "max_tokens": 500 }, dependencies=[ TaskDependency("content", "input.text", REQUIRED) ])
Advanced with tools and context:
@tool def web_search(query: str) -> Dict[str, Any]: return {"results": search_engine.search(query)} LLMTask("researcher", { "prompt": "Research ${topic} and provide analysis", "context": "You are an expert ${field} researcher with access to web search", "llm_provider": "anthropic", "llm_model": "claude-3-opus", "tools": [web_search], "tool_choice": "auto", "temperature": 0.5, "max_tokens": 3000 }, dependencies=[ TaskDependency("topic", "config.research_topic", REQUIRED), TaskDependency("field", "config.expertise_field", REQUIRED) ])
Local model with Ollama:
LLMTask("local_chat", { "prompt": "Help with this question: ${question}", "llm_provider": "ollama", "llm_model": "llama2", "temperature": 0.8, "max_tokens": 2000 })
Template Variables
Variables in prompt, context, and role fields use ${variable_name} syntax. Resolution order:
Dependency values (from TaskDependency objects)
Direct config values
Environment variables
Error if required variable not found
Tool Integration
Functions decorated with @tool are automatically:
Registered with the LLM provider
Made available to the model during generation
Executed when called by the model
Results injected back into the conversation
Error Handling
Configuration errors are caught during validation:
Missing required fields (llm_provider, llm_model)
Invalid parameter ranges (temperature, max_tokens)
Unsupported tool configurations
Provider-specific validation
Environment Variables
API keys can be provided via environment variables:
OPENAI_API_KEY for OpenAI
GOOGLE_API_KEY for Google
Etc.
See also
@tool: For creating callable functions
TaskDependency: For template variable injection
Provider documentation for model-specific capabilities
- preview_resolved_templates()[source]
Preview what the templates would resolve to with current context.
- set_context(context)[source]
Set the task context for template resolution.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
- class SearchTask[source]
Bases:
BaseTaskSimple web search task that executes searches and returns results.
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize search task.
- Parameters:
Config parameters: - query: Search query string (required) - max_results: Maximum number of search results (default: 5) - serper_api_key: Optional API key for Serper search
- class ConditionalTask[source]
Bases:
BaseTaskTask that executes actions based on conditional logic using context data.
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize conditional task.
- Parameters:
Config parameters: - condition_function: Function that evaluates to True/False - condition_inputs: List of input field names for condition function - action_function: Function to execute when condition is True - action_inputs: List of input field names for action function - else_function: Optional function to execute when condition is False - else_inputs: List of input field names for else function - skip_reason: Custom reason message when condition is False
- get_execution_summary(context)[source]
Get a formatted summary of the conditional task execution results.
- Parameters:
context (
TaskContext) – TaskContext to retrieve results from- Return type:
- Returns:
Dictionary containing execution summary with formatted info, or None if no results found
- set_context(context)[source]
Set the task context.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
- threshold_condition(threshold, operator='>=')[source]
Create a condition function that checks if a value meets a threshold.
- contains_condition(search_term, case_sensitive=False)[source]
Create a condition function that checks if text contains a term.
- list_size_condition(min_size=None, max_size=None)[source]
Create a condition function that checks list size.
- success_rate_condition(min_success_rate)[source]
Create a condition function that checks success rate from a results dict.
- quality_gate_condition(min_score, score_field='score')[source]
Create a condition function that checks quality score.
- increment_counter_action(counter_name='counter')[source]
Create an action function that increments a counter in context.
- set_flag_action(flag_name, flag_value=True)[source]
Create an action function that sets a flag value.
- class TransformTask[source]
Bases:
BaseTaskTask that applies transformations to data from context.
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize transform task.
- Parameters:
- Config parameters:
transform_function: Function to apply transformation
input_field: Single input field name (for single-input transforms)
input_fields: List of input field names (for multi-input transforms)
output_name: Name for the output in the result
validate_input: Whether to validate input data (default: True)
validate_output: Whether to validate output data (default: False)
- property input_data: Any
Public readonly property to access input data for testing purposes.
- Returns:
Input data for transformation
- set_context(context)[source]
Set the task context.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
- extract_field_from_list(field_name)[source]
Create a transformation function that extracts a field from each item in a list.
- combine_text_fields(separator='\\n\\n', title_field='title', content_field='content')[source]
Create a transformation function that combines text from multiple items.
- filter_by_condition(condition_func)[source]
Create a transformation function that filters items by a condition.
- aggregate_numeric_field(field_name, operation='sum')[source]
Create a transformation function that aggregates a numeric field.
- format_as_markdown_list(title_field='title', url_field='url')[source]
Create a transformation function that formats items as a markdown list.
- tool(func)[source]
Decorator to mark a function as a tool and extract metadata.
- Parameters:
- Return type:
- Returns:
Decorated function with tool metadata attached
- Example:
@tool
def calculate(expression – str) -> float: ‘’’Perform mathematical calculations.
- param expression:
Mathematical expression to evaluate
- returns:
Calculated result
‘’’ return eval(expression) # Use safe_eval in production
- class ToolSchemaGenerator[source]
Bases:
objectGenerates OpenAI-compatible schemas from decorated functions.
- search_with_content(query, max_results=5, max_content_results=5, content_timeout=15, html_method='readability', skip_pdf=False)[source]
Search the web and automatically fetch content from the top results.
This tool provides enhanced search functionality that not only returns search results but also automatically fetches and extracts readable content from the top URLs. Perfect for research tasks where you need both search results and their content.
- Parameters:
query (
str) – The search query string to search formax_results (
int) – Maximum number of search results to return (1-10, default: 5)max_content_results (
int) – Maximum number of results to fetch content from (1-10, default: 5)content_timeout (
int) – Timeout in seconds for fetching content from each URL (default: 15)html_method (
str) – Method for HTML text extraction - “readability” or “basic” (default: “readability”)skip_pdf (
bool) – Whether to skip PDF files when fetching content (default: False)
- Return type:
- Returns:
Dictionary containing search results with fetched content including query, total_results, search_time, results list with title/url/snippet/content for each, and content_stats with attempted/successful/failed/skipped counts.
- class ToolRegistry[source]
Bases:
objectManages tool schemas and implementations with automatic generation.
- __init__(tool_functions)[source]
Initialize tool registry with list of decorated functions.
- Parameters:
tool_functions (
List[Callable[...,Any]]) – List of functions decorated with @tool- Raises:
ValueError – If any function is not decorated with @tool
- generate_tool_context()[source]
Generate context description for system prompt.
- Return type:
- Returns:
Formatted tool context for inclusion in system prompt
- get_tool_count()[source]
Get the number of registered tools.
- Return type:
- Returns:
Number of tools in registry
- get_tool_function(name)[source]
Get tool function by name.
- Parameters:
name (
str) – Name of the tool to retrieve- Return type:
- Returns:
Tool function implementation
- Raises:
ValueError – If tool is not found
- get_tool_metadata(name)[source]
Get tool metadata by name.
- Parameters:
name (
str) – Name of the tool- Return type:
- Returns:
ToolMetadata instance
- Raises:
ValueError – If tool is not found
- class ToolExecutor[source]
Bases:
objectHandles tool execution with error handling and logging.
- Parameters:
tool_registry (
ToolRegistry)max_execution_time (
float)
- __init__(tool_registry, max_execution_time=30.0)[source]
Initialize tool executor.
- Parameters:
tool_registry (
ToolRegistry) – Registry containing tool implementationsmax_execution_time (
float) – Maximum time (seconds) to allow for tool execution
- execute_tool(tool_name, arguments)[source]
Execute a tool and return result or error.
- Parameters:
- Returns:
success (bool): Whether execution was successful
result (Any): Tool result if successful
error (str): Error message if failed
execution_time (float): Time taken in seconds
tool_name (str): Name of executed tool
error_type (str): Type of error if failed
- Return type:
Dictionary with execution result containing
- class BatchArticleSummarizeTask[source]
Bases:
BaseTaskReusable task that creates individual summaries for each article using separate LLM calls.
This task processes a list of articles and creates summaries with configurable length. It includes content validation, error handling, and detection of generic LLM responses.
- Configuration Options:
summary_length: Target summary length in characters (default: 1000)
content_limit: Maximum content length to send to LLM (default: 3000)
min_content_length: Minimum content length to process (default: 50)
llm_provider: LLM provider to use (default: “openai”)
llm_model: LLM model to use (default: “gpt-4o-mini”)
temperature: LLM temperature (default: 0.3)
max_tokens: Maximum tokens for LLM response (default: 300)
- __init__(name, config, dependencies=None)[source]
Initialize a new task instance with configuration and dependencies.
Creates a new task with the specified configuration. The task will be initialized with default status and logging setup. Dependencies will be resolved automatically by the PipelineAgent before execution.
- Parameters:
name (
str) – Unique identifier for this task within the agent. Must be unique across all tasks in the same pipeline. Used for: * Logging identification * Dependency references (other_task.field) * Result storage in TaskContext * Status tracking and error reportingconfig (
Dict[str,Any]) – Configuration dictionary containing task-specific parameters. The exact keys depend on the task implementation. Common patterns: * API credentials and endpoints * File paths and processing options * Behavioral flags and timeout values * Data transformation parameters Will be validated against validation_rules if defined.dependencies (
Optional[List[TaskDependency]]) – List of TaskDependency objects specifying how this task receives data from other tasks. Dependencies are resolved automatically before run() is called, with resolved data added to the config dictionary. Use TaskDependency.create_required() or TaskDependency.create_optional() for convenience.
- Attributes Initialized:
name: Task identifier
config: Configuration dictionary (may be updated by dependencies)
dependencies: List of dependency specifications
validation_rules: Set by subclasses for configuration validation
agent_name: Set automatically by PipelineAgent
logger: Task-specific logger (task.{name})
Status tracking fields for execution monitoring
Example
Basic task creation:
task = MyTask( name="process_data", config={ "input_file": "data.csv", "output_format": "json", "batch_size": 1000 } )
Task with dependencies:
task = TransformTask( name="process_results", config={ "transform_function": my_transform_func, "output_name": "processed_data" }, dependencies=[ TaskDependency("input_data", "search_task.results", REQUIRED), TaskDependency("options", "config_task.settings", OPTIONAL) ] ) Note: * Task names should be descriptive and use snake_case * Config validation occurs during run(), not during initialization * Dependencies are resolved by PipelineAgent, not by the task itself * Subclasses should call super().__init__() and set validation_rules See Also: * TaskDependency: For creating task dependencies * TaskResult: Return format from run() method * Validation rules documentation for config validation format
- get_dependencies()[source]
Get the list of dependencies for this task.
- Return type:
- Returns:
List of TaskDependency objects for this task.
- run()[source]
Execute the task and return a structured result.
This is the main method that subclasses must implement to perform their specific functionality. The method should follow the TaskResult pattern for error handling and return structured results.
Implementation Guidelines
Validation First: Always validate configuration before work:
def run(self) -> TaskResult: start_time = datetime.now() validation_failure = self._validate_or_fail(start_time) if validation_failure: return validation_failure
Error Handling: Use TaskResult pattern, not exceptions:
try: result = self.do_work() return TaskResult.success(data=result, execution_time=elapsed) except ExpectedError as e: return TaskResult.failure(error_message=str(e), execution_time=elapsed)
Timing: Always include execution timing:
start_time = datetime.now() # ... do work ... execution_time = (datetime.now() - start_time).total_seconds()
Metadata: Include useful execution information:
return TaskResult.success( data=result, execution_time=execution_time, metadata={ "processed_items": len(items), "api_calls": call_count, "cache_hits": cache_hits } )
Return Patterns
Success: All work completed successfully
Partial: Some work completed, some failed (recoverable)
Failure: Task could not complete due to errors
Skipped: Task was skipped due to conditions not being met
- A returned TaskResult contains:
status: SUCCESS, PARTIAL, ERROR, or SKIPPED
data: Result data (dict) if successful, None if failed
error: Error message if failed, None if successful
execution_time: Time taken to execute in seconds
metadata: Additional information about execution
Example Implementation
def run(self) -> TaskResult: start_time = datetime.now() # Validate configuration validation_failure = self._validate_or_fail(start_time) if validation_failure: return validation_failure try: # Extract configuration input_file = self.config["input_file"] output_format = self.config.get("output_format", "json") # Perform work data = self.process_file(input_file) formatted_data = self.format_output(data, output_format) execution_time = (datetime.now() - start_time).total_seconds() return TaskResult.success( data={"processed_data": formatted_data, "format": output_format}, execution_time=execution_time, metadata={"records_processed": len(data)} ) except FileNotFoundError: execution_time = (datetime.now() - start_time).total_seconds() return TaskResult.failure( error_message=f"Input file not found: {input_file}", execution_time=execution_time, metadata={"error_type": "FileNotFoundError"} )
Note
Never raise exceptions for operational errors
Always calculate and include execution_time
Use descriptive error messages that help users understand issues
Include relevant metadata for debugging and monitoring
- Return type:
- Returns:
TaskResult
See also
TaskResult: For understanding return value structure
_validate_or_fail(): For configuration validation pattern
Task-specific implementations for concrete examples
- class FileSaveTask[source]
Bases:
BaseTaskTask that saves content to a file with timestamp suffix.
This is a generic file saving task that can handle various content types and formats. Originally designed for article saving but generalized for broader reuse.
- __init__(name, config, dependencies=None)[source]
Initialize file save task.
- Parameters:
name (
str) – Task nameconfig (
Dict[str,Any]) – Configuration dictionary containing: - content: The content to save (optional, can be resolved from dependencies) - title: Title for the file (optional, default: “Generated Content”) - output_dir: Output directory path (default: “output”) - file_format: Output format - “txt”, “md”, “json”, etc. (default: “txt”) - filename_prefix: Custom prefix for filename (optional)dependencies (
Optional[List[TaskDependency]]) – List of task dependencies
- class URLFetchTask[source]
Bases:
BaseTaskTask that fetches main text content from a list of URLs.
- __init__(name, config, dependencies=None)[source]
Initialize URL fetch task.
- Parameters:
name (
str) – Task nameConfiguration dictionary containing:
urls: List of URLs to fetch (optional, can be resolved from dependencies)
max_urls: Maximum number of URLs to process (default: 5)
timeout: Request timeout in seconds (default: 30)
dependencies (
Optional[List[TaskDependency]]) – List of task dependencies
- get_dependencies()[source]
Get the list of task dependencies.
- Return type:
- Returns:
List of TaskDependency objects
- run()[source]
Fetch main text from the provided URLs.
- Returns:
total_urls: Total number of URLs processed
successful_fetches: Number of successful fetches
failed_fetches: Number of failed fetches
articles: List of article content with HTTP 200 status only
all_articles: List of all fetched article content (including non-200)
status_200_count: Number of articles with HTTP 200 status
non_200_count: Number of articles with non-200 HTTP status
errors: List of error messages for failed fetches
- Return type:
Dictionary containing
- class ExtractAudioFromVideoTask[source]
Bases:
BaseTaskExtract audio from video file using moviepy and compress to MP3 format.
- class AudioTranscriptTask[source]
Bases:
BaseTaskTranscribe audio file using OpenAI Whisper API.
- validate_required_fields(data, required_fields)[source]
Validate that all required fields are present in the data.
- class SearchResult[source]
Bases:
objectRepresents a single search result.
- class SearchResponse[source]
Bases:
objectRepresents the complete search response from a provider.
- Parameters:
- class BaseSearcher[source]
Bases:
ABCAbstract base class for search providers.
- abstract property api_key: str | None
Get API key from config or environment variables.
- Returns:
API key string or None if not required/found
- abstractmethod search(query, max_results=10, **kwargs)[source]
Perform a search query.
- Parameters:
- Return type:
- Returns:
SearchResponse object containing results
- Raises:
RuntimeError – If the search fails
ValueError – If query is invalid
- validate_query(query)[source]
Validate search query.
- Parameters:
query (
str) – The search query to validate- Raises:
ValueError – If query is invalid
- Return type:
- class SerperSearcher[source]
Bases:
BaseSearcherSearch provider implementation using Serper API.
- API_BASE_URL = 'https://google.serper.dev'
- DEFAULT_TIMEOUT = 30
- MAX_RESULTS_LIMIT = 100
- SEARCH_ENDPOINT = '/search'
- __init__(config=None)[source]
Initialize Serper searcher.
Required config or environment variables: - api_key: Serper API key (or SERPER_API_KEY environment variable)
Optional config parameters:
timeout: Request timeout in seconds (default: 30)
country: Country code for search results (e.g., ‘us’, ‘uk’)
language: Language code for search results (e.g., ‘en’, ‘es’)
safe_search: Safe search setting (‘active’, ‘moderate’, ‘off’)
search_type: Type of search (‘search’, ‘images’, ‘videos’, ‘places’, ‘news’)
- search(query, max_results=10, **kwargs)[source]
Perform a search using Serper API.
- Parameters:
query (
str) – The search query stringmax_results (
int) – Maximum number of results to return (max 100)**kwargs (
Any) – Additional Serper API parameters: - gl: Country code (e.g., ‘us’, ‘uk’) - hl: Language code (e.g., ‘en’, ‘es’) - safe: Safe search (‘active’, ‘moderate’, ‘off’) - type: Search type (‘search’, ‘images’, ‘videos’, ‘places’, ‘news’)
- Return type:
- Returns:
SearchResponse object containing results
- Raises:
RuntimeError – If the search API call fails
ValueError – If query is invalid
- fetch_main_text(url, config=None, **kwargs)[source]
Fetch and extract main text content from a URL.
This function automatically detects content type and extracts readable text: - HTML pages: Extracts main article content using readability algorithm - PDF files: Extracts text from all pages - DOCX files: Extracts text from paragraphs and tables - Plain text: Returns as-is
- Parameters:
url (
str) – URL to fetch and extract text fromconfig (
Optional[Dict[str,Any]]) – Optional configuration for URLFetcher, plus text extraction options: - html_method: “readability” (default) or “basic” for HTML extraction - include_metadata: Whether to include extraction metadata (default: True)**kwargs (
Any) – Additional arguments to pass to fetch_url
- Returns:
url: Final URL after redirects
content_type: Detected content type
text: Extracted text content
extraction_method: Method used for text extraction
original_size: Size of original content in bytes
text_size: Size of extracted text in characters
metadata: Additional extraction metadata (if include_metadata=True)
- Return type:
Dictionary containing
- Raises:
ValueError – If URL is invalid or content type is not supported
RuntimeError – If text extraction fails
- Example:
>>> result = fetch_main_text(“https – //example.com/article.html”)
>>> print (result[“text”])
>>> print (f”Extracted {result[‘text_size’]} characters from {result[‘content_type’]}”)
>>> pdf_result = fetch_main_text(“https – //example.com/document.pdf”)
>>> print (pdf_result[“text”])
- fetch_url(url, config=None, **kwargs)[source]
Convenience function to fetch a URL with Chrome-like headers.
- Parameters:
- Return type:
- Returns:
Dictionary with fetch results (see URLFetcher.fetch for details)
Example:
result = fetch_url("https://example.com") print(result["content"]) print(f"Status: {result['status_code']}")
- class URLFetcher[source]
Bases:
objectUtility class for fetching web content with Chrome-like behavior.
- __exit__(exc_type, exc_val, exc_tb)[source]
Context manager exit.
- Parameters:
exc_val (
Optional[BaseException])exc_tb (
Optional[TracebackType])
- Return type:
- __init__(config=None)[source]
Initialize URL fetcher with optional configuration.
- Parameters:
config (
Optional[Dict[str,Any]]) –Optional configuration dictionary with options:
timeout: Request timeout in seconds (default: 30)
user_agent: Custom user agent string
headers: Additional headers to include
follow_redirects: Whether to follow redirects (default: True)
max_redirects: Maximum number of redirects (default: 10)
- fetch(url, **kwargs)[source]
Fetch content from a URL.
- Parameters:
- Returns:
url: Final URL after redirects
status_code: HTTP status code
headers: Response headers
content: Response content as text
content_type: Content type from headers
encoding: Character encoding
size: Content size in bytes
response_time: Time taken for request in seconds
- Return type:
Dictionary containing
- Raises:
ValueError – If URL is invalid
RuntimeError – If request fails
- fetch_headers_only(url, **kwargs)[source]
Fetch only headers from a URL using HEAD request.
- Parameters:
- Returns:
url: Final URL after redirects
status_code: HTTP status code
headers: Response headers
content_type: Content type from headers
content_length: Content length if available
response_time: Time taken for request in seconds
- Return type:
Dictionary containing
- print_header(title, width=80, char='=')[source]
Print a formatted header with title centered between separator characters.
- print_message_box(title, content_lines, width=80, char='=', newline_before=False)[source]
Print a message box with title and content lines, with intelligent text processing.
- Parameters:
title (
str) – The message box title (will be trimmed if too long)content_lines (
list[str]) – List of content lines (will be wrapped if too long)width (
int) – Total width for the message box (default: 80)char (
str) – Character to use for separators (default: “=”)newline_before (
bool) – Add newline before the message box (default: False)
- Return type:
Declarative Syntax (Recommended)
The declarative syntax provides a clean, Pythonic way to define AI agent pipelines using @task decorators and helper functions.
PipelineAgent (Declarative Syntax)
Declarative pipeline agent with automatic task discovery.
This module provides PipelineAgent, a subclass of BasePipelineAgent that automatically discovers @task decorated methods and builds the task list.
Example:
from aipype import PipelineAgent, task, Depends
from typing import Annotated
class MyAgent(PipelineAgent):
@task
def fetch_data(self) -> dict:
return {"data": "value"}
@task
def process(self, fetch_data: dict) -> str:
# fetch_data parameter auto-wired from fetch_data task
return f"Processed: {fetch_data}"
@task
def with_explicit_path(
self,
content: Annotated[str, Depends("process.output")]
) -> str:
return f"Final: {content}"
agent = MyAgent("my_agent", {"config_key": "value"})
result = agent.run()
- class PipelineAgent[source]
Bases:
BasePipelineAgentDeclarative agent that discovers tasks from @task decorated methods.
PipelineAgent provides a cleaner, more Pythonic way to define AI pipelines. Instead of manually creating task objects and wiring dependencies, you simply define methods decorated with @task and let the framework infer the execution order from parameter names.
- Key Features:
Automatic task discovery from decorated methods
Dependency inference from function signatures
Support for explicit paths via Annotated[T, Depends(“path”)]
Full compatibility with existing execution engine
Parallel execution of independent tasks
- How It Works:
On initialization, discovers all @task decorated methods
Analyzes method signatures to infer dependencies
Topologically sorts methods by dependency order
Creates TaskWrapper instances for each method
Returns task list to parent BasePipelineAgent for execution
- The parent BasePipelineAgent handles all execution concerns:
Building execution phases
Resolving dependencies
Parallel execution
Error handling
Result collection
Example:
class ArticleAgent(PipelineAgent): @task def search(self) -> dict: # Access config directly for initial inputs return {"results": f"Results for {self.config['topic']}"} @task def summarize(self, search: dict) -> str: # 'search' parameter auto-wired from search task return f"Summary of: {search}" @task def write(self, summarize: str) -> str: return f"Article based on: {summarize}" agent = ArticleAgent("writer", {"topic": "AI trends"}) result = agent.run()
- name
Agent identifier
- config
Configuration dictionary
- tasks
List of TaskWrapper instances (populated by setup_tasks())
- context
TaskContext for inter-task communication
Decorators
Decorators for declarative task definition.
This module provides the @task decorator and Depends class for defining pipeline tasks with automatic dependency inference.
Example:
from aipype import PipelineAgent, task, Depends
from typing import Annotated
class MyAgent(PipelineAgent):
@task
def fetch_data(self) -> dict:
return search("AI news", max_results=5)
@task
def process(self, fetch_data: dict) -> str:
# fetch_data parameter auto-wired from fetch_data task
return llm(f"Summarize: {fetch_data}", model="gpt-4o")
@task
def with_explicit_path(
self,
content: Annotated[str, Depends("process.content")]
) -> str:
return llm(f"Expand: {content}", model="gpt-4o")
- task(func)[source]
Mark a method as a pipeline task with automatic dependency inference.
The @task decorator marks methods in a PipelineAgent as pipeline tasks. Dependencies are automatically inferred from the method’s parameter names - if a parameter name matches another task’s name, it becomes a dependency.
- Parameters:
func (
TypeVar(F, bound=Callable[...,Any])) – The method to mark as a task- Return type:
- Returns:
The decorated method with task metadata attached
Example:
class MyAgent(PipelineAgent): @task def fetch_data(self) -> dict: return search("AI news", max_results=5) @task def process(self, fetch_data: dict) -> str: # fetch_data parameter auto-wired from fetch_data task return llm(f"Summarize: {fetch_data}", model="gpt-4o")
- class Depends[source]
Bases:
objectSpecify an explicit dependency path for a task parameter.
Use with typing.Annotated to specify the exact path to resolve from the task context, rather than using the default inference.
- path
Dot-notation path to the dependency value (e.g., “task_name.field”)
- Example:
from typing import Annotated
@task
def write_article( – self, # Explicitly get .content field instead of full task output outline: Annotated[str, Depends(“generate_outline.content”)], summary: Annotated[str, Depends(“summarize.content”)]
) -> str – return llm(f”Write from: {outline}”, model=”gpt-4o”)
- Parameters:
path (
str)
- __init__(path)[source]
Initialize with a dependency path.
- Parameters:
path (
str) – Dot-notation path like “task_name.field.nested”- Raises:
ValueError – If path is empty or doesn’t contain a dot
- path
Helper Functions
Helper functions for creating tasks with clean APIs.
This module provides intuitive helper functions for creating LLM and search tasks within @task decorated methods. These helpers simplify the declarative syntax by providing sensible defaults and a cleaner interface.
Example:
from aipype import PipelineAgent, task, llm, search
class MyAgent(PipelineAgent):
@task
def find_articles(self) -> dict:
return search(self.config["topic"], max_results=10)
@task
def summarize(self, find_articles: dict) -> str:
return llm(
prompt=f"Summarize: {find_articles}",
model="gpt-4o",
temperature=0.3
)
- llm(prompt, *, model='gpt-4o', provider='openai', system=None, temperature=0.7, max_tokens=1000, tools=None, response_format=None, timeout=60.0, **kwargs)[source]
Create an LLMTask with a clean, intuitive API.
This helper simplifies LLMTask creation by providing sensible defaults and a more intuitive parameter interface. Designed to be used within @task decorated methods - returns LLMTask for delegation by TaskWrapper.
- Parameters:
prompt (
str) – The prompt to send to the LLMmodel (
str) – Model identifier (default: “gpt-4o”)provider (
str) – LLM provider (default: “openai”)temperature (
float) – Randomness control 0.0-2.0 (default: 0.7)max_tokens (
int) – Maximum response tokens (default: 1000)tools (
Optional[List[Union[Callable[...,Any],Dict[str,Any]]]]) – List of @tool functions and/or MCP server configsresponse_format (
Union[Type[BaseModel],Dict[str,Any],None]) – Pydantic model or JSON schema for structured outputtimeout (
float) – Request timeout in seconds (default: 60)**kwargs (
Any) – Additional LLMTask configuration
- Return type:
- Returns:
Configured LLMTask ready for delegation
Example:
@task def summarize(self, articles: list) -> str: return llm( prompt=f"Summarize these articles: {articles}", model="gpt-4o", temperature=0.3, max_tokens=500 )
Example with structured output and tools:
@task def analyze(self, data: dict) -> AnalysisResult: return llm( prompt=f"Analyze: {data}", model="gpt-4o", response_format=AnalysisResult, tools=[calculator, mcp_server("brave", "https://mcp.brave.com")] )
- search(query, *, max_results=5, **kwargs)[source]
Create a SearchTask with a clean API.
- Parameters:
- Return type:
- Returns:
Configured SearchTask ready for delegation
Example:
@task def find_articles(self) -> dict: topic = self.config.get("topic", "AI") return search(f"{topic} latest news", max_results=10)
- mcp_server(label, url, *, allowed_tools=None, require_approval='never')[source]
Create MCP server configuration for use with llm().
Model Context Protocol (MCP) servers provide external tools that can be called by LLMs. This helper creates the configuration dict needed by LLMTask.
- Parameters:
- Return type:
- Returns:
MCP configuration dict for use in tools parameter
Example:
@task def research(self, topic: str) -> str: return llm( prompt=f"Research {topic} thoroughly", model="claude-sonnet-4", provider="anthropic", tools=[ mcp_server("brave", "https://mcp.brave.com"), mcp_server("github", "https://mcp.github.com", allowed_tools=["search_repos", "get_file"]) ] )
- transform(input_data, fn, *, output_name='result')[source]
Apply a transformation function and return result as dict.
Unlike returning a TransformTask, this helper executes the transformation immediately and returns the result. Use this for simple data transformations within @task methods.
- Parameters:
- Return type:
- Returns:
Dict with transformed data under output_name and ‘data’ keys
Example:
@task def extract_urls(self, search_results: dict) -> dict: results = search_results.get("results", []) return transform( results, fn=lambda r: [item["url"] for item in r] )
Task Wrapper
Task wrapper for decorated @task methods.
This module provides TaskWrapper, a BaseTask subclass that wraps decorated @task methods and enables them to work with the existing pipeline execution engine.
Example:
# TaskWrapper is created automatically by PipelineAgent
# You typically don't create it directly
wrapper = TaskWrapper(
name="process_data",
method=agent.process_data, # Bound method decorated with @task
agent=agent,
dependencies=[TaskDependency("fetch_data", "fetch_data.data", REQUIRED)]
)
result = wrapper.run() # Executes the method and processes return value
- class TaskWrapper[source]
Bases:
BaseTaskWraps a decorated @task method as a BaseTask.
TaskWrapper bridges the gap between the new decorator-based syntax and the existing execution engine. It:
Stores a reference to the decorated method
Builds kwargs from resolved dependencies
Calls the method and processes return values
Handles delegation to returned BaseTask instances
This allows the existing TaskExecutionPlan and DependencyResolver to work without modification.
- method
The bound method to call
- agent
Reference to the parent agent
- context_instance
TaskContext for dependency resolution
Example:
class MyAgent(PipelineAgent): @task def fetch_data(self) -> dict: return {"data": "value"} @task def process(self, fetch_data: dict) -> str: return llm(f"Process: {fetch_data}") # TaskWrapper is created automatically for each @task method # and handles the execution and return value processing
- set_context(context)[source]
Set the task context for dependency resolution.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
Dependency Inference
Automatic dependency inference from function signatures.
This module provides utilities for inferring task dependencies from method signatures. It analyzes parameter names and type hints to automatically create TaskDependency objects.
- Example:
@task
def process( – self, fetch_data: dict, # Inferred: depends on “fetch_data” task config: Annotated[str, Depends(“settings.api_key”)] # Explicit path
) -> str – …
deps = infer_dependencies_from_signature (process, {“fetch_data”})
# Returns – [TaskDependency(“fetch_data”, “fetch_data.data”, REQUIRED),
# TaskDependency (“config”, “settings.api_key”, REQUIRED)
- infer_dependencies_from_signature(method, known_task_names)[source]
Infer TaskDependency objects from a method’s signature.
This function analyzes a method’s parameters and type hints to automatically create TaskDependency objects. Dependencies are inferred when:
A parameter name matches a known task name
A parameter has Annotated[T, Depends(“path”)] type hint
- Parameters:
- Return type:
- Returns:
List of TaskDependency objects
- Example:
@task
def process( – self, fetch_data: dict, # Inferred: depends on “fetch_data” task config: Annotated[str, Depends(“settings.api_key”)] # Explicit path
) -> str – …
deps = infer_dependencies_from_signature (process, {“fetch_data”})
BasePipelineAgent (Legacy Syntax)
Pipeline-based AI agent framework with automatic task orchestration.
This module provides BasePipelineAgent, the core component for building AI workflows with automatic dependency resolution and parallel execution. Tasks are organized into execution phases based on their dependencies, enabling optimal performance and data flow between tasks.
Key Concepts:
Dependency Resolution: Tasks automatically receive data from dependent tasks
Execution Phases: Tasks are grouped for optimal parallel/sequential execution
Task Context: Shared data store for inter-task communication
Error Handling: TaskResult pattern for graceful error propagation
Quick Example
class ArticleWriterAgent(BasePipelineAgent):
def setup_tasks(self):
return [
SearchTask("search", {"query": "AI trends", "max_results": 5}),
LLMTask("summarize", {
"prompt": "Summarize: ${content}",
"llm_provider": "openai",
"llm_model": "gpt-4"
}, dependencies=[
TaskDependency("content", "search.results", REQUIRED)
])
]
agent = ArticleWriterAgent("writer", {})
result = agent.run()
print(f"Status: {result.status}, Tasks: {result.completed_tasks}/{result.total_tasks}")
Execution Flow
Setup: Tasks defined with dependencies in setup_tasks()
Planning: Dependency graph analyzed, phases created
Resolution: Task configs updated with dependency data
Execution: Phases run sequentially, tasks within phases run in parallel
Results: AgentRunResult returned with execution status and metrics
See also
BaseTask: Base class for implementing custom tasks
TaskDependency: Declarative dependency specification
TaskContext: Shared data store for task communication
AgentRunResult: Standardized execution result format
- class TaskExecutionPlan[source]
Bases:
objectExecution plan that organizes tasks into phases based on dependencies.
This class analyzes task dependencies and creates an optimal execution plan where:
Tasks with no dependencies run in the first phase
Tasks whose dependencies are satisfied run in subsequent phases
Tasks within the same phase can run in parallel
Phases execute sequentially to maintain dependency ordering
The plan automatically detects and rejects circular dependencies, ensuring all workflows can execute successfully.
- tasks
Original list of tasks to organize
- phases
List of task lists, each representing an execution phase
- Example:
Given tasks – A, B (depends on A), C (depends on A), D (depends on B,C)
Execution plan
* Phase 1 – [A] (no dependencies)
* Phase 2 – [B, C] (depend only on A, can run in parallel)
* Phase 3 – [D] (depends on B and C)
- Parameters:
- __init__(tasks)[source]
Create execution plan from list of tasks.
- Parameters:
tasks (
List[BaseTask]) – List of tasks to organize into execution phases- Raises:
ValueError – If circular dependencies detected or dependencies cannot be satisfied
- total_phases()[source]
Get total number of execution phases.
- Return type:
- Returns:
Number of phases
- class BasePipelineAgent[source]
Bases:
objectBase agent class for building AI workflows with automatic task orchestration.
BasePipelineAgent provides the foundation for creating AI automation workflows. It automatically handles task dependency resolution, parallel execution, error handling, and result collection. Users inherit from this class and implement setup_tasks() to define their workflow.
- Key Features
Automatic Dependency Resolution: Tasks receive data from previous tasks
Parallel Execution: Independent tasks run simultaneously for performance
Error Handling: Graceful handling of task failures with detailed reporting
Context Management: Shared data store for inter-task communication
Result Tracking: Comprehensive execution metrics and status reporting
- Configuration Options:
enable_parallel (bool): Enable parallel task execution (default: True)
max_parallel_tasks (int): Maximum concurrent tasks (default: 5)
stop_on_failure (bool): Stop pipeline on first failure (default: True)
- Lifecycle:
Initialization: Agent created with name and config
Task Setup: setup_tasks() called to define workflow
Execution: run() method orchestrates task execution
Results: AgentRunResult returned with status and metrics
Example
Basic agent implementation:
class DataProcessorAgent(BasePipelineAgent): def setup_tasks(self): return [ SearchTask("gather_data", { "query": self.config.get("search_query", ""), "max_results": 10 }), TransformTask("process_data", { "transform_function": process_search_results, "input_field": "results" }, dependencies=[ TaskDependency("results", "gather_data.results", REQUIRED) ]), FileSaveTask("save_results", { "file_path": "/tmp/results.json" }, dependencies=[ TaskDependency("data", "process_data.result", REQUIRED) ]) ] # Usage agent = DataProcessorAgent("processor", { "search_query": "machine learning trends", "enable_parallel": True }) result = agent.run() if result.is_success(): print(f"Processed {result.completed_tasks} tasks successfully") # Access individual task results search_result = agent.context.get_result("gather_data") processed_data = agent.context.get_result("process_data") else: print(f"Pipeline failed: {result.error_message}")
Advanced configuration:
agent = DataProcessorAgent("processor", { "search_query": "AI research", "enable_parallel": False, # Sequential execution "stop_on_failure": False, # Continue despite failures "max_parallel_tasks": 10 # Higher concurrency })
Return Values
run() returns AgentRunResult with:
status: SUCCESS, PARTIAL, ERROR, or RUNNING
completed_tasks/failed_tasks: Task completion counts
total_phases: Number of execution phases
execution_time: Total runtime in seconds
metadata: Additional execution information
Error Handling
Individual task failures are captured and reported
Pipeline can continue or stop based on stop_on_failure setting
Detailed error information available in execution context
Partial success supported for workflows with optional components
Thread Safety
TaskContext is thread-safe for concurrent task execution
Agent instance should not be shared between threads
Each agent execution creates isolated context
Performance
Tasks in same phase execute in parallel (when enabled)
Dependency resolution minimizes execution phases
ThreadPoolExecutor manages concurrent task execution
Configurable concurrency limits prevent resource exhaustion
- __init__(name, config=None)[source]
Initialize pipeline agent with name and configuration.
Creates a new pipeline agent instance with automatic task setup. The agent will call setup_tasks() during initialization to configure the workflow. TaskContext and DependencyResolver are created automatically.
Attributes Created:
name: Agent identifier
config: Configuration dictionary
tasks: List of tasks (populated by setup_tasks())
context: TaskContext for inter-task communication
dependency_resolver: Handles dependency resolution
execution_plan: Created during run() execution
- Parameters:
name (
str) – Unique identifier for this agent instance. Used in logging and result tracking. Should be descriptive of the agent’s purpose.config (
Optional[Dict[str,Any]]) –Configuration dictionary with agent settings:
enable_parallel (bool): Enable parallel task execution (default: True)
max_parallel_tasks (int): Maximum concurrent tasks (default: 5)
stop_on_failure (bool): Stop pipeline on first failure (default: True)
self.config. (Additional config keys are passed to tasks via)
Example:
agent = MyAgent("data_processor", { "enable_parallel": True, "max_parallel_tasks": 3, "input_file": "data.csv" }) print(f"Agent {agent.name} has {len(agent.tasks)} tasks")
Note
The agent automatically calls setup_tasks() during initialization. If setup_tasks() raises an exception, the agent will log a warning but continue with an empty task list.
See also
BaseTask: Base class for implementing custom tasks
TaskDependency: Declarative dependency specification
TaskContext: Shared data store for inter-task communication
AgentRunResult: Standardized execution result format
- setup_tasks()[source]
Define the workflow tasks for this agent. Must be implemented by subclasses.
This is the main method where users define their AI workflow by creating and configuring tasks with their dependencies. Tasks are automatically organized into execution phases based on their dependency relationships.
- Task Naming:
Use descriptive names that indicate the task’s purpose
Names must be unique within the agent
Use snake_case for consistency
Good: “fetch_articles”, “analyze_sentiment”, “save_results”
Avoid: “task1”, “t”, “process”
- Dependency Design:
Required dependencies must be satisfied for task to run
Optional dependencies use default values if source unavailable
Circular dependencies will cause pipeline execution to fail
Use transform_func in dependencies for data preprocessing
Note
This method is called automatically during agent initialization. Tasks should not be added to self.tasks directly - return them from this method instead.
See also
TaskDependency: For creating task dependencies
BaseTask subclasses: LLMTask, SearchTask, ConditionalTask, etc.
Task-specific documentation for configuration options
- Returns:
List of configured BaseTask instances that define the workflow.
- Return type:
List[BaseTask]
- Example:
Basic search and summarize workflow –
def setup_tasks(self): return [ SearchTask("search", { "query": self.config.get("topic", ""), "max_results": 5 }), LLMTask("summarize", { "prompt": "Summarize these articles: ${articles}", "llm_provider": "openai", "llm_model": "gpt-4" }, dependencies=[ TaskDependency("articles", "search.results", REQUIRED) ]) ]
Complex workflow with conditional logic –
def setup_tasks(self): return [ SearchTask("search", {"query": "AI news"}), TransformTask("filter", { "transform_function": filter_recent_articles, "input_field": "results" }, dependencies=[ TaskDependency("results", "search.results", REQUIRED) ]), ConditionalTask("quality_check", { "condition_function": lambda articles: len(articles) >= 3, "condition_inputs": ["filtered_articles"], "action_function": log_action("Quality check passed"), "else_function": log_action("Insufficient articles") }, dependencies=[ TaskDependency("filtered_articles", "filter.result", REQUIRED) ]), LLMTask("summarize", { "prompt": "Create summary from: ${content}", "llm_provider": "openai" }, dependencies=[ TaskDependency("content", "filter.result", REQUIRED) ]) ]
- Raises:
NotImplementedError – Must be implemented by subclasses
- create_context()[source]
Create and return the task context for this agent.
- Return type:
- Returns:
TaskContext instance
- run()[source]
Execute the complete workflow with automatic task orchestration.
This is the main execution method that runs the entire pipeline:
Builds execution plan from task dependencies
Resolves dependencies and updates task configurations
Executes tasks in phases (parallel within phases, sequential between phases)
Collects results and handles errors
Returns comprehensive execution status
The method handles all aspects of task execution including dependency resolution, parallel execution, error propagation, and result collection. Individual task failures are captured and the pipeline can continue based on the stop_on_failure configuration.
The method returns AgentRunResult containing:
- status:
SUCCESS (all tasks completed),
PARTIAL (some failed),
ERROR (all failed),
RUNNING (already executing)
completed_tasks: Number of successfully completed tasks
failed_tasks: Number of tasks that failed
total_phases: Number of execution phases used
execution_time: Total runtime in seconds
metadata: Additional execution information
Exceptions
No exceptions are raised directly. All operational errors are captured in the returned AgentRunResult. Only programming errors (like missing setup_tasks implementation) will raise exceptions.
Example
Basic execution and result checking:
agent = MyAgent("processor", config) result = agent.run() if result.is_success(): print(f"All {result.completed_tasks} tasks completed successfully") # Access individual task results search_data = agent.context.get_result("search_task") processed_data = agent.context.get_result("process_task") elif result.is_partial(): print(f"Partial success: {result.completed_tasks}/{result.total_tasks}") print(f"Failed tasks: {result.failed_tasks}") else: # result.is_error() print(f"Pipeline failed: {result.error_message}")
Performance monitoring:
result = agent.run() print(f"Execution time: {result.execution_time:.2f}s") print(f"Phases used: {result.total_phases}") # Check if parallel execution was effective if result.total_phases < len(agent.tasks): print("Parallel execution optimized the pipeline")
Execution Phases
Tasks are automatically organized into phases based on dependencies:
Phase 1: Tasks with no dependencies
Phase 2: Tasks depending only on Phase 1 tasks
Phase N: Tasks depending on previous phases only
Within each phase, tasks can execute in parallel (if enabled). Between phases, execution is sequential to maintain dependency order.
Error Handling
Individual task errors are captured in TaskResult objects
Pipeline can continue or stop based on stop_on_failure config
Partial success is supported when some tasks succeed
All errors are logged with detailed context information
Failed task dependencies propagate to dependent tasks
Thread Safety
This method is not thread-safe. Each agent instance should only execute one pipeline at a time. Concurrent task execution within the pipeline is handled internally with ThreadPoolExecutor.
Performance
Parallel execution significantly improves performance for independent tasks
Dependency resolution optimizes execution order
Thread pool size controlled by max_parallel_tasks config
Memory usage scales with number of concurrent tasks
Note
If the agent is already running, this method returns immediately with a RUNNING status to prevent concurrent execution conflicts.
See also
AgentRunResult: For detailed result format and status checking
TaskContext: Access via agent.context for individual task results
TaskExecutionPlan: For understanding execution phase organization
- Return type:
- Returns:
AgentRunResult
- get_context()[source]
Get the task context for this agent.
- Return type:
- Returns:
TaskContext instance
- get_execution_plan()[source]
Get the current execution plan.
- Return type:
- Returns:
TaskExecutionPlan instance or None if not yet created
Task Types
LLM Task
Advanced LLM task with context-aware prompts, tool calling, and template substitution.
This module provides LLMTask, a sophisticated task for integrating Large Language Models into AI pipelines. It supports automatic prompt template substitution using dependency data, tool/function calling, multiple LLM providers, and comprehensive response handling.
- Key Features
Template Substitution: Dynamic prompt generation using ${variable} syntax
Tool Calling: Function calling with automatic tool registration and execution
Multi-Provider: Support for OpenAI, Anthropic, Ollama, and other LLM providers
Context Integration: Automatic injection of dependency data into prompts
Response Handling: Structured response parsing and error handling
Logging: Optional detailed logging for debugging and monitoring
Template System
LLMTask uses ${variable} syntax for dynamic prompt generation:
task = LLMTask("summarize", {
"prompt": "Summarize these articles: ${articles}",
"context": "You are an expert ${domain} analyst",
"llm_provider": "openai",
"llm_model": "gpt-4"
}, dependencies=[
TaskDependency("articles", "search.results", REQUIRED),
TaskDependency("domain", "config.analysis_domain", OPTIONAL, default_value="general")
])
Tool Calling
Enable function calling with automatic tool management:
@tool
def search_web(query: str) -> Dict[str, Any]:
return {"results": web_search(query)}
task = LLMTask("research", {
"prompt": "Research ${topic} and provide analysis",
"tools": [search_web],
"llm_provider": "openai",
"llm_model": "gpt-4"
})
MCP Integration
Use Model Context Protocol servers alongside or instead of Python tools:
# MCP server only
task = LLMTask("web_research", {
"prompt": "Search for ${topic} and summarize findings",
"tools": [
{
"type": "mcp",
"server_label": "brave_search",
"server_url": "https://mcp.brave.com",
"require_approval": "never"
}
],
"llm_provider": "anthropic",
"llm_model": "claude-sonnet-4"
})
# Mixed: Python tools + MCP servers
task = LLMTask("comprehensive_research", {
"prompt": "Research ${topic} using all available tools",
"tools": [
local_calculator_tool, # Python @tool function
{
"type": "mcp",
"server_label": "web_search",
"server_url": "https://mcp.example.com"
}
],
"llm_provider": "openai",
"llm_model": "gpt-4"
})
Structured Outputs
Get typed, structured responses using Pydantic models or JSON schemas:
from pydantic import BaseModel
class Article(BaseModel):
title: str
summary: str
key_points: list[str]
task = LLMTask("extract", {
"prompt": "Extract article information from: ${text}",
"response_format": Article, # Pass Pydantic model
"llm_provider": "openai",
"llm_model": "gpt-4o-mini"
})
# Access parsed structured data
result = task.run()
if result.is_success():
parsed_data = result.data["parsed_object"] # Dict matching Article schema
Supported Providers
OpenAI: gpt-4, gpt-3.5-turbo, etc.
Anthropic: claude-3-opus, claude-3-sonnet, etc.
Ollama: Local models (llama2, mistral, etc.)
Google: gemini-pro, gemini-pro-vision
Azure OpenAI: Azure-hosted OpenAI models
And many more via litellm
Configuration Options
Core settings:
llm_provider: Provider name (required)
llm_model: Model identifier (required)
prompt/prompt_template: Main prompt text with templates
temperature: Randomness control (0.0-2.0, default: 0.7)
max_tokens: Maximum response tokens (default: 1000)
timeout: Request timeout in seconds (default: 60)
Tool calling:
tools: List of @tool decorated functions
tool_choice: “auto”, “none”, or specific tool name
parallel_tool_calls: Enable parallel tool execution
max_tool_execution_time: Tool timeout in seconds
Structured outputs:
response_format: Pydantic BaseModel class or JSON schema dict
Pydantic model: Pass model class directly (requires pydantic>=2.0)
JSON schema: Dict with {“type”: “json_schema”, “json_schema”: {…}}
Context and role:
context: System/context message (supports templates)
role: User role description (supports templates)
Response Format
LLMTask returns structured data including:
content: Generated text response
parsed_object: Parsed JSON object (when response_format is used)
usage: Token usage statistics
tool_calls: List of executed tool calls (if any)
model: Model used for generation
provider: Provider used
metadata: Additional response information
Example Usage
Basic text generation:
task = LLMTask("generate", {
"prompt": "Write a brief summary about ${topic}",
"llm_provider": "openai",
"llm_model": "gpt-4",
"temperature": 0.3,
"max_tokens": 500
})
With dependencies and templates:
task = LLMTask("analyze", {
"prompt": "Analyze these search results about ${topic}: ${results}",
"context": "You are an expert ${field} researcher",
"llm_provider": "anthropic",
"llm_model": "claude-3-opus"
}, dependencies=[
TaskDependency("topic", "input.topic", REQUIRED),
TaskDependency("results", "search.results", REQUIRED),
TaskDependency("field", "config.research_field", OPTIONAL, default_value="general")
])
With tool calling:
@tool
def calculate(expression: str) -> float:
return eval(expression) # Simple calculator
task = LLMTask("math_helper", {
"prompt": "Help solve this math problem: ${problem}",
"tools": [calculate],
"llm_provider": "openai",
"llm_model": "gpt-4o-mini",
"tool_choice": "auto"
})
Error Handling
LLMTask handles various error conditions gracefully:
API failures (network, authentication, rate limits)
Invalid model/provider combinations
Tool execution errors
Template substitution errors
Response parsing failures
All errors are returned as TaskResult.failure() with detailed error messages and relevant metadata for debugging.
See also
BaseTask: Parent class for task implementation patterns
TaskDependency: For injecting data into prompt templates
@tool decorator: For creating tool functions
litellm: Underlying LLM provider abstraction
- class LLMTask[source]
Bases:
BaseTaskAdvanced LLM integration task with template substitution and tool calling.
LLMTask provides sophisticated Large Language Model integration for AI pipelines. It supports dynamic prompt generation through template substitution, automatic function/tool calling, and seamless integration with multiple LLM providers.
The task automatically resolves template variables from dependency data and manages the complete LLM interaction lifecycle including request formatting, response parsing, tool execution, and error handling.
- Core Capabilities:
Dynamic Prompts: ${variable} template substitution from dependencies
Tool Integration: Automatic function calling with @tool decorated functions
Multi-Provider: Support for 50+ LLM providers via litellm
Response Processing: Structured response parsing and validation
Error Recovery: Graceful handling of API failures and timeouts
Usage Tracking: Detailed token usage and cost monitoring
- Template Substitution:
- Templates use ${variable_name} syntax and are resolved from:
Task dependencies (via TaskDependency)
Task configuration values
Environment variables (as fallback)
- Template fields support:
prompt/prompt_template: Main user message
context: System/context message
role: User role description
- Tool Calling Workflow:
Functions decorated with @tool are automatically registered
LLM decides which tools to call based on prompt
Tools execute with validated parameters
Results injected back into conversation
LLM continues with tool results
Provider Configuration
Different providers require different configuration:
OpenAI:
{ "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.7, "max_tokens": 1000 }
Ollama (local):
{ "llm_provider": "ollama", "llm_model": "llama2", "temperature": 0.8 }
Response Structure
Successful responses include:
{ "content": "Generated text response", "usage": { "prompt_tokens": 150, "completion_tokens": 300, "total_tokens": 450 }, "tool_calls": [ { "tool_name": "search_web", "arguments": {"query": "AI trends"}, "result": {"results": [...]}, "success": True } ], "model": "gpt-4", "provider": "openai", "metadata": { "response_time": 2.34, "template_variables": ["topic", "domain"] } }
Common Usage Patterns
Content generation with templates:
LLMTask("writer", { "prompt": "Write a ${length} article about ${topic} for ${audience}", "context": "You are an expert ${field} writer", "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.7 }, dependencies=[ TaskDependency("topic", "input.topic", REQUIRED), TaskDependency("length", "config.article_length", OPTIONAL, default_value="brief"), TaskDependency("audience", "config.target_audience", REQUIRED), TaskDependency("field", "config.expertise_field", REQUIRED) ])
Research with tool calling:
@tool def search_academic_papers(query: str, limit: int = 5) -> List[Dict]: # Search implementation return papers LLMTask("researcher", { "prompt": "Research ${topic} and provide comprehensive analysis", "tools": [search_academic_papers], "llm_provider": "anthropic", "llm_model": "claude-3-opus", "tool_choice": "auto", "max_tokens": 3000 })
Data analysis and reasoning:
LLMTask("analyzer", { "prompt": "Analyze this data and provide insights: ${data}", "context": "You are a data scientist specializing in ${domain}", "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.3, # Lower temperature for analytical tasks "max_tokens": 2000 }, dependencies=[ TaskDependency("data", "processor.results", REQUIRED), TaskDependency("domain", "config.analysis_domain", REQUIRED) ])
Error Handling
Common failure scenarios and recovery:
API Errors: Network failures, authentication issues
Rate Limits: Automatic retry with exponential backoff
Invalid Models: Clear error messages for unsupported models
Tool Failures: Individual tool errors don’t fail entire task
Template Errors: Missing variables result in clear error messages
Performance Considerations
Use appropriate max_tokens to control costs and latency
Lower temperature (0.1-0.3) for factual/analytical tasks
Higher temperature (0.7-1.0) for creative tasks
Consider model capabilities vs cost (gpt-3.5-turbo vs gpt-4)
Use tool_choice=”none” to disable tool calling when not needed
See also
@tool: For creating callable functions
TaskDependency: For template variable injection
litellm: Underlying provider abstraction
Tool calling examples in the examples package
- Parameters:
- REQUIRED_CONFIGS = ['llm_provider', 'llm_model']
- DEFAULT_TEMPERATURE = 0.7
- DEFAULT_MAX_TOKENS = 1000
- DEFAULT_TIMEOUT = 60
- __init__(name, config=None, dependencies=None)[source]
Initialize an advanced LLM task with template and tool support.
Creates a new LLM task that can automatically substitute template variables from dependencies, execute function calls, and interact with multiple LLM providers. The task handles the complete LLM interaction lifecycle.
- Parameters:
name (
str) – Unique identifier for this task within the pipeline. Used for logging, dependency references, and result storage.config (
Optional[Dict[str,Any]]) –LLM configuration dictionary with the following keys:
Required:
llm_provider (str): LLM provider name. Supported values include: “openai”, “anthropic”, “ollama”, “azure”, “google”, “cohere”, etc.
llm_model (str): Model identifier specific to the provider:
OpenAI: “gpt-4”, “gpt-3.5-turbo”, “gpt-4-turbo-preview”
Anthropic: “claude-3-opus”, “claude-3-sonnet”, “claude-3-haiku”
Ollama: “llama2”, “mistral”, “codellama”
Google: “gemini-pro”, “gemini-pro-vision”
Prompt Configuration:
prompt OR prompt_template (str): Main user message with ${var} templates
context (str, optional): System/context message (supports templates)
role (str, optional): User role description (supports templates)
Generation Parameters:
temperature (float, 0.0-2.0): Randomness control (default: 0.7)
max_tokens (int): Maximum response tokens (default: 1000)
timeout (float): Request timeout in seconds (default: 60)
Tool Calling:
tools (List[Callable | Dict], optional): List of @tool decorated functions and/or MCP server configurations. Each item can be:
Python function: Decorated with @tool for local execution
MCP config dict: {“type”: “mcp”, “server_label”: “…”, “server_url”: “…”}
tool_choice (str, optional): “auto”, “none”, or specific tool name
parallel_tool_calls (bool): Enable parallel tool execution (default: True)
max_tool_execution_time (float): Tool timeout in seconds (default: 30)
Provider-Specific:
api_key (str, optional): API key (can also use environment variables)
api_base (str, optional): Custom API endpoint (for Azure, local deployments)
api_version (str, optional): API version (for Azure OpenAI)
dependencies (
Optional[List[TaskDependency]]) – List of TaskDependency objects for template variable injection. Template variables in prompt, context, and role fields will be automatically replaced with resolved dependency values.
Examples
Basic text generation
LLMTask("summarizer", { "prompt": "Summarize this text: ${content}", "llm_provider": "openai", "llm_model": "gpt-4", "temperature": 0.3, "max_tokens": 500 }, dependencies=[ TaskDependency("content", "input.text", REQUIRED) ])
Advanced with tools and context:
@tool def web_search(query: str) -> Dict[str, Any]: return {"results": search_engine.search(query)} LLMTask("researcher", { "prompt": "Research ${topic} and provide analysis", "context": "You are an expert ${field} researcher with access to web search", "llm_provider": "anthropic", "llm_model": "claude-3-opus", "tools": [web_search], "tool_choice": "auto", "temperature": 0.5, "max_tokens": 3000 }, dependencies=[ TaskDependency("topic", "config.research_topic", REQUIRED), TaskDependency("field", "config.expertise_field", REQUIRED) ])
Local model with Ollama:
LLMTask("local_chat", { "prompt": "Help with this question: ${question}", "llm_provider": "ollama", "llm_model": "llama2", "temperature": 0.8, "max_tokens": 2000 })
Template Variables
Variables in prompt, context, and role fields use ${variable_name} syntax. Resolution order:
Dependency values (from TaskDependency objects)
Direct config values
Environment variables
Error if required variable not found
Tool Integration
Functions decorated with @tool are automatically:
Registered with the LLM provider
Made available to the model during generation
Executed when called by the model
Results injected back into the conversation
Error Handling
Configuration errors are caught during validation:
Missing required fields (llm_provider, llm_model)
Invalid parameter ranges (temperature, max_tokens)
Unsupported tool configurations
Provider-specific validation
Environment Variables
API keys can be provided via environment variables:
OPENAI_API_KEY for OpenAI
GOOGLE_API_KEY for Google
Etc.
See also
@tool: For creating callable functions
TaskDependency: For template variable injection
Provider documentation for model-specific capabilities
-
context_instance:
Optional[TaskContext]
-
tool_registry:
Optional[ToolRegistry]
-
tool_executor:
Optional[ToolExecutor]
- set_context(context)[source]
Set the task context for template resolution.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
Search Task
SearchTask - Simple web search task.
- class SearchTask[source]
Bases:
BaseTaskSimple web search task that executes searches and returns results.
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize search task.
- Parameters:
Config parameters: - query: Search query string (required) - max_results: Maximum number of search results (default: 5) - serper_api_key: Optional API key for Serper search
Transform Task
TransformTask - Generic data transformation task.
- class TransformTask[source]
Bases:
BaseTaskTask that applies transformations to data from context.
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize transform task.
- Parameters:
- Config parameters:
transform_function: Function to apply transformation
input_field: Single input field name (for single-input transforms)
input_fields: List of input field names (for multi-input transforms)
output_name: Name for the output in the result
validate_input: Whether to validate input data (default: True)
validate_output: Whether to validate output data (default: False)
- set_context(context)[source]
Set the task context.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
- property input_data: Any
Public readonly property to access input data for testing purposes.
- Returns:
Input data for transformation
- extract_field_from_list(field_name)[source]
Create a transformation function that extracts a field from each item in a list.
- combine_text_fields(separator='\\n\\n', title_field='title', content_field='content')[source]
Create a transformation function that combines text from multiple items.
- filter_by_condition(condition_func)[source]
Create a transformation function that filters items by a condition.
- aggregate_numeric_field(field_name, operation='sum')[source]
Create a transformation function that aggregates a numeric field.
Conditional Task
ConditionalTask - Task that executes based on context conditions.
- class ConditionalTask[source]
Bases:
BaseTaskTask that executes actions based on conditional logic using context data.
- Parameters:
- __init__(name, config=None, dependencies=None)[source]
Initialize conditional task.
- Parameters:
Config parameters: - condition_function: Function that evaluates to True/False - condition_inputs: List of input field names for condition function - action_function: Function to execute when condition is True - action_inputs: List of input field names for action function - else_function: Optional function to execute when condition is False - else_inputs: List of input field names for else function - skip_reason: Custom reason message when condition is False
- set_context(context)[source]
Set the task context.
- Parameters:
context (
TaskContext) – TaskContext instance- Return type:
- get_execution_summary(context)[source]
Get a formatted summary of the conditional task execution results.
- Parameters:
context (
TaskContext) – TaskContext to retrieve results from- Return type:
- Returns:
Dictionary containing execution summary with formatted info, or None if no results found
- threshold_condition(threshold, operator='>=')[source]
Create a condition function that checks if a value meets a threshold.
- contains_condition(search_term, case_sensitive=False)[source]
Create a condition function that checks if text contains a term.
- list_size_condition(min_size=None, max_size=None)[source]
Create a condition function that checks list size.
- success_rate_condition(min_success_rate)[source]
Create a condition function that checks success rate from a results dict.
- quality_gate_condition(min_score, score_field='score')[source]
Create a condition function that checks quality score.
Core Infrastructure
Task Result
Task result classes for standardized response format.
- class TaskStatus[source]
Bases:
EnumEnumeration for task execution status.
- NOT_STARTED = 'not_started'
- STARTED = 'started'
- SUCCESS = 'success'
- ERROR = 'error'
- SKIPPED = 'skipped'
- PARTIAL = 'partial'
- class TaskResult[source]
Bases:
objectStandardized result format for all task executions.
This class provides a consistent interface for task results across the framework, enabling better error handling, performance tracking, and result processing.
- Parameters:
-
status:
TaskStatus
- classmethod success(data=None, metadata=None, execution_time=0.0)[source]
Create a successful task result.
- classmethod failure(error_message, metadata=None, execution_time=0.0)[source]
Create an error task result.
- classmethod partial(data=None, error=None, metadata=None, execution_time=0.0)[source]
Create a partial success task result.
- classmethod skipped(reason, metadata=None, execution_time=0.0)[source]
Create a skipped task result.
- is_success()[source]
Check if the task completed successfully.
- Return type:
- Returns:
True if status is SUCCESS
- is_error()[source]
Check if the task failed with an error.
- Return type:
- Returns:
True if status is ERROR
- is_partial()[source]
Check if the task completed with partial success.
- Return type:
- Returns:
True if status is PARTIAL
- is_skipped()[source]
Check if the task was skipped.
- Return type:
- Returns:
True if status is SKIPPED
- has_data()[source]
Check if the result contains data.
- Return type:
- Returns:
True if data is not None
- get_legacy_result()[source]
Get result in legacy format for backward compatibility.
This method extracts the data field to maintain compatibility with existing code that expects raw return values.
- Return type:
- Returns:
The data field, or raises RuntimeError if task failed
- wrap_legacy_result(result, execution_time=0.0)[source]
Wrap a legacy task result in TaskResult format.
This utility function helps with gradual migration by wrapping existing task results in the new standardized format.
- Parameters:
- Return type:
- Returns:
TaskResult with SUCCESS status containing the legacy result
- unwrap_to_legacy(task_result)[source]
Unwrap TaskResult to legacy format for backward compatibility.
- Parameters:
task_result (
TaskResult) – TaskResult instance- Return type:
- Returns:
The data field from the TaskResult
- Raises:
RuntimeError – If the task failed
Agent Run Result
Agent run result classes for standardized pipeline execution response format.
- class AgentRunStatus[source]
Bases:
EnumEnumeration for agent execution status.
- SUCCESS = 'success'
- PARTIAL = 'partial'
- ERROR = 'error'
- RUNNING = 'running'
- class AgentRunResult[source]
Bases:
objectStandardized result format for agent pipeline executions.
This class provides a consistent interface for agent execution results, enabling better error handling, status checking, and result processing.
Note: Individual task results are accessible via agent.context.get_result(task_name)
- Parameters:
-
status:
AgentRunStatus
- classmethod success(agent_name, total_tasks, completed_tasks, total_phases, execution_time=0.0, metadata=None)[source]
Create a successful agent execution result.
- Parameters:
agent_name (
str) – Name of the agenttotal_tasks (
int) – Total number of tasks in the pipelinecompleted_tasks (
int) – Number of tasks that completed successfullytotal_phases (
int) – Number of execution phasesexecution_time (
float) – Total execution time in secondsmetadata (
Optional[Dict[str,Any]]) – Optional metadata dictionary
- Return type:
- Returns:
AgentRunResult with SUCCESS status
- classmethod partial(agent_name, total_tasks, completed_tasks, failed_tasks, total_phases, execution_time=0.0, metadata=None)[source]
Create a partial success agent execution result.
- Parameters:
agent_name (
str) – Name of the agenttotal_tasks (
int) – Total number of tasks in the pipelinecompleted_tasks (
int) – Number of tasks that completed successfullyfailed_tasks (
int) – Number of tasks that failedtotal_phases (
int) – Number of execution phasesexecution_time (
float) – Total execution time in secondsmetadata (
Optional[Dict[str,Any]]) – Optional metadata dictionary
- Return type:
- Returns:
AgentRunResult with PARTIAL status
- classmethod failure(agent_name, total_tasks, failed_tasks, error_message, total_phases=0, execution_time=0.0, metadata=None)[source]
Create a failed agent execution result.
- Parameters:
agent_name (
str) – Name of the agenttotal_tasks (
int) – Total number of tasks in the pipelinefailed_tasks (
int) – Number of tasks that failederror_message (
str) – Description of the errortotal_phases (
int) – Number of execution phasesexecution_time (
float) – Total execution time in secondsmetadata (
Optional[Dict[str,Any]]) – Optional metadata dictionary
- Return type:
- Returns:
AgentRunResult with ERROR status
- is_success()[source]
Check if the agent execution completed successfully.
- Return type:
- Returns:
True if status is SUCCESS
- is_error()[source]
Check if the agent execution failed with an error.
- Return type:
- Returns:
True if status is ERROR
- is_partial()[source]
Check if the agent execution completed with partial success.
- Return type:
- Returns:
True if status is PARTIAL
- is_running()[source]
Check if the agent is currently running.
- Return type:
- Returns:
True if status is RUNNING
- __init__(status, agent_name, total_tasks=0, completed_tasks=0, failed_tasks=0, total_phases=0, execution_time=0.0, metadata=<factory>, error_message=None)
Task Context
TaskContext - Simplified shared data store for inter-task communication.
- class TaskContext[source]
Bases:
objectSimplified context for storing and accessing task results across pipeline execution.
Focuses on string-based data storage and simple dot notation path access. Designed for text-based automation workflows.
- get_path_value(path)[source]
Get a value from context using paths like ‘task.field’, ‘task.array[0]’, or ‘task.array[].field’.
- get_task_count()[source]
Get the number of tasks with stored results.
- Return type:
- Returns:
Number of tasks with results
- get_result_field(task_name, field_name, expected_type=str, default=None)[source]
Get a specific field from a task result with type checking.
Ultra-safe: handles all edge cases internally, never throws errors.
- Parameters:
- Return type:
- Returns:
The field value if found and correct type, otherwise default
- get_result_content(task_name)[source]
Get the ‘content’ field from a task result as a string.
Ultra-safe: handles all edge cases internally, never throws errors.
- has_result_content(task_name)[source]
Check if a task has non-empty content.
Ultra-safe: handles all edge cases internally, never throws errors.
- display_result_content(task_name, title, field_name='content')[source]
Display task result content in a message box if present.
Ultra-safe: handles all edge cases internally including: - Task not completed - Missing or empty content - Any errors during display
No return value needed - this is a pure convenience method.
- get_result_fields(task_name, *field_names)[source]
Get multiple fields from a task result at once.
Ultra-safe: handles all edge cases internally, never throws errors.
Task Dependencies
Declarative task dependency system for automated data flow in AI pipelines.
This module provides TaskDependency and DependencyResolver for creating automated data flow between tasks in PipelineAgent workflows. Dependencies are specified declaratively and resolved automatically, enabling complex data transformations and pipeline orchestration without manual data passing.
- Key Features
Declarative Dependencies: Specify data flow using simple path syntax
Automatic Resolution: Dependencies resolved before task execution
Data Transformation: Optional transform functions for data preprocessing
Type Safety: Required vs optional dependencies with validation
Path Access: Flexible dot-notation path access for nested data
- Dependency Flow
Declaration: Tasks declare dependencies using source paths
Resolution: DependencyResolver extracts data from TaskContext
Transformation: Optional transform functions process data
Injection: Resolved data added to task configuration
Execution: Task runs with all dependencies satisfied
Quick Example
# Search task produces results
search_task = SearchTask("search", {"query": "AI news"})
# LLM task depends on search results
llm_task = LLMTask("summarize", {
"prompt": "Summarize: ${articles}",
"llm_provider": "openai"
}, dependencies=[
TaskDependency("articles", "search.results", REQUIRED)
])
# Pipeline automatically resolves search.results -> articles
Path Syntax
Dependencies use dot notation to access nested data:
“task_name.field” - Access field in task result
“task_name.data.nested” - Access nested field
“task_name.results[].url” - Extract URLs from result list
“task_name.metadata.count” - Access metadata fields
Transform Functions
Use transform_func to preprocess dependency data:
TaskDependency(
"urls",
"search.results",
REQUIRED,
transform_func=lambda results: [r['url'] for r in results]
)
Dependency Types
REQUIRED: Task execution fails if dependency unavailable
OPTIONAL: Uses default_value if dependency unavailable
See also
TaskDependency: Individual dependency specification
DependencyResolver: Automatic dependency resolution engine
TaskContext: Shared data store for inter-task communication
Built-in transform functions: extract_urls_from_results, combine_article_content
- class DependencyType[source]
Bases:
EnumEnumeration of dependency types for task data flow.
Dependency types control how missing dependencies are handled:
REQUIRED: Task execution fails if dependency cannot be resolved
OPTIONAL: Default value used if dependency unavailable
Example
# Required dependency - task fails if search_task not available TaskDependency("query_results", "search_task.results", REQUIRED) # Optional dependency - uses default if config_task unavailable TaskDependency("settings", "config_task.options", OPTIONAL, default_value={})
- REQUIRED = 'required'
- OPTIONAL = 'optional'
- class TaskDependency[source]
Bases:
objectSpecification for automatic data flow between tasks in a pipeline.
TaskDependency defines how a task receives data from other tasks in the pipeline. Dependencies are resolved automatically by the PipelineAgent before task execution, with resolved data added to the task’s configuration.
The dependency system supports flexible path-based data access, optional data transformation, and both required and optional dependencies with appropriate error handling.
- Core Components:
name: Key in target task’s config where resolved data is stored
source_path: Dot-notation path to source data in TaskContext
dependency_type: REQUIRED or OPTIONAL behavior for missing data
transform_func: Optional function to process resolved data
default_value: Fallback value for optional dependencies
- Path Syntax:
Source paths use dot notation to access nested data
“task.field” - Direct field access
“task.data.nested.value” - Nested object access
“task.results[].url” - Array element extraction
“task.metadata.statistics” - Metadata access
Transform Functions
Optional preprocessing of resolved data before injection:
def extract_titles(articles): return [article.get('title', 'Untitled') for article in articles] TaskDependency( "titles", "search.results", REQUIRED, transform_func=extract_titles )
Common Patterns
Basic data passing:
TaskDependency("search_results", "search_task.results", REQUIRED)
Data transformation:
TaskDependency( "article_urls", "search_task.results", REQUIRED, transform_func=lambda results: [r['url'] for r in results] )
Optional configuration:
TaskDependency( "processing_options", "config_task.settings", OPTIONAL, default_value={"batch_size": 100, "timeout": 30} )
Nested data access:
TaskDependency("api_endpoint", "config_task.api.endpoints.primary", REQUIRED)
Complex transformation:
def combine_and_filter(search_results): # Filter recent articles and combine text recent = [r for r in search_results if is_recent(r)] return ' '.join(r.get('content', '') for r in recent) TaskDependency( "combined_content", "search_task.results", REQUIRED, transform_func=combine_and_filter )
Error Handling
Required dependencies: Missing data causes task execution failure
Optional dependencies: Missing data uses default_value
Transform errors: Transform function exceptions cause dependency failure
Path errors: Invalid paths cause resolution failure with detailed messages
Best Practices
Use descriptive names that indicate the data being passed
Prefer specific paths over broad data passing
Use transform functions to shape data for consuming tasks
Provide meaningful default values for optional dependencies
Document complex transform functions for maintainability
Thread Safety
TaskDependency instances are immutable after creation and thread-safe. However, transform functions should be thread-safe if used in parallel execution.
See also
DependencyType: REQUIRED vs OPTIONAL dependency behavior
DependencyResolver: Automatic resolution engine
TaskContext: Source of dependency data
Built-in transform utilities: extract_urls_from_results, etc.
- Parameters:
- __init__(name, source_path, dependency_type, default_value=None, transform_func=None, override_existing=False, description='')[source]
Initialize a task dependency specification.
Creates a dependency that will be resolved automatically by the PipelineAgent before task execution. The resolved data will be injected into the task’s configuration under the specified name.
Note
Dependencies are resolved in the order they are defined
Transform functions should be deterministic and thread-safe
Source task must complete successfully for REQUIRED dependencies
Path resolution supports nested objects and array access
- Parameters:
name (
str) – Key name where resolved data will be stored in the target task’s config dictionary. Should be descriptive and follow naming conventions (snake_case recommended). This is how the consuming task will access the dependency data.source_path (
str) –Dot-notation path to the source data in TaskContext. Format: “source_task_name.field_path”. .. admonition:: Examples
”search.results” - Access results field from search task
”fetch.data.articles[].content” - Extract content from article array
”config.api.endpoints.primary” - Access nested configuration
”process.metadata.item_count” - Access metadata fields
dependency_type (
DependencyType) –How to handle missing dependencies:
DependencyType.REQUIRED: Task execution fails if unavailable
DependencyType.OPTIONAL: Uses default_value if unavailable
default_value (
Any) –Value to use when optional dependency is unavailable. Only relevant for OPTIONAL dependencies. Can be any type that makes sense for the consuming task. Common patterns:
Empty list: []
Empty dict: {}
Default config: {“timeout”: 30, “retries”: 3}
None: None (explicit null)
transform_func (
Optional[Callable[[Any],Any]]) –Optional function to preprocess resolved data before injection. Function signature: (resolved_data: Any) -> Any. Common uses:
Extract specific fields: lambda x: [item[‘url’] for item in x]
Filter data: lambda x: [item for item in x if item[‘valid’]]
Combine data: lambda x: ‘ ‘.join(x)
Format data: lambda x: {“processed”: x, “count”: len(x)}
override_existing (
bool) –Whether to override existing values in task config.
False (default): Only inject if key doesn’t exist in config
True: Always inject, overriding existing config values
Use with caution as it can override user-provided configuration.
description (
str) – Human-readable description of this dependency’s purpose. Helpful for documentation and debugging. Should explain what data is being passed and how it will be used.
Example:
Basic dependency:
# Pass search results to summarization task TaskDependency( name="search_results", source_path="web_search.results", dependency_type=DependencyType.REQUIRED )
Transformed dependency:
TaskDependency( name="article_urls", source_path="search.results", dependency_type=DependencyType.REQUIRED, transform_func=lambda results: [r['url'] for r in results], description="Extract URLs from search results for fetching" )
Optional dependency with default:
TaskDependency( name="processing_config", source_path="config_loader.settings", dependency_type=DependencyType.OPTIONAL, default_value={"batch_size": 100, "parallel": True}, description="Processing configuration with sensible defaults" )
- Raises:
ValueError – If name is empty, source_path is empty, or source_path doesn’t contain a dot (invalid format).
See also
DependencyType: For dependency type behavior
DependencyResolver: For resolution implementation details
Built-in transform functions: extract_urls_from_results, etc.
- class DependencyResolver[source]
Bases:
objectResolves task dependencies from context and builds task configuration.
- Parameters:
context (
TaskContext)
- __init__(context)[source]
Initialize dependency resolver with context.
- Parameters:
context (
TaskContext) – TaskContext to resolve dependencies from
- resolve_dependencies(task)[source]
Resolve all dependencies for a task and return merged configuration.
- Parameters:
task (
BaseTask) – Task instance with get_dependencies() method- Return type:
- Returns:
Dictionary with resolved dependencies merged with existing config
- Raises:
ValueError – If required dependencies cannot be satisfied
- create_required_dependency(name, source_path, transform_func=None)[source]
Create a required dependency.
- create_optional_dependency(name, source_path, default_value, transform_func=None)[source]
Create an optional dependency with default value.
Tools and Utilities
Tool Registry
Tool registry for managing tool schemas and implementations.
- class ToolRegistry[source]
Bases:
objectManages tool schemas and implementations with automatic generation.
- __init__(tool_functions)[source]
Initialize tool registry with list of decorated functions.
- Parameters:
tool_functions (
List[Callable[...,Any]]) – List of functions decorated with @tool- Raises:
ValueError – If any function is not decorated with @tool
- get_tool_function(name)[source]
Get tool function by name.
- Parameters:
name (
str) – Name of the tool to retrieve- Return type:
- Returns:
Tool function implementation
- Raises:
ValueError – If tool is not found
- get_tool_metadata(name)[source]
Get tool metadata by name.
- Parameters:
name (
str) – Name of the tool- Return type:
- Returns:
ToolMetadata instance
- Raises:
ValueError – If tool is not found
- get_tool_count()[source]
Get the number of registered tools.
- Return type:
- Returns:
Number of tools in registry
- generate_tool_context()[source]
Generate context description for system prompt.
- Return type:
- Returns:
Formatted tool context for inclusion in system prompt
Tool Executor
Tool execution engine with error handling and logging.
- class ToolExecutor[source]
Bases:
objectHandles tool execution with error handling and logging.
- Parameters:
tool_registry (
ToolRegistry)max_execution_time (
float)
- __init__(tool_registry, max_execution_time=30.0)[source]
Initialize tool executor.
- Parameters:
tool_registry (
ToolRegistry) – Registry containing tool implementationsmax_execution_time (
float) – Maximum time (seconds) to allow for tool execution
- execute_tool(tool_name, arguments)[source]
Execute a tool and return result or error.
- Parameters:
- Returns:
success (bool): Whether execution was successful
result (Any): Tool result if successful
error (str): Error message if failed
execution_time (float): Time taken in seconds
tool_name (str): Name of executed tool
error_type (str): Type of error if failed
- Return type:
Dictionary with execution result containing
Tools
Tool decorator and metadata extraction for function calling.
- tool(func)[source]
Decorator to mark a function as a tool and extract metadata.
- Parameters:
- Return type:
- Returns:
Decorated function with tool metadata attached
- Example:
@tool
def calculate(expression – str) -> float: ‘’’Perform mathematical calculations.
- param expression:
Mathematical expression to evaluate
- returns:
Calculated result
‘’’ return eval(expression) # Use safe_eval in production
- class ToolSchemaGenerator[source]
Bases:
objectGenerates OpenAI-compatible schemas from decorated functions.
- search_with_content(query, max_results=5, max_content_results=5, content_timeout=15, html_method='readability', skip_pdf=False)[source]
Search the web and automatically fetch content from the top results.
This tool provides enhanced search functionality that not only returns search results but also automatically fetches and extracts readable content from the top URLs. Perfect for research tasks where you need both search results and their content.
- Parameters:
query (
str) – The search query string to search formax_results (
int) – Maximum number of search results to return (1-10, default: 5)max_content_results (
int) – Maximum number of results to fetch content from (1-10, default: 5)content_timeout (
int) – Timeout in seconds for fetching content from each URL (default: 15)html_method (
str) – Method for HTML text extraction - “readability” or “basic” (default: “readability”)skip_pdf (
bool) – Whether to skip PDF files when fetching content (default: False)
- Return type:
- Returns:
Dictionary containing search results with fetched content including query, total_results, search_time, results list with title/url/snippet/content for each, and content_stats with attempted/successful/failed/skipped counts.
URL Fetcher
- class URLFetcher[source]
Utility class for fetching web content with Chrome-like behavior.
- __init__(config=None)[source]
Initialize URL fetcher with optional configuration.
- Parameters:
config (
Optional[Dict[str,Any]]) –Optional configuration dictionary with options:
timeout: Request timeout in seconds (default: 30)
user_agent: Custom user agent string
headers: Additional headers to include
follow_redirects: Whether to follow redirects (default: True)
max_redirects: Maximum number of redirects (default: 10)
- fetch(url, **kwargs)[source]
Fetch content from a URL.
- Parameters:
- Returns:
url: Final URL after redirects
status_code: HTTP status code
headers: Response headers
content: Response content as text
content_type: Content type from headers
encoding: Character encoding
size: Content size in bytes
response_time: Time taken for request in seconds
- Return type:
Dictionary containing
- Raises:
ValueError – If URL is invalid
RuntimeError – If request fails
- fetch_headers_only(url, **kwargs)[source]
Fetch only headers from a URL using HEAD request.
- Parameters:
- Returns:
url: Final URL after redirects
status_code: HTTP status code
headers: Response headers
content_type: Content type from headers
content_length: Content length if available
response_time: Time taken for request in seconds
- Return type:
Dictionary containing
- __exit__(exc_type, exc_val, exc_tb)[source]
Context manager exit.
- Parameters:
exc_val (
Optional[BaseException])exc_tb (
Optional[TracebackType])
- Return type: