Files
ProStock/docs/n_income_factor_lifecycle.md
liaozhaorun a22bc2d282 refactor(data): 移除 api_daily 模块并更新文档
- 删除 src/data/api_wrappers/api_daily.py (240行)
- 更新 6 个文档文件,将 daily 表引用替换为 pro_bar
- 同步 README.md 中的因子框架和训练模块示例

BREAKING CHANGE: api_daily 模块已移除,请使用 api_pro_bar 替代
2026-03-14 01:48:56 +08:00

18 KiB
Raw Permalink Blame History

n_income 因子生命周期分析

概述

本文档详细分析 src/experiment/regression.pyn_income 因子的完整生命周期,从字符串定义到最终参与模型训练的全过程。

因子定义

src/experiment/regression.py 第 55 行定义:

FACTOR_DEFINITIONS = {
    # ... 其他因子 ...
    "n_income": "n_income",
}

n_income 是一个简单符号表达式,代表净利润Net Income财务指标。这是一个点-in-timePIT数据,需要从财务报表中获取。


第一阶段:因子注册

1.1 注册入口

位置: src/experiment/regression.py:134

def create_factors_with_strings(engine: FactorEngine) -> List[str]:
    for name, expr in FACTOR_DEFINITIONS.items():
        engine.add_factor(name, expr)  # 注册 n_income

1.2 add_factor 实现

位置: src/factors/engine/factor_engine.py:62-86

def add_factor(
    self,
    name: str,
    expr: Union[str, Node],
) -> "FactorEngine":
    """注册一个因子表达式"""
    # 如果是字符串,先解析为 Node
    if isinstance(expr, str):
        expr = self.parser.parse(expr)  # 关键步骤
    
    # 创建执行计划
    plan = self.planner.create_plan(expr, name)
    
    # 缓存执行计划
    self._factor_plans[name] = plan
    return self

对于 n_income = "n_income",流程如下:

  1. 检测到是字符串,调用 parser.parse("n_income")
  2. 解析结果为 Symbol("n_income") 节点
  3. 通过 planner.create_plan() 生成执行计划
  4. 缓存计划供后续计算使用

第二阶段:表达式解析

2.1 解析器入口

位置: src/factors/parser.py:38-52

def parse(self, formula: str) -> Node:
    """解析因子表达式字符串为 Node"""
    tree = ast.parse(formula, mode='eval')
    return self._visit(tree.body)

2.2 符号节点处理

位置: src/factors/parser.py:96-99

def _visit_Name(self, node: ast.Name) -> Symbol:
    """处理名称节点(如变量名 n_income"""
    return Symbol(node.id)

对于 "n_income" 字符串:

  1. ast.parse("n_income", mode='eval') 生成 AST
  2. AST 节点类型为 ast.Nameid 为 "n_income"
  3. 调用 _visit_Name() 创建 Symbol("n_income")

2.3 Symbol 节点定义

位置: src/factors/dsl.py:82-102

@dataclass
class Symbol(Node):
    """符号节点,代表命名变量"""
    name: str
    
    def dependencies(self) -> Set[str]:
        return {self.name}

Symbol("n_income") 的依赖集合为 {"n_income"}


第三阶段:执行计划生成

3.1 计划器入口

位置: src/factors/engine/planner.py:40-74

def create_plan(
    self,
    node: Node,
    output_name: Optional[str] = None,
) -> ExecutionPlan:
    """创建完整执行计划"""
    # 1. 提取依赖
    deps = self.extractor.extract_dependencies(node)
    
    # 2. 翻译为 Polars 表达式
    polars_expr = self.translator.translate(node)
    
    # 3. 推导数据规格
    data_specs = self._infer_data_specs(deps, schema_cache=self.schema_cache)
    
    return ExecutionPlan(
        data_specs=data_specs,
        polars_expr=polars_expr,
        dependencies=deps,
        output_name=output_name,
    )

3.2 提取依赖

位置: src/factors/compiler.py:19-32

def extract_dependencies(self, node: Node) -> Set[str]:
    """从 AST 提取所有依赖的原始字段名"""
    deps: Set[str] = set()
    self._extract(node, deps)
    return deps

def _extract(self, node: Node, deps: Set[str]):
    if isinstance(node, Symbol):
        deps.add(node.name)  # n_income 被加入依赖集
    # ... 其他节点类型处理

对于 Symbol("n_income"),提取的依赖为 {"n_income"}

3.3 推导数据规格

位置: src/factors/engine/planner.py:86-148

def _infer_data_specs(
    self,
    dependencies: Set[str],
    schema_cache: SchemaCache,
) -> List[DataSpec]:
    """推导数据规格"""
    table_fields: Dict[str, List[str]] = defaultdict(list)
    
    for field in dependencies:
        # 使用 SchemaCache 查找字段所属表
        table = schema_cache.get_table_for_field(field)
        table_fields[table].append(field)
    
    # 为每张表创建 DataSpec
    for table, fields in table_fields.items():
        if schema_cache.is_pit_table(table):  # PIT 表(财务数据)
            spec = DataSpec(
                table=table,
                columns=fields,
                join_type="asof_backward",
                left_on="trade_date",
                right_on="f_ann_date",
            )
        else:  # 普通表(行情数据)
            spec = DataSpec(
                table=table,
                columns=fields,
                join_type="standard",
            )
        specs.append(spec)

对于 n_income

  1. schema_cache.get_table_for_field("n_income") 返回 "financial_income"
  2. schema_cache.is_pit_table("financial_income") 返回 True
  3. 生成 DataSpec
    • table="financial_income"
    • columns=["n_income"]
    • join_type="asof_backward"
    • left_on="trade_date"
    • right_on="f_ann_date"

3.4 SchemaCache 实现

位置: src/data/catalog.py

class SchemaCache:
    """缓存数据库表结构信息,提供字段到表的映射"""
    
    def get_table_for_field(self, field: str) -> Optional[str]:
        """根据字段名获取表名"""
        return self._field_to_table.get(field)
    
    def is_pit_table(self, table: str) -> bool:
        """判断是否为 PITPoint-in-Time表"""
        info = self._table_info.get(table)
        if info and info.table_type == TableType.PIT:
            return True
        return False

表类型识别逻辑:

  • PIT 表: 包含 ann_datef_ann_date 字段(财务数据表)
  • DAILY 表: 包含 trade_date 字段(行情数据表)

第四阶段:数据获取与拼接

4.1 compute() 执行入口

位置: src/factors/engine/factor_engine.py:88-120

def compute(
    self,
    factor_names: List[str],
    start_date: str,
    end_date: str,
) -> pl.DataFrame:
    """计算指定因子"""
    # 1. 获取所有需要的执行计划
    plans = [self._factor_plans[name] for name in factor_names]
    
    # 2. 合并数据规格(去重)
    merged_specs = self._merge_data_specs(plans)
    
    # 3. 从路由器获取核心宽表
    core_wide = self.router.fetch_data(
        specs=merged_specs,
        start_date=start_date,
        end_date=end_date,
    )
    
    # 4. 执行计算
    result = self._execute_with_dependencies(factor_names, core_wide)
    
    return result

4.2 数据路由器 fetch_data

位置: src/factors/engine/data_router.py:48-100

def fetch_data(
    self,
    specs: List[DataSpec],
    start_date: str,
    end_date: str,
) -> pl.DataFrame:
    """根据 DataSpec 列表获取并组装数据"""
    # 分离标准表和 asof 表
    standard_specs = [s for s in specs if s.join_type == "standard"]
    asof_specs = [s for s in specs if s.join_type == "asof_backward"]
    
    # 加载标准表(行情数据)
    standard_frames = []
    for spec in standard_specs:
        df = self._load_table_from_spec(spec, start_date, end_date)
        standard_frames.append(df)
    
    # 以第一张标准表为基础(通常是 daily
    base_df = standard_frames[0] if standard_frames else None
    
    # 使用 FinancialLoader 加载和拼接财务数据
    if asof_specs and base_df is not None:
        for spec in asof_specs:
            financial_df = self.financial_loader.load_financial_data(
                table_name=spec.table,
                columns=spec.columns,
                start_date=start_date,
                end_date=end_date,
            )
            base_df = self.financial_loader.merge_financial_with_price(
                price_df=base_df,
                financial_df=financial_df,
                date_col="trade_date",
                f_ann_col="f_ann_date",
            )
    
    return base_df

4.3 财务数据加载

位置: src/data/financial_loader.py:26-83

def load_financial_data(
    self,
    table_name: str,
    columns: List[str],
    start_date: str,
    end_date: str,
) -> pl.DataFrame:
    """从数据库加载并清洗财务数据"""
    # 计算包含回看期的日期范围默认1年
    adjusted_start = self.get_date_range_with_lookback(start_date, lookback_days=365)
    
    # 从数据库查询
    query = f"""
        SELECT ts_code, f_ann_date, end_date, {', '.join(columns)}
        FROM {table_name}
        WHERE f_ann_date >= '{adjusted_start}' 
          AND f_ann_date <= '{end_date}'
        ORDER BY ts_code, f_ann_date
    """
    df = self.conn.execute(query).fetchdf()
    df = pl.DataFrame(df)
    
    # 数据清洗:仅保留 report_type==1合并报表
    if "report_type" in df.columns:
        df = df.filter(pl.col("report_type") == 1)
    
    # 去重:按 (ts_code, end_date) 取 update_flag 最大的记录
    if "update_flag" in df.columns:
        df = (
            df.sort(["ts_code", "end_date", "update_flag"], descending=[False, False, True])
            .unique(subset=["ts_code", "end_date"], keep="first")
        )
    
    return df

4.4 财务数据与行情数据拼接

位置: src/data/financial_loader.py:85-136

def merge_financial_with_price(
    self,
    price_df: pl.DataFrame,
    financial_df: pl.DataFrame,
    date_col: str = "trade_date",
    f_ann_col: str = "f_ann_date",
) -> pl.DataFrame:
    """使用 asof join 将财务数据拼接到行情数据"""
    # 确保日期格式正确
    price_df = price_df.with_columns(pl.col(date_col).cast(pl.Date))
    financial_df = financial_df.with_columns(pl.col(f_ann_col).cast(pl.Date))
    
    # 使用 join_asof 进行 PIT 对齐
    # strategy='backward': 对于每个 trade_date找 f_ann_date <= trade_date 的最新财务数据
    result = price_df.join_asof(
        financial_df,
        left_on=date_col,
        right_on=f_ann_col,
        by="ts_code",  # 按股票代码分组
        strategy="backward",  # 向后查找最新公告
    )
    
    return result

拼接逻辑详解

trade_date ts_code close n_income (拼接后) 来源 f_ann_date
2024-01-15 000001.SZ 10.5 1,000,000 2024-01-10
2024-01-16 000001.SZ 10.6 1,000,000 2024-01-10
2024-01-20 000001.SZ 10.8 1,200,000 2024-01-18

上表展示了 n_income 的 PIT 拼接过程:

  • 1月15日使用前一次公告1月10日的净利润数据
  • 1月16日继续使用1月10日的数据无新公告
  • 1月20日使用最新公告1月18日的净利润数据

第五阶段:因子计算

5.1 执行计算

位置: src/factors/engine/factor_engine.py:188-227

def _execute_with_dependencies(
    self,
    factor_names: List[str],
    core_wide: pl.DataFrame,
) -> pl.DataFrame:
    """按依赖顺序执行因子计算"""
    # 拓扑排序确定计算顺序
    sorted_factors = self._topological_sort(factor_names)
    
    # 创建结果 DataFrame
    result_exprs = []
    for name in sorted_factors:
        plan = self._factor_plans[name]
        # 执行 Polars 表达式
        expr = plan.polars_expr.alias(name)
        result_exprs.append(expr)
    
    # 一次性执行所有表达式
    result = core_wide.with_columns(result_exprs)
    
    return result

5.2 翻译为 Polars 表达式

位置: src/factors/translator.py

def translate(self, node: Node) -> pl.Expr:
    """将 DSL 节点翻译为 Polars 表达式"""
    if isinstance(node, Symbol):
        # Symbol 直接转为 pl.col()
        return pl.col(node.name)
    # ... 其他节点类型

对于 Symbol("n_income"),翻译结果为 pl.col("n_income")

5.3 n_income 计算

由于 n_income 是简单符号(无运算),计算过程:

# core_wide 已经包含 n_income 列(从财务表拼接而来)
result = core_wide.with_columns([
    pl.col("n_income").alias("n_income")  # 直接引用
])

实际上,n_income 的值在数据拼接阶段已经确定,计算阶段只是确认输出列名。


第六阶段:参与模型训练

6.1 数据流回到训练流程

位置: src/experiment/regression.py:176-183

def prepare_data(...) -> pl.DataFrame:
    factor_names = feature_cols + ["return_5"]  # 包含 n_income
    
    data = engine.compute(
        factor_names=factor_names,
        start_date=start_date,
        end_date=end_date,
    )
    return data

6.2 完整数据流

┌─────────────────────────────────────────────────────────────────────┐
│                         n_income 因子生命周期                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  1. 定义阶段                                                          │
│     regression.py:55  "n_income": "n_income"                         │
│                         ↓                                            │
│  2. 注册阶段 (FactorEngine.add_factor)                                │
│     parser.parse("n_income") → Symbol("n_income")                    │
│                         ↓                                            │
│  3. 计划阶段 (ExecutionPlanner.create_plan)                           │
│     extract_dependencies() → {"n_income"}                            │
│     _infer_data_specs()  → DataSpec(financial_income, asof_backward) │
│                         ↓                                            │
│  4. 数据获取阶段 (DataRouter.fetch_data)                              │
│     FinancialLoader.load_financial_data()                            │
│     从 financial_income 表读取 n_income 字段                         │
│                         ↓                                            │
│  5. 数据拼接阶段 (FinancialLoader.merge_financial_with_price)         │
│     join_asof(strategy='backward')                                   │
│     按 trade_date 和 f_ann_date 对齐                                │
│                         ↓                                            │
│  6. 计算阶段 (FactorEngine._execute_with_dependencies)                │
│     pl.col("n_income") → 输出到结果 DataFrame                        │
│                         ↓                                            │
│  7. 训练阶段                                                          │
│     作为特征列传入 LightGBM 模型                                      │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

关键技术点总结

1. PIT 数据处理

财务数据是低频、公告驱动的数据,与高频、连续的行情数据不同。系统使用 asof_backward 策略处理:

  • asof: As-Of表示截至某个时点的有效数据
  • backward: 向后查找,确保使用最新公告的数据
  • 关键约束: 不能使用未来数据lookahead bias

2. SchemaCache 动态路由

系统自动识别字段所属表,无需手动指定:

# 系统自动识别
n_income  financial_income  (PIT)
close     pro_bar  (主力行情表)

3. 财务数据清洗

FinancialLoader 自动处理:

  • 报告类型过滤: 仅使用合并报表report_type=1
  • 去重策略: 按 (ts_code, end_date) 取最新修订版update_flag 最大)
  • 日期对齐: 使用公告日f_ann_date而非报告期end_date

4. 扩展性设计

添加新的财务因子只需在字典中添加一行:

FACTOR_DEFINITIONS = {
    "n_income": "n_income",
    "revenue": "revenue",      # 营业收入
    "eps": "eps",              # 每股收益
    "roe": "roe",              # 净资产收益率
}

系统自动处理表路由、数据获取和拼接。


相关代码文件

文件 职责
src/experiment/regression.py 训练入口,因子定义
src/factors/engine/factor_engine.py 因子引擎统一入口
src/factors/parser.py 字符串表达式解析
src/factors/compiler.py AST 依赖提取
src/factors/engine/planner.py 执行计划生成
src/factors/engine/data_router.py 数据路由与组装
src/data/financial_loader.py 财务数据加载与拼接
src/data/catalog.py 数据库目录与表结构
src/data/api_wrappers/financial_data/api_income.py 利润表数据接口

附录:数据表结构

financial_income利润表

CREATE TABLE financial_income (
    ts_code VARCHAR,        -- 股票代码
    f_ann_date DATE,        -- 公告日期PIT关键字段
    end_date DATE,          -- 报告期
    report_type INTEGER,    -- 报告类型1=合并报表)
    update_flag INTEGER,    -- 更新标识(越大越新)
    n_income BIGINT,        -- 净利润(本因子使用的字段)
    revenue BIGINT,         -- 营业收入
    ...                     -- 其他财务字段
);

pro_bar主力行情表

CREATE TABLE pro_bar (
    ts_code VARCHAR,        -- 股票代码
    trade_date DATE,        -- 交易日期
    open DOUBLE,            -- 开盘价
    high DOUBLE,            -- 最高价
    low DOUBLE,             -- 最低价
    close DOUBLE,           -- 收盘价
    vol BIGINT,             -- 成交量
    turnover_rate DOUBLE,   -- 换手率
    volume_ratio DOUBLE,    -- 量比
    ...                     -- 其他行情字段
);

说明: pro_bar 表通过 Tushare Pro Bar 接口获取,包含后复权数据和换手率、量比等指标,是主力行情数据表。