From 181994f063bd50d9a9746d634f011bd90db3048a Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Sat, 14 Mar 2026 01:24:52 +0800 Subject: [PATCH] =?UTF-8?q?perf(factors/engine):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=E5=BC=95=E6=93=8E=E4=BD=BF=E7=94=A8=20Polars?= =?UTF-8?q?=20=E5=8E=9F=E7=94=9F=E5=B9=B6=E8=A1=8C=20-=20=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=20Python=20=E5=A4=9A=E8=BF=9B=E7=A8=8B/=E5=A4=9A?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=EF=BC=8C=E6=B6=88=E9=99=A4=20DataFr?= =?UTF-8?q?ame=20=E5=BA=8F=E5=88=97=E5=8C=96=E5=BC=80=E9=94=80=20-=20?= =?UTF-8?q?=E9=87=87=E7=94=A8=20BFS=20=E5=88=86=E5=B1=82=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E7=AD=96=E7=95=A5=EF=BC=8C=E6=AF=8F=E5=B1=82=E8=A1=A8=E8=BE=BE?= =?UTF-8?q?=E5=BC=8F=E9=80=9A=E8=BF=87=E5=8D=95=E6=AC=A1=20with=5Fcolumns?= =?UTF-8?q?=20=E6=8F=90=E4=BA=A4=20-=20=E5=88=A9=E7=94=A8=20Polars=20Rust?= =?UTF-8?q?=20=E5=BC=95=E6=93=8E=E5=AE=9E=E7=8E=B0=E9=9B=B6=E6=8B=B7?= =?UTF-8?q?=E8=B4=9D=E5=B9=B6=E8=A1=8C=E8=AE=A1=E7=AE=97=20-=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=AD=BB=E9=94=81=E6=A3=80=E6=B5=8B=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E4=BE=9D=E8=B5=96=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/factors/engine/compute_engine.py | 147 ++++++++++++++------------- src/factors/engine/factor_engine.py | 6 +- tests/test_factor_engine.py | 2 +- 3 files changed, 77 insertions(+), 78 deletions(-) diff --git a/src/factors/engine/compute_engine.py b/src/factors/engine/compute_engine.py index 8d1da46..d0e7b14 100644 --- a/src/factors/engine/compute_engine.py +++ b/src/factors/engine/compute_engine.py @@ -1,10 +1,12 @@ """计算引擎。 执行并行运算,负责将执行计划应用到数据上。 + +利用 Polars 底层 Rust 引擎的原生并行能力,通过 BFS 分层执行策略 +避免 Python 层面的多进程/多线程开销。 """ -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor -from typing import Any, Dict, List, Optional, Set, Union +from typing import Dict, List, Set import polars as pl @@ -14,33 +16,25 @@ from src.factors.engine.data_spec import ExecutionPlan class ComputeEngine: """计算引擎 - 执行并行运算。 - 负责将执行计划应用到数据上,支持并行计算。 + 负责将执行计划应用到数据上,利用 Polars 底层 Rust 引擎的原生并行能力。 - Attributes: - max_workers: 最大并行工作线程数 - use_processes: 是否使用进程池(CPU 密集型任务) + 采用 BFS 分层执行策略: + 1. 构建依赖图,识别各计划间的依赖关系 + 2. 按拓扑排序分层,每层包含互不依赖的计划 + 3. 将每层计划打包为表达式列表,通过单次 with_columns 提交 + 4. Polars 自动在所有 CPU 核心上并行计算,零拷贝内存 """ - def __init__( - self, - max_workers: int = 4, - use_processes: bool = False, - ) -> None: - """初始化计算引擎。 - - Args: - max_workers: 最大并行工作线程数 - use_processes: 是否使用进程池代替线程池 - """ - self.max_workers = max_workers - self.use_processes = use_processes + def __init__(self) -> None: + """初始化计算引擎。""" + pass def execute( self, plan: ExecutionPlan, data: pl.DataFrame, ) -> pl.DataFrame: - """执行计算计划。 + """执行单个计算计划。 Args: plan: 执行计划 @@ -55,16 +49,14 @@ class ComputeEngine: raise ValueError(f"数据缺少必要的字段: {missing_cols}") # 执行计算 - result = data.with_columns([plan.polars_expr.alias(plan.output_name)]) - - return result + return data.with_columns([plan.polars_expr.alias(plan.output_name)]) def execute_batch( self, plans: List[ExecutionPlan], data: pl.DataFrame, ) -> pl.DataFrame: - """批量执行多个计算计划。 + """顺序批量执行多个计算计划。 Args: plans: 执行计划列表 @@ -74,10 +66,8 @@ class ComputeEngine: 包含所有因子结果的 DataFrame """ result = data - for plan in plans: result = self.execute(plan, result) - return result def execute_parallel( @@ -85,7 +75,11 @@ class ComputeEngine: plans: List[ExecutionPlan], data: pl.DataFrame, ) -> pl.DataFrame: - """并行执行多个计算计划。 + """分层并行执行计算计划(利用 Polars 原生并发优化)。 + + 抛弃 Python 的多进程/多线程池,采用计算图拓扑分层(BFS DAG)。 + 将每一层互不依赖的表达式列表打包,通过单次 with_columns 交给 Polars, + 由底层 Rust 引擎自动调度并行计算,实现零拷贝性能最大化。 Args: plans: 执行计划列表 @@ -93,63 +87,70 @@ class ComputeEngine: Returns: 包含所有因子结果的 DataFrame + + Raises: + RuntimeError: 当存在依赖环或缺少基础依赖字段时 """ - # 检查计划间依赖 - independent_plans = [] - dependent_plans = [] - available_cols = set(data.columns) + if not plans: + return data - for plan in plans: - if plan.dependencies <= available_cols: - independent_plans.append(plan) + result = data + available_cols: Set[str] = set(result.columns) + + # 复制一份计划列表用于迭代 + remaining_plans = plans.copy() + + while remaining_plans: + # 找出当前可以执行的所有独立计划(即依赖的所有列都已就绪) + current_layer: List[ExecutionPlan] = [] + next_remaining: List[ExecutionPlan] = [] + + for plan in remaining_plans: + if plan.dependencies <= available_cols: + current_layer.append(plan) + else: + next_remaining.append(plan) + + # 安全兜底:如果一轮遍历后没找到任何可执行计划,说明存在依赖环或数据缺失 + if not current_layer: + missing = remaining_plans[0].dependencies - available_cols + raise RuntimeError( + f"计算发生死锁或缺少基础依赖字段!\n" + f"因子 '{remaining_plans[0].output_name}' 缺少: {missing}" + ) + + # 核心优化:利用 Polars 内部 Rust 级多线程引擎执行当前层 + exprs = [plan.polars_expr.alias(plan.output_name) for plan in current_layer] + result = result.with_columns(exprs) + + # 更新已就绪字段集合,为计算下一层做准备 + for plan in current_layer: available_cols.add(plan.output_name) - else: - dependent_plans.append(plan) - # 并行执行独立计划 - if independent_plans: - ExecutorClass = ( - ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor - ) + remaining_plans = next_remaining - with ExecutorClass(max_workers=self.max_workers) as executor: - futures = { - executor.submit(self._execute_single, plan, data): plan - for plan in independent_plans - } + return result - results = [] - for future in futures: - plan = futures[future] - try: - result_col = future.result() - results.append((plan.output_name, result_col)) - except Exception as e: - raise RuntimeError(f"计算因子 {plan.output_name} 失败: {e}") - - # 合并结果 - for name, series in results: - data = data.with_columns([series.alias(name)]) - - # 顺序执行依赖计划 - for plan in dependent_plans: - data = self.execute(plan, data) - - return data - - def _execute_single( + def compute( self, - plan: ExecutionPlan, + plans: List[ExecutionPlan], data: pl.DataFrame, - ) -> pl.Series: - """执行单个计划并返回结果列。 + parallel: bool = True, + ) -> pl.DataFrame: + """智能计算入口。 + + 根据 parallel 参数自动选择执行模式: + - True: 使用分层并行执行(推荐) + - False: 使用顺序执行 Args: - plan: 执行计划 + plans: 执行计划列表 data: 输入数据 + parallel: 是否使用并行执行 Returns: - 计算结果序列 + 包含所有因子结果的 DataFrame """ - result = self.execute(plan, data) - return result[plan.output_name] + if parallel: + return self.execute_parallel(plans, data) + return self.execute_batch(plans, data) diff --git a/src/factors/engine/factor_engine.py b/src/factors/engine/factor_engine.py index 184ac46..25c9e35 100644 --- a/src/factors/engine/factor_engine.py +++ b/src/factors/engine/factor_engine.py @@ -57,7 +57,6 @@ class FactorEngine: def __init__( self, data_source: Optional[Dict[str, pl.DataFrame]] = None, - max_workers: int = 4, registry: Optional["FunctionRegistry"] = None, metadata_path: Optional[str] = None, ) -> None: @@ -65,16 +64,15 @@ class FactorEngine: Args: data_source: 内存数据源,为 None 时使用数据库连接 - max_workers: 并行计算的最大工作线程数 registry: 函数注册表,None 时创建独立实例 - metadata_path: 因子元数据文件路径,为 None 时不启用 metadata 功能 + metadata_path: 因子元数据文件路径,为 None 时启用默认 metadata 功能 """ from src.factors.registry import FunctionRegistry from src.factors.parser import FormulaParser self.router = DataRouter(data_source) self.planner = ExecutionPlanner() - self.compute_engine = ComputeEngine(max_workers=max_workers) + self.compute_engine = ComputeEngine() self.registered_expressions: Dict[str, Node] = {} self._plans: Dict[str, ExecutionPlan] = {} diff --git a/tests/test_factor_engine.py b/tests/test_factor_engine.py index 1286b9e..73ab8e3 100644 --- a/tests/test_factor_engine.py +++ b/tests/test_factor_engine.py @@ -72,7 +72,7 @@ class TestFactorEngineEndToEnd: def engine(self, mock_data): """提供配置好的 FactorEngine fixture。""" data_source = {"pro_bar": mock_data} - return FactorEngine(data_source=data_source, max_workers=2) + return FactorEngine(data_source=data_source) def test_simple_symbol_expression(self, engine): """测试简单的符号表达式。"""