9.9 KiB
9.9 KiB
因子预计算与本地 DuckDB 缓存实现计划
目标
构建一个将因子值预先计算并持久化到独立 DuckDB 文件的流程,使训练脚本能够从本地缓存读取大部分因子,仅对 FACTOR_DEFINITIONS 中的因子进行实时计算。新增因子值校验机制,确保新旧数据一致性;入口脚本支持全量计算、自动校验并增量更新已有数据库。
架构
- FactorStorage (
src/data/factor_storage.py):负责单个因子的 DuckDB 读写、日期范围查询、以及新旧数据校验。每个因子对应data/factor/{factor_name}.duckdb,表名为factor_values,包含trade_date,ts_code,{factor_name}。 - compute_factors.py (
src/data/compute_factors.py):入口脚本,放置在src/data/下。支持默认运行(不传入 list 时自动计算 metadata 中全部因子)。对每个因子:- 计算
[start_date, end_date]的全量数据; - 若本地已存在该因子 db,读取本地重叠日期数据与新计算结果做逐行校验;
- 校验通过则使用
INSERT OR REPLACE将全量数据写入/更新到本地 db(对已有文件而言是增量更新记录); - 校验不通过则跳过该因子并发出警告。
- 最终打印运行报告(成功、跳过、失败、校验差异统计)。
- 计算
- DataPipeline 内置缓存 (
src/training/pipeline.py):DataPipeline内部自动引入FactorStorage(),无需训练脚本显式传入。prepare_data中将特征列分为manual_cols(factor_definitions中的因子)和cached_cols(本地已缓存的因子)。manual_cols+label_name仍走FactorEngine.compute;cached_cols通过FactorStorage.load读取并按trade_date/ts_codeleft join 合并。 - 训练脚本无需改动:
regression.py和learn_to_rank.py保持现有DataPipeline初始化代码不变,由 Pipeline 内部自动处理缓存。
详细任务
Task 1: 实现 FactorStorage 模块(含校验)
- 创建文件:
src/data/factor_storage.py - 实现
FactorStorage类:__init__(base_dir="data/factor"):确保目录存在;路径应使用settings.factor_cache_path_resolved,不硬编码。_db_path(factor_name) -> Path:返回单个因子的 db 路径。save(factor_name, df):SQL 建表factor_values (trade_date VARCHAR, ts_code VARCHAR, {factor_name} DOUBLE)并设置PRIMARY KEY (trade_date, ts_code);执行 UPSERT 时显式用SELECT trade_date, ts_code, "{factor_name}" FROM df避免列数不匹配。load(factor_name, start_date, end_date) -> pl.DataFrame:连接对应 db,按日期范围过滤读取。exists(factor_name) -> bool:检查 db 文件是否存在。get_date_range(factor_name) -> tuple[str, str] | None:返回已有数据的 min/max date,不存在返回 None。validate(factor_name, new_df, tolerance=1e-6) -> tuple[bool, dict]:- 若本地不存在,返回
(True, {})。 - 读取本地重叠日期数据
old_df; - 按
trade_date+ts_codejoin; - 计算
abs(new - old)的最大值、均值、非 NaN 行数; - 若
max_abs_diff > tolerance,返回(False, stats);否则返回(True, stats)。
- 若本地不存在,返回
- 创建测试:
tests/test_factor_storage.py- 测试 save/load/exists/get_date_range/validate 的正确性。
uv run pytest tests/test_factor_storage.py -v
Task 2: 实现因子预计算入口脚本(支持默认运行、增量更新、校验与报告)
- 创建文件:
src/data/compute_factors.py(注意:放在src/data/而非src/experiment/) - 提供
run(factor_names: List[str], ...)函数和DEFAULT_FACTOR_NAMES: List[str] = [],方便用户直接在代码或 IDE 中执行。 - CLI 参数(argparse):
factors: 可选位置参数,空格分隔的因子名列表。若不提供,默认读取 metadata 中所有因子的name字段作为计算列表。--metadata: metadata JSONL 路径,默认data/factors.jsonl。--start-date: 默认20180101。--end-date: 默认当前日期(datetime.now().strftime("%Y%m%d"))。--output-dir: 默认data/factor(通过settings.factor_cache_path_resolved)。--tolerance: 校验容忍度,默认1e-6。--force: 强制覆盖,即使校验失败也写入(危险开关,默认关闭)。
- 核心流程:
- 初始化
FactorEngine()、FactorStorage(output_dir)、FactorManager(metadata_path)。 - 确定待计算因子列表:若
factor_names为空,执行manager.get_all_factors()["name"].to_list()。 - 遍历待计算因子(使用
tqdm):- 通过
manager.get_factors_by_name(name)获取 dsl 表达式。 engine.add_factor(name, dsl)注册。new_df = engine.compute([name], start_date, end_date)计算全量数据。- 校验:调用
storage.validate(name, new_df, tolerance)。 - 若校验失败且
--force为 False:打印警告,将该因子标记为失败,继续下一个。 - 若校验通过(或
--force为 True):打印差异统计(max_diff,mean_diff,若有的话);调用storage.save(name, new_df)写入/更新数据。
- 通过
- 汇总并打印报告:
- 成功因子数、更新日期范围;
- 校验失败因子数及差异详情;
- 总计耗时。
- 初始化
- 示例运行:
# 计算指定因子 uv run python src/data/compute_factors.py ma_5 ma_20 --start-date 20180101 --end-date 20241231 # 默认计算 metadata 中全部因子 uv run python src/data/compute_factors.py # 在 Jupyter/IDE 中直接调用 from src.data.compute_factors import run, DEFAULT_FACTOR_NAMES run(factor_names=["ma_5", "ma_20"], start_date="20240101", end_date="20241231")
Task 3: DataPipeline 内置缓存读取逻辑
- 修改文件:
src/training/pipeline.py __init__中新增参数(保持向后兼容):use_factor_cache: bool = Truefactor_storage: Optional[Any] = None- 在
__init__内部:若use_factor_cache为 True 且factor_storage为 None,则自动from src.data.factor_storage import FactorStorage; self.factor_storage = FactorStorage(),否则按传入值处理。
- 修改
prepare_data中的数据获取逻辑:feature_cols = self.factor_manager.register_to_engine(engine, verbose=verbose)。- 分类:
manual_cols = [c for c in feature_cols if c in self.factor_manager.factor_definitions](FACTOR_DEFINITIONS中的因子)。cached_cols = [],如果use_factor_cache且self.factor_storage存在:遍历feature_cols,对不在manual_cols中且storage.exists(c)的列加入cached_cols。fallback_cols = [c for c in feature_cols if c not in manual_cols + cached_cols](走 Engine 兜底)。
- 计算:
compute_cols = manual_cols + fallback_cols + [label_name];data = engine.compute(compute_cols, all_start, all_end)。 - 加载缓存:若
cached_cols非空,逐个storage.load(col, all_start, all_end),与data按["trade_date", "ts_code"]做how="left"join(推荐 left join,避免 outer join 导致垃圾行膨胀)。 - verbose 打印:
从本地缓存加载 X 个因子,手动计算 Y 个因子,兜底计算 Z 个因子。 - 确保最终
data的列包含所有feature_cols(join 后列顺序可能变化,不影响后续逻辑即可)。
- 修改文件:
src/training/factor_manager.py(无需修改或极小修改)factor_definitions已经是 public 属性,pipeline.py可直接访问。
Task 4: 训练脚本保持零改动
src/experiment/regression.py:保持不变,DataPipeline初始化代码无需显式传入FactorStorage。src/experiment/learn_to_rank.py:保持不变。- 若后续需要显式关闭缓存,可通过
DataPipeline(use_factor_cache=False)控制,但当前默认即可。
Task 5: 端到端验证
- 预计算验证:
确认
uv run python src/data/compute_factors.py ma_5 ma_20 return_5 --start-date 20240101 --end-date 20241231data/factor/ma_5.duckdb等文件已生成。 - 校验场景验证:
- 手动修改某个 db 中的几行数据值,再次运行 compute_factors。
- 观察是否正确打印差异统计并中止更新(除非加上
--force)。
- 训练流程验证:
- 运行相关训练测试或脚本,确认
DataPipeline正确区分手动计算与缓存读取的因子,输出结果与改造前一致。 uv run pytest tests/ -v通过。
- 运行相关训练测试或脚本,确认
关键设计说明
- 为何放在
src/data/:compute_factors 是数据层的入口工具,与src/data/storage.py、src/data/client.py处于同一抽象层级,src/experiment/仅保留业务实验脚本。 - 全量计算 + INSERT OR REPLACE 的"增量"语义:每次运行都计算全量,但对已有数据库文件执行的是记录的增量更新(旧记录被替换、新记录被插入),既能保证数据最新,又能在校验失败时安全中止。
- DataPipeline 自动发现缓存:训练脚本无感知接入,避免因底层架构变更导致多处实验代码需要同步修改。
- DuckDB UPSERT 踩坑:建表时必须显式声明
PRIMARY KEY (trade_date, ts_code),UPSERT 语句为:同时INSERT INTO factor_values SELECT trade_date, ts_code, "factor_col" FROM df ON CONFLICT (trade_date, ts_code) DO UPDATE SET "factor_col" = EXCLUDED."factor_col"SELECT必须显式列出三列,不能写SELECT * FROM df,否则当传入 DataFrame 包含额外列时会报列数不匹配错误。
预期效果
- 训练前可以把
SELECTED_FACTORS批量预计算到data/factor/*.duckdb。 - 训练时
regression.py/learn_to_rank.py对SELECTED_FACTORS直接读本地缓存,大幅降低FactorEngine.compute的计算量和基础数据重复加载。 FACTOR_DEFINITIONS中的临时/实验性因子依旧实时计算,不影响灵活性。