Source code for aipype_g.tasklib.gmail_list_emails_task

"""Gmail List Emails Task - List Gmail messages with filters and queries."""

from typing import List, Dict, Any, Optional

from typing import override

from aipype.base_task import BaseTask
from aipype.task_dependencies import TaskDependency
from aipype.task_result import TaskResult
from .gmail_service import GmailService, GmailServiceError
from .gmail_models import GmailMessage, GmailSearchResult, GmailSearchOperators


[docs] class GmailListEmailsTask(BaseTask): """Task that lists Gmail messages with optional filters and search queries."""
[docs] def __init__( self, name: str, config: Dict[str, Any], dependencies: Optional[List[TaskDependency]] = None, ): """Initialize Gmail list emails task. Args: name: Task name config: Configuration dictionary containing: - query: Gmail search query (optional, can be resolved from dependencies) - max_results: Maximum number of messages to return (default: 10) - include_spam_trash: Include spam and trash messages (default: False) - label_ids: List of label IDs to filter by (optional) - credentials_file: Path to OAuth2 credentials file (optional) - token_file: Path to OAuth2 token file (optional) - parse_messages: Whether to parse full message content (default: True) dependencies: List of task dependencies """ super().__init__(name, config) self.dependencies = dependencies or [] self.validation_rules = { "defaults": { "query": "", "max_results": 10, "include_spam_trash": False, "parse_messages": True, "timeout": 30, }, "types": { "query": str, "max_results": int, "include_spam_trash": bool, "label_ids": list, "credentials_file": str, "token_file": str, "parse_messages": bool, "timeout": int, # Note: credentials is not validated by type as it's a Google credentials object }, "ranges": { "max_results": (1, 100), "timeout": (5, 300), }, }
[docs] @override def get_dependencies(self) -> List[TaskDependency]: """Get the list of task dependencies. Returns: List of TaskDependency objects """ return self.dependencies
[docs] @override def run(self) -> TaskResult: """List Gmail messages based on query and filters. Returns: TaskResult containing: - messages: List of GmailMessage objects (if parse_messages=True) - message_ids: List of message IDs (if parse_messages=False) - search_result: GmailSearchResult object with metadata - query_used: The Gmail query that was executed - total_found: Total number of messages found - retrieved_count: Number of messages actually retrieved """ from datetime import datetime start_time = datetime.now() # Validate configuration using instance validation rules validation_failure = self._validate_or_fail(start_time) if validation_failure: return validation_failure # Get configuration values (may have been updated after initialization via dependencies) query = self.config.get("query", "") max_results = self.config.get("max_results", 10) include_spam_trash = self.config.get("include_spam_trash", False) label_ids = self.config.get("label_ids") credentials = self.config.get( "credentials" ) # Pre-authenticated credentials from GoogleOAuthTask credentials_file = self.config.get("credentials_file") token_file = self.config.get("token_file") parse_messages = self.config.get("parse_messages", True) timeout = self.config.get("timeout", 30) # Progress callback for detailed operation logging def progress_callback(message: str) -> None: self.logger.debug(f"[{self.name}] {message}") self.logger.info( f"Starting Gmail list emails task with query: '{query}', max_results: {max_results}" ) try: # Initialize Gmail service with timeout progress_callback("Initializing Gmail service...") gmail_service = GmailService( credentials=credentials, # Pre-authenticated credentials take priority credentials_file=credentials_file, token_file=token_file, timeout=timeout, ) # List messages with progress tracking progress_callback( f"Listing messages (query: '{query}', max: {max_results})..." ) # Gmail service list_messages has partially unknown return types but is well-defined message_list = gmail_service.list_messages( query=query, max_results=max_results, label_ids=label_ids, include_spam_trash=include_spam_trash, progress_callback=progress_callback, ) messages: List[GmailMessage] = [] message_ids: List[str] = [] # Parse messages if requested if parse_messages and message_list: progress_callback(f"Starting to parse {len(message_list)} messages...") for i, msg_data in enumerate(message_list, 1): try: message_id = msg_data["id"] progress_callback( f"Parsing message {i}/{len(message_list)}: {message_id[:8]}..." ) # Get full message data with progress tracking # Gmail service get_message has partially unknown return types but is well-defined full_message_data = gmail_service.get_message( message_id, progress_callback=progress_callback ) parsed_message = gmail_service.parse_message(full_message_data) messages.append(parsed_message) message_ids.append(message_id) except GmailServiceError as e: error_msg = f"Gmail API error parsing message {msg_data.get('id', 'unknown')}: {str(e)}" self.logger.warning(error_msg) progress_callback(f"WARNING: {error_msg}") # Continue processing other messages continue except Exception as e: error_msg = f"Unexpected error parsing message {msg_data.get('id', 'unknown')}: {str(e)}" self.logger.warning(error_msg) progress_callback(f"WARNING: {error_msg}") # Continue processing other messages continue progress_callback( f"Completed parsing {len(messages)} messages successfully" ) else: # Just collect message IDs message_ids = [msg["id"] for msg in message_list] progress_callback( f"Retrieved {len(message_ids)} message IDs without parsing (faster mode)" ) # Create search result object search_result = GmailSearchResult( query=query, total_count=len(message_ids), messages=messages, next_page_token=None, # Note: Gmail API pagination would require additional implementation estimated_result_size=len(message_ids), ) result_data = { "messages": messages if parse_messages else [], "message_ids": message_ids, "search_result": search_result.to_dict(), "query_used": query, "total_found": len(message_ids), "retrieved_count": len(messages) if parse_messages else len(message_ids), "parse_messages": parse_messages, "search_metadata": { "include_spam_trash": include_spam_trash, "label_ids": label_ids, "max_results_requested": max_results, }, } execution_time = (datetime.now() - start_time).total_seconds() if message_ids: self.logger.info( f"Gmail list emails task completed: {len(message_ids)} messages found, " f"{len(messages) if parse_messages else len(message_ids)} retrieved" ) return TaskResult.success( data=result_data, execution_time=execution_time, metadata={ "task_type": "gmail_list_emails", "query": query, "total_found": len(message_ids), "retrieved_count": len(messages) if parse_messages else len(message_ids), "parse_messages": parse_messages, }, ) else: # No messages found - this is still a success, just empty results self.logger.info( f"Gmail list emails task completed: No messages found for query '{query}'" ) return TaskResult.success( data=result_data, execution_time=execution_time, metadata={ "task_type": "gmail_list_emails", "query": query, "total_found": 0, "retrieved_count": 0, "parse_messages": parse_messages, }, ) except GmailServiceError as e: execution_time = (datetime.now() - start_time).total_seconds() error_msg = f"GmailListEmailsTask Gmail operation failed: {str(e)}" self.logger.error(error_msg) return TaskResult.failure( error_message=error_msg, execution_time=execution_time, metadata={ "task_type": "gmail_list_emails", "query": query, "error_type": "GmailServiceError", "max_results": max_results, }, ) except Exception as e: execution_time = (datetime.now() - start_time).total_seconds() error_msg = f"GmailListEmailsTask operation failed: {str(e)}" self.logger.error(error_msg) return TaskResult.failure( error_message=error_msg, execution_time=execution_time, metadata={ "task_type": "gmail_list_emails", "query": query, "error_type": type(e).__name__, "max_results": max_results, }, )
[docs] @staticmethod def create_search_query(**kwargs: Any) -> str: """Helper method to create Gmail search queries using common patterns. Supported kwargs: - from_sender: str - Filter by sender email - to_recipient: str - Filter by recipient email - subject: str - Filter by subject - newer_than_days: int - Messages newer than X days - older_than_days: int - Messages older than X days - has_attachment: bool - Messages with attachments - is_unread: bool - Unread messages - is_important: bool - Important messages - is_starred: bool - Starred messages - label: str - Messages with specific label - custom_query: str - Additional custom query terms Returns: Gmail search query string """ # Initialize query parts list with explicit type annotation query_parts: List[str] = [] # Query parts list for building Gmail search query query_parts: List[str] = [] if "from_sender" in kwargs: # GmailSearchOperators methods return well-defined string queries query_parts.append(GmailSearchOperators.from_sender(kwargs["from_sender"])) if "to_recipient" in kwargs: query_parts.append( GmailSearchOperators.to_recipient(kwargs["to_recipient"]) ) if "subject" in kwargs: query_parts.append(GmailSearchOperators.with_subject(kwargs["subject"])) if "newer_than_days" in kwargs: query_parts.append( GmailSearchOperators.newer_than(kwargs["newer_than_days"]) ) if "older_than_days" in kwargs: query_parts.append( GmailSearchOperators.older_than(kwargs["older_than_days"]) ) if kwargs.get("has_attachment"): query_parts.append(GmailSearchOperators.has_attachment()) if kwargs.get("is_unread"): query_parts.append(GmailSearchOperators.is_unread()) if kwargs.get("is_important"): query_parts.append(GmailSearchOperators.is_important()) if kwargs.get("is_starred"): query_parts.append(GmailSearchOperators.is_starred()) if "label" in kwargs: query_parts.append(GmailSearchOperators.with_label(kwargs["label"])) if "custom_query" in kwargs: query_parts.append(kwargs["custom_query"]) # Combine all query parts with AND operator return GmailSearchOperators.combine_and(*query_parts) if query_parts else ""