# Aipype Framework ## Introduction The **Aipype** framework is a modular and declarative pipeline-based architecture designed to facilitate the orchestration of AI tasks. At its core, the framework revolves around the **PipelineAgent**, which automates the discovery and execution of tasks defined by methods decorated with the `@task` decorator. This allows developers to focus on defining the logic of their tasks without worrying about the underlying orchestration mechanics. The framework intelligently infers task dependencies based on method signatures, enabling a seamless flow of data between tasks. This is achieved through the use of **TaskContext**, a shared data store that allows for efficient inter-task communication and data sharing. To manage dependencies effectively, Aipype introduces the **TaskDependency** class, which specifies how tasks are interconnected. This class supports both required and optional dependencies, allowing for flexible data flow and error handling. When a task is executed, the framework constructs an execution plan that organizes tasks into phases, ensuring that independent tasks can run in parallel while maintaining the correct order of execution for dependent tasks. Additionally, the **TaskResult** pattern is employed for error handling, allowing tasks to return structured results that indicate success, partial success, or failure without raising exceptions. This design promotes robust error management and enhances the overall reliability of the pipeline. The Aipype framework also supports advanced features such as parallel execution of independent tasks, making it suitable for high-performance applications. Developers can configure execution parameters, such as the maximum number of concurrent tasks and whether to stop the pipeline on the first failure. By leveraging these capabilities, Aipype provides a powerful and flexible environment for building complex AI workflows, enabling developers to create efficient and maintainable pipelines with ease. ## Aipype Usage Examples ## 01 Basic Print Agent This minimal example demonstrates core framework concepts from the local `aipype` package using the recommended declarative syntax. ```python from aipype import PipelineAgent, task from aipype import print_header DEFAULT_MESSAGE = "Hello from my first agent!" class BasicAgent(PipelineAgent): """An agent that prints a message using the declarative @task syntax.""" @task def print_message(self) -> dict[str, str]: """A simple task that reads a message from config and prints it.""" message = self.config.get("message", DEFAULT_MESSAGE) print(message) return {"message": message} if __name__ == "__main__": print_header("BASIC PRINT AGENT TUTORIAL") agent = BasicAgent(name="basic-agent", config={"message": DEFAULT_MESSAGE}) agent.run() agent.display_results() ``` ## 02 LLM Task This example demonstrates how to use the `llm()` helper function to generate an article outline using the declarative @task syntax. ```python import os from typing import Optional, List from aipype import PipelineAgent, task, llm from aipype import print_header DEFAULT_TOPIC = "AI agent frameworks" class OutlineAgent(PipelineAgent): """Agent that generates an article outline using the declarative syntax.""" @task def outline_article(self) -> LLMTask: """Generate an article outline using the llm() helper.""" topic = self.config.get("topic", DEFAULT_TOPIC) return llm( prompt=( f"Create a concise, hierarchical outline for an article about {topic}.\n" "- Use 5-7 top-level sections with clear headings.\n" "- Add 2-3 bullet points under each section.\n" "- Optimize for technical readers and clarity.\n" "Return the outline in markdown." ), model="gemma3:4b", provider="ollama", temperature=0.2, max_tokens=800, ) def display_results(self) -> None: """Display formatted outline after pipeline execution.""" super().display_results() self.context.display_result_content("outline_article", "GENERATED OUTLINE (MARKDOWN)") if __name__ == "__main__": print_header("LLM TASK TUTORIAL - OUTLINE GENERATION") agent = OutlineAgent(name="outline-agent", config={"topic": DEFAULT_TOPIC}) agent.run() agent.display_results() ``` ## 03 Dependent Tasks This tutorial shows how one task can depend on another using the declarative @task syntax with automatic dependency inference. ```python import os from typing import Annotated, Optional, List from aipype import PipelineAgent, task, llm, Depends from aipype import print_header DEFAULT_TOPIC = "AI agent frameworks" class OutlineToArticleAgent(PipelineAgent): """Agent with two dependent LLM tasks: outline -> article.""" @task def outline_article(self) -> LLMTask: """Generate an article outline.""" topic = self.config.get("topic", DEFAULT_TOPIC) return llm( prompt=( f"Create a detailed outline for an article about {topic}.\n" "- Use 4-6 main sections with descriptive headings\n" "- Add 2-3 bullet points under each section\n" "- Return only the outline in markdown format." ), model="gemma3:4b", provider="ollama", temperature=0.3, max_tokens=600, ) @task def write_article( self, outline_article: Annotated[str, Depends("outline_article.content")], ) -> LLMTask: """Write an article based on the outline.""" return llm( prompt=( "Write a short, well-structured article based on the following outline:\n" f"{outline_article}\n" "Output only the article, no other text." ), model="gemma3:4b", provider="ollama", temperature=0.5, max_tokens=1200, ) def display_results(self) -> None: """Display results using the framework display methods.""" super().display_results() self.context.display_completed_results( [ ("outline_article", "GENERATED OUTLINE"), ("write_article", "GENERATED ARTICLE"), ] ) if __name__ == "__main__": print_header("DEPENDENT TASKS TUTORIAL - OUTLINE TO ARTICLE") agent = OutlineToArticleAgent(name="outline-to-article-agent", config={"topic": DEFAULT_TOPIC}) agent.run() agent.display_results() ``` ## 04 Conditional Task This tutorial demonstrates how to implement conditional logic in your agent pipelines using the declarative @task syntax. ```python import os from typing import Annotated, Any, Dict from aipype import PipelineAgent, task, llm, Depends from aipype import print_header DEFAULT_TOPIC = "machine learning fundamentals" MIN_WORD_COUNT = 50 class QualityCheckerAgent(PipelineAgent): """Agent that generates content and checks it against quality criteria.""" @task def generate_outline(self) -> LLMTask: """Generate an article outline.""" topic = self.config.get("topic", DEFAULT_TOPIC) return llm( prompt=( f"Create a detailed outline for an article about {topic}.\n" "- Use 4-6 main sections with descriptive headings\n" "- Add 2-3 bullet points under each section\n" "- Return only the outline in markdown format." ), model="gemma3:4b", provider="ollama", temperature=0.3, max_tokens=600, ) @task def quality_check( self, generate_outline: Annotated[str, Depends("generate_outline.content")], ) -> Dict[str, Any]: """Check if the outline meets quality standards.""" word_count = len(generate_outline.split()) if word_count >= MIN_WORD_COUNT: return { "status": "approved", "message": "Outline meets quality standards.", "word_count": word_count, } else: return { "status": "rejected", "message": "Outline needs expansion.", "word_count": word_count, } def display_results(self) -> None: """Display the outline and quality check results.""" super().display_results() outline_result = self.context.get_result("generate_outline") quality_result = self.context.get_result("quality_check") print(f"Outline: {outline_result['content']}") print(f"Quality Check: {quality_result['status']} - {quality_result['message']}") if __name__ == "__main__": print_header("CONDITIONAL LOGIC TUTORIAL - QUALITY CHECKER") agent = QualityCheckerAgent(name="quality-checker-agent", config={"topic": DEFAULT_TOPIC}) agent.run() agent.display_results() ``` ## Structured Response Example This example demonstrates LLMTask structured response support with Pydantic models. ```python import os import json from typing import Any, Dict, List from pydantic import BaseModel, Field from aipype import PipelineAgent, task, llm from aipype import print_header class CompanyInfo(BaseModel): """Structured information about a company.""" name: str = Field(description="Company name") industry: str = Field(description="Primary industry sector") founded_year: int = Field(description="Year the company was founded") headquarters: str = Field(description="Headquarters location") key_products: List[str] = Field(description="Main products or services") class DataExtractionAgent(PipelineAgent): """Agent that extracts structured data from unstructured text using @task syntax.""" @task def extract_company(self) -> LLMTask: """Extract company information from text using structured output.""" company_text = ( "Tesla Inc. is an American electric vehicle and clean energy company " "founded in 2003. Headquartered in Austin, Texas." ) return llm( prompt=f"Extract company information from this text: {company_text}", model="gpt-4", provider="openai", response_format=CompanyInfo, temperature=0.1, ) if __name__ == "__main__": print_header("STRUCTURED RESPONSE EXTRACTION EXAMPLE") agent = DataExtractionAgent(name="data_extractor") agent.run() company_result = agent.context.get_result("extract_company") print(json.dumps(company_result, indent=2)) ``` ## API Reference ### PipelineAgent Declarative agent that discovers tasks from `@task` decorated methods. It provides a cleaner, more Pythonic way to define AI pipelines. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None)` - Initialize the pipeline agent. - `setup_tasks() -> List[BaseTask]` - Auto-discover `@task` methods and build the task list. - `run() -> AgentRunResult` - Execute the complete workflow with automatic task orchestration. - `add_tasks(tasks: List[BaseTask]) -> None` - Add tasks to the agent. - `create_context() -> TaskContext` - Create and return the task context for this agent. - `get_context() -> TaskContext` - Get the task context for this agent. - `get_execution_plan() -> Optional[TaskExecutionPlan]` - Get the current execution plan. - `validate_dependencies() -> List[str]` - Validate that all task dependencies can be satisfied. - `display_results(sections: Optional[List[str]] = None) -> None` - Display results with configurable sections. **Attributes:** - `name: str` - Agent identifier. - `config: Optional[Dict[str, Any]]` - Configuration dictionary. - `tasks: List[BaseTask]` - List of TaskWrapper instances (populated by `setup_tasks()`). - `context: TaskContext` - TaskContext for inter-task communication. **Configuration Parameters:** - `name` (str) - Unique identifier for this agent. - `config` (Optional[Dict[str, Any]]) - Configuration dictionary passed to agent and tasks. --- ### BasePipelineAgent Base agent class for building AI workflows with automatic task orchestration. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None)` - Initialize pipeline agent with name and configuration. - `setup_tasks() -> List[BaseTask]` - Define the workflow tasks for this agent. Must be implemented by subclasses. - `run() -> AgentRunResult` - Execute the complete workflow with automatic task orchestration. - `reset() -> None` - Reset the agent and clear context. **Attributes:** - `name: str` - Unique identifier for this agent instance. - `config: Optional[Dict[str, Any]]` - Configuration dictionary with agent settings. - `tasks: List[BaseTask]` - List of tasks (populated by `setup_tasks()`). - `context: TaskContext` - TaskContext for inter-task communication. **Configuration Parameters:** - `enable_parallel` (bool) - Enable parallel task execution (default: True). - `max_parallel_tasks` (int) - Maximum concurrent tasks (default: 5). - `stop_on_failure` (bool) - Stop pipeline on first failure (default: True). --- ### TaskExecutionPlan Execution plan that organizes tasks into phases based on dependencies. **Methods:** - `__init__(tasks: List[BaseTask])` - Create execution plan from list of tasks. - `get_phase(phase_index: int) -> List[BaseTask]` - Get tasks in a specific phase. - `get_total_tasks() -> int` - Get total number of tasks in the plan. - `total_phases() -> int` - Get total number of execution phases. **Attributes:** - `tasks: List[BaseTask]` - Original list of tasks to organize. - `phases: List[List[BaseTask]]` - List of task lists, each representing an execution phase. --- ### BaseTask Abstract base class for all task implementations in the aipype framework. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None, dependencies: Optional[List[TaskDependency]] = None)` - Initialize a new task instance with configuration and dependencies. - `run() -> TaskResult` - Must be implemented by subclasses to perform task work. - `get_dependencies() -> List[TaskDependency]` - Get the list of dependencies for this task. - `mark_started() -> None` - Mark the task as started. - `mark_success(result: Optional[Any] = None) -> None` - Mark the task as successfully completed with an optional result. - `mark_error(error: str) -> None` - Mark the task as failed with an error message. **Attributes:** - `name: str` - Unique identifier for this task instance. - `config: Optional[Dict[str, Any]]` - Configuration dictionary with task-specific parameters. - `dependencies: List[TaskDependency]` - List of TaskDependency objects for data flow. - `validation_rules: Optional[Dict[str, Any]]` - Optional validation rules for configuration. --- ### TaskResult Standardized result format for all task executions. **Methods:** - `__init__(status: TaskStatus, data: Optional[Any] = None, error: Optional[str] = None, metadata: Dict[str, Any] = {}, execution_time: float = 0.0)` - Initialize task result. - `is_success() -> bool` - Check if the task completed successfully. - `is_error() -> bool` - Check if the task failed with an error. - `is_partial() -> bool` - Check if the task completed with partial success. - `add_metadata(key: str, value: Any) -> None` - Add metadata entry. **Attributes:** - `status: TaskStatus` - Status of the task execution. - `data: Optional[Any]` - Result data if successful. - `error: Optional[str]` - Error message if failed. - `metadata: Dict[str, Any]` - Additional metadata about the execution. - `execution_time: float` - Time taken to execute in seconds. --- ### AgentRunResult Standardized result format for agent pipeline executions. **Methods:** - `__init__(status: AgentRunStatus, agent_name: str, total_tasks: int = 0, completed_tasks: int = 0, failed_tasks: int = 0, total_phases: int = 0, execution_time: float = 0.0, metadata: Dict[str, Any] = {}, error_message: Optional[str] = None)` - Initialize agent run result. - `is_success() -> bool` - Check if the agent execution completed successfully. - `is_error() -> bool` - Check if the agent execution failed with an error. - `is_partial() -> bool` - Check if the agent execution completed with partial success. **Attributes:** - `status: AgentRunStatus` - Status of the agent execution. - `agent_name: str` - Name of the agent. - `total_tasks: int` - Total number of tasks in the pipeline. - `completed_tasks: int` - Number of tasks that completed successfully. - `failed_tasks: int` - Number of tasks that failed. - `total_phases: int` - Number of execution phases. - `execution_time: float` - Total execution time in seconds. - `metadata: Dict[str, Any]` - Additional metadata about the execution. - `error_message: Optional[str]` - Error message if execution failed. --- ### TaskContext Simplified context for storing and accessing task results across pipeline execution. **Methods:** - `__init__()` - Initialize empty task context. - `store_result(task_name: str, result: Dict[str, Any]) -> None` - Store a task result in the context. - `get_result(task_name: str) -> Optional[Dict[str, Any]]` - Get a task result from the context. - `has_result(task_name: str) -> bool` - Check if a task result exists in the context. - `clear() -> None` - Clear all stored results and execution history. **Attributes:** - `execution_history: List[Dict[str, Any]]` - History of task executions. - `results: Dict[str, Dict[str, Any]]` - Stored task results. --- ### TaskDependency Specification for automatic data flow between tasks in a pipeline. **Methods:** - `__init__(name: str, source_path: str, dependency_type: DependencyType, default_value: Optional[Any] = None, transform_func: Optional[Callable[[Any], Any]] = None, override_existing: bool = False, description: str = '')` - Initialize a task dependency specification. - `is_required() -> bool` - Check if this is a required dependency. - `is_optional() -> bool` - Check if this is an optional dependency. **Attributes:** - `name: str` - Key in target task's config where resolved data is stored. - `source_path: str` - Dot-notation path to source data in TaskContext. - `dependency_type: DependencyType` - REQUIRED or OPTIONAL behavior for missing data. - `transform_func: Optional[Callable[[Any], Any]]` - Optional function to process resolved data. - `default_value: Optional[Any]` - Fallback value for optional dependencies. --- ### DependencyResolver Resolves task dependencies from context and builds task configuration. **Methods:** - `__init__(context: TaskContext)` - Initialize dependency resolver with context. - `resolve_dependencies(task: BaseTask) -> Dict[str, Any]` - Resolve all dependencies for a task and return merged configuration. - `validate_dependencies(task: BaseTask) -> List[str]` - Validate that all dependencies for a task can be satisfied. --- ### LLMTask Advanced LLM integration task with template substitution and tool calling. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None, dependencies: Optional[List[TaskDependency]] = None)` - Initialize an advanced LLM task with template and tool support. - `run() -> TaskResult` - Execute the LLM task with context-resolved prompts and logging. - `set_context(context: TaskContext) -> None` - Set the task context for template resolution. - `get_resolved_prompt() -> Optional[str]` - Get the resolved prompt after template substitution. - `get_resolved_context() -> Optional[str]` - Get the resolved context after template substitution. **Attributes:** - `name: str` - Unique identifier for this task within the pipeline. - `config: Optional[Dict[str, Any]]` - LLM configuration dictionary. - `dependencies: Optional[List[TaskDependency]]` - List of TaskDependency objects for template variable injection. --- ### SearchTask Simple web search task that executes searches and returns results. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None, dependencies: Optional[List[TaskDependency]] = None)` - Initialize search task. - `run() -> TaskResult` - Execute the search and return results. - `__str__() -> str` - String representation of the search task. **Attributes:** - `name: str` - Task name. - `config: Optional[Dict[str, Any]]` - Task configuration. --- ### TransformTask Task that applies transformations to data from context. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None, dependencies: Optional[List[TaskDependency]] = None)` - Initialize transform task. - `run() -> TaskResult` - Execute the transformation. - `set_context(context: TaskContext) -> None` - Set the task context. - `preview_transformation() -> Dict[str, Any]` - Preview what the transformation would do with current input. **Attributes:** - `name: str` - Task name. - `config: Optional[Dict[str, Any]]` - Task configuration. --- ### ConditionalTask Task that executes actions based on conditional logic using context data. **Methods:** - `__init__(name: str, config: Optional[Dict[str, Any]] = None, dependencies: Optional[List[TaskDependency]] = None)` - Initialize conditional task. - `run() -> TaskResult` - Execute the conditional task. - `set_context(context: TaskContext) -> None` - Set the task context. - `preview_condition() -> Dict[str, Any]` - Preview what the condition would evaluate to with current input. **Attributes:** - `name: str` - Task name. - `config: Optional[Dict[str, Any]]` - Task configuration. --- ### Utility Functions #### llm(prompt: str, *, model: str = 'gpt-4o', provider: str = 'openai', system: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 1000, tools: Optional[List[Union[Callable[..., Any], Dict[str, Any]]] = None, response_format: Optional[Union[Type[BaseModel], Dict[str, Any], None]] = None, timeout: float = 60.0, **kwargs) -> LLMTask Create an LLMTask with a clean, intuitive API. #### search(query: str, *, max_results: int = 5, **kwargs) -> SearchTask Create a SearchTask with a clean API. #### transform(input_data: Any, fn: Callable[[Any], Any], *, output_name: str = 'result') -> Dict[str, Any] Apply a transformation function and return result as dict. --- This API reference provides a comprehensive overview of the key classes and methods in the `aipype` framework, enabling users to effectively utilize the framework for building AI agent pipelines.