"""Read Google Sheet Task - Read data from Google Sheets as 2D arrays."""
from typing import List, Dict, Any, Optional
from typing import override
from aipype.base_task import BaseTask
from aipype.task_dependencies import TaskDependency
from aipype.task_result import TaskResult
from .google_sheets_service import GoogleSheetsService, GoogleSheetsError
from .google_sheets_models import SheetData
[docs]
class ReadGoogleSheetTask(BaseTask):
"""Task that reads data from Google Sheets and returns as 2D arrays."""
[docs]
def __init__(
self,
name: str,
config: Dict[str, Any],
dependencies: Optional[List[TaskDependency]] = None,
):
"""Initialize Read Google Sheet task.
Args:
name: Task name
config: Configuration dictionary containing:
- spreadsheet_id: Google Spreadsheet ID (required)
- range: Range in A1 notation (e.g., 'Sheet1!A1:C10') (optional)
- sheet_name: Name of sheet to read (alternative to range) (optional)
- start_row: Starting row number (1-based) (optional, default: 1)
- end_row: Ending row number (1-based) (optional)
- start_col: Starting column number (1-based) (optional, default: 1)
- end_col: Ending column number (1-based) (optional)
- include_headers: Whether first row contains headers (default: True)
- credentials: Pre-authenticated credentials (can be resolved from dependencies)
- credentials_file: Path to OAuth2 credentials file (optional)
- token_file: Path to OAuth2 token file (optional)
- timeout: Request timeout in seconds (default: 30)
dependencies: List of task dependencies
"""
super().__init__(name, config)
self.dependencies = dependencies or []
self.validation_rules = {
"defaults": {
"start_row": 1,
"start_col": 1,
"include_headers": True,
"timeout": 30,
},
"required": {
"spreadsheet_id": str,
},
"types": {
"spreadsheet_id": str,
"range": str,
"sheet_name": str,
"start_row": int,
"end_row": int,
"start_col": int,
"end_col": int,
"include_headers": bool,
"credentials_file": str,
"token_file": str,
"timeout": int,
},
"ranges": {
"start_row": (1, None),
"end_row": (1, None),
"start_col": (1, None),
"end_col": (1, None),
"timeout": (5, 300),
},
}
[docs]
@override
def get_dependencies(self) -> List[TaskDependency]:
"""Get the list of task dependencies.
Returns:
List of TaskDependency objects
"""
return self.dependencies
[docs]
@override
def run(self) -> TaskResult:
"""Read data from Google Sheet.
Returns:
TaskResult containing:
- sheet_data: SheetData object (serialized)
- values: 2D array of cell values
- headers: List of header names (if include_headers=True)
- num_rows: Number of data rows
- num_cols: Number of columns
- shape: Tuple of (rows, cols)
- spreadsheet_info: Metadata about the spreadsheet
"""
from datetime import datetime
start_time = datetime.now()
# Validate configuration using instance validation rules
validation_failure = self._validate_or_fail(start_time)
if validation_failure:
return validation_failure
# Get configuration values
spreadsheet_id = self.config["spreadsheet_id"]
range_a1 = self.config.get("range")
sheet_name = self.config.get("sheet_name")
start_row = self.config.get("start_row", 1)
end_row = self.config.get("end_row")
start_col = self.config.get("start_col", 1)
end_col = self.config.get("end_col")
include_headers = self.config.get("include_headers", True)
credentials = self.config.get("credentials")
credentials_file = self.config.get("credentials_file")
token_file = self.config.get("token_file")
timeout = self.config.get("timeout", 30)
# Progress callback for detailed operation logging
def progress_callback(message: str) -> None:
self.logger.debug(f"[{self.name}] {message}")
self.logger.info(
f"Starting Google Sheets read task for spreadsheet: {spreadsheet_id[:8]}..."
)
try:
# Initialize Google Sheets service
progress_callback("Initializing Google Sheets service...")
sheets_service = GoogleSheetsService(
credentials=credentials,
credentials_file=credentials_file,
token_file=token_file,
timeout=timeout,
)
# Determine what to read
sheet_data: SheetData
if range_a1:
# Read specific range
progress_callback(f"Reading specific range: {range_a1}")
sheet_data = sheets_service.read_range(
spreadsheet_id=spreadsheet_id,
range_a1=range_a1,
include_headers=include_headers,
progress_callback=progress_callback,
)
elif sheet_name:
# Read sheet with optional bounds
progress_callback(f"Reading sheet '{sheet_name}' with bounds...")
sheet_data = sheets_service.read_sheet(
spreadsheet_id=spreadsheet_id,
sheet_name=sheet_name,
start_row=start_row,
end_row=end_row,
start_col=start_col,
end_col=end_col,
include_headers=include_headers,
progress_callback=progress_callback,
)
else:
# Read all data from first sheet
progress_callback("Getting sheet names...")
sheet_names = sheets_service.get_sheet_names(
spreadsheet_id=spreadsheet_id,
progress_callback=progress_callback,
)
if not sheet_names:
raise GoogleSheetsError("No sheets found in spreadsheet")
first_sheet = sheet_names[0]
progress_callback(f"Reading all data from first sheet: '{first_sheet}'")
sheet_data = sheets_service.read_all_data(
spreadsheet_id=spreadsheet_id,
sheet_name=first_sheet,
include_headers=include_headers,
progress_callback=progress_callback,
)
# Get spreadsheet info for metadata
progress_callback("Getting spreadsheet metadata...")
spreadsheet_info = sheets_service.get_spreadsheet_info(
spreadsheet_id=spreadsheet_id,
progress_callback=progress_callback,
)
# Prepare result data
result_data = {
"sheet_data": sheet_data.to_dict(),
"values": sheet_data.values, # 2D array
"headers": sheet_data.headers,
"num_rows": sheet_data.num_rows,
"num_cols": sheet_data.num_cols,
"shape": sheet_data.shape,
"spreadsheet_info": spreadsheet_info.to_dict(),
"range_read": sheet_data.range_read,
"include_headers": include_headers,
}
execution_time = (datetime.now() - start_time).total_seconds()
self.logger.info(
f"Google Sheets read task completed: {sheet_data.num_rows} rows, "
f"{sheet_data.num_cols} cols from '{sheet_data.sheet_name}'"
)
return TaskResult.success(
data=result_data,
execution_time=execution_time,
metadata={
"task_type": "read_google_sheet",
"spreadsheet_id": spreadsheet_id,
"sheet_name": sheet_data.sheet_name,
"range_read": sheet_data.range_read,
"num_rows": sheet_data.num_rows,
"num_cols": sheet_data.num_cols,
"include_headers": include_headers,
},
)
except GoogleSheetsError as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = f"ReadGoogleSheetTask Sheets API operation failed: {str(e)}"
self.logger.error(error_msg)
return TaskResult.failure(
error_message=error_msg,
execution_time=execution_time,
metadata={
"task_type": "read_google_sheet",
"spreadsheet_id": spreadsheet_id,
"error_type": "GoogleSheetsError",
"range_attempted": range_a1
or f"{sheet_name}({start_row},{start_col})",
},
)
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = f"ReadGoogleSheetTask operation failed: {str(e)}"
self.logger.error(error_msg)
return TaskResult.failure(
error_message=error_msg,
execution_time=execution_time,
metadata={
"task_type": "read_google_sheet",
"spreadsheet_id": spreadsheet_id,
"error_type": type(e).__name__,
"range_attempted": range_a1
or f"{sheet_name}({start_row},{start_col})",
},
)
[docs]
@staticmethod
def create_range_config(
spreadsheet_id: str,
range_a1: str,
include_headers: bool = True,
credentials_file: Optional[str] = None,
token_file: Optional[str] = None,
) -> Dict[str, Any]:
"""Helper to create configuration for reading a specific range.
Args:
spreadsheet_id: Google Spreadsheet ID
range_a1: Range in A1 notation (e.g., 'Sheet1!A1:C10')
include_headers: Whether first row contains headers
credentials_file: Path to OAuth2 credentials file
token_file: Path to OAuth2 token file
Returns:
Configuration dictionary for ReadGoogleSheetTask
"""
config = {
"spreadsheet_id": spreadsheet_id,
"range": range_a1,
"include_headers": include_headers,
}
if credentials_file:
config["credentials_file"] = credentials_file
if token_file:
config["token_file"] = token_file
return config
[docs]
@staticmethod
def create_sheet_config(
spreadsheet_id: str,
sheet_name: str,
start_row: int = 1,
end_row: Optional[int] = None,
start_col: int = 1,
end_col: Optional[int] = None,
include_headers: bool = True,
credentials_file: Optional[str] = None,
token_file: Optional[str] = None,
) -> Dict[str, Any]:
"""Helper to create configuration for reading a sheet with bounds.
Args:
spreadsheet_id: Google Spreadsheet ID
sheet_name: Name of sheet to read
start_row: Starting row (1-based)
end_row: Ending row (1-based, optional)
start_col: Starting column (1-based)
end_col: Ending column (1-based, optional)
include_headers: Whether first row contains headers
credentials_file: Path to OAuth2 credentials file
token_file: Path to OAuth2 token file
Returns:
Configuration dictionary for ReadGoogleSheetTask
"""
config = {
"spreadsheet_id": spreadsheet_id,
"sheet_name": sheet_name,
"start_row": start_row,
"start_col": start_col,
"include_headers": include_headers,
}
if end_row:
config["end_row"] = end_row
if end_col:
config["end_col"] = end_col
if credentials_file:
config["credentials_file"] = credentials_file
if token_file:
config["token_file"] = token_file
return config
[docs]
@staticmethod
def create_full_sheet_config(
spreadsheet_id: str,
sheet_name: str,
include_headers: bool = True,
credentials_file: Optional[str] = None,
token_file: Optional[str] = None,
) -> Dict[str, Any]:
"""Helper to create configuration for reading an entire sheet.
Args:
spreadsheet_id: Google Spreadsheet ID
sheet_name: Name of sheet to read
include_headers: Whether first row contains headers
credentials_file: Path to OAuth2 credentials file
token_file: Path to OAuth2 token file
Returns:
Configuration dictionary for ReadGoogleSheetTask
"""
config = {
"spreadsheet_id": spreadsheet_id,
"sheet_name": sheet_name,
"include_headers": include_headers,
}
if credentials_file:
config["credentials_file"] = credentials_file
if token_file:
config["token_file"] = token_file
return config