Files
ProStock/src/factors/engine/planner.py

154 lines
4.1 KiB
Python
Raw Normal View History

"""执行计划生成器。
整合编译器和翻译器生成完整的执行计划
"""
from typing import Any, Dict, List, Optional, Set, Union
from src.factors.dsl import (
Node,
Symbol,
FunctionNode,
BinaryOpNode,
UnaryOpNode,
Constant,
)
from src.factors.compiler import DependencyExtractor
from src.factors.translator import PolarsTranslator
from src.factors.engine.data_spec import DataSpec, ExecutionPlan
class ExecutionPlanner:
"""执行计划生成器。
整合编译器和翻译器生成完整的执行计划
Attributes:
compiler: 依赖提取器
translator: Polars 翻译器
"""
def __init__(self) -> None:
"""初始化执行计划生成器。"""
self.compiler = DependencyExtractor()
self.translator = PolarsTranslator()
def create_plan(
self,
expression: Node,
output_name: str = "factor",
data_specs: Optional[List[DataSpec]] = None,
) -> ExecutionPlan:
"""从表达式创建执行计划。
Args:
expression: DSL 表达式节点
output_name: 输出因子名称
data_specs: 预定义的数据规格None 时自动推导
Returns:
执行计划对象
"""
# 1. 提取依赖
dependencies = self.compiler.extract_dependencies(expression)
# 2. 翻译为 Polars 表达式
polars_expr = self.translator.translate(expression)
# 3. 推导或验证数据规格
if data_specs is None:
data_specs = self._infer_data_specs(dependencies, expression)
return ExecutionPlan(
data_specs=data_specs,
polars_expr=polars_expr,
dependencies=dependencies,
output_name=output_name,
)
def _infer_data_specs(
self,
dependencies: Set[str],
expression: Node,
) -> List[DataSpec]:
"""从依赖推导数据规格。
基础行情字段open, high, low, close, vol, amount, pre_close, change, pct_chg
默认从 pro_bar 表获取
每日指标字段total_mv, circ_mv, pe, pb daily_basic 表获取
Args:
dependencies: 依赖的字段集合
expression: 表达式节点
Returns:
数据规格列表
"""
# 基础行情字段集合(这些字段从 pro_bar 表获取)
pro_bar_fields = {
"open",
"high",
"low",
"close",
"vol",
"amount",
"pre_close",
"change",
"pct_chg",
"turnover_rate",
"volume_ratio",
}
# 每日指标字段集合(这些字段从 daily_basic 表获取)
daily_basic_fields = {
"turnover_rate_f",
"pe",
"pe_ttm",
"pb",
"ps",
"ps_ttm",
"dv_ratio",
"dv_ttm",
"total_share",
"float_share",
"free_share",
"total_mv",
"circ_mv",
}
# 将依赖分为不同表的字段
pro_bar_deps = dependencies & pro_bar_fields
daily_basic_deps = dependencies & daily_basic_fields
other_deps = dependencies - pro_bar_fields - daily_basic_fields
data_specs = []
# pro_bar 表的数据规格
if pro_bar_deps:
data_specs.append(
DataSpec(
table="pro_bar",
columns=sorted(pro_bar_deps),
)
)
# daily_basic 表的数据规格
if daily_basic_deps:
data_specs.append(
DataSpec(
table="daily_basic",
columns=sorted(daily_basic_deps),
)
)
# 其他字段从 daily 表获取
if other_deps:
data_specs.append(
DataSpec(
table="daily",
columns=sorted(other_deps),
)
)
return data_specs