Files
ProStock/docs/factor-cache-plan.md

135 lines
9.9 KiB
Markdown
Raw Normal View History

# 因子预计算与本地 DuckDB 缓存实现计划
## 目标
构建一个将因子值预先计算并持久化到独立 DuckDB 文件的流程,使训练脚本能够从本地缓存读取大部分因子,仅对 `FACTOR_DEFINITIONS` 中的因子进行实时计算。新增因子值校验机制,确保新旧数据一致性;入口脚本支持全量计算、自动校验并增量更新已有数据库。
## 架构
1. **FactorStorage** (`src/data/factor_storage.py`):负责单个因子的 DuckDB 读写、日期范围查询、以及新旧数据校验。每个因子对应 `data/factor/{factor_name}.duckdb`,表名为 `factor_values`,包含 `trade_date`, `ts_code`, `{factor_name}`
2. **compute_factors.py** (`src/data/compute_factors.py`):入口脚本,放置在 `src/data/` 下。支持默认运行(不传入 list 时自动计算 metadata 中全部因子)。对每个因子:
- 计算 `[start_date, end_date]` 的全量数据;
- 若本地已存在该因子 db读取本地重叠日期数据与新计算结果做逐行校验
- 校验通过则使用 `INSERT OR REPLACE` 将全量数据写入/更新到本地 db对已有文件而言是增量更新记录
- 校验不通过则跳过该因子并发出警告。
- 最终打印运行报告(成功、跳过、失败、校验差异统计)。
3. **DataPipeline 内置缓存** (`src/training/pipeline.py`)`DataPipeline` 内部自动引入 `FactorStorage()`,无需训练脚本显式传入。`prepare_data` 中将特征列分为 `manual_cols``factor_definitions` 中的因子)和 `cached_cols`(本地已缓存的因子)。`manual_cols` + `label_name` 仍走 `FactorEngine.compute``cached_cols` 通过 `FactorStorage.load` 读取并按 `trade_date`/`ts_code` left join 合并。
4. **训练脚本无需改动**`regression.py``learn_to_rank.py` 保持现有 `DataPipeline` 初始化代码不变,由 Pipeline 内部自动处理缓存。
---
## 详细任务
### Task 1: 实现 FactorStorage 模块(含校验)
- **创建文件**: `src/data/factor_storage.py`
- 实现 `FactorStorage` 类:
- `__init__(base_dir="data/factor")`:确保目录存在;**路径应使用 `settings.factor_cache_path_resolved`**,不硬编码。
- `_db_path(factor_name) -> Path`:返回单个因子的 db 路径。
- `save(factor_name, df)`SQL 建表 `factor_values (trade_date VARCHAR, ts_code VARCHAR, {factor_name} DOUBLE)` 并设置 `PRIMARY KEY (trade_date, ts_code)`;执行 UPSERT 时显式用 `SELECT trade_date, ts_code, "{factor_name}" FROM df` 避免列数不匹配。
- `load(factor_name, start_date, end_date) -> pl.DataFrame`:连接对应 db按日期范围过滤读取。
- `exists(factor_name) -> bool`:检查 db 文件是否存在。
- `get_date_range(factor_name) -> tuple[str, str] | None`:返回已有数据的 min/max date不存在返回 None。
- `validate(factor_name, new_df, tolerance=1e-6) -> tuple[bool, dict]`
- 若本地不存在,返回 `(True, {})`
- 读取本地重叠日期数据 `old_df`
-`trade_date` + `ts_code` join
- 计算 `abs(new - old)` 的最大值、均值、非 NaN 行数;
-`max_abs_diff > tolerance`,返回 `(False, stats)`;否则返回 `(True, stats)`
- **创建测试**: `tests/test_factor_storage.py`
- 测试 save/load/exists/get_date_range/validate 的正确性。
- `uv run pytest tests/test_factor_storage.py -v`
### Task 2: 实现因子预计算入口脚本(支持默认运行、增量更新、校验与报告)
- **创建文件**: `src/data/compute_factors.py`(注意:放在 `src/data/` 而非 `src/experiment/`
- 提供 `run(factor_names: List[str], ...)` 函数和 `DEFAULT_FACTOR_NAMES: List[str] = []`,方便用户直接在代码或 IDE 中执行。
- CLI 参数argparse
- `factors`: 可选位置参数,空格分隔的因子名列表。若不提供,默认读取 metadata 中所有因子的 `name` 字段作为计算列表。
- `--metadata`: metadata JSONL 路径,默认 `data/factors.jsonl`
- `--start-date`: 默认 `20180101`
- `--end-date`: 默认当前日期(`datetime.now().strftime("%Y%m%d")`)。
- `--output-dir`: 默认 `data/factor`(通过 `settings.factor_cache_path_resolved`)。
- `--tolerance`: 校验容忍度,默认 `1e-6`
- `--force`: 强制覆盖,即使校验失败也写入(危险开关,默认关闭)。
- 核心流程:
1. 初始化 `FactorEngine()``FactorStorage(output_dir)``FactorManager(metadata_path)`
2. 确定待计算因子列表:若 `factor_names` 为空,执行 `manager.get_all_factors()["name"].to_list()`
3. 遍历待计算因子(使用 `tqdm`
- 通过 `manager.get_factors_by_name(name)` 获取 dsl 表达式。
- `engine.add_factor(name, dsl)` 注册。
- `new_df = engine.compute([name], start_date, end_date)` 计算全量数据。
- 校验:调用 `storage.validate(name, new_df, tolerance)`
- 若校验失败且 `--force` 为 False打印警告将该因子标记为失败继续下一个。
- 若校验通过(或 `--force` 为 True打印差异统计`max_diff`, `mean_diff`,若有的话);调用 `storage.save(name, new_df)` 写入/更新数据。
4. 汇总并打印报告:
- 成功因子数、更新日期范围;
- 校验失败因子数及差异详情;
- 总计耗时。
- **示例运行**:
```bash
# 计算指定因子
uv run python src/data/compute_factors.py ma_5 ma_20 --start-date 20180101 --end-date 20241231
# 默认计算 metadata 中全部因子
uv run python src/data/compute_factors.py
# 在 Jupyter/IDE 中直接调用
from src.data.compute_factors import run, DEFAULT_FACTOR_NAMES
run(factor_names=["ma_5", "ma_20"], start_date="20240101", end_date="20241231")
```
### Task 3: DataPipeline 内置缓存读取逻辑
- **修改文件**: `src/training/pipeline.py`
- `__init__` 中新增参数(保持向后兼容):
- `use_factor_cache: bool = True`
- `factor_storage: Optional[Any] = None`
-`__init__` 内部:若 `use_factor_cache` 为 True 且 `factor_storage` 为 None则自动 `from src.data.factor_storage import FactorStorage; self.factor_storage = FactorStorage()`,否则按传入值处理。
- 修改 `prepare_data` 中的数据获取逻辑:
1. `feature_cols = self.factor_manager.register_to_engine(engine, verbose=verbose)`
2. 分类:
- `manual_cols = [c for c in feature_cols if c in self.factor_manager.factor_definitions]``FACTOR_DEFINITIONS` 中的因子)。
- `cached_cols = []`,如果 `use_factor_cache``self.factor_storage` 存在:遍历 `feature_cols`,对不在 `manual_cols` 中且 `storage.exists(c)` 的列加入 `cached_cols`
- `fallback_cols = [c for c in feature_cols if c not in manual_cols + cached_cols]`(走 Engine 兜底)。
3. 计算:`compute_cols = manual_cols + fallback_cols + [label_name]``data = engine.compute(compute_cols, all_start, all_end)`
4. 加载缓存:若 `cached_cols` 非空,逐个 `storage.load(col, all_start, all_end)`,与 `data``["trade_date", "ts_code"]``how="left"` join**推荐 left join**,避免 outer join 导致垃圾行膨胀)。
5. verbose 打印:`从本地缓存加载 X 个因子,手动计算 Y 个因子,兜底计算 Z 个因子`
6. 确保最终 `data` 的列包含所有 `feature_cols`join 后列顺序可能变化,不影响后续逻辑即可)。
- **修改文件**: `src/training/factor_manager.py`(无需修改或极小修改)
- `factor_definitions` 已经是 public 属性,`pipeline.py` 可直接访问。
### Task 4: 训练脚本保持零改动
- **`src/experiment/regression.py`**:保持不变,`DataPipeline` 初始化代码无需显式传入 `FactorStorage`
- **`src/experiment/learn_to_rank.py`**:保持不变。
- 若后续需要显式关闭缓存,可通过 `DataPipeline(use_factor_cache=False)` 控制,但当前默认即可。
### Task 5: 端到端验证
- **预计算验证**
```bash
uv run python src/data/compute_factors.py ma_5 ma_20 return_5 --start-date 20240101 --end-date 20241231
```
确认 `data/factor/ma_5.duckdb` 等文件已生成。
- **校验场景验证**
- 手动修改某个 db 中的几行数据值,再次运行 compute_factors。
- 观察是否正确打印差异统计并中止更新(除非加上 `--force`)。
- **训练流程验证**
- 运行相关训练测试或脚本,确认 `DataPipeline` 正确区分手动计算与缓存读取的因子,输出结果与改造前一致。
- `uv run pytest tests/ -v` 通过。
---
## 关键设计说明
- **为何放在 `src/data/`**compute_factors 是数据层的入口工具,与 `src/data/storage.py``src/data/client.py` 处于同一抽象层级,`src/experiment/` 仅保留业务实验脚本。
- **全量计算 + INSERT OR REPLACE 的"增量"语义**:每次运行都计算全量,但对已有数据库文件执行的是记录的增量更新(旧记录被替换、新记录被插入),既能保证数据最新,又能在校验失败时安全中止。
- **DataPipeline 自动发现缓存**:训练脚本无感知接入,避免因底层架构变更导致多处实验代码需要同步修改。
- **DuckDB UPSERT 踩坑**:建表时必须显式声明 `PRIMARY KEY (trade_date, ts_code)`UPSERT 语句为:
```sql
INSERT INTO factor_values
SELECT trade_date, ts_code, "factor_col" FROM df
ON CONFLICT (trade_date, ts_code)
DO UPDATE SET "factor_col" = EXCLUDED."factor_col"
```
同时 `SELECT` 必须显式列出三列,不能写 `SELECT * FROM df`,否则当传入 DataFrame 包含额外列时会报列数不匹配错误。
## 预期效果
- 训练前可以把 `SELECTED_FACTORS` 批量预计算到 `data/factor/*.duckdb`
- 训练时 `regression.py` / `learn_to_rank.py``SELECTED_FACTORS` 直接读本地缓存,大幅降低 `FactorEngine.compute` 的计算量和基础数据重复加载。
- `FACTOR_DEFINITIONS` 中的临时/实验性因子依旧实时计算,不影响灵活性。