Files
ProStock/docs/factor_framework_design.md
liaozhaorun 0a16129548 feat(factors): 添加因子计算框架
- 新增因子基类 (BaseFactor, CrossSectionalFactor, TimeSeriesFactor)
- 新增数据规格和上下文类 (DataSpec, FactorContext, FactorData)
- 新增数据加载器 (DataLoader) 和执行引擎 (FactorEngine)
- 新增组合因子支持 (CompositeFactor, ScalarFactor)
- 添加因子模块完整测试用例
- 添加 Git 提交规范文档
2026-02-22 14:41:32 +08:00

1304 lines
49 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ProStock 因子计算框架设计文档
## 1. 设计目标
- **安全性**:在框架层面彻底防止数据泄露(使用未来数据)
- **易用性**:因子开发者只需关注计算逻辑,无需担心数据安全
- **可扩展性**:支持日期截面、股票截面、交叉因子三种计算模式
- **组合性**:支持因子组合和嵌套
- **性能**:合理利用 Polars 的高效计算能力
## 1.1 核心原则
### 原则 1因子类型单一性
每个因子**只能是一种类型**(日期截面 或 股票截面),不允许同时支持两种模式。这确保:
- 因子语义清晰明确
- 数据访问模式可预测
- 便于框架进行正确的数据裁剪
### 原则 2Point-in-Time 严格性
对于任意计算点 `T`(特定日期或特定股票-日期组合):
- 因子**只能访问 `T` 及之前的数据**
- **绝对禁止访问 `T` 之后的任何数据**
- 每个计算点都是独立、隔离的计算上下文
这类似于数据库的 "as-of" 查询语义:"在当时那个时刻,我能看到什么数据?"
---
## 2. 架构概述
```
┌─────────────────────────────────────────────────────────────┐
│ Factor Engine (执行引擎) │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ DAG Builder │ │ Lookback │ │ Parallel Executor │ │
│ │ (依赖图构建) │ │ Validator │ │ (并行计算) │ │
│ └──────────────┘ └──────────────┘ └────────────────────┘ │
└──────────────────────────┬──────────────────────────────────┘
┌──────────────────────────▼──────────────────────────────────┐
│ DataLoader (数据加载层) │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ Multi-File │ │ Column │ │ Lookback │ │
│ │ Aggregation │ │ Selector │ │ Window Control │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
└──────────────────────────┬──────────────────────────────────┘
┌──────▼──────┐
│ HDF5 Files │
└─────────────┘
```
---
## 3. 核心组件设计
### 3.1 数据类型定义
```python
from dataclasses import dataclass
from typing import Dict, List, Optional
import polars as pl
@dataclass(frozen=True)
class DataSpec:
"""数据需求规格说明"""
source: str # H5 文件名(不含扩展名)
columns: List[str] # 需要的列名
lookback_days: int = 0 # 回看窗口(用于时序计算)
@dataclass
class FactorContext:
"""
因子计算上下文(由框架自动注入)
根据因子类型的不同,包含不同的上下文信息:
- CrossSectionalFactorcurrent_date 表示当前计算的日期
- TimeSeriesFactorcurrent_stock 表示当前计算的股票
"""
current_date: Optional[str] = None # 当前计算日期 YYYYMMDD截面因子
current_stock: Optional[str] = None # 当前计算股票代码(时序因子)
trade_dates: List[str] = None # 所有交易日期列表(用于对齐)
class FactorData:
"""
提供给因子的数据容器
根据因子类型的不同,包含不同的数据:
- CrossSectionalFactor当前日期及历史 lookback 的截面数据(所有股票)
- TimeSeriesFactor单只股票的完整时间序列数据
"""
def __init__(self, df: pl.DataFrame, context: FactorContext):
self._df = df
self._context = context
def get_column(self, col: str) -> pl.Series:
"""
获取指定列的数据
适用于两种因子类型:
- 截面因子:获取当天所有股票的该列值
- 时序因子:获取该股票时间序列的该列值
"""
return self._df[col]
def filter_by_date(self, date: str) -> "FactorData":
"""
按日期过滤数据(主要用于截面因子)
截面因子可以使用此方法获取特定日期的数据
但注意:无法获取未来日期的数据(引擎已经裁剪掉)
"""
filtered = self._df.filter(pl.col("trade_date") == date)
return FactorData(filtered, self._context)
def to_polars(self) -> pl.DataFrame:
"""获取底层的 Polars DataFrame高级用法"""
return self._df
@property
def context(self) -> FactorContext:
"""获取计算上下文"""
return self._context
```
### 3.2 因子基类(按类型严格分离)
```python
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Literal
import polars as pl
FactorType = Literal["cross_sectional", "time_series"]
class BaseFactor(ABC):
"""
因子基类 - 定义通用接口
设计原则:
1. 类型单一性:每个因子只能是 cross_sectional 或 time_series 之一
2. 声明式依赖:通过类属性声明所需数据和回看窗口
3. 防泄露保障:根据因子类型,在框架层面防止不同的泄露
4. 参数化支持:通过 __init__ 参数实现因子变体
"""
# ========== 必须声明的类属性 ==========
name: str = "" # 因子名称(唯一标识)
factor_type: FactorType # 因子类型(强制指定)
data_specs: List[DataSpec] = [] # 数据需求规格
# ========== 可选声明的类属性 ==========
category: str = "default" # 因子分类(用于组织管理)
description: str = "" # 因子描述
def __init_subclass__(cls, **kwargs):
"""子类创建时验证必须属性"""
super().__init_subclass__(**kwargs)
if not cls.name:
raise ValueError(f"Factor {cls.__name__} must define 'name'")
if not cls.factor_type:
raise ValueError(f"Factor {cls.__name__} must define 'factor_type'")
if not cls.data_specs:
raise ValueError(f"Factor {cls.__name__} must define 'data_specs'")
def __init__(self, **params):
"""初始化因子参数"""
self.params = params
self._validate_params()
def _validate_params(self):
"""验证参数有效性(子类可覆盖)"""
pass
@abstractmethod
def compute(self, data: FactorData) -> pl.Series:
"""核心计算逻辑(由子类实现)"""
pass
# ========== 因子组合运算符 ==========
def __add__(self, other: 'BaseFactor') -> 'CompositeFactor':
"""因子相加f1 + f2要求同类型"""
return CompositeFactor(self, other, '+')
def __sub__(self, other: 'BaseFactor') -> 'CompositeFactor':
"""因子相减f1 - f2要求同类型"""
return CompositeFactor(self, other, '-')
def __mul__(self, other: 'BaseFactor') -> 'CompositeFactor':
"""因子相乘f1 * f2要求同类型"""
return CompositeFactor(self, other, '*')
def __truediv__(self, other: 'BaseFactor') -> 'CompositeFactor':
"""因子相除f1 / f2要求同类型"""
return CompositeFactor(self, other, '/')
def __rmul__(self, scalar: float) -> 'ScalarFactor':
"""标量乘法0.5 * f1"""
return ScalarFactor(self, scalar, '*')
class CrossSectionalFactor(BaseFactor):
"""
日期截面因子基类
计算逻辑:在每个交易日,对所有股票进行横向计算
防泄露边界:
- ❌ 禁止访问未来日期的数据(日期泄露)
- ✅ 允许访问当前日期的所有股票数据(股票间比较是正常的)
数据传入:
- compute() 接收的是单日的截面数据(所有股票在该日期的数据)
- 包含 lookback_days 的历史截面数据(用于时序计算后再截面比较)
性能优化:
- 按日期遍历,每天计算一次
- 不需要重复计算,每天独立计算
"""
factor_type: FactorType = "cross_sectional"
@abstractmethod
def compute(self, data: FactorData) -> pl.Series:
"""
计算截面因子值
Args:
data: FactorData包含当前日期及之前 lookback_days 的截面数据
格式DataFrame[ts_code, trade_date, col1, col2, ...]
Returns:
pl.Series: 当前日期所有股票的因子值(长度 = 该日股票数量)
示例:
def compute(self, data):
# 获取当前日期的截面(已经过滤到当前日期,无未来数据)
cs = data.get_cross_section()
# 计算市值排名(在同一天的股票间比较)
return cs['market_cap'].rank()
"""
pass
class TimeSeriesFactor(BaseFactor):
"""
股票截面因子基类(时间序列因子)
计算逻辑:对每只股票,在其时间序列上进行纵向计算
防泄露边界:
- ❌ 禁止访问其他股票的数据(股票泄露)
- ✅ 允许访问该股票的完整历史数据(时序计算需要历史数据)
数据传入:
- compute() 接收的是单只股票的完整时间序列数据
- 包含该股票在 [start_date, end_date] 范围内的所有数据
性能优化:
- 按股票遍历,每只股票一次性计算整个时间序列
- 使用 Polars 的向量化计算(如 rolling_mean高效批量计算
- 无重复计算问题
"""
factor_type: FactorType = "time_series"
@abstractmethod
def compute(self, data: FactorData) -> pl.Series:
"""
计算时间序列因子值
Args:
data: FactorData包含单只股票的完整时间序列
格式DataFrame[ts_code, trade_date, col1, col2, ...]
该股票的所有历史数据都已加载
Returns:
pl.Series: 该股票在各日期的因子值(长度 = 日期数量)
示例:
def compute(self, data):
# 获取该股票的价格序列(该股票的完整历史)
series = data.get_series()
# 一次性计算整个序列的移动平均(高效)
return series.rolling_mean(window_size=self.params['period'])
"""
pass
class CompositeFactor(BaseFactor):
"""组合因子 - 用于实现因子间的数学运算(要求同类型)"""
def __init__(self, left: BaseFactor, right: BaseFactor, op: str):
# 验证类型一致性
if left.factor_type != right.factor_type:
raise ValueError(
f"Cannot combine factors of different types: "
f"{left.factor_type} vs {right.factor_type}"
)
self.left = left
self.right = right
self.op = op
self.factor_type = left.factor_type
self.name = f"({left.name}_{op}_{right.name})"
# 合并数据需求
self.data_specs = self._merge_data_specs()
def _merge_data_specs(self) -> List[DataSpec]:
"""合并左右因子的数据需求(取最大 lookback"""
# ... 合并逻辑
pass
def compute(self, data: FactorData) -> pl.Series:
"""执行组合运算"""
left_values = self.left.compute(data)
right_values = self.right.compute(data)
ops = {
'+': lambda a, b: a + b,
'-': lambda a, b: a - b,
'*': lambda a, b: a * b,
'/': lambda a, b: a / b,
}
return ops[self.op](left_values, right_values)
class ScalarFactor(BaseFactor):
"""标量运算因子"""
def __init__(self, factor: BaseFactor, scalar: float, op: str):
self.factor = factor
self.scalar = scalar
self.op = op
self.factor_type = factor.factor_type
self.name = f"({scalar}_{op}_{factor.name})"
self.data_specs = factor.data_specs
def compute(self, data: FactorData) -> pl.Series:
values = self.factor.compute(data)
if self.op == '*':
return values * self.scalar
elif self.op == '+':
return values + self.scalar
# ... 其他运算
```
---
## 4. 两种计算模式(严格分离的防泄露边界)
### 4.1 核心设计原则
**防泄露边界与因子类型的对应关系:**
| 因子类型 | 防止泄露 | 允许访问 | 计算方式 |
|---------|---------|---------|---------|
| **CrossSectionalFactor** | **日期泄露**(不能用未来日期) | 当天所有股票的数据 | 按日期遍历,每天计算 |
| **TimeSeriesFactor** | **股票泄露**(不能用其他股票) | 该股票的完整历史 | 按股票遍历,每只股票一次性计算 |
**为什么这样设计?**
1. **时序因子(如 MA5**
- 本质上需要历史时序数据来计算前5天收盘价
- 如果防止时序泄露每次只传1天数据会导致 O(N×L) 的重复计算
- 更好的做法:传入整只股票序列,一次性计算整个时间序列的滚动平均
- 需要防止的是**股票泄露**(不能用其他股票的数据来预测这只股票)
2. **截面因子(如 PE 排名)**
- 本质上是当天所有股票之间的相对比较
- 如果传入多天的数据,容易误用未来日期的信息(如用明天的 PE 算今天的排名)
- 更好的做法:每天只传入当天的数据
- 需要防止的是**日期泄露**(不能用未来日期的数据)
### 4.2 日期截面因子Cross-Sectional Factor
**计算方式**:在每个交易日,对所有股票进行横向计算
**典型因子**
- 当日收益率排名
- PE 行业分位数
- 市值对数
- 换手率排序
**防泄露边界 - 防止日期泄露:**
```
对于日期 D 的计算D 遍历 start_date 到 end_date
┌────────────────────────────────────────────────────────────┐
│ 传入数据: │
│ - trade_date = D 的所有股票数据 │
│ - 以及 [D-lookback, D] 的历史数据(用于时序计算后截面) │
│ │
│ 禁止传入: │
│ - trade_date > D 的任何数据 ❌ │
│ - 即未来日期的数据绝对不可见 │
│ │
│ 允许访问: │
│ - D 当天的所有股票数据 ✅ │
│ - 股票间比较是正常的(如排名、分位数) │
└────────────────────────────────────────────────────────────┘
```
**引擎计算流程:**
```python
# 引擎层伪代码
for current_date in date_range:
# 1. 加载当前日期及历史 lookback 的数据(不含未来)
day_data = load_data(
start_date=current_date - lookback,
end_date=current_date # 注意:不包含未来日期
)
# 2. 传入因子计算
factor_values = factor.compute(day_data)
# factor 只能看到 current_date 及之前的数据
# 3. 保存结果
results[current_date] = factor_values
```
**示例**
```python
class ReturnRankFactor(CrossSectionalFactor):
"""当日收益率排名因子"""
name = "return_rank"
factor_type = "cross_sectional"
data_specs = [
DataSpec(
source="daily",
columns=["ts_code", "trade_date", "close"],
lookback_days=1 # 需要前一天的收盘价计算收益率
)
]
category = "momentum"
def compute(self, data: FactorData) -> pl.Series:
# data 包含当天及前1天的数据无未来数据
# 获取当前日期的截面
today_data = data.get_cross_section() # 只返回当前日期的数据
# 计算当天收益率(需要当天和前一天的收盘价)
# 注意:因为 lookback_days=1data 包含两天的数据
returns = today_data["close"].pct_change()
# 返回排名(在当天所有股票间排名)
return returns.rank()
```
### 4.3 时间序列因子Time-Series Factor
**计算方式**:对每只股票,传入完整时间序列,一次性计算所有日期的因子值
**典型因子**
- 20 日移动平均
- 历史波动率
- RSI 技术指标
- MACD
**防泄露边界 - 防止股票泄露:**
```
对于股票 S 的计算S 遍历所有股票):
┌────────────────────────────────────────────────────────────┐
│ 传入数据: │
│ - ts_code = S 的完整时间序列 │
│ - trade_date 在 [start_date, end_date] 范围内的所有数据 │
│ │
│ 禁止传入: │
│ - 其他股票的数据 ❌ │
│ - 即绝对不能混入其他股票的信息 │
│ │
│ 允许访问: │
│ - 该股票的完整历史数据 ✅ │
│ - 包括 start_date 之前 lookback_days 的数据 │
└────────────────────────────────────────────────────────────┘
```
**引擎计算流程:**
```python
# 引擎层伪代码
for stock_code in stock_codes:
# 1. 加载该股票的完整时间序列(所有日期)
stock_data = load_stock_data(
ts_code=stock_code,
start_date=start_date - lookback, # 需要额外的历史数据计算初期值
end_date=end_date
)
# 2. 传入因子计算(一次性计算整个序列)
factor_values = factor.compute(stock_data)
# factor 看到的是该股票的完整历史
# 使用 Polars 的 rolling_mean 等向量化操作,高效计算
# 3. 保存结果
results[stock_code] = factor_values
```
**性能优势(以 MA5 为例):**
```python
# ❌ 低效方式Point-in-Time 逐个计算)
for date in dates:
data = load_data(date-5, date) # 加载6天数据
ma = data["close"].mean() # 计算平均值
# 时间复杂度: O(N × L)N=日期数, L=窗口长度
# ✅ 高效方式(向量化批量计算)
series = load_all_data() # 加载全部数据
ma = series.rolling_mean(window_size=5) # 一次性计算整个序列
# 时间复杂度: O(N)Polars 底层 Rust 优化
```
**示例**
```python
class MovingAverageFactor(TimeSeriesFactor):
"""移动平均线因子"""
name = "ma"
factor_type = "time_series"
data_specs = [
DataSpec(
source="daily",
columns=["ts_code", "trade_date", "close"],
lookback_days=20 # 需要20天历史数据计算初期的 MA
)
]
category = "technical"
def __init__(self, period: int = 20):
super().__init__(period=period)
# 动态调整 lookback
self.data_specs[0].lookback_days = period
def compute(self, data: FactorData) -> pl.Series:
# data 是该股票的完整时间序列(高效传入)
series = data.get_series(column="close")
# 使用 Polars 的向量化 rolling_mean一次性计算整个序列
return series.rolling_mean(window_size=self.params["period"])
```
### 4.4 交叉因子Cross Factor【预留设计】
**适用场景**:同时涉及时间序列和截面计算
**典型因子**
- 某股票过去 20 天在行业内的涨幅排名变化
- 个股波动率与市场波动率的比值
**设计挑战**
这类因子需要同时访问:
1. **个股的时间序列数据**(时序因子特性)
2. **市场的截面数据**(截面因子特性)
**解决方案:将交叉因子拆分为基础因子的组合**
```python
# 推荐方案:组合基础因子而非创建新类型
# 1. 先计算个股动量(时序因子)
stock_momentum = TimeSeriesMomentumFactor(period=20)
# 2. 再计算市场平均动量(截面因子,每天计算市场均值)
market_momentum = CrossSectionalMeanFactor(
base_factor=stock_momentum, # 基于时序因子的结果
aggregation="mean"
)
# 3. 计算相对动量(时序因子间的比较)
# 注意:这里需要特殊处理,因为两个因子类型不同
relative_momentum = RelativeStrengthFactor(
stock_factor=stock_momentum,
market_factor=market_momentum
)
```
**替代方案:专门的 CrossFactor 类型**
```python
class CrossFactor(BaseFactor):
"""
交叉因子基类(预留)
同时需要时序和截面数据,但保持防泄露边界:
- 时序部分:只能看到该股票的历史(防止股票泄露)
- 截面部分:只能看到当前日期的数据(防止日期泄露)
"""
factor_type = "cross"
@abstractmethod
def compute_cross(
self,
stock_series: pl.Series, # 当前股票的时间序列(无其他股票)
market_section: pl.DataFrame # 当前日期的全市场截面(无未来日期)
) -> float:
"""交叉计算逻辑"""
pass
```
---
## 5. 防数据泄露机制(按因子类型区分)
### 5.1 核心原则:不同类型的不同防泄露边界
**关键洞察**:不同类型的因子需要防止的泄露不同,应该采用不同的数据传入策略:
| 因子类型 | 需要防止 | 数据传入策略 | 计算效率 |
|---------|---------|-------------|---------|
| **CrossSectionalFactor** | **日期泄露**(不能用未来日期的数据) | 每天传入当天的数据(含 lookback 历史) | 按天遍历,每天计算一次 |
| **TimeSeriesFactor** | **股票泄露**(不能用其他股票的数据) | 每只股票传入完整序列 | 按股票遍历,向量化计算,高效 |
**为什么不需要同时防止两种泄露?**
1. **时序因子的特点**
- 计算 MA5 需要前5天的收盘价这是**正常的计算需求**,不是泄露
- 如果防止时序上的"未来数据"每次只能传1天数据会导致重复计算
- 真正需要防止的是用**其他股票**的数据来预测这只股票
2. **截面因子的特点**
- 计算 PE 排名需要当天所有股票的 PE这是**正常的计算需求**,不是泄露
- 如果传入多天的数据,容易误用未来日期的 PE如用明天的 PE 算今天的排名)
- 真正需要防止的是用**未来日期**的数据
### 5.2 实现策略
```
┌────────────────────────────────────────────────────────────┐
│ 因子定义阶段 │
│ 1. 因子声明 required_columns 和 lookback_days │
│ 2. 框架静态分析依赖关系 │
└──────────────────────────┬─────────────────────────────────┘
┌────────────────────────────────────────────────────────────┐
│ 数据加载阶段 │
│ 1. 根据声明加载所需数据(多文件聚合 + 列选择) │
│ 2. 根据 lookback_days 计算每个日期所需的历史数据窗口 │
└──────────────────────────┬─────────────────────────────────┘
┌────────────────────────────────────────────────────────────┐
│ 计算阶段(防泄露核心) │
│ │
│ 对于每个 (date, stock) 组合: │
│ │
│ 日期截面模式: │
│ - 截取 date - lookback_days 到 date 的数据 │
│ - 所有股票,仅该时间窗口 │
│ │
│ 股票截面模式: │
│ - 截取该股票 date - lookback_days 到 date 的数据 │
│ - 仅该股票,仅该时间窗口 │
│ │
│ 关键:因子代码只能看到已截断的数据,无法访问未来数据 │
└────────────────────────────────────────────────────────────┘
```
### 5.2 实现细节
```python
class DataLoader:
"""数据加载器 - 负责安全的数据访问"""
def __init__(self, data_dir: str):
self.data_dir = Path(data_dir)
self._cache: Dict[str, pl.DataFrame] = {}
def load(
self,
specs: List[DataSpec],
date_range: Optional[Tuple[str, str]] = None
) -> pl.DataFrame:
"""
加载并聚合多个 H5 文件的数据
Args:
specs: 数据需求规格列表
date_range: 日期范围限制 (start_date, end_date)
Returns:
合并后的 Polars DataFrame
"""
pass
def get_safe_data(
self,
data: pl.DataFrame,
current_date: str,
lookback_days: int,
mode: str, # 'cross_sectional' | 'time_series'
stock_code: Optional[str] = None
) -> FactorData:
"""
获取安全的数据视图(防泄露核心)
对于日期 D 和回看窗口 L
- 只返回 [D-L, D] 范围内的数据
- 根据 mode 决定是返回截面还是单只股票数据
"""
# 计算截断日期
cutoff_start = self._get_trading_date_offset(current_date, -lookback_days)
cutoff_end = current_date
# 截断数据
safe_df = data.filter(
(pl.col("trade_date") >= cutoff_start) &
(pl.col("trade_date") <= cutoff_end)
)
if mode == "time_series":
# 只保留指定股票的数据
safe_df = safe_df.filter(pl.col("ts_code") == stock_code)
# 创建上下文
context = FactorContext(
current_date=current_date,
current_stock=stock_code,
trade_dates=self._get_all_trade_dates()
)
return FactorData(safe_df, context)
class FactorEngine:
"""
因子执行引擎 - 根据因子类型采用不同的计算和防泄露策略
核心职责:
1. CrossSectionalFactor防止日期泄露每天传入当天的数据
2. TimeSeriesFactor防止股票泄露每只股票传入完整序列
3. 管理计算流程和结果组装
"""
def __init__(self, data_loader: DataLoader):
self.data_loader = data_loader
def compute(self, factor: BaseFactor, **kwargs) -> pl.DataFrame:
"""
统一的计算入口,根据因子类型分发到具体方法
"""
if factor.factor_type == "cross_sectional":
return self._compute_cross_sectional(factor, **kwargs)
elif factor.factor_type == "time_series":
return self._compute_time_series(factor, **kwargs)
else:
raise ValueError(f"Unknown factor type: {factor.factor_type}")
def _compute_cross_sectional(
self,
factor: CrossSectionalFactor,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""
执行日期截面计算
防泄露策略:
- 防止日期泄露:每天只传入当天的数据(含 lookback 历史,但不含未来)
- 允许股票间比较:传入当天所有股票的数据
计算方式:
- 按日期遍历
- 每天计算一次,返回当天所有股票的因子值
返回 DataFrame 格式:
┌────────────┬──────────┬──────────────┐
│ trade_date │ ts_code │ factor_name │
├────────────┼──────────┼──────────────┤
│ 20240101 │ 000001.SZ│ 0.5 │
│ 20240101 │ 000002.SZ│ 0.3 │
└────────────┴──────────┴──────────────┘
"""
# 计算实际需要加载的日期范围(考虑 lookback
max_lookback = max(spec.lookback_days for spec in factor.data_specs)
data_start = self._get_trading_date_offset(start_date, -max_lookback)
# 一次性加载所有数据(后续按天裁剪)
raw_data = self.data_loader.load(factor.data_specs, (data_start, end_date))
results = []
# 按日期遍历:每天计算一次
for current_date in self._get_date_range(start_date, end_date):
# 裁剪数据:只保留 current_date 及之前的数据(防止日期泄露)
# 但保留所有股票的数据(允许股票间比较)
day_data = raw_data.filter(
pl.col("trade_date") <= current_date
)
# 如果 lookback > 0进一步裁剪到 lookback 窗口
if max_lookback > 0:
lookback_start = self._get_trading_date_offset(current_date, -max_lookback)
day_data = day_data.filter(pl.col("trade_date") >= lookback_start)
# 创建 FactorData包含当天及历史数据无未来数据
context = FactorContext(
current_date=current_date,
trade_dates=self._get_all_trade_dates()
)
factor_data = FactorData(day_data, context)
# 计算因子值
factor_values = factor.compute(factor_data)
# 获取当前日期的股票列表
today_stocks = day_data.filter(
pl.col("trade_date") == current_date
)["ts_code"]
results.append(pl.DataFrame({
"trade_date": [current_date] * len(today_stocks),
"ts_code": today_stocks,
factor.name: factor_values
}))
return pl.concat(results)
def _compute_time_series(
self,
factor: TimeSeriesFactor,
stock_codes: List[str],
start_date: str,
end_date: str
) -> pl.DataFrame:
"""
执行时间序列计算(股票截面)
防泄露策略:
- 防止股票泄露:每只股票单独计算,传入该股票的完整序列
- 允许访问历史数据:时序计算需要历史数据,这是正常的
计算方式:
- 按股票遍历
- 每只股票一次性计算整个时间序列(向量化,高效)
性能优势:
- 使用 Polars 的 rolling_mean 等向量化操作
- 无重复计算问题
"""
max_lookback = max(spec.lookback_days for spec in factor.data_specs)
data_start = self._get_trading_date_offset(start_date, -max_lookback)
# 加载所有数据
all_data = self.data_loader.load(factor.data_specs, (data_start, end_date))
results = []
# 按股票遍历:每只股票一次性计算
for stock_code in stock_codes:
# 过滤出该股票的数据(防止股票泄露)
stock_data = all_data.filter(pl.col("ts_code") == stock_code)
if stock_data.is_empty():
continue
# 创建 FactorData该股票的完整序列
context = FactorContext(
current_stock=stock_code,
trade_dates=self._get_all_trade_dates()
)
factor_data = FactorData(stock_data, context)
# 一次性计算整个时间序列(向量化,高效)
factor_values = factor.compute(factor_data)
# 获取该股票的日期列表
stock_dates = stock_data["trade_date"]
results.append(pl.DataFrame({
"trade_date": stock_dates,
"ts_code": [stock_code] * len(stock_dates),
factor.name: factor_values
}))
return pl.concat(results)
```
### 5.3 防泄露验证示例
**日期截面因子 - 防止日期泄露:**
```python
# ❌ 错误的截面因子(试图访问未来日期)
class BadCrossSectionalFactor(CrossSectionalFactor):
name = "bad_cs"
factor_type = "cross_sectional"
data_specs = [DataSpec(source="daily", columns=["close"], lookback_days=5)]
def compute(self, data: FactorData) -> pl.Series:
# 引擎传入的是当前日期的数据(假设今天是 20240110
# data 包含 20240106-20240110根据 lookback_days=5
# 但绝对不包含 20240115 的数据
# 试图访问未来日期 - 会报错或返回空
future_data = data.filter(pl.col("trade_date") == "20240115")
# 因为引擎已经裁剪掉未来数据future_data 为空
return future_data["close"] # 错误!
# ✅ 正确的截面因子
class GoodCrossSectionalFactor(CrossSectionalFactor):
name = "pe_rank"
factor_type = "cross_sectional"
data_specs = [DataSpec(source="daily", columns=["pe"], lookback_days=0)]
def compute(self, data: FactorData) -> pl.Series:
# data 只包含当前日期的数据(无未来日期)
# 可以安全地访问当天所有股票的 PE
pe_values = data.get_column("pe")
# 计算排名(在当天股票间比较是正常的)
return pe_values.rank()
```
**时间序列因子 - 防止股票泄露:**
```python
# ❌ 错误的时序因子(试图访问其他股票数据)
class BadTimeSeriesFactor(TimeSeriesFactor):
name = "bad_ts"
factor_type = "time_series"
data_specs = [DataSpec(source="daily", columns=["close"], lookback_days=20)]
def compute(self, data: FactorData) -> pl.Series:
# 引擎传入的是单只股票的数据
# 但因子试图加载其他股票的数据(泄露!)
# 试图访问全局数据 - 这是框架要阻止的
all_stocks_data = load_all_stocks() # 不应该允许!
market_mean = all_stocks_data["close"].mean()
return data.get_series() / market_mean # 用了其他股票的数据!
# ✅ 正确的时序因子
class GoodTimeSeriesFactor(TimeSeriesFactor):
name = "ma20"
factor_type = "time_series"
data_specs = [DataSpec(source="daily", columns=["close"], lookback_days=20)]
def compute(self, data: FactorData) -> pl.Series:
# data 是该股票的完整时间序列(无其他股票)
# 可以安全地计算时序指标(需要历史数据是正常的)
prices = data.get_column("close")
# 计算 20 日移动平均(使用历史数据是正常的)
return prices.rolling_mean(window_size=20)
```
---
## 6. 目录结构
```
src/
├── factors/
│ ├── __init__.py
│ ├── base.py # BaseFactor、CrossSectionalFactor、TimeSeriesFactor 基类
│ ├── data_spec.py # DataSpec、FactorContext、FactorData
│ ├── data_loader.py # DataLoader 多文件聚合、列选择
│ ├── engine.py # FactorEngine 执行引擎
│ ├── composite.py # CompositeFactor、ScalarFactor 组合因子
│ ├── cross_sectional.py # CrossSectionalEngine 日期截面计算
│ ├── time_series.py # TimeSeriesEngine 股票截面计算
│ └── builtin/ # 内置因子库
│ ├── __init__.py
│ ├── momentum.py # 动量类因子CrossSectional
│ ├── technical.py # 技术指标类TimeSeries
│ ├── value.py # 价值类因子CrossSectional
│ └── volatility.py # 波动率类因子TimeSeries
└── data/
└── storage.py # 已有模块(复用)
```
---
## 7. 使用示例
### 7.1 基础用法
```python
from src.factors import FactorEngine, DataLoader
from src.factors.builtin import MovingAverageFactor, ReturnRankFactor
# 1. 初始化数据加载器和执行引擎
data_loader = DataLoader(data_dir="data")
engine = FactorEngine(data_loader)
# 2. 定义因子
ma20 = MovingAverageFactor(period=20)
rank = ReturnRankFactor()
# 3. 计算日期截面因子(自动识别因子类型)
result1 = engine.compute(
factor=rank, # CrossSectionalFactor 类型
start_date="20240101",
end_date="20240131"
)
# 4. 计算股票截面因子(自动识别因子类型)
result2 = engine.compute(
factor=ma20, # TimeSeriesFactor 类型
stock_codes=["000001.SZ", "600000.SH"],
start_date="20240101",
end_date="20240131"
)
```
### 7.2 因子组合
```python
from src.factors.builtin import PEFactor, PBFactor
# 价值复合因子0.5 * PE + 0.3 * PB
pe = PEFactor() # CrossSectionalFactor
pb = PBFactor() # CrossSectionalFactor
# 使用运算符重载组合(类型必须一致)
value_factor = 0.5 * pe + 0.3 * pb
# 计算复合因子
result = engine.compute(
factor=value_factor, # 仍然是 CrossSectionalFactor
start_date="20240101",
end_date="20240131"
)
# ❌ 错误示例:不能组合不同类型的因子
ma = MovingAverageFactor() # TimeSeriesFactor
# bad = pe + ma # ValueError: Cannot combine factors of different types
```
### 7.3 自定义因子
```python
from src.factors import TimeSeriesFactor, DataSpec, FactorData
import polars as pl
class ReturnStdFactor(TimeSeriesFactor):
"""自定义因子示例20日收益率标准差"""
name = "return_std_20"
factor_type = "time_series" # 明确指定类型
data_specs = [
DataSpec(
source="daily",
columns=["ts_code", "trade_date", "close"],
lookback_days=21 # 20天收益率需要21天收盘价
)
]
category = "volatility"
description = "20日收益率标准差"
def compute(self, data: FactorData) -> pl.Series:
# 获取当前股票的时间序列收盘价(传入的是完整序列)
prices = data.get_column("close")
# 计算收益率
returns = prices.pct_change()
# 计算20日滚动标准差向量化计算高效
std = returns.rolling_std(window_size=20)
return std
# 使用自定义因子
my_factor = ReturnStdFactor()
result = engine.compute(
factor=my_factor,
stock_codes=["000001.SZ"],
start_date="20240101",
end_date="20240131"
)
```
---
## 8. 关键设计决策总结
| 决策点 | 选择 | 理由 |
|--------|------|------|
| **防泄露边界** | 按因子类型区分 | 时序防股票泄露,截面防日期泄露,各取所需 |
| **时序因子数据策略** | 传入完整序列,一次性计算 | 避免 O(N×L) 重复计算,利用 Polars 向量化性能 |
| **截面因子数据策略** | 每天传入当天数据 | 防止日期泄露,确保不会误用未来信息 |
| **因子类型** | 严格分离CrossSectional vs TimeSeries | 语义清晰,不同防泄露策略,便于框架优化 |
| **因子接口** | 类继承 + 抽象方法 | 强制规范,支持参数化,易于 IDE 提示 |
| **编程范式** | OOP对比函数式 | 更好的类型检查、组合性和可维护性 |
| **数据返回** | Polars DataFrame/Series | 高性能,现代化 API |
| **组合机制** | 运算符重载(同类型可组合) | 直观易用,类型安全 |
| **缓存策略** | 暂不支持 | 先保证正确性,后续按需添加 |
### 8.1 设计修正的关键洞察
**原设计的问题**
- 试图对所有因子使用统一的 Point-in-Time 策略
- 导致时序因子(如 MA5需要按天裁剪数据产生大量重复计算
- 实际上时序因子需要历史数据是正常的,不应该被限制
**修正后的设计**
- **时序因子**:防止股票泄露(不能用其他股票数据),允许访问完整历史
- **截面因子**:防止日期泄露(不能用未来日期),每天只传入当天数据
- 这样既保证了数据安全,又获得了计算性能
**类比理解**
- 时序因子像"技术分析":只看这只股票自己的历史走势
- 截面因子像"相对估值":只看当天所有股票的相对比较
- 两者关注的泄露风险不同,应该采用不同的防护策略 |
---
## 9. 后续扩展计划
### 阶段 2近期
- [ ] 因子结果缓存机制
- [ ] 并行计算优化
- [ ] 更多内置因子
### 阶段 3中期
- [ ] 交叉因子完整实现
- [ ] 自定义数据源支持
- [ ] 因子可视化工具
### 阶段 4远期
- [ ] 实时数据接入
- [ ] 分布式计算支持
- [ ] 机器学习因子自动生成
---
## 10. 命名约定
- **因子类名**`{描述}Factor`,如 `MovingAverageFactor`
- **因子名称**name 属性):`snake_case`,如 `"ma_20"`
- **数据源名**:与 H5 文件名一致,如 `"daily"`, `"fundamental"`
- **列名**:与数据源中的列名完全一致
- **日期格式**`YYYYMMDD` 字符串
---
*文档版本: v1.1*
*最后更新: 2026-02-21*
**v1.1 更新说明**
- 修正防泄露边界:时序因子防止股票泄露,截面因子防止日期泄露
- 优化计算策略:时序因子传入完整序列一次性计算,避免重复计算
- 明确不同类型因子的数据传入策略和防泄露重点
---
## 附录 A函数式编程方案讨论
作为对比,以下是使用**装饰器 + 函数式编程**的替代设计方案。
### A.1 函数式方案设计
```python
from typing import Callable, List
import polars as pl
from dataclasses import dataclass
@dataclass
class FactorDef:
"""因子定义(由装饰器创建)"""
name: str
factor_type: str # 'cross_sectional' | 'time_series'
data_specs: List[DataSpec]
compute_func: Callable[[FactorData], pl.Series]
params: dict = None
def factor(
name: str,
factor_type: str,
data_specs: List[DataSpec],
category: str = "default",
description: str = ""
):
"""
因子装饰器 - 将函数注册为因子
使用示例:
@factor(
name="ma_20",
factor_type="time_series",
data_specs=[DataSpec(source="daily", columns=["close"], lookback_days=20)]
)
def moving_average(data: FactorData, period: int = 20) -> pl.Series:
return data.get_series().rolling_mean(window_size=period)
"""
def decorator(func: Callable) -> FactorDef:
return FactorDef(
name=name,
factor_type=factor_type,
data_specs=data_specs,
compute_func=func,
params={}
)
return decorator
# 参数化因子的函数式实现
def parameterized_factor(base_factor: FactorDef):
"""
创建参数化因子的工厂函数
使用示例:
# 基础定义
ma_base = factor(...)(moving_average)
# 创建 MA5 和 MA20
ma5 = parameterized_factor(ma_base)(period=5)
ma20 = parameterized_factor(ma_base)(period=20)
"""
def create_instance(**params) -> FactorDef:
# 创建新的 FactorDef更新参数
new_def = FactorDef(
name=f"{base_factor.name}_{'_'.join(f'{k}{v}' for k, v in params.items())}",
factor_type=base_factor.factor_type,
data_specs=base_factor.data_specs, # 可能需要根据 params 调整 lookback
compute_func=lambda data: base_factor.compute_func(data, **params),
params=params
)
return new_def
return create_instance
```
### A.2 两种方案对比
| 维度 | 类继承方案(当前) | 函数式 + 装饰器方案 |
|------|-------------------|-------------------|
| **代码量** | 需要定义类,代码较多 | 函数 + 装饰器,代码较少 |
| **参数化** | `__init__` 直观自然 | 需要工厂函数或偏函数 |
| **IDE 支持** | 好,有类型提示和补全 | 一般,装饰器会丢失类型信息 |
| **组合性** | 运算符重载 (`+`, `-`, `*`) | 需要显式组合函数 |
| **可扩展性** | 继承机制成熟 | 函数组合灵活但复杂 |
| **学习成本** | 需要理解 OOP | 需要理解函数式编程 |
| **调试难度** | 类层次清晰,易调试 | 装饰器嵌套,调试较困难 |
| **元数据管理** | 类属性自然 | 需要额外的元数据结构 |
### A.3 函数式方案示例
```python
# ========== 使用函数式方案 ==========
# 1. 定义基础因子(使用装饰器)
@factor(
name="ma",
factor_type="time_series",
data_specs=[DataSpec(source="daily", columns=["close"], lookback_days=20)]
)
def ma_factor(data: FactorData, period: int) -> pl.Series:
return data.get_series().rolling_mean(window_size=period)
# 2. 创建参数化实例
ma5 = parameterized_factor(ma_factor)(period=5)
ma20 = parameterized_factor(ma_factor)(period=20)
# 3. 组合因子(函数式方式)
def combine_factors(f1: FactorDef, f2: FactorDef, op: str) -> FactorDef:
"""组合两个因子"""
def combined_compute(data: FactorData) -> pl.Series:
v1 = f1.compute_func(data)
v2 = f2.compute_func(data)
if op == '+':
return v1 + v2
elif op == '*':
return v1 * v2
# ...
return FactorDef(
name=f"{f1.name}_{op}_{f2.name}",
factor_type=f1.factor_type, # 假设类型相同
data_specs=merge_specs(f1.data_specs, f2.data_specs),
compute_func=combined_compute
)
# 使用
value_factor = combine_factors(
combine_factors(pe_factor, pb_factor, '+'),
ps_factor,
'+'
)
```
### A.4 方案选择建议
**推荐使用类继承方案的情况**
- 团队熟悉面向对象编程
- 需要丰富的 IDE 支持和类型检查
- 因子逻辑复杂,需要分层抽象
- 需要频繁的组合和参数化操作
**考虑使用函数式方案的情况**
- 团队偏好函数式编程风格
- 因子逻辑简单,主要是数学运算
- 希望减少样板代码
- 需要高度灵活的组合方式
**当前项目选择类继承方案的理由**
1. **Python 生态更熟悉 OOP**:大多数量化开发者更习惯类的方式
2. **IDE 支持更好**VSCode/PyCharm 对类属性和方法的提示更完善
3. **参数化更自然**`MA(period=20)``create_ma(period=20)` 更直观
4. **运算符重载**`f1 + f2``combine_factors(f1, f2, '+')` 更易读
5. **可维护性**:类层次结构在长期维护中更清晰