"""计算引擎。 执行并行运算,负责将执行计划应用到数据上。 """ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from typing import Any, Dict, List, Optional, Set, Union import polars as pl from src.factors.engine.data_spec import ExecutionPlan class ComputeEngine: """计算引擎 - 执行并行运算。 负责将执行计划应用到数据上,支持并行计算。 Attributes: max_workers: 最大并行工作线程数 use_processes: 是否使用进程池(CPU 密集型任务) """ def __init__( self, max_workers: int = 4, use_processes: bool = False, ) -> None: """初始化计算引擎。 Args: max_workers: 最大并行工作线程数 use_processes: 是否使用进程池代替线程池 """ self.max_workers = max_workers self.use_processes = use_processes def execute( self, plan: ExecutionPlan, data: pl.DataFrame, ) -> pl.DataFrame: """执行计算计划。 Args: plan: 执行计划 data: 输入数据(核心宽表) Returns: 包含因子结果的 DataFrame """ # 检查依赖字段是否存在 missing_cols = plan.dependencies - set(data.columns) if missing_cols: raise ValueError(f"数据缺少必要的字段: {missing_cols}") # 执行计算 result = data.with_columns([plan.polars_expr.alias(plan.output_name)]) return result def execute_batch( self, plans: List[ExecutionPlan], data: pl.DataFrame, ) -> pl.DataFrame: """批量执行多个计算计划。 Args: plans: 执行计划列表 data: 输入数据 Returns: 包含所有因子结果的 DataFrame """ result = data for plan in plans: result = self.execute(plan, result) return result def execute_parallel( self, plans: List[ExecutionPlan], data: pl.DataFrame, ) -> pl.DataFrame: """并行执行多个计算计划。 Args: plans: 执行计划列表 data: 输入数据 Returns: 包含所有因子结果的 DataFrame """ # 检查计划间依赖 independent_plans = [] dependent_plans = [] available_cols = set(data.columns) for plan in plans: if plan.dependencies <= available_cols: independent_plans.append(plan) available_cols.add(plan.output_name) else: dependent_plans.append(plan) # 并行执行独立计划 if independent_plans: ExecutorClass = ( ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor ) with ExecutorClass(max_workers=self.max_workers) as executor: futures = { executor.submit(self._execute_single, plan, data): plan for plan in independent_plans } results = [] for future in futures: plan = futures[future] try: result_col = future.result() results.append((plan.output_name, result_col)) except Exception as e: raise RuntimeError(f"计算因子 {plan.output_name} 失败: {e}") # 合并结果 for name, series in results: data = data.with_columns([series.alias(name)]) # 顺序执行依赖计划 for plan in dependent_plans: data = self.execute(plan, data) return data def _execute_single( self, plan: ExecutionPlan, data: pl.DataFrame, ) -> pl.Series: """执行单个计划并返回结果列。 Args: plan: 执行计划 data: 输入数据 Returns: 计算结果序列 """ result = self.execute(plan, data) return result[plan.output_name]