135 lines
9.9 KiB
Markdown
135 lines
9.9 KiB
Markdown
|
|
# 因子预计算与本地 DuckDB 缓存实现计划
|
|||
|
|
|
|||
|
|
## 目标
|
|||
|
|
构建一个将因子值预先计算并持久化到独立 DuckDB 文件的流程,使训练脚本能够从本地缓存读取大部分因子,仅对 `FACTOR_DEFINITIONS` 中的因子进行实时计算。新增因子值校验机制,确保新旧数据一致性;入口脚本支持全量计算、自动校验并增量更新已有数据库。
|
|||
|
|
|
|||
|
|
## 架构
|
|||
|
|
1. **FactorStorage** (`src/data/factor_storage.py`):负责单个因子的 DuckDB 读写、日期范围查询、以及新旧数据校验。每个因子对应 `data/factor/{factor_name}.duckdb`,表名为 `factor_values`,包含 `trade_date`, `ts_code`, `{factor_name}`。
|
|||
|
|
2. **compute_factors.py** (`src/data/compute_factors.py`):入口脚本,放置在 `src/data/` 下。支持默认运行(不传入 list 时自动计算 metadata 中全部因子)。对每个因子:
|
|||
|
|
- 计算 `[start_date, end_date]` 的全量数据;
|
|||
|
|
- 若本地已存在该因子 db,读取本地重叠日期数据与新计算结果做逐行校验;
|
|||
|
|
- 校验通过则使用 `INSERT OR REPLACE` 将全量数据写入/更新到本地 db(对已有文件而言是增量更新记录);
|
|||
|
|
- 校验不通过则跳过该因子并发出警告。
|
|||
|
|
- 最终打印运行报告(成功、跳过、失败、校验差异统计)。
|
|||
|
|
3. **DataPipeline 内置缓存** (`src/training/pipeline.py`):`DataPipeline` 内部自动引入 `FactorStorage()`,无需训练脚本显式传入。`prepare_data` 中将特征列分为 `manual_cols`(`factor_definitions` 中的因子)和 `cached_cols`(本地已缓存的因子)。`manual_cols` + `label_name` 仍走 `FactorEngine.compute`;`cached_cols` 通过 `FactorStorage.load` 读取并按 `trade_date`/`ts_code` left join 合并。
|
|||
|
|
4. **训练脚本无需改动**:`regression.py` 和 `learn_to_rank.py` 保持现有 `DataPipeline` 初始化代码不变,由 Pipeline 内部自动处理缓存。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 详细任务
|
|||
|
|
|
|||
|
|
### Task 1: 实现 FactorStorage 模块(含校验)
|
|||
|
|
- **创建文件**: `src/data/factor_storage.py`
|
|||
|
|
- 实现 `FactorStorage` 类:
|
|||
|
|
- `__init__(base_dir="data/factor")`:确保目录存在;**路径应使用 `settings.factor_cache_path_resolved`**,不硬编码。
|
|||
|
|
- `_db_path(factor_name) -> Path`:返回单个因子的 db 路径。
|
|||
|
|
- `save(factor_name, df)`:SQL 建表 `factor_values (trade_date VARCHAR, ts_code VARCHAR, {factor_name} DOUBLE)` 并设置 `PRIMARY KEY (trade_date, ts_code)`;执行 UPSERT 时显式用 `SELECT trade_date, ts_code, "{factor_name}" FROM df` 避免列数不匹配。
|
|||
|
|
- `load(factor_name, start_date, end_date) -> pl.DataFrame`:连接对应 db,按日期范围过滤读取。
|
|||
|
|
- `exists(factor_name) -> bool`:检查 db 文件是否存在。
|
|||
|
|
- `get_date_range(factor_name) -> tuple[str, str] | None`:返回已有数据的 min/max date,不存在返回 None。
|
|||
|
|
- `validate(factor_name, new_df, tolerance=1e-6) -> tuple[bool, dict]`:
|
|||
|
|
- 若本地不存在,返回 `(True, {})`。
|
|||
|
|
- 读取本地重叠日期数据 `old_df`;
|
|||
|
|
- 按 `trade_date` + `ts_code` join;
|
|||
|
|
- 计算 `abs(new - old)` 的最大值、均值、非 NaN 行数;
|
|||
|
|
- 若 `max_abs_diff > tolerance`,返回 `(False, stats)`;否则返回 `(True, stats)`。
|
|||
|
|
- **创建测试**: `tests/test_factor_storage.py`
|
|||
|
|
- 测试 save/load/exists/get_date_range/validate 的正确性。
|
|||
|
|
- `uv run pytest tests/test_factor_storage.py -v`
|
|||
|
|
|
|||
|
|
### Task 2: 实现因子预计算入口脚本(支持默认运行、增量更新、校验与报告)
|
|||
|
|
- **创建文件**: `src/data/compute_factors.py`(注意:放在 `src/data/` 而非 `src/experiment/`)
|
|||
|
|
- 提供 `run(factor_names: List[str], ...)` 函数和 `DEFAULT_FACTOR_NAMES: List[str] = []`,方便用户直接在代码或 IDE 中执行。
|
|||
|
|
- CLI 参数(argparse):
|
|||
|
|
- `factors`: 可选位置参数,空格分隔的因子名列表。若不提供,默认读取 metadata 中所有因子的 `name` 字段作为计算列表。
|
|||
|
|
- `--metadata`: metadata JSONL 路径,默认 `data/factors.jsonl`。
|
|||
|
|
- `--start-date`: 默认 `20180101`。
|
|||
|
|
- `--end-date`: 默认当前日期(`datetime.now().strftime("%Y%m%d")`)。
|
|||
|
|
- `--output-dir`: 默认 `data/factor`(通过 `settings.factor_cache_path_resolved`)。
|
|||
|
|
- `--tolerance`: 校验容忍度,默认 `1e-6`。
|
|||
|
|
- `--force`: 强制覆盖,即使校验失败也写入(危险开关,默认关闭)。
|
|||
|
|
- 核心流程:
|
|||
|
|
1. 初始化 `FactorEngine()`、`FactorStorage(output_dir)`、`FactorManager(metadata_path)`。
|
|||
|
|
2. 确定待计算因子列表:若 `factor_names` 为空,执行 `manager.get_all_factors()["name"].to_list()`。
|
|||
|
|
3. 遍历待计算因子(使用 `tqdm`):
|
|||
|
|
- 通过 `manager.get_factors_by_name(name)` 获取 dsl 表达式。
|
|||
|
|
- `engine.add_factor(name, dsl)` 注册。
|
|||
|
|
- `new_df = engine.compute([name], start_date, end_date)` 计算全量数据。
|
|||
|
|
- 校验:调用 `storage.validate(name, new_df, tolerance)`。
|
|||
|
|
- 若校验失败且 `--force` 为 False:打印警告,将该因子标记为失败,继续下一个。
|
|||
|
|
- 若校验通过(或 `--force` 为 True):打印差异统计(`max_diff`, `mean_diff`,若有的话);调用 `storage.save(name, new_df)` 写入/更新数据。
|
|||
|
|
4. 汇总并打印报告:
|
|||
|
|
- 成功因子数、更新日期范围;
|
|||
|
|
- 校验失败因子数及差异详情;
|
|||
|
|
- 总计耗时。
|
|||
|
|
- **示例运行**:
|
|||
|
|
```bash
|
|||
|
|
# 计算指定因子
|
|||
|
|
uv run python src/data/compute_factors.py ma_5 ma_20 --start-date 20180101 --end-date 20241231
|
|||
|
|
|
|||
|
|
# 默认计算 metadata 中全部因子
|
|||
|
|
uv run python src/data/compute_factors.py
|
|||
|
|
|
|||
|
|
# 在 Jupyter/IDE 中直接调用
|
|||
|
|
from src.data.compute_factors import run, DEFAULT_FACTOR_NAMES
|
|||
|
|
run(factor_names=["ma_5", "ma_20"], start_date="20240101", end_date="20241231")
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Task 3: DataPipeline 内置缓存读取逻辑
|
|||
|
|
- **修改文件**: `src/training/pipeline.py`
|
|||
|
|
- `__init__` 中新增参数(保持向后兼容):
|
|||
|
|
- `use_factor_cache: bool = True`
|
|||
|
|
- `factor_storage: Optional[Any] = None`
|
|||
|
|
- 在 `__init__` 内部:若 `use_factor_cache` 为 True 且 `factor_storage` 为 None,则自动 `from src.data.factor_storage import FactorStorage; self.factor_storage = FactorStorage()`,否则按传入值处理。
|
|||
|
|
- 修改 `prepare_data` 中的数据获取逻辑:
|
|||
|
|
1. `feature_cols = self.factor_manager.register_to_engine(engine, verbose=verbose)`。
|
|||
|
|
2. 分类:
|
|||
|
|
- `manual_cols = [c for c in feature_cols if c in self.factor_manager.factor_definitions]`(`FACTOR_DEFINITIONS` 中的因子)。
|
|||
|
|
- `cached_cols = []`,如果 `use_factor_cache` 且 `self.factor_storage` 存在:遍历 `feature_cols`,对不在 `manual_cols` 中且 `storage.exists(c)` 的列加入 `cached_cols`。
|
|||
|
|
- `fallback_cols = [c for c in feature_cols if c not in manual_cols + cached_cols]`(走 Engine 兜底)。
|
|||
|
|
3. 计算:`compute_cols = manual_cols + fallback_cols + [label_name]`;`data = engine.compute(compute_cols, all_start, all_end)`。
|
|||
|
|
4. 加载缓存:若 `cached_cols` 非空,逐个 `storage.load(col, all_start, all_end)`,与 `data` 按 `["trade_date", "ts_code"]` 做 `how="left"` join(**推荐 left join**,避免 outer join 导致垃圾行膨胀)。
|
|||
|
|
5. verbose 打印:`从本地缓存加载 X 个因子,手动计算 Y 个因子,兜底计算 Z 个因子`。
|
|||
|
|
6. 确保最终 `data` 的列包含所有 `feature_cols`(join 后列顺序可能变化,不影响后续逻辑即可)。
|
|||
|
|
- **修改文件**: `src/training/factor_manager.py`(无需修改或极小修改)
|
|||
|
|
- `factor_definitions` 已经是 public 属性,`pipeline.py` 可直接访问。
|
|||
|
|
|
|||
|
|
### Task 4: 训练脚本保持零改动
|
|||
|
|
- **`src/experiment/regression.py`**:保持不变,`DataPipeline` 初始化代码无需显式传入 `FactorStorage`。
|
|||
|
|
- **`src/experiment/learn_to_rank.py`**:保持不变。
|
|||
|
|
- 若后续需要显式关闭缓存,可通过 `DataPipeline(use_factor_cache=False)` 控制,但当前默认即可。
|
|||
|
|
|
|||
|
|
### Task 5: 端到端验证
|
|||
|
|
- **预计算验证**:
|
|||
|
|
```bash
|
|||
|
|
uv run python src/data/compute_factors.py ma_5 ma_20 return_5 --start-date 20240101 --end-date 20241231
|
|||
|
|
```
|
|||
|
|
确认 `data/factor/ma_5.duckdb` 等文件已生成。
|
|||
|
|
- **校验场景验证**:
|
|||
|
|
- 手动修改某个 db 中的几行数据值,再次运行 compute_factors。
|
|||
|
|
- 观察是否正确打印差异统计并中止更新(除非加上 `--force`)。
|
|||
|
|
- **训练流程验证**:
|
|||
|
|
- 运行相关训练测试或脚本,确认 `DataPipeline` 正确区分手动计算与缓存读取的因子,输出结果与改造前一致。
|
|||
|
|
- `uv run pytest tests/ -v` 通过。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 关键设计说明
|
|||
|
|
|
|||
|
|
- **为何放在 `src/data/`**:compute_factors 是数据层的入口工具,与 `src/data/storage.py`、`src/data/client.py` 处于同一抽象层级,`src/experiment/` 仅保留业务实验脚本。
|
|||
|
|
- **全量计算 + INSERT OR REPLACE 的"增量"语义**:每次运行都计算全量,但对已有数据库文件执行的是记录的增量更新(旧记录被替换、新记录被插入),既能保证数据最新,又能在校验失败时安全中止。
|
|||
|
|
- **DataPipeline 自动发现缓存**:训练脚本无感知接入,避免因底层架构变更导致多处实验代码需要同步修改。
|
|||
|
|
- **DuckDB UPSERT 踩坑**:建表时必须显式声明 `PRIMARY KEY (trade_date, ts_code)`,UPSERT 语句为:
|
|||
|
|
```sql
|
|||
|
|
INSERT INTO factor_values
|
|||
|
|
SELECT trade_date, ts_code, "factor_col" FROM df
|
|||
|
|
ON CONFLICT (trade_date, ts_code)
|
|||
|
|
DO UPDATE SET "factor_col" = EXCLUDED."factor_col"
|
|||
|
|
```
|
|||
|
|
同时 `SELECT` 必须显式列出三列,不能写 `SELECT * FROM df`,否则当传入 DataFrame 包含额外列时会报列数不匹配错误。
|
|||
|
|
|
|||
|
|
## 预期效果
|
|||
|
|
- 训练前可以把 `SELECTED_FACTORS` 批量预计算到 `data/factor/*.duckdb`。
|
|||
|
|
- 训练时 `regression.py` / `learn_to_rank.py` 对 `SELECTED_FACTORS` 直接读本地缓存,大幅降低 `FactorEngine.compute` 的计算量和基础数据重复加载。
|
|||
|
|
- `FACTOR_DEFINITIONS` 中的临时/实验性因子依旧实时计算,不影响灵活性。
|