docs: 添加财务数据同步模块重构相关文档

- 添加财务数据 API 封装规范文档 (FINANCIAL_API_SPEC.md)
  包含架构设计原则、类设计规范、同步策略、数据差异检测等

- 添加 n_income 因子生命周期分析文档
  详细追踪因子从定义到训练的全流程

- 添加财务数据同步模块重构计划文档
  明确 QuarterBasedSync 基类设计、重构任务清单

这些文档为后续财务数据同步模块重构提供完整的设计依据和实施方案
This commit is contained in:
2026-03-07 22:14:04 +08:00
parent 1520c2a51e
commit c01bf76a3d
3 changed files with 2997 additions and 0 deletions

View File

@@ -0,0 +1,907 @@
# 财务数据 API 封装规范
> **文档版本**: v1.0
> **适用范围**: 所有财务数据 API利润表、资产负债表、现金流量表等
> **更新日期**: 2026-03-07
---
## 目录
1. [概述](#概述)
2. [架构设计原则](#架构设计原则)
3. [文件结构规范](#文件结构规范)
4. [类设计规范](#类设计规范)
5. [同步策略规范](#同步策略规范)
6. [数据差异检测](#数据差异检测)
7. [表结构设计](#表结构设计)
8. [索引设计规范](#索引设计规范)
9. [报表类型过滤](#报表类型过滤)
10. [代码示例](#代码示例)
11. [常见问题](#常见问题)
---
## 概述
### 财务数据特点
财务数据与日频数据(日线、分钟线)有本质区别:
| 特性 | 日频数据 | 财务数据 |
|------|----------|----------|
| 更新频率 | 每日更新 | 季度更新 |
| 获取方式 | 按股票循环获取 | VIP接口一次性获取全市场 |
| 数据修正 | 极少发生 | 经常发生(财报修正) |
| 数据量 | 大5000+股票×250交易日 | 小5000+股票×4季度 |
| 版本控制 | 无 | 多版本report_type |
### 核心要求
1. **必须实现 `QuarterBasedSync` 基类**:所有财务数据同步必须继承此基类
2. **不设置唯一约束**:不创建主键和唯一索引,支持数据多次修正
3. **先删除后插入**:数据更新采用删除旧数据再插入新数据的策略
4. **无跳过逻辑**:财务数据必须每次都进行对比更新
5. **报表类型过滤**:默认只同步合并报表,支持灵活配置
---
## 架构设计原则
### 1. 职责分离
```
调度中心 (api_financial_sync.py)
|
v
各 API 文件 (api_income.py, api_balance.py, api_cashflow.py)
|
v
基类 (base_financial_sync.py - QuarterBasedSync)
|
v
存储层 (storage.py - ThreadSafeStorage)
```
**规范**:
- 调度中心只负责任务协调,不包含具体同步逻辑
- 各 API 文件实现具体的 `fetch_single_quarter()` 方法
- 通用逻辑下沉到 `QuarterBasedSync` 基类
### 2. 统一继承
**必须**继承 `QuarterBasedSync` 基类:
```python
from src.data.api_wrappers.base_financial_sync import QuarterBasedSync
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。"""
pass
```
**禁止**在 API 文件中重复实现同步逻辑。
---
## 文件结构规范
### 文件位置
财务数据 API 文件必须位于:
```
src/data/api_wrappers/financial_data/
├── __init__.py # 可选:导出公共接口
├── api_income.py # 利润表接口(已实现)
├── api_balance.py # 资产负债表接口(预留)
├── api_cashflow.py # 现金流量表接口(预留)
└── api_financial_sync.py # 调度中心(只保留调度逻辑)
```
### 基类位置
```
src/data/api_wrappers/
├── base_sync.py # StockBasedSync, DateBasedSync
└── base_financial_sync.py # QuarterBasedSync本规范核心
```
### 文件内容结构
每个 API 文件必须包含以下部分(按顺序):
```python
"""模块文档字符串。
包含:模块用途、使用方式、注意事项。
"""
# 1. 标准库导入
from typing import Optional
import pandas as pd
# 2. 第三方库导入
# (无特殊要求)
# 3. 本地模块导入
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync
)
# 4. 同步类实现
class XXXQuarterSync(QuarterBasedSync):
"""具体财务数据同步实现类。"""
pass
# 5. 便捷函数
def sync_xxx(force_full: bool = False, dry_run: bool = False) -> list:
"""同步数据便捷函数。"""
pass
def preview_xxx_sync() -> dict:
"""预览同步信息便捷函数。"""
pass
# 6. 原始数据接口(可选,向后兼容)
def get_xxx(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取原始数据接口。"""
pass
```
---
## 类设计规范
### 类命名规范
**必须**遵循以下命名模式:
```python
# 格式: {DataType}QuarterSync
# 正确示例
IncomeQuarterSync # 利润表
BalanceQuarterSync # 资产负债表
CashflowQuarterSync # 现金流量表
# 错误示例
IncomeSync # 缺少 Quarter不统一
SyncIncome # 动词开头,不符合类命名规范
```
### 必须覆盖的类属性
子类**必须**定义以下类属性:
```python
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。"""
# 1. 表名(必须)
table_name = "financial_income"
# 2. API 接口名(必须)
api_name = "income_vip"
# 3. 目标报表类型(可选,默认 "1"
TARGET_REPORT_TYPE = "1"
# 4. 表结构定义(必须)
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
# ... 其他字段
}
# 5. 索引定义(必须)
TABLE_INDEXES = [
("idx_financial_income_ts_code", ["ts_code"]),
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
```
### 必须实现的抽象方法
子类**必须**实现以下抽象方法:
```python
@abstractmethod
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司财务数据的 DataFrame
注意:
- 使用 VIP 接口(如 income_vip
- 不要在此方法中过滤 report_type基类会统一处理
- 返回的 DataFrame 必须包含 ts_code 和 end_date 列
"""
params = {"period": period}
return self.client.query(self.api_name, **params)
```
### 禁止的操作
子类**禁止**覆盖或修改以下方法:
```python
# 基类核心方法,禁止覆盖
- sync_quarter() # 单季度同步流程
- sync_range() # 范围同步
- sync_incremental() # 增量同步
- sync_full() # 全量同步
- delete_stock_quarter_data() # 删除数据
- compare_and_find_differences() # 差异检测
- ensure_table_exists() # 建表逻辑
```
---
## 同步策略规范
### 增量同步策略
**规范**: 财务数据同步**必须**每次都执行,不存在"跳过"的情况。
**原因**: 财务数据可能会被修正,即使本地已有数据,也需要重新对比更新。
**流程**:
```python
def sync_incremental(self, dry_run: bool = False) -> List[Dict]:
"""增量同步流程。
注意:财务数据必须每次都进行对比更新,因为数据可能被修正。
"""
# 1. 确保表存在(首次同步时自动建表)
self.ensure_table_exists()
# 2. 获取本地最新季度
latest_quarter = self._get_latest_quarter()
# 3. 获取当前季度
current_quarter = self.get_current_quarter()
# 4. 确定同步范围(不检查是否需要同步,直接执行)
# 注意:即使 latest_quarter >= current_quarter也要执行
start_quarter = self.get_prev_quarter(latest_quarter)
# 5. 执行同步
return self.sync_range(start_quarter, current_quarter, dry_run)
```
### 全量同步策略
**规范**: 全量同步从默认起始日期2018Q1同步到当前季度。
**流程**:
```python
def sync_full(self, dry_run: bool = False) -> List[Dict]:
"""全量同步流程。"""
# 1. 创建表结构(如不存在)
self.ensure_table_exists()
# 2. 清空表(可选,根据需求决定)
# self.storage.clear_table(self.table_name)
# 3. 获取同步范围
start_quarter = self.DEFAULT_START_DATE # "20180331"
end_quarter = self.get_current_quarter()
# 4. 执行同步
return self.sync_range(start_quarter, end_quarter, dry_run)
```
### 单季度同步策略
**规范**: 单季度同步采用"先删除后插入"策略。
**流程**:
```python
def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
"""单季度同步流程(核心)。"""
# 1. 获取远程数据
remote_df = self.fetch_single_quarter(period)
# 2. 根据 TARGET_REPORT_TYPE 过滤报表类型
if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
# 3. 对比找出差异股票
diff_df, stats_df = self.compare_and_find_differences(remote_df, period)
# 4. 执行同步(先删除后插入)
if not dry_run and not diff_df.empty:
diff_stocks = list(diff_df['ts_code'].unique())
# 4.1 删除差异股票的旧数据
self.delete_stock_quarter_data(period, diff_stocks)
# 4.2 插入新数据
self.storage.queue_save(self.table_name, diff_df)
self.storage.flush()
return {...}
```
**重要**: 禁止使用 UPSERTINSERT OR REPLACE必须使用"先删除后插入"。
---
## 数据差异检测
### 检测逻辑
**规范**: 按股票级别对比本地与远程数据量,识别差异。
**算法**:
```python
def compare_and_find_differences(
self,
remote_df: pd.DataFrame,
period: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""对比远程数据与本地数据,找出差异。
逻辑:
1. 统计远程数据中每只股票的数据量
2. 查询本地数据库中该季度每只股票的数据量
3. 对比找出差异股票(新增或数据量不一致)
4. 返回需要插入的差异数据
注意:主键为 (ts_code, end_date, report_type),但差异检测按股票级别进行。
如果某股票的记录总数不一致,则更新该股票的所有记录。
"""
# 1. 统计远程数据中每只股票的数据量
remote_counts = remote_df.groupby('ts_code').size().to_dict()
# 2. 获取本地数据量(按股票汇总)
local_counts = self.get_local_data_count_by_stock(period)
# 3. 对比找出差异
diff_stocks = []
stats = []
for ts_code, remote_count in remote_counts.items():
local_count = local_counts.get(ts_code, 0)
if local_count == 0:
status = "new" # 本地不存在
diff_stocks.append(ts_code)
elif local_count != remote_count:
status = "modified" # 数据量不一致,可能包含修正
diff_stocks.append(ts_code)
else:
status = "same" # 数据量一致
stats.append({
'ts_code': ts_code,
'remote_count': remote_count,
'local_count': local_count,
'status': status
})
# 4. 提取差异数据
diff_df = remote_df[remote_df['ts_code'].isin(diff_stocks)].copy()
stats_df = pd.DataFrame(stats)
return diff_df, stats_df
```
### 删除策略
**规范**: 删除指定季度和指定股票的所有数据。
```python
def delete_stock_quarter_data(
self,
period: str,
ts_codes: Optional[List[str]] = None
) -> int:
"""删除指定季度和股票的数据。
Args:
period: 季度字符串 (YYYYMMDD)
ts_codes: 股票代码列表None 表示删除该季度所有数据
Returns:
删除的记录数
"""
if ts_codes:
# 删除指定股票的数据
placeholders = ', '.join(['?' for _ in ts_codes])
query = f'''
DELETE FROM "{self.table_name}"
WHERE end_date = ? AND ts_code IN ({placeholders})
'''
result = storage._connection.execute(query, [period] + ts_codes)
else:
# 删除整个季度的数据
query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?'
result = storage._connection.execute(query, [period])
return result.rowcount
```
---
## 表结构设计
### 必备字段
财务数据表**必须**包含以下字段:
```python
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL", # 股票代码
"end_date": "DATE NOT NULL", # 报告期(季度最后一天)
"report_type": "INTEGER", # 报表类型
"ann_date": "DATE", # 公告日期(可选)
# ... 其他业务字段
}
```
### 字段命名规范
遵循 Tushare API 返回的字段名,保持与原 API 一致。
**正确示例**:
```python
"basic_eps": "DOUBLE", # 基本每股收益
"total_revenue": "DOUBLE", # 营业总收入
"operate_profit": "DOUBLE", # 营业利润
```
**错误示例**:
```python
"basicEPS": "DOUBLE", # 驼峰命名,不符合
"basic_eps_value": "DOUBLE", # 添加多余后缀
"eps_basic": "DOUBLE", # 词序颠倒
```
### 数据类型规范
| Tushare 类型 | DuckDB 类型 |
|--------------|-------------|
| str | VARCHAR(n) |
| float | DOUBLE |
| int | INTEGER |
| date | DATE |
**示例**:
```python
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"report_type": "INTEGER",
"basic_eps": "DOUBLE",
}
```
---
## 索引设计规范
### 禁止唯一索引
**严格禁止**创建主键和唯一索引:
```python
# 禁止创建主键
PRIMARY_KEY = ("ts_code", "end_date", "report_type") # 错误!
# 禁止创建唯一索引
("idx_unique", ["ts_code", "end_date"], True) # 错误!
CREATE UNIQUE INDEX ... # 错误!
```
**原因**: 财务数据可能发生多次修正,同一支股票在同一季度可能有多个版本(不同的 `ann_date`),设置唯一约束会导致插入失败。
### 推荐索引
**必须**创建以下索引:
```python
TABLE_INDEXES = [
# 1. 股票代码索引(单字段查询)
("idx_financial_income_ts_code", ["ts_code"]),
# 2. 报告期索引(时间范围查询)
("idx_financial_income_end_date", ["end_date"]),
# 3. 复合索引(股票+报告期+报表类型,最常用)
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
```
### 索引命名规范
索引名**必须**遵循以下格式:
```python
# 格式: idx_{table_name}_{fields_description}
# 正确示例
"idx_financial_income_ts_code"
"idx_financial_income_end_date"
"idx_financial_income_ts_period"
# 错误示例
"ts_code_idx" # 缺少表名前缀
"income_ts" # 表名缩写不明确
"index_1" # 无意义名称
```
---
## 报表类型过滤
### 默认行为
**规范**: 默认只同步合并报表(`report_type='1'`)。
```python
class QuarterBasedSync(ABC):
# 目标报表类型(子类可覆盖)
# 默认只同步合并报表report_type='1'
# 设为 None 则同步所有报表类型
TARGET_REPORT_TYPE: Optional[str] = "1"
```
### 覆盖方式
子类可以通过覆盖类属性来修改默认行为:
```python
class IncomeQuarterSync(QuarterBasedSync):
"""利润表同步 - 只同步合并报表。"""
TARGET_REPORT_TYPE = "1" # 明确指定
class BalanceQuarterSync(QuarterBasedSync):
"""资产负债表同步 - 同步所有报表类型。"""
TARGET_REPORT_TYPE = None # 不过滤
```
### 过滤逻辑
过滤逻辑在基类中统一处理:
```python
def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
# 1. 获取远程数据
remote_df = self.fetch_single_quarter(period)
# 2. 根据 TARGET_REPORT_TYPE 过滤
if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
# ... 后续处理
```
### 报表类型说明
根据 Tushare 文档:
| 代码 | 类型 | 说明 |
|------|------|------|
| 1 | 合并报表 | 上市公司最新报表(默认)|
| 2 | 单季合并 | 单一季度的合并报表 |
| 3 | 调整单季合并表 | 调整后的单季合并报表 |
| 4 | 调整合并报表 | 本年度公布上年同期的财务报表数据 |
| 5 | 调整前合并报表 | 数据发生变更,将原数据进行保留 |
| 6 | 母公司报表 | 该公司母公司的财务报表数据 |
| ... | ... | ... |
---
## 代码示例
### 完整实现示例:利润表接口
```python
"""利润表数据接口 (VIP 版本)
使用 Tushare VIP 接口 (income_vip) 获取利润表数据。
按季度同步,一次请求获取一个季度的全部上市公司数据。
使用方式:
from src.data.api_wrappers.financial_data.api_income import (
IncomeQuarterSync,
sync_income,
preview_income_sync
)
# 方式1: 使用类
syncer = IncomeQuarterSync()
syncer.sync_incremental() # 增量同步
syncer.sync_full() # 全量同步
# 方式2: 使用便捷函数
sync_income() # 增量同步
sync_income(force_full=True) # 全量同步
"""
from typing import Optional
import pandas as pd
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync
)
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。
使用 income_vip 接口按季度获取全部上市公司利润表数据。
表结构: financial_income
注意: 不设置主键和唯一索引,支持财务数据多次修正
"""
table_name = "financial_income"
api_name = "income_vip"
# 只同步合并报表
TARGET_REPORT_TYPE = "1"
# 表结构定义
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"f_ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
"comp_type": "INTEGER",
"basic_eps": "DOUBLE",
"diluted_eps": "DOUBLE",
"total_revenue": "DOUBLE",
"revenue": "DOUBLE",
# ... 其他字段
}
# 普通索引(不要创建唯一索引)
TABLE_INDEXES = [
("idx_financial_income_ts_code", ["ts_code"]),
("idx_financial_income_end_date", ["end_date"]),
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司利润表数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司利润表数据的 DataFrame
"""
params = {"period": period}
return self.client.query(self.api_name, **params)
# =============================================================================
# 便捷函数
# =============================================================================
def sync_income(
force_full: bool = False,
dry_run: bool = False,
) -> list:
"""同步利润表数据(便捷函数)。
Args:
force_full: 若为 True强制全量同步
dry_run: 若为 True仅预览不写入
Returns:
同步结果列表
"""
return sync_financial_data(IncomeQuarterSync, force_full, dry_run)
def preview_income_sync() -> dict:
"""预览利润表同步信息。
Returns:
预览信息字典
"""
return preview_financial_sync(IncomeQuarterSync)
def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取利润表数据(原始接口,单季度)。
用于直接获取某个季度的数据,不进行同步管理。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
fields: 指定返回字段,默认返回全部字段
Returns:
包含利润表数据的 DataFrame
"""
client = TushareClient()
if fields is None:
fields = "ts_code,ann_date,end_date,report_type,basic_eps,..."
return client.query("income_vip", period=period, fields=fields)
```
### 预留接口示例
```python
"""资产负债表数据接口 (VIP 版本) - 预留
使用 Tushare VIP 接口 (balancesheet_vip) 获取资产负债表数据。
"""
from typing import Optional
import pandas as pd
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync
)
class BalanceQuarterSync(QuarterBasedSync):
"""资产负债表季度同步实现(预留)。"""
table_name = "financial_balance"
api_name = "balancesheet_vip"
TARGET_REPORT_TYPE = "1"
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
# TODO: 补充完整字段定义
}
TABLE_INDEXES = [
("idx_financial_balance_ts_code", ["ts_code"]),
("idx_financial_balance_end_date", ["end_date"]),
("idx_financial_balance_ts_period", ["ts_code", "end_date", "report_type"]),
]
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""预留方法,尚未实现。"""
raise NotImplementedError(
"资产负债表同步尚未实现。需要 Tushare 5000 积分调用 balancesheet_vip 接口。"
)
def sync_balance(force_full: bool = False, dry_run: bool = False) -> list:
"""预留函数。"""
raise NotImplementedError("资产负债表同步尚未实现")
def preview_balance_sync() -> dict:
"""预留函数。"""
raise NotImplementedError("资产负债表同步尚未实现")
```
---
## 常见问题
### Q1: 为什么不设置主键和唯一索引?
**A**: 财务数据可能发生多次修正。例如:
```
第一次发布600000.SH, 20240331, report_type='1', ann_date='20240428'
第二次修正600000.SH, 20240331, report_type='1', ann_date='20240515'
```
如果设置了唯一约束(如 `PRIMARY KEY (ts_code, end_date, report_type)`),第二次插入会失败。因此采用"先删除后插入"策略,不设置唯一约束。
### Q2: 为什么增量同步不跳过已同步的季度?
**A**: 财务数据与日频数据不同,可能会被修正。即使本地已有某季度的数据,也需要重新获取远程数据进行对比,确保数据完整性。
### Q3: 为什么要获取前一季度?
**A**: 财务数据修正可能发生在发布后的一段时间内。获取前一季度可以捕获上一季度可能发生的修正数据。
**示例**:
```
当前日期: 2024-04-15
当前季度: 2024Q1 (20240331)
本地最新: 2023Q4 (20231231)
同步范围: 2023Q3 (20230930) -> 2024Q1 (20240331)
(包含前一季度以确保数据完整性)
```
### Q4: 如何支持其他报表类型?
**A**: 覆盖 `TARGET_REPORT_TYPE` 类属性:
```python
class IncomeAllReportSync(QuarterBasedSync):
"""同步所有报表类型。"""
TARGET_REPORT_TYPE = None # 不过滤
```
或者同步特定类型:
```python
class IncomeParentReportSync(QuarterBasedSync):
"""只同步母公司报表。"""
TARGET_REPORT_TYPE = "6" # 母公司报表
```
### Q5: 如何处理字段变更?
**A**: 如果 Tushare API 字段发生变更:
1. 更新 `TABLE_SCHEMA` 添加新字段
2. 使用 `ALTER TABLE` 添加新列(对已存在的数据)
3. 更新 `fetch_single_quarter()` 中的字段列表(如果使用了 `fields` 参数)
**注意**: 不要删除已有字段,保持向后兼容。
### Q6: 如何调试同步问题?
**A**: 使用 `dry_run=True` 进行预览:
```python
from src.data.api_wrappers.financial_data.api_income import sync_income
# 预览同步,不写入数据
result = sync_income(dry_run=True)
print(result)
```
查看日志输出,检查:
- 远程数据量
- 本地数据量
- 差异股票列表
- 删除/插入记录数
---
## 附录
### 相关文档
- [财务数据 API 说明](financial_api.md) - Tushare 财务数据接口说明
- [通用 API 接口规范](API_INTERFACE_SPEC.md) - 通用接口规范
- [数据同步重构计划](../plan/2026-03-07-financial-sync-refactor.md) - 本次重构的详细计划
### 相关代码
- `src/data/api_wrappers/base_financial_sync.py` - QuarterBasedSync 基类
- `src/data/api_wrappers/financial_data/api_income.py` - 利润表示例实现
- `src/data/api_wrappers/financial_data/api_financial_sync.py` - 调度中心
### 变更历史
| 日期 | 版本 | 变更内容 |
|------|------|----------|
| 2026-03-07 | v1.0 | 初始版本,规范财务数据 API 封装要求 |
---
**注意**: 本文档为强制性规范,所有财务数据 API 封装必须遵循。如有特殊情况需要例外,需经过架构评审。

View File

@@ -0,0 +1,600 @@
# n_income 因子生命周期分析
## 概述
本文档详细分析 `src/experiment/regression.py``n_income` 因子的完整生命周期,从字符串定义到最终参与模型训练的全过程。
## 因子定义
`src/experiment/regression.py` 第 55 行定义:
```python
FACTOR_DEFINITIONS = {
# ... 其他因子 ...
"n_income": "n_income",
}
```
`n_income` 是一个简单符号表达式,代表**净利润**Net Income财务指标。这是一个**点-in-timePIT数据**,需要从财务报表中获取。
---
## 第一阶段:因子注册
### 1.1 注册入口
**位置**: `src/experiment/regression.py:134`
```python
def create_factors_with_strings(engine: FactorEngine) -> List[str]:
for name, expr in FACTOR_DEFINITIONS.items():
engine.add_factor(name, expr) # 注册 n_income
```
### 1.2 add_factor 实现
**位置**: `src/factors/engine/factor_engine.py:62-86`
```python
def add_factor(
self,
name: str,
expr: Union[str, Node],
) -> "FactorEngine":
"""注册一个因子表达式"""
# 如果是字符串,先解析为 Node
if isinstance(expr, str):
expr = self.parser.parse(expr) # 关键步骤
# 创建执行计划
plan = self.planner.create_plan(expr, name)
# 缓存执行计划
self._factor_plans[name] = plan
return self
```
对于 `n_income = "n_income"`,流程如下:
1. 检测到是字符串,调用 `parser.parse("n_income")`
2. 解析结果为 `Symbol("n_income")` 节点
3. 通过 `planner.create_plan()` 生成执行计划
4. 缓存计划供后续计算使用
---
## 第二阶段:表达式解析
### 2.1 解析器入口
**位置**: `src/factors/parser.py:38-52`
```python
def parse(self, formula: str) -> Node:
"""解析因子表达式字符串为 Node"""
tree = ast.parse(formula, mode='eval')
return self._visit(tree.body)
```
### 2.2 符号节点处理
**位置**: `src/factors/parser.py:96-99`
```python
def _visit_Name(self, node: ast.Name) -> Symbol:
"""处理名称节点(如变量名 n_income"""
return Symbol(node.id)
```
对于 `"n_income"` 字符串:
1. `ast.parse("n_income", mode='eval')` 生成 AST
2. AST 节点类型为 `ast.Name`id 为 `"n_income"`
3. 调用 `_visit_Name()` 创建 `Symbol("n_income")`
### 2.3 Symbol 节点定义
**位置**: `src/factors/dsl.py:82-102`
```python
@dataclass
class Symbol(Node):
"""符号节点,代表命名变量"""
name: str
def dependencies(self) -> Set[str]:
return {self.name}
```
`Symbol("n_income")` 的依赖集合为 `{"n_income"}`
---
## 第三阶段:执行计划生成
### 3.1 计划器入口
**位置**: `src/factors/engine/planner.py:40-74`
```python
def create_plan(
self,
node: Node,
output_name: Optional[str] = None,
) -> ExecutionPlan:
"""创建完整执行计划"""
# 1. 提取依赖
deps = self.extractor.extract_dependencies(node)
# 2. 翻译为 Polars 表达式
polars_expr = self.translator.translate(node)
# 3. 推导数据规格
data_specs = self._infer_data_specs(deps, schema_cache=self.schema_cache)
return ExecutionPlan(
data_specs=data_specs,
polars_expr=polars_expr,
dependencies=deps,
output_name=output_name,
)
```
### 3.2 提取依赖
**位置**: `src/factors/compiler.py:19-32`
```python
def extract_dependencies(self, node: Node) -> Set[str]:
"""从 AST 提取所有依赖的原始字段名"""
deps: Set[str] = set()
self._extract(node, deps)
return deps
def _extract(self, node: Node, deps: Set[str]):
if isinstance(node, Symbol):
deps.add(node.name) # n_income 被加入依赖集
# ... 其他节点类型处理
```
对于 `Symbol("n_income")`,提取的依赖为 `{"n_income"}`
### 3.3 推导数据规格
**位置**: `src/factors/engine/planner.py:86-148`
```python
def _infer_data_specs(
self,
dependencies: Set[str],
schema_cache: SchemaCache,
) -> List[DataSpec]:
"""推导数据规格"""
table_fields: Dict[str, List[str]] = defaultdict(list)
for field in dependencies:
# 使用 SchemaCache 查找字段所属表
table = schema_cache.get_table_for_field(field)
table_fields[table].append(field)
# 为每张表创建 DataSpec
for table, fields in table_fields.items():
if schema_cache.is_pit_table(table): # PIT 表(财务数据)
spec = DataSpec(
table=table,
columns=fields,
join_type="asof_backward",
left_on="trade_date",
right_on="f_ann_date",
)
else: # 普通表(行情数据)
spec = DataSpec(
table=table,
columns=fields,
join_type="standard",
)
specs.append(spec)
```
对于 `n_income`
1. `schema_cache.get_table_for_field("n_income")` 返回 `"financial_income"`
2. `schema_cache.is_pit_table("financial_income")` 返回 `True`
3. 生成 DataSpec
- `table="financial_income"`
- `columns=["n_income"]`
- `join_type="asof_backward"`
- `left_on="trade_date"`
- `right_on="f_ann_date"`
### 3.4 SchemaCache 实现
**位置**: `src/data/catalog.py`
```python
class SchemaCache:
"""缓存数据库表结构信息,提供字段到表的映射"""
def get_table_for_field(self, field: str) -> Optional[str]:
"""根据字段名获取表名"""
return self._field_to_table.get(field)
def is_pit_table(self, table: str) -> bool:
"""判断是否为 PITPoint-in-Time"""
info = self._table_info.get(table)
if info and info.table_type == TableType.PIT:
return True
return False
```
表类型识别逻辑:
- **PIT 表**: 包含 `ann_date``f_ann_date` 字段(财务数据表)
- **DAILY 表**: 包含 `trade_date` 字段(行情数据表)
---
## 第四阶段:数据获取与拼接
### 4.1 compute() 执行入口
**位置**: `src/factors/engine/factor_engine.py:88-120`
```python
def compute(
self,
factor_names: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""计算指定因子"""
# 1. 获取所有需要的执行计划
plans = [self._factor_plans[name] for name in factor_names]
# 2. 合并数据规格(去重)
merged_specs = self._merge_data_specs(plans)
# 3. 从路由器获取核心宽表
core_wide = self.router.fetch_data(
specs=merged_specs,
start_date=start_date,
end_date=end_date,
)
# 4. 执行计算
result = self._execute_with_dependencies(factor_names, core_wide)
return result
```
### 4.2 数据路由器 fetch_data
**位置**: `src/factors/engine/data_router.py:48-100`
```python
def fetch_data(
self,
specs: List[DataSpec],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""根据 DataSpec 列表获取并组装数据"""
# 分离标准表和 asof 表
standard_specs = [s for s in specs if s.join_type == "standard"]
asof_specs = [s for s in specs if s.join_type == "asof_backward"]
# 加载标准表(行情数据)
standard_frames = []
for spec in standard_specs:
df = self._load_table_from_spec(spec, start_date, end_date)
standard_frames.append(df)
# 以第一张标准表为基础(通常是 daily
base_df = standard_frames[0] if standard_frames else None
# 使用 FinancialLoader 加载和拼接财务数据
if asof_specs and base_df is not None:
for spec in asof_specs:
financial_df = self.financial_loader.load_financial_data(
table_name=spec.table,
columns=spec.columns,
start_date=start_date,
end_date=end_date,
)
base_df = self.financial_loader.merge_financial_with_price(
price_df=base_df,
financial_df=financial_df,
date_col="trade_date",
f_ann_col="f_ann_date",
)
return base_df
```
### 4.3 财务数据加载
**位置**: `src/data/financial_loader.py:26-83`
```python
def load_financial_data(
self,
table_name: str,
columns: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""从数据库加载并清洗财务数据"""
# 计算包含回看期的日期范围默认1年
adjusted_start = self.get_date_range_with_lookback(start_date, lookback_days=365)
# 从数据库查询
query = f"""
SELECT ts_code, f_ann_date, end_date, {', '.join(columns)}
FROM {table_name}
WHERE f_ann_date >= '{adjusted_start}'
AND f_ann_date <= '{end_date}'
ORDER BY ts_code, f_ann_date
"""
df = self.conn.execute(query).fetchdf()
df = pl.DataFrame(df)
# 数据清洗:仅保留 report_type==1合并报表
if "report_type" in df.columns:
df = df.filter(pl.col("report_type") == 1)
# 去重:按 (ts_code, end_date) 取 update_flag 最大的记录
if "update_flag" in df.columns:
df = (
df.sort(["ts_code", "end_date", "update_flag"], descending=[False, False, True])
.unique(subset=["ts_code", "end_date"], keep="first")
)
return df
```
### 4.4 财务数据与行情数据拼接
**位置**: `src/data/financial_loader.py:85-136`
```python
def merge_financial_with_price(
self,
price_df: pl.DataFrame,
financial_df: pl.DataFrame,
date_col: str = "trade_date",
f_ann_col: str = "f_ann_date",
) -> pl.DataFrame:
"""使用 asof join 将财务数据拼接到行情数据"""
# 确保日期格式正确
price_df = price_df.with_columns(pl.col(date_col).cast(pl.Date))
financial_df = financial_df.with_columns(pl.col(f_ann_col).cast(pl.Date))
# 使用 join_asof 进行 PIT 对齐
# strategy='backward': 对于每个 trade_date找 f_ann_date <= trade_date 的最新财务数据
result = price_df.join_asof(
financial_df,
left_on=date_col,
right_on=f_ann_col,
by="ts_code", # 按股票代码分组
strategy="backward", # 向后查找最新公告
)
return result
```
**拼接逻辑详解**
| trade_date | ts_code | close | n_income (拼接后) | 来源 f_ann_date |
|-----------|---------|-------|------------------|-----------------|
| 2024-01-15 | 000001.SZ | 10.5 | 1,000,000 | 2024-01-10 |
| 2024-01-16 | 000001.SZ | 10.6 | 1,000,000 | 2024-01-10 |
| 2024-01-20 | 000001.SZ | 10.8 | 1,200,000 | 2024-01-18 |
上表展示了 `n_income` 的 PIT 拼接过程:
- 1月15日使用前一次公告1月10日的净利润数据
- 1月16日继续使用1月10日的数据无新公告
- 1月20日使用最新公告1月18日的净利润数据
---
## 第五阶段:因子计算
### 5.1 执行计算
**位置**: `src/factors/engine/factor_engine.py:188-227`
```python
def _execute_with_dependencies(
self,
factor_names: List[str],
core_wide: pl.DataFrame,
) -> pl.DataFrame:
"""按依赖顺序执行因子计算"""
# 拓扑排序确定计算顺序
sorted_factors = self._topological_sort(factor_names)
# 创建结果 DataFrame
result_exprs = []
for name in sorted_factors:
plan = self._factor_plans[name]
# 执行 Polars 表达式
expr = plan.polars_expr.alias(name)
result_exprs.append(expr)
# 一次性执行所有表达式
result = core_wide.with_columns(result_exprs)
return result
```
### 5.2 翻译为 Polars 表达式
**位置**: `src/factors/translator.py`
```python
def translate(self, node: Node) -> pl.Expr:
"""将 DSL 节点翻译为 Polars 表达式"""
if isinstance(node, Symbol):
# Symbol 直接转为 pl.col()
return pl.col(node.name)
# ... 其他节点类型
```
对于 `Symbol("n_income")`,翻译结果为 `pl.col("n_income")`
### 5.3 n_income 计算
由于 `n_income` 是简单符号(无运算),计算过程:
```python
# core_wide 已经包含 n_income 列(从财务表拼接而来)
result = core_wide.with_columns([
pl.col("n_income").alias("n_income") # 直接引用
])
```
实际上,`n_income` 的值在数据拼接阶段已经确定,计算阶段只是确认输出列名。
---
## 第六阶段:参与模型训练
### 6.1 数据流回到训练流程
**位置**: `src/experiment/regression.py:176-183`
```python
def prepare_data(...) -> pl.DataFrame:
factor_names = feature_cols + ["return_5"] # 包含 n_income
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
return data
```
### 6.2 完整数据流
```
┌─────────────────────────────────────────────────────────────────────┐
│ n_income 因子生命周期 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 定义阶段 │
│ regression.py:55 "n_income": "n_income" │
│ ↓ │
│ 2. 注册阶段 (FactorEngine.add_factor) │
│ parser.parse("n_income") → Symbol("n_income") │
│ ↓ │
│ 3. 计划阶段 (ExecutionPlanner.create_plan) │
│ extract_dependencies() → {"n_income"} │
│ _infer_data_specs() → DataSpec(financial_income, asof_backward) │
│ ↓ │
│ 4. 数据获取阶段 (DataRouter.fetch_data) │
│ FinancialLoader.load_financial_data() │
│ 从 financial_income 表读取 n_income 字段 │
│ ↓ │
│ 5. 数据拼接阶段 (FinancialLoader.merge_financial_with_price) │
│ join_asof(strategy='backward') │
│ 按 trade_date 和 f_ann_date 对齐 │
│ ↓ │
│ 6. 计算阶段 (FactorEngine._execute_with_dependencies) │
│ pl.col("n_income") → 输出到结果 DataFrame │
│ ↓ │
│ 7. 训练阶段 │
│ 作为特征列传入 LightGBM 模型 │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
---
## 关键技术点总结
### 1. PIT 数据处理
财务数据是**低频、公告驱动**的数据,与**高频、连续**的行情数据不同。系统使用 **asof_backward** 策略处理:
- **asof**: As-Of表示截至某个时点的有效数据
- **backward**: 向后查找,确保使用最新公告的数据
- **关键约束**: 不能使用未来数据lookahead bias
### 2. SchemaCache 动态路由
系统自动识别字段所属表,无需手动指定:
```python
# 系统自动识别
n_income financial_income (PIT)
close daily (DAILY)
```
### 3. 财务数据清洗
`FinancialLoader` 自动处理:
- **报告类型过滤**: 仅使用合并报表report_type=1
- **去重策略**: 按 (ts_code, end_date) 取最新修订版update_flag 最大)
- **日期对齐**: 使用公告日f_ann_date而非报告期end_date
### 4. 扩展性设计
添加新的财务因子只需在字典中添加一行:
```python
FACTOR_DEFINITIONS = {
"n_income": "n_income",
"revenue": "revenue", # 营业收入
"eps": "eps", # 每股收益
"roe": "roe", # 净资产收益率
}
```
系统自动处理表路由、数据获取和拼接。
---
## 相关代码文件
| 文件 | 职责 |
|------|------|
| `src/experiment/regression.py` | 训练入口,因子定义 |
| `src/factors/engine/factor_engine.py` | 因子引擎统一入口 |
| `src/factors/parser.py` | 字符串表达式解析 |
| `src/factors/compiler.py` | AST 依赖提取 |
| `src/factors/engine/planner.py` | 执行计划生成 |
| `src/factors/engine/data_router.py` | 数据路由与组装 |
| `src/data/financial_loader.py` | 财务数据加载与拼接 |
| `src/data/catalog.py` | 数据库目录与表结构 |
| `src/data/api_wrappers/financial_data/api_income.py` | 利润表数据接口 |
---
## 附录:数据表结构
### financial_income利润表
```sql
CREATE TABLE financial_income (
ts_code VARCHAR, -- 股票代码
f_ann_date DATE, -- 公告日期PIT关键字段
end_date DATE, -- 报告期
report_type INTEGER, -- 报告类型1=合并报表)
update_flag INTEGER, -- 更新标识(越大越新)
n_income BIGINT, -- 净利润(本因子使用的字段)
revenue BIGINT, -- 营业收入
... -- 其他财务字段
);
```
### daily日线行情
```sql
CREATE TABLE daily (
ts_code VARCHAR, -- 股票代码
trade_date DATE, -- 交易日期
open DOUBLE, -- 开盘价
high DOUBLE, -- 最高价
low DOUBLE, -- 最低价
close DOUBLE, -- 收盘价
vol BIGINT, -- 成交量
... -- 其他行情字段
);
```

File diff suppressed because it is too large Load Diff