refactor(factor): 完全重构因子计算框架 - 引入DSL表达式系统

- 删除旧因子框架:移除 base.py、composite.py、data_loader.py、data_spec.py
  及所有子模块(momentum、financial、quality、sentiment等)
- 新增DSL表达式系统:实现 factor DSL 编译器和翻译器
  - dsl.py: 领域特定语言定义
  - compiler.py: AST编译与优化
  - translator.py: Polars表达式翻译
  - api.py: 统一API接口
- 新增数据路由层:data_router.py 实现字段到表的动态路由
- 新增API封装:api_pro_bar.py 提供pro_bar数据接口
- 更新执行引擎:engine.py 适配新的DSL架构
- 重构测试体系:删除旧测试,新增 test_dsl_promotion.py、
  test_factor_integration.py、test_pro_bar.py
- 清理文档:删除8个过时文档(factor_design、db_sync_guide等)
This commit is contained in:
2026-02-27 22:22:23 +08:00
parent c3c20ed7ea
commit a56433e440
51 changed files with 667 additions and 11287 deletions

View File

@@ -1,467 +0,0 @@
# Classify2_load_model.ipynb 代码流程详解文档
## 概述
本文档详细描述了 `Classify2_load_model.ipynb` 文件中完整的代码流程该notebook实现了一个股票分类模型的特征工程与数据预处理流程。整个流程涵盖了从原始数据加载、多源数据合并、因子计算、数据清洗、特征标准化等多个环节最终输出可供机器学习模型使用的特征矩阵。
---
## 第一部分:环境配置与初始化
### 1.1 基础环境设置
notebook开头的第一个代码单元负责基础的开发环境配置。首先启用Jupyter的自动重载功能通过`%load_ext autoreload``%autoreload 2`指令,使得在修改导入的模块后能够自动重新加载,这对于开发调试阶段非常实用。随后进行垃圾回收机制的相关设置,导入`gc`模块用于手动管理内存因为后续处理的数据量较大数GB级别及时释放不需要的对象可以避免内存溢出。
系统路径的配置通过`sys.path.append`完成将项目根目录添加到Python的模块搜索路径中这样可以直接导入项目内的自定义模块。打印工作目录用于确认当前运行环境。数据处理的核心库`pandas`被导入用于表格数据处理,同时导入项目内部的因子模块和工具函数,包括`get_rolling_factor``get_simple_factor`用于计算技术因子,`read_industry_data`用于读取行业分类数据,`calculate_score`用于计算目标变量。最后通过`warnings.filterwarnings("ignore")`忽略所有警告信息,保持输出界面的整洁。
### 1.2 并行计算配置
第二个代码单元配置了Modin框架的并行计算参数。通过设置环境变量`os.environ["MODIN_CPUS"] = "4"`指定使用4个CPU核心进行并行计算。Modin是一个pandas的并行替代库能够在多核CPU上并行执行pandas操作显著加速大规模数据的处理速度。这一配置对于后续处理数千万条记录的数据框至关重要。
---
## 第二部分:核心数据加载与合并
### 2.1 日线数据加载
第三个代码单元是整个流程中最关键的数据加载步骤依次从HDF5文件中读取多个数据源并合并。HDF5是一种高效的分块压缩存储格式非常适合存储大规模数值型数据。
**第一步:加载日线行情数据**
使用`read_and_merge_h5_data`函数从`daily_data.h5`文件中读取日线行情数据。该函数是项目自定义的工具函数,其核心逻辑在`main/utils/utils.py`中实现。读取的字段包括股票代码`ts_code`、交易日期`trade_date`、开盘价`open`、收盘价`close`、最高价`high`、最低价`low`、成交量`vol`、成交额`amount`以及涨跌幅`pct_chg`。这些字段构成分析的基础价格信息。首次调用该函数时`df`参数为空,因此直接返回读取的数据作为初始数据框。
**第二步:加载日线基础数据**
继续调用`read_and_merge_h5_data`读取`daily_basic.h5`文件获取每日股票的基础财务指标。本次读取使用内连接inner join方式与已有数据进行合并这意味着只保留两个数据集中都存在的股票记录。读取的字段包括`turnover_rate`(换手率)、`pe_ttm`(滚动市盈率)、`circ_mv`(流通市值)、`total_mv`(总市值)以及`volume_ratio`(量比)。这些指标对于评估股票的流动性和估值水平非常重要。
**第三步:加载涨跌停限制数据**
`stk_limit.h5`文件中读取股票的涨跌停价格信息。读取的字段包括`pre_close`(前一日收盘价)、`up_limit`(涨停价)和`down_limit`(跌停价)。这些信息用于后续识别股票的涨停状态,是构建目标变量的重要依据。
**第四步:加载资金流数据**
`money_flow.h5`文件中读取资金流信息,这是计算资金流因子的基础数据。读取的字段包括各类成交量的分解:`buy_sm_vol``sell_sm_vol`代表小单买卖成交量、`buy_lg_vol``sell_lg_vol`代表大单买卖成交量、`buy_elg_vol``sell_elg_vol`代表超大单买卖成交量,以及`net_mf_vol`(净资金流成交量)。通过分析这些不同档位的资金流向,可以判断机构投资者和散户的交易行为。
**第五步:加载筹码分布数据**
最后从`cyq_perf.h5`文件中读取筹码分布相关指标。这些数据反映的是股票持仓成本分布情况,包括历史最低价`his_low`、历史最高价`his_high`、不同成本分位线的价格(`cost_5pct``cost_15pct``cost_50pct``cost_85pct``cost_95pct`)、加权平均成本`weight_avg`以及获利盘比例`winner_rate`。这些指标是计算筹码分布因子的核心数据。
经过上述五个步骤的依次合并最终生成的数据框包含9436343条记录、33个字段占用内存约2.3GB。
### 2.2 行业数据加载与合并
第四个代码单元处理行业分类数据的加载与合并。首先使用`read_and_merge_h5_data`函数从`industry_data.h5`文件中读取行业分类数据,提取股票代码、二级行业代码`l2_code`以及行业纳入日期`in_date`
随后定义了`merge_with_industry_data`函数用于将行业数据与主数据框进行时间匹配合并。该函数的核心逻辑分为以下几个步骤首先确保日期字段转换为datetime类型然后分别对行业数据和主数据按股票代码和日期排序接着使用`pd.merge_asof`函数进行向后合并direction='backward'),这意味着对于每一股票的每一个交易日,会匹配该日之前(包括当日)最近的行业变更记录;对于交易日期早于所有行业变更日期的记录,使用每个股票的最早行业代码进行填充。
这一合并策略的设计目的是确保在任意交易日期都能获取到该股票当前所属的行业分类,这对于后续进行行业中性化处理至关重要。
---
## 第三部分:指数数据处理
### 3.1 指数数据读取
第五个代码单元实现指数数据的读取与指标计算。定义了两个核心函数:`calculate_indicators`用于计算单个指数的技术指标,`generate_index_indicators`用于批量处理多个指数。
`calculate_indicators`函数中,首先对数据按交易日期排序,然后计算以下指标:
**当日涨跌幅**:通过`(close - pre_close) / pre_close * 100`计算,这是最基础的收益率指标。
**RSI指标**相对强弱指数计算过程包括首先计算价格变动delta然后分离上涨和下跌部分分别计算14日滚动平均最后通过公式`100 - (100 / (1 + rs))`得到RSI值。RSI是衡量股票近期涨跌动能的经典技术指标。
**MACD指标**移动平均收敛发散指标计算过程包括12日EMA减去26日EMA得到MACD线9日EMA得到信号线MACD与信号线的差值得到MACD柱。MACD是判断趋势方向和动量变化的重要工具。
**情绪因子**
- **上涨比例up_ratio_20d**过去20天上涨天数占比反映市场整体情绪
- **成交量变化率volume_change_rate**当日成交量与20日均量的比率反映成交量异常变化
- **波动率volatility**过去20天日收益率的标准差反映市场波动程度
- **成交额变化率amount_change_rate**当日成交额与20日均额的比率
`generate_index_indicators`函数则对所有指数分别计算上述指标,然后将结果透视为宽表格式,使每个指数的指标成为独立的列。最终输出包含多个指数技术指标的数据框,这些指标可以作为市场情绪的代理变量加入模型。
### 3.2 行业指数数据处理
第六个代码单元使用talib库和numpy库进行更专业的技术指标计算。talib是专业的技术分析库包含数百种经典技术指标的实现。
`sw_daily.h5`文件读取行业日线数据后,依次计算:
**OBV能量潮指标**:通过`talib.OBV`函数计算,将成交量与价格变动方向结合,衡量资金流入流出的强度。
**短期收益率return_5**5日收益率计算公式为`x / x.shift(5) - 1`
**中期收益率return_20**20日收益率用于捕捉中短期动量。
**动量因子act_factor**:通过`get_act_factor`函数计算该函数对不同周期的EMA指数移动平均进行arctan变换得到平滑的动量因子。这是项目自定义的核心因子之一。
**收益率分位数排名**:将收益率在截面内转换为百分位排名,消除不同行业间的基数差异,便于跨行业比较。
最终对所有新增字段添加`industry_`前缀,并重命名`ts_code``cat_l2_code`,形成行业级别的因子数据,可用于后续的行业对标分析。
---
## 第四部分:财务数据加载
### 4.1 财务指标数据
第九至十一个代码单元负责加载各类财务数据,为模型增加基本面因子。
**财务指标数据fina_indicator**:从`fina_indicator.h5`文件读取,包含`undist_profit_ps`(每股未分配利润)、`ocfps`(每股经营现金流)、`bps`(每股净资产)、`roa`(资产收益率)和`roe`(净资产收益率)。这些指标反映企业的盈利能力和财务健康状况。
**现金流数据cashflow**:从`cashflow.h5`文件读取,包含`n_cashflow_act`(经营活动净现金流),用于评估企业现金流的真实状况。
**资产负债表数据balancesheet**:从`balancesheet.h5`文件读取,包含`money_cap`(货币资金)和`total_liab`(总负债),用于计算企业的偿债能力。
这些财务数据通过`ann_date`(公告日期)与交易数据进行时间匹配,确保使用最新披露的财务信息。需要注意的是,财务数据存在滞后性,通常季报在季度结束后一段时间才披露,因此在匹配时使用向后合并策略。
---
## 第五部分:数据过滤与清洗
### 5.1 数据过滤规则
第十一个代码单元定义了`filter_data`函数,应用一系列过滤规则清理数据:
**排除ST股票**`df[~df['is_st']]`过滤掉所有ST特别处理股票这类股票存在较大风险。
**排除北京股票**`df[~df['ts_code'].str.endswith('BJ')]`排除北京交易所的股票。
**排除创业板和科创板**`df[~df['ts_code'].str.startswith('30')]`排除创业板股票,`df[~df['ts_code'].str.startswith('68')]`排除科创板股票。这一设计可能是因为这两个板块的股票特性与主板有较大差异。
**排除ST类代码**`df[~df['ts_code'].str.startswith('8')]`排除了以8开头的股票代码通常这类代码也属于风险警示类。
**时间范围过滤**`df[df['trade_date'] >= '2019-01-01']]`只保留2019年及以后的数据确保数据质量和一致性。
**删除冗余列**:如果存在`in_date`列则删除,避免与交易日期混淆。
过滤后的数据量从9436343条减少到5087384条约减少46%。
---
## 第六部分:因子计算
### 6.1 财务因子计算
第十二个代码单元是因子计算的核心部分,首先计算一系列财务相关因子:
**现金流因子cashflow_to_ev_factor**:将现金流数据与企业价值进行对比,计算企业现金流的相对估值水平。
**市净率因子book_to_price_ratio**:通过`caculate_book_to_price_ratio`函数计算,每股净资产与股价的比值,是价值投资的经典指标。
### 6.2 基础技术因子
继续计算各类技术因子:
**换手率均值turnover_rate_mean_5**5日平均换手率反映股票的交易活跃程度。
**收益率方差variance_20**20日收益率方差衡量短期波动性。
**BBI比率因子bbi_ratio_factor**BBI是多空指数的简称通过不同周期均线的组合判断多空趋势。
**日偏离度daily_deviation**:当日涨跌幅与市场平均涨跌幅的差异,反映个股的相对强弱。
**日行业偏离度daily_industry_deviation**:相对于所在行业平均涨跌幅的偏离,反映行业内相对表现。
### 6.3 滚动因子与简单因子
调用`get_rolling_factor``get_simple_factor`函数批量计算大量因子。这两个函数定义在`main/factor/factor.py`中,是项目最核心的因子计算模块。
**get_rolling_factor函数**主要计算的因子包括:
**资金流因子组**
- `lg_elg_net_buy_vol`:超大单加大单的净买入量
- `flow_lg_elg_intensity`:主力资金流强度,等于净买入量除以成交量
- `sm_net_buy_vol`:散户净买入量
- `flow_divergence_diff`:散户与主力背离度,主力净买入减去散户净买入
- `flow_divergence_ratio`:背离比率形式
- `lg_elg_buy_prop`:主力买入占比
- `flow_struct_buy_change`资金流结构变动1日变化率
- `flow_lg_elg_accel`:资金流加速度,二阶导数
**筹码分布因子组**
- `chip_concentration_range`95%成本价 - 5%成本价)/ 当前价格,反映筹码集中程度
- `chip_skewness`:(加权平均成本 - 50%成本价)/ 50%成本价,反映成本分布偏斜方向
- `floating_chip_proxy`:浮筹比例,结合获利盘和价格位置
- `cost_support_15pct_change`15%成本线变化率,反映支撑位变动
- `cat_winner_price_zone`:获利盘价格区域分类
**资金与筹码结合因子**
- `flow_chip_consistency`:资金流与筹码结构一致性
- `profit_taking_vs_absorb`:获利了结压力与承接盘强度
**收益率分布因子**
- `upside_vol``downside_vol`:上涨和下跌方向的标准差
- `vol_ratio`:上下波动率比值
- `return_skew``return_kurtosis`:收益率的偏度和峰度
**成交量因子**
- `volume_change_rate`:成交量变化率
- `cat_volume_breakout`:成交量突破信号
- `turnover_deviation`:换手率偏离度
- `cat_turnover_spike`:换手率激增信号
- `avg_volume_ratio``cat_volume_ratio_breakout`:量比相关因子
**talib技术指标**
- `atr_14``atr_6`14日和6日平均真实波幅
- `obv``maobv_6`能量潮及其6日均线
- `rsi_3`3日RSI
**收益率因子**
- `return_5``return_20`5日和20日收益率
- `std_return_5``std_return_90``std_return_90_2`:不同周期的收益率标准差
**EMA与动量因子**
- 各周期EMA5、13、20、60日
- `act_factor1``act_factor4`基于EMA的动量因子
**协方差因子**
- `cov`:高价与成交量的滚动协方差
- `delta_cov`:协方差差分
- `alpha_22_improved`改进的alpha因子
**其他因子**
- `alpha_003`:收盘价与开盘价的相对位置
- `alpha_007`收盘价与成交量的5日滚动相关性
- `alpha_013`5日与20日累计收益差值
- `vol_break`价格突破85%成本线且量比大于2的信号
- `weight_roc5`加权成本5日变化率
- `price_cost_divergence`:价格与成本相关性
- `smallcap_concentration`:小盘股筹码集中度
- `cost_stability`:筹码稳定性指数
- `liquidity_risk`:筹码流动性风险
**get_simple_factor函数**在滚动因子的基础上进一步计算衍生因子:
- `momentum_factor`动量因子等于成交量变化率加0.5倍换手率偏离度
- `resonance_factor`:共振因子,量比乘以涨跌幅
- `log_close`:收盘价对数
- `cat_vol_spike`:成交量激增分类变量
- `up``down`:上下影线比例
- `obv_maobv_6`OBV与6日均线差值
- `std_return_5_over_std_return_90`:短期与长期波动率比值
- `act_factor5``act_factor6`:综合动量因子
- `active_buy_volume_*`:各档位主动买入占比
- `ctrl_strength`:控制强度,反映成本区间与历史区间的比值
- `low_cost_dev``asymmetry``lock_factor`:各类筹码特征因子
- `cat_golden_resonance`:黄金共振信号
### 6.4 资金流因子
第十三个代码单元继续计算`money_factor.py`中定义的各类资金流相关因子。这些因子主要分析大单资金的流动特征和动量属性。
**lg_flow_mom_corr**大单资金流与20至60日滚动窗口的动量相关性衡量资金流趋势的持续性。
**lg_flow_accel**:大单资金流加速度,捕捉资金流入流出的加速变化。
**profit_pressure**:盈利压力因子,结合价格位置和资金流向。
**underwater_resistance**:水下阻力因子,分析亏损区域的支持强度。
**cost_conc_std_20**20日成本集中度标准差反映筹码分布的稳定程度。
**profit_decay_20**20日盈利衰减因子衡量获利盘随时间的减少程度。
**vol_amp_loss_20**20日波动幅度损失因子。
**vol_drop_profit_cnt_5**5日内量价齐升计数。
**lg_flow_vol_interact_20**20日大单资金流与成交量交互因子。
**cost_break_confirm_cnt_5**5日内成本突破确认计数。
**atr_norm_channel_pos_14**14日ATR标准化通道位置。
**turnover_diff_skew_20**20日换手率差值偏度。
**lg_sm_flow_diverge_20**20日大小单资金流背离。
**pullback_strong_20_20**20日回撤强度因子。
**vol_wgt_hist_pos_20**20日成交量加权历史位置。
**vol_adj_roc_20**20日成交量调整变动率。
### 6.5 截面排名因子
第十三个代码单元的后半部分定义了一系列`cs_rank_*`开头的截面排名因子,这些因子都是通过截面排名方式计算得出,可以消除不同股票间的基数差异。
**cs_rank_net_lg_flow_val**:大单净流入值截面排名
**cs_rank_flow_divergence**:资金流背离截面排名
**cs_rank_industry_adj_lg_flow**:行业调整后大单资金流截面排名
**cs_rank_elg_buy_ratio**:超大单买入占比截面排名
**cs_rank_rel_profit_margin**:相对盈利-margin截面排名
**cs_rank_cost_breadth**:成本宽度截面排名
**cs_rank_dist_to_upper_cost**:到上方成本距离截面排名
**cs_rank_winner_rate**:获利盘比例截面排名
**cs_rank_intraday_range**:日内振幅截面排名
**cs_rank_close_pos_in_range**:收盘价在成本区间位置截面排名
**cs_rank_opening_gap**:跳空缺口截面排名(需要前收盘价)
**cs_rank_pos_in_hist_range**:历史区间位置截面排名
**cs_rank_vol_x_profit_margin**成交量与盈利margin交互截面排名
**cs_rank_lg_flow_price_concordance**:大单资金流与价格一致性截面排名
**cs_rank_turnover_per_winner**:单位获利盘换手率截面排名
**cs_rank_ind_cap_neutral_pe**:行业市值中性市盈率截面排名
**cs_rank_volume_ratio**:量比截面排名
**cs_rank_elg_buy_sell_sm_ratio**:超大单买卖与小单比值截面排名
**cs_rank_cost_dist_vol_ratio**:成本分布成交量比值截面排名
**cs_rank_size**:市值规模截面排名
经过上述所有因子计算数据框最终包含181个字段内存占用约6.5GB。
---
## 第七部分:数据预处理函数定义
### 7.1 特征漂移检测
第十四个代码单元定义了`remove_shifted_features`函数,用于检测训练集和测试集之间是否存在特征分布漂移。漂移检测使用两种统计方法:
**KS检验Kolmogorov-Smirnov检验**比较两个分布是否来自同一分布通过p值判断p值小于阈值默认0.05)则认为存在显著差异。
**Wasserstein距离**衡量两个分布之间的Earth Mover距离距离大于阈值默认0.1)则认为漂移严重。
如果特征同时满足两个条件,则认为该特征存在漂移并将其移除。这一步骤对于确保模型在测试集上的泛化能力非常重要。
### 7.2 标准化处理函数
第十五个代码单元定义了三个核心的标准化处理函数:
**cs_mad_filter截面MAD去极值函数**
MADMedian Absolute Deviation中位绝对偏差是一种稳健的极值检测方法。具体步骤包括按日期分组计算每列的中位数然后计算每个值与中位数的绝对偏差再次取中位数得到MAD最后将超出`[median - k * MAD, median + k * MAD]`范围的值截断到边界。默认k=3scale_factor=1.4826使得MAD约等于正态分布的标准差。
**cs_neutralize_market_cap_numpy市值中性化函数**
对每个交易日的每只股票使用OLS回归将因子值对市值取对数进行回归提取残差作为中性化后的因子值。这一步骤消除因子中与市值相关的系统性偏差使因子更能反映股票的真实特性而非市值效应。
**cs_zscore_standardize截面Z-Score标准化函数**
对每个截面每日计算各因子的均值和标准差然后进行Z-Score变换`Z = (value - mean) / (std + epsilon)`。这使得不同量纲的因子具有可比性且均值为0、标准差为1。
**fill_nan_with_daily_median截面中位数填充函数**
对每个交易日分别计算各因子的中位数,用该中位数填充该日内的缺失值。这一方法比全局中位数填充更能反映数据的时间特性。
### 7.3 其他预处理函数
第十五个代码单元还定义了其他辅助预处理函数:
**remove_outliers_label_percentile**对标签进行百分位去极值默认保留1%到99%分位之间的数据。
**calculate_risk_adjusted_target**:计算风险调整后的目标变量,结合未来收益率和未来波动性。
**calculate_score**:计算综合评分目标,考虑未来收益减去风险惩罚(最大回撤)。
**remove_highly_correlated_features**:移除高度相关的特征,避免多重共线性问题。
**cross_sectional_standardization**截面标准化使用StandardScaler进行标准化。
**neutralize_manual_revised**:手动实现的行业市值中性化函数,对每个行业分别进行回归取残差。
**mad_filter**全局MAD去极值函数简化版
**percentile_filter**:百分位去极值函数。
**iqr_filter**:四分位距标准化函数。
**quantile_filter**:滚动分位数去极值函数。
**select_top_features_by_rankic**基于RankIC选择最优特征。
**create_deviation_within_dates**:创建截面偏差特征,计算行业内各因子与均值的偏差。
---
## 第八部分:目标变量构建
### 8.1 未来收益率计算
第十六个代码单元负责构建预测目标变量。
**未来收益率**:通过`df.groupby('ts_code')['close'].shift(-days) / df['close'] - 1`计算days默认为5即预测未来5日的收益率。shift(-days)表示向后移动获取未来价格。
**涨停标签**构建分类目标变量首先识别当日涨幅超过5%的股票(`cat_up_limit`然后计算过去5日内是否存在涨停通过rolling窗口的max函数再向后shift(5)避免未来信息泄露最后fillna(0)并转换为整数类型。
**过滤异常收益率**:使用`between`方法保留1%到99%分位之间的收益率,去除极端值。
---
## 第九部分:特征列筛选
### 9.1 特征列筛选逻辑
第十七个代码单元进行特征列的最终筛选。通过一系列排除规则,从所有可用列中筛选出建模所需的特征列:
**排除非特征列**:排除`trade_date``ts_code``label`等标识列;排除包含`future``label``score``gen``is_st``pe_ttm``circ_mv``code`等关键词的列;排除原始基础数据列`origin_columns`;排除下划线开头的临时列。
**排除特定特征**:手动排除存在问题的特征如`intraday_lg_flow_corr_20``cap_neutral_cost_metric``hurst_net_mf_vol_60`等;排除财务因子`roa``roe`
最终筛选出191个特征列用于建模。
---
## 第十部分:缺失值处理与标准化流程
### 10.1 缺失值填充
第十八个代码单元将所有特征的缺失值填充为0。这一简化处理的假设是缺失值可能代表数据不可用或极小概率事件用0填充对模型影响相对中性。
### 10.2 完整的预处理流水线
第十九个代码单元展示了完整的预处理流程执行:
**第一步:特征列确定**——合并行业数据、指数数据后确定最终特征列表。
**第二步MAD去极值**——执行第一轮截面MAD去极值处理全量特征。
**第三步:市值中性化**——对第一轮去极值后的特征进行截面市值中性化。
**第四步第二轮MAD去极值**——对中性化后的特征再次进行去极值处理。
**第五步Z-Score标准化**——最后对所有特征进行截面Z-Score标准化。
整个流水线确保了最终进入模型的特征具有:稳定的分布(去极值)、去除市值偏差(中性化)、可比性(标准化)。
---
## 总结
`Classify2_load_model.ipynb`实现了一个完整、规范的量化因子工程流程。整个流程涵盖了数据加载、多源数据合并、因子计算、特征筛选、预处理标准化等关键步骤。
**数据层面**整合了日线行情、资金流、筹码分布、财务指标、行业分类、指数数据等多维度数据源形成了超过180个特征的基础特征池。
**因子层面**:因子体系覆盖了资金流因子、筹码分布因子、技术指标因子、动量因子、波动率因子、截面排名因子等多个类别,形成了多角度、全方位的特征体系。
**预处理层面**建立了完整的预处理流水线包括MAD去极值、市值中性化、Z-Score标准化等标准化步骤确保特征的质量和稳定性。
整个notebook的设计体现了量化投资特征工程的最佳实践为后续的机器学习模型训练提供了高质量的数据基础。

View File

@@ -1,227 +0,0 @@
# 代码审查报告 - Factor 框架
**审查日期**: 2026-02-22
**审查范围**: `src/factors/` 模块及测试代码
---
## 变更概述
| 类型 | 文件 |
|------|------|
| **已暂存** | `.kilocode/rules/project_rules.md` - Git 提交规范文档 |
| | `.kilocode/rules/python-development-guidelines.md` - Python 开发规范扩展 |
| **未跟踪** | `src/factors/` - 新增因子计算框架 |
| | `tests/factors/` - 对应测试文件 |
| | `docs/` - 文档目录 |
---
## 文档变更(已暂存)
✅ 无问题。Git 提交规范添加符合项目风格,格式清晰。
---
## 新增代码审查factors 模块)
### 1. 严重问题
#### 1.1 `engine.py:306-323` - 交易日偏移实现假设错误
```python
def _get_trading_date_offset(self, date: str, offset: int) -> str:
from datetime import datetime, timedelta
dt = datetime.strptime(date, "%Y%m%d")
new_dt = dt + timedelta(days=offset)
return new_dt.strftime("%Y%m%d")
```
**问题**简单使用日历日偏移假设每天都是交易日。A股市场有周末和节假日这会导致
- 偏移计算不准确
- `lookback_days` 实际不等于交易天数
- 可能加载过多或过少的历史数据
**建议**:使用真实的交易日历,或至少跳过周末。
---
#### 1.2 `base.py:137-147` - 乘法运算符类型检查不完整
```python
def __mul__(self, other):
if isinstance(other, (int, float)):
from src.factors.composite import ScalarFactor
return ScalarFactor(self, float(other), "*")
elif isinstance(other, BaseFactor):
from src.factors.composite import CompositeFactor
return CompositeFactor(self, other, "*")
return NotImplemented
```
**问题**`float` 类型的负数会匹配 `int` 分支,但 `bool``int` 的子类,会被错误匹配:
```python
factor * True # 返回 ScalarFactor(factor, 1.0, "*") - 可能不是预期行为
factor * False # 返回 ScalarFactor(factor, 0.0, "*") - 可能不是预期行为
```
**建议**:显式排除 `bool` 类型:
```python
if isinstance(other, (int, float)) and not isinstance(other, bool):
```
---
#### 1.3 `engine.py:60-72` - compute 方法缺少必需参数验证 ✅ 已修复
**修复内容**
- 为截面因子添加了 `start_date``end_date` 必填参数验证
- 为时序因子添加了 `stock_codes``start_date``end_date` 必填参数验证
- 参数缺失时抛出明确的 `ValueError`,指出缺少哪些参数
**修复代码**:
```python
if factor.factor_type == "cross_sectional":
if "start_date" not in kwargs or "end_date" not in kwargs:
raise ValueError(
"cross_sectional factor requires 'start_date' and 'end_date' parameters"
)
elif factor.factor_type == "time_series":
missing = []
if "stock_codes" not in kwargs:
missing.append("stock_codes")
if "start_date" not in kwargs:
missing.append("start_date")
if "end_date" not in kwargs:
missing.append("end_date")
if missing:
raise ValueError(
f"time_series factor requires parameters: {', '.join(missing)}"
)
```
---
### 2. 中等问题
#### 2.1 `base.py:92-101` - 参数验证时机问题 ✅ 已修复
**修复内容**
- 移除了 `__init__` 中自动调用的 `self._validate_params()`
- 更新了 `_validate_params()` 的文档,明确说明子类如需自定义验证,需自行在子类 `__init__` 中调用
- 添加了关于 `data_specs` 必须在类级别定义的说明
**修复代码**:
```python
def __init__(self, **params):
"""初始化因子参数
注意data_specs 必须在类级别定义(类属性),
而非在 __init__ 中设置。data_specs 的验证在
__init_subclass__ 中完成(类创建时)。
"""
self.params = params
def _validate_params(self):
"""验证参数有效性
子类可覆盖此方法进行自定义验证(需自行在子类 __init__ 中调用)。
基类实现为空,表示不执行任何验证。
注意:由于 data_specs 在类创建时通过 __init_subclass__ 验证,
不应在实例级别修改。如需动态 data_specs请使用参数化模式
...
"""
pass
```
---
#### 2.2 `engine.py:161-169` - 静默修复长度不匹配
```python
if len(factor_values) != len(today_stocks):
# 尝试从 factor_data 重新提取
cs_data = factor_data.get_cross_section()
if len(cs_data) > 0:
today_stocks = cs_data["ts_code"]
if len(factor_values) != len(today_stocks):
factor_values = pl.Series([None] * len(today_stocks)) # 静默填充 null
```
**问题**:静默返回 null 值可能掩盖因子计算中的逻辑错误。开发者难以发现因子实现问题。
**建议**
- 至少记录警告日志
- 或在开发环境抛出异常
---
#### 2.3 `data_spec.py:14` - 使用 `frozen=True` 但通过 `object.__setattr__` 绕过 ✅ 已修复
**修复内容**
- 更新了 `__post_init__` 的注释,准确说明 `frozen=True` 的含义
- 说明本类仅做验证,无需修改字段,因此直接 raise ValueError 即可
- 补充说明如需在 `__post_init__` 中修改字段,可使用 `object.__setattr__`
**修复代码**:
```python
def __post_init__(self):
"""验证约束条件
验证项:
1. lookback_days >= 1至少包含当日
2. columns 必须包含 ts_code 和 trade_date
3. source 不能为空字符串
注意:由于 frozen=True实例创建后不可修改。
若需要在 __post_init__ 中修改字段(如有),可使用 object.__setattr__。
本类仅做验证,无需修改字段,因此直接 raise ValueError 即可。
"""
if self.lookback_days < 1:
raise ValueError(f"lookback_days must be >= 1, got {self.lookback_days}")
```
---
### 3. 轻微问题 / 优化建议
#### 3.1 `engine.py` - 缺少类型注解
`compute()` 方法返回类型注解为 `pl.DataFrame`,但 `_compute_cross_sectional``_compute_time_series` 返回类型未标注。
#### 3.2 `data_spec.py:42` - 默认值 `lookback_days=1` 语义
注释说 "包含当日",但 `lookback_days=1` 实际只包含 `[T]`,这与注释中 `lookback_days=5` 表示 `[T-4, T]` 一致。
---
## 测试覆盖
测试覆盖良好,共 97 个测试用例,覆盖:
- 因子基类验证
- 组合因子运算
- 数据加载器
- 数据规格定义
- 引擎执行逻辑
---
## 总结
| 状态 | 严重 | 中等 | 轻微 |
|------|------|------|------|
| 修复前 | 3 | 3 | 2 |
| 修复后 | 2 | 1 | 2 |
**已修复问题**
- ✅ 1.3 compute 方法参数验证
- ✅ 2.1 参数验证时机问题
- ✅ 2.3 frozen dataclass 注释
**待修复问题**
- ⚠️ 1.1 交易日偏移实现(严重)
- ⚠️ 1.2 乘法运算符 bool 边界(严重)
- ⚠️ 2.2 静默修复长度不匹配(中等)
Factor 框架整体设计良好,测试覆盖全面。建议修复剩余 2 个严重问题后再合并代码。

View File

@@ -1,267 +0,0 @@
# DuckDB 数据同步指南
ProStock 现已从 HDF5 迁移至 DuckDB 存储。本文档介绍新的同步机制。
## 新功能概览
- **自动表创建**: 根据 DataFrame 自动推断表结构
- **复合索引**: 自动为 `(trade_date, ts_code)` 创建复合索引
- **增量同步**: 智能判断同步策略(按日期或按股票)
- **类型映射**: 预定义常见字段的数据类型
## 核心模块
### 1. TableManager - 表管理
```python
from src.data.db_manager import TableManager
# 创建表管理器
manager = TableManager()
# 从 DataFrame 创建表(自动创建复合索引)
import pandas as pd
data = pd.DataFrame({
"ts_code": ["000001.SZ"],
"trade_date": ["20240101"],
"close": [10.5],
})
manager.create_table_from_dataframe("daily", data)
# 确保表存在(不存在则自动创建)
manager.ensure_table_exists("daily", sample_data=data)
```
### 2. IncrementalSync - 增量同步
```python
from src.data.db_manager import IncrementalSync
sync = IncrementalSync()
# 获取同步策略
strategy, start, end, stocks = sync.get_sync_strategy(
table_name="daily",
start_date="20240101",
end_date="20240131",
stock_codes=None # None = 所有股票
)
# 返回值:
# - strategy: "by_date" | "by_stock" | "none"
# - start: 同步开始日期
# - end: 同步结束日期
# - stocks: 需要同步的股票列表None = 全部)
# 执行数据同步
result = sync.sync_data("daily", data, strategy="by_date")
```
### 3. SyncManager - 高级同步
```python
from src.data.db_manager import SyncManager
from src.data.api_wrappers import get_daily
# 创建同步管理器
manager = SyncManager()
# 一键同步(自动处理表创建、策略选择、数据获取)
result = manager.sync(
table_name="daily",
fetch_func=get_daily, # 数据获取函数
start_date="20240101",
end_date="20240131",
stock_codes=["000001.SZ", "600000.SH"] # 可选:指定股票
)
print(result)
# {
# "status": "success",
# "table": "daily",
# "strategy": "by_date",
# "rows": 1000,
# "date_range": "20240101 to 20240131"
# }
```
## 便捷函数
### 快速同步数据
```python
from src.data.db_manager import sync_table
from src.data.api_wrappers import get_daily
# 同步日线数据
result = sync_table(
table_name="daily",
fetch_func=get_daily,
start_date="20240101",
end_date="20240131"
)
```
### 获取表信息
```python
from src.data.db_manager import get_table_info
# 查看表统计信息
info = get_table_info("daily")
print(info)
# {
# "exists": True,
# "row_count": 100000,
# "min_date": "20240101",
# "max_date": "20240131",
# "unique_stocks": 5000
# }
```
### 确保表存在
```python
from src.data.db_manager import ensure_table
# 如果表不存在,使用 sample_data 创建
ensure_table("daily", sample_data=df)
```
## 同步策略详解
### 1. 按日期同步 (by_date)
**适用场景**: 全市场数据同步、每日增量更新
**逻辑**:
- 表不存在 → 全量同步
- 表存在但空 → 全量同步
- 表存在且有数据 → 从 `last_date + 1` 开始增量同步
```python
# 示例: 表已有数据到 20240115
strategy, start, end, stocks = sync.get_sync_strategy(
"daily", "20240101", "20240131"
)
# 返回: ("by_date", "20240116", "20240131", None)
# 只需同步 16-31 号的新数据
```
### 2. 按股票同步 (by_stock)
**适用场景**: 补充特定股票的历史数据
**逻辑**:
- 检查哪些请求的股票不存在于表中
- 仅同步缺失的股票
```python
# 示例: 表中已有 000001.SZ请求两只股票
strategy, start, end, stocks = sync.get_sync_strategy(
"daily", "20240101", "20240131",
stock_codes=["000001.SZ", "600000.SH"]
)
# 返回: ("by_stock", "20240101", "20240131", ["600000.SH"])
# 只同步缺失的 600000.SH
```
### 3. 无需同步 (none)
**适用场景**: 数据已是最新
**触发条件**:
- 表存在且日期已覆盖请求范围
- 所有请求的股票都已存在
## 完整示例
```python
from src.data.db_manager import SyncManager, get_table_info
from src.data.api_wrappers import get_daily
# 1. 查看当前表状态
info = get_table_info("daily")
print(f"当前数据: {info['row_count']} 行, 最新日期: {info['max_date']}")
# 2. 创建同步管理器
manager = SyncManager()
# 3. 执行同步
result = manager.sync(
table_name="daily",
fetch_func=get_daily,
start_date="20240101",
end_date="20240222"
)
# 4. 检查结果
if result["status"] == "success":
print(f"成功同步 {result['rows']} 行数据")
print(f"使用策略: {result['strategy']}")
elif result["status"] == "skipped":
print("数据已是最新,无需同步")
else:
print(f"同步失败: {result.get('error')}")
```
## 类型映射
默认字段类型映射:
```python
DEFAULT_TYPE_MAPPING = {
"ts_code": "VARCHAR(16)",
"trade_date": "DATE",
"open": "DOUBLE",
"high": "DOUBLE",
"low": "DOUBLE",
"close": "DOUBLE",
"pre_close": "DOUBLE",
"change": "DOUBLE",
"pct_chg": "DOUBLE",
"vol": "DOUBLE",
"amount": "DOUBLE",
"turnover_rate": "DOUBLE",
"volume_ratio": "DOUBLE",
"adj_factor": "DOUBLE",
"suspend_flag": "INTEGER",
}
```
未定义字段会根据 pandas dtype 自动推断:
- `int``INTEGER`
- `float``DOUBLE`
- `bool``BOOLEAN`
- `datetime``TIMESTAMP`
- 其他 → `VARCHAR`
## 索引策略
自动创建的索引:
1. **主键**: `(ts_code, trade_date)` - 确保数据唯一性
2. **复合索引**: `(trade_date, ts_code)` - 优化按日期查询性能
## 与旧代码的兼容性
原有 `Storage``ThreadSafeStorage` API 保持不变:
```python
from src.data.storage import Storage, ThreadSafeStorage
# 旧代码继续可用
storage = Storage()
storage.save("daily", data)
df = storage.load("daily", start_date="20240101")
```
新增的功能通过 `db_manager` 模块提供。
## 性能建议
1. **批量写入**: 使用 `SyncManager` 自动处理批量写入
2. **避免重复查询**: 使用 `get_table_info()` 检查现有数据
3. **合理选择策略**: 全市场更新用 `by_date`,补充数据用 `by_stock`
4. **利用索引**: 查询时优先使用 `trade_date``ts_code` 过滤

View File

@@ -1,707 +0,0 @@
# 🚀 量化因子计算框架抽象设计与实施蓝图
## 一、 系统架构设计(四层解耦模型)
本系统采用严格的分层架构,每一层只需关注自己的输入与输出,层与层之间通过标准化的数据结构(如抽象语法树、需求清单、物理执行图)进行通信。
### 1. 领域特定语言层DSL Layer / 用户层)
* **职责**:提供对量化研究员极度友好的因子表达式编写接口,屏蔽所有底层计算引擎和数据库的痕迹。
* **输入**:研究员编写的数学与逻辑表达式。
* **输出**:纯粹的、无状态的**抽象语法树AST**。
* **边界约束**:本层绝对不允许依赖任何外部数据处理库。它只负责描述“计算逻辑是什么”,不涉及“怎么算”和“数据在哪”。
### 2. 编译与分析层Compiler Layer / 解析层)
* **职责**:接收 DSL 层生成的 AST进行语法树分析与优化。
* **核心动作 1依赖提取**。遍历语法树,找出所有的“叶子节点”(即基础数据字段),生成全局数据需求清单。
* **核心动作 2图优化可选**。识别重复的子表达式结构,进行合并计算标记。
* **输出**结构化的数据依赖清单Set/List和经过校验的 AST。
### 3. 动态数据路由层Data Router Layer / IO 层)
* **职责**:充当量化系统与底层多表数据库之间的桥梁。
* **核心逻辑**:基于元数据字典(记录字段所属的数据库表及数据频度),将分析层传递的“数据需求清单”转化为对数据库的最优查询指令。
* **输出**在内存中组装好的、经过严格时间对齐与防未来函数处理的、极简的数据上下文Data Context
### 4. 物理执行引擎层Execution Engine / 计算层)
* **职责**:将抽象的计算逻辑映射到具体的硬件或高性能计算库(如 Polars/向量化引擎)上并执行。
* **核心逻辑**:遍历 AST将其翻译为物理引擎的执行算子。在这个翻译过程中**系统隐式地强制注入量化计算的安全规则**(如截面分组、时序分组)。
* **输出**:最终的因子计算结果(面板数据表)。
---
## 二、 核心机制的具体实现逻辑(非代码描述)
为了让 AI 准确理解你的意图,你需要向 AI 阐明以下四个核心逻辑的运作机制:
### 1. 表达式树的生成机制 (符号化运算)
* **逻辑说明**:定义基础的变量节点(代表底层字段)和操作节点(代表加减乘除或函数)。通过重载面向对象语言的原生运算符(如算术运算符、比较运算符),使得变量节点参与运算时,不会抛出错误或执行计算,而是生成一个新的、包含左右子节点和操作符的父节点。
* **结果**:一个复杂的数学公式最终在内存里会变成一棵树状的数据结构。
### 2. 动态 SQL 生成与按需加载机制
* **逻辑说明**:系统初始化时,加载一次数据库元数据(表名、列名、更新频率),形成路由字典。当收到需求清单时,系统不使用 `SELECT *`,而是通过路由字典找到字段对应的表,动态拼接 `SELECT [必要关联键], [需求字段] FROM [表名] WHERE[时间与股票池过滤]`
* **结果**:极大降低数据库的 I/O 压力和网络传输负载。
### 3. 数据对齐与防未来函数机制(极其重要)
数据在内存中合并时,必须根据表的“频度属性”采取不同的关联策略:
* **同行频表(如日频基础与日频行情)**:以基准时间轴为左表,严格按照 `[资产标识, 交易日]` 进行精确匹配连接。
* **低频事件表(如财务报表)**:绝不能按自然日期或报告期关联。必须以“财报实际披露日”作为右表时间键,采用**“就近向后寻找匹配Asof Join / Point-in-Time Join”**策略。即某一天的财务数据,只能使用该日期之前(含当天)最新发布的那份财报。
* **防错铁律**:拼表完成后,必须强制按照 `[资产标识, 交易日]` 的优先级进行升序排序,为后续的滑动窗口计算提供物理连续性保障。
### 4. 算子翻译与引擎方言注入机制
物理层在将 AST 翻译为引擎执行图时,必须自动附加以下安全约束,这是研究员无需关心但系统必须保证的:
* **时序算子(如移动平均、动量)**:翻译时,必须向引擎下达强制指令——“本计算窗口必须被严格限制在单一资产的边界内”。
* **截面算子(如截面排名、行业中性化)**:翻译时,必须向引擎下达强制指令——“本计算必须在同一个交易日切片内横向展开”。
---
## 三、 Vibe Coding 实施与 Prompt 投喂计划
在利用 AI 编写代码时,建议按照以下阶段逐步进行(可作为每个阶段发给 AI 的指令纲要):
### 里程碑 1构建抽象语法树引擎 (DSL & AST)
* **任务指派**:要求 AI 设计一套纯粹的表达式树数据结构。包含基础节点类、变量节点类、二元/一元操作节点类、以及函数调用节点类。
* **验收标准**:通过重载运算符,可以随意组合变量(如 A, B, C并且编写一个简单的打印函数能够以可视化的方式或 JSON 结构)输出这棵树的层次关系。绝不包含任何第三方数据处理库。
### 里程碑 2实现依赖解析器 (Compiler)
* **任务指派**:要求 AI 编写一个树遍历器(如使用 Visitor 模式)。该遍历器接收里程碑 1 产生的树根节点,递归访问所有分支,收集所有叶子节点(变量节点)的名称。
* **验收标准**:输入一个深层嵌套的复杂公式树,解析器能够准确、去重地返回该公式依赖的所有底层基础字段名称列表。
### 里程碑 3构建元数据路由与动态组装器 (Data Router)
* **任务指派**:要求 AI 设计一个数据上下文管理器。
1. 实现注册机制,能接收不同表的数据字典和频度属性(日频或 PIT 低频)。
2. 根据里程碑 2 提取的依赖列表,自动分配表归属,并生成最小化拉取数据的伪代码或抽象 SQL 查询计划。
3. 阐明并在代码结构中实现不同频度数据的合并对齐逻辑(精确连接与就近前向连接),以及最后的全局强制排序逻辑。
* **验收标准**:输入几个测试字段,管理器能正确输出不同表的查询指令清单,并展现合并逻辑的抽象流程。
### 里程碑 4构建物理引擎翻译器 (Translator)
* **任务指派**:指定一个高性能计算库(如 Polars。要求 AI 编写一个翻译层,接收里程碑 1 的树节点,递归转化为该计算库的原生表达式对象。
* **验收约束**:在这个环节,要求 AI 必须在翻译时序函数时自动附加资产分组属性,在翻译截面函数时自动附加日期分组属性。
* **验收标准**:输入的抽象树被成功转化为计算引擎可以识别的执行计划对象,且分组属性被正确挂载。
### 里程碑 5系统顶层编排与端到端测试 (Orchestrator)
* **任务指派**:要求 AI 编写一个对外的 `FactorEngine` 类,作为系统的统一入口。
* **执行流编排**:接收研究员的表达式 -> 调用编译器解析依赖 -> 调用路由器连接数据库拉取并组装核心宽表 -> 调用翻译器生成物理执行计划 -> 将计划提交给计算引擎执行并行运算。
* **验收标准**:模拟少量的内存数据作为假数据库,完整跑通一条“从表达式注册,到自动按需取数,最终输出包含因子结果数据表”的全流程链路。
---
## 四、 详细设计规范(新增)
### 4.1 五层架构总览
```
┌─────────────────────────────────────────────────────────────────┐
│ Layer 5: 编排层 (Orchestrator) │
│ - FactorEngine: 统一入口 │
│ - 协调各层工作流 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Layer 4: 物理执行引擎层 (Execution Engine) │
│ - PolarsTranslator: AST → Polars表达式 │
│ - 自动注入分组约束(截面/时序) │
│ - 执行计算并返回结果 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Layer 3: 动态数据路由层 (Data Router) │
│ - MetadataRegistry: 字段→表映射 │
│ - QueryPlanner: 生成最优查询计划 │
│ - DataAligner: PIT对齐与防未来函数处理 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Layer 2: 编译与分析层 (Compiler) │
│ - DependencyExtractor: 提取数据依赖 │
│ - GraphOptimizer: 子表达式合并(预留接口) │
│ - 输出: 数据需求清单 + 优化后的AST │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Layer 1: DSL层 (领域特定语言) │
│ - AST节点: Field, BinaryOp, UnaryOp, FunctionCall, Constant │
│ - 算子库: ts_* (时序), cs_* (截面), math_* (数学) │
│ - 运算符重载: +, -, *, /, >, <, == 等 │
└─────────────────────────────────────────────────────────────────┘
```
---
### 4.2 Layer 1: DSL层详细设计
#### 核心设计原则
- **算子与数据解耦**:算子只描述计算逻辑,不绑定具体数据
- **纯表达式树**输出无状态的AST不涉及任何外部库
- **延迟执行**:表达式构建时不执行计算,只生成树结构
#### AST节点类型体系
```python
# 节点基类
class ASTNode(ABC):
"""AST节点基类"""
@abstractmethod
def accept(self, visitor: "NodeVisitor") -> Any:
"""接受访问者"""
pass
@abstractmethod
def get_children(self) -> List["ASTNode"]:
"""获取子节点列表"""
pass
# 1. 字段节点(叶子节点)
class Field(ASTNode):
"""
字段节点 - 代表底层数据字段
示例: close, volume, pe, pb
"""
name: str # 字段名
dtype: Optional[str] = None # 数据类型提示
# 2. 常量节点(叶子节点)
class Constant(ASTNode):
"""
常量节点 - 代表常量值
示例: 5, 10.5, "20240101"
"""
value: Union[int, float, str]
dtype: str
# 3. 二元操作节点
class BinaryOp(ASTNode):
"""
二元操作节点
支持的运算符: +, -, *, /, //, %, **, >, >=, <, <=, ==, !=, &, |
"""
op: str # '+', '-', '*', '/', '>', etc.
left: ASTNode
right: ASTNode
# 4. 一元操作节点
class UnaryOp(ASTNode):
"""
一元操作节点
支持的运算符: -, +, ~, abs
"""
op: str # '-', '+', '~', 'abs'
operand: ASTNode
# 5. 函数调用节点
class FunctionCall(ASTNode):
"""
函数调用节点 - 代表算子调用
示例: ts_mean(close, 20), cs_rank(pe)
"""
name: str # 函数名
args: List[ASTNode]
kwargs: Dict[str, Any]
func_type: str # "timeseries" | "cross_sectional" | "math"
```
#### 运算符重载规则
在 ASTNode 基类中实现运算符重载:
```python
class ASTNode:
# 算术运算符
def __add__(self, other) -> BinaryOp:
return BinaryOp("+", self, _ensure_node(other))
def __sub__(self, other) -> BinaryOp:
return BinaryOp("-", self, _ensure_node(other))
def __mul__(self, other) -> BinaryOp:
return BinaryOp("*", self, _ensure_node(other))
def __truediv__(self, other) -> BinaryOp:
return BinaryOp("/", self, _ensure_node(other))
# 反向运算符(支持 5 * field
def __radd__(self, other) -> BinaryOp:
return BinaryOp("+", _ensure_node(other), self)
def __rmul__(self, other) -> BinaryOp:
return BinaryOp("*", _ensure_node(other), self)
# 比较运算符
def __gt__(self, other) -> BinaryOp:
return BinaryOp(">", self, _ensure_node(other))
def __lt__(self, other) -> BinaryOp:
return BinaryOp("<", self, _ensure_node(other))
# 一元运算符
def __neg__(self) -> UnaryOp:
return UnaryOp("-", self)
```
#### 算子库规范
算子按功能分为三类:
| 前缀 | 类别 | 说明 | 示例 |
|------|------|------|------|
| `ts_` | 时序算子 | 在时间序列上计算,需按股票分组 | `ts_mean`, `ts_std`, `ts_sum` |
| `cs_` | 截面算子 | 在截面上计算,需按日期分组 | `cs_rank`, `cs_zscore`, `cs_percentile` |
| `math_` | 数学算子 | 逐元素计算,无需分组 | `math_log`, `math_exp`, `math_sqrt` |
**时序算子列表ts_*)**
```python
ts_mean(field, window: int) # 移动平均
ts_std(field, window: int) # 移动标准差
ts_sum(field, window: int) # 移动求和
ts_max(field, window: int) # 移动最大值
ts_min(field, window: int) # 移动最小值
ts_delta(field, period: int = 1) # 差分
ts_pct_change(field, period: int = 1) # 百分比变化
ts_corr(f1, f2, window: int) # 滚动相关系数
```
**截面算子列表cs_*)**
```python
cs_rank(field) # 截面排名0-1
cs_percentile(field) # 截面分位数
cs_zscore(field) # Z-Score标准化
cs_mean(field) # 截面均值
cs_std(field) # 截面标准差
```
**数学算子列表math_*)**
```python
math_log(field) # 自然对数
math_exp(field) # 指数
math_sqrt(field) # 平方根
math_abs(field) # 绝对值
```
#### 表达式构建示例
```python
from src.factors.dsl import Field, ts_mean, cs_rank
# ========== 示例 1: 简单移动平均线因子 ==========
close = Field("close")
ma20 = ts_mean(close, 20)
factor1 = ma20
# ========== 示例 2: 双均线差值因子 ==========
close = Field("close")
ma20 = ts_mean(close, 20)
ma5 = ts_mean(close, 5)
factor2 = (ma20 - ma5) / close
# ========== 示例 3: 复杂多因子组合 ==========
close = Field("close")
volume = Field("volume")
pe = Field("pe")
price_momentum = ts_pct_change(close, 20)
vol_ma = ts_mean(volume, 20)
vol_ratio = volume / vol_ma
pe_rank = cs_rank(pe)
factor3 = price_momentum * 0.4 + vol_ratio * 0.3 + pe_rank * 0.3
```
---
### 4.3 Layer 2: 编译层详细设计
#### 依赖提取器
```python
class DependencyExtractor(NodeVisitor):
"""
依赖提取器 - 遍历AST收集数据依赖
输出: DataRequirement
- fields: Set[str] 需要的字段列表
- min_lookback: Dict[str, int] 每个字段的最小回看天数
"""
def __init__(self):
self.fields: Set[str] = set()
self.field_lookback: Dict[str, int] = defaultdict(int)
def visit_field(self, node: Field) -> None:
"""记录字段依赖"""
self.fields.add(node.name)
self.field_lookback[node.name] = max(
self.field_lookback[node.name], 1
)
def visit_function_call(self, node: FunctionCall) -> None:
"""处理函数调用,提取窗口参数"""
for arg in node.args:
arg.accept(self)
if node.func_type == "timeseries":
window = self._extract_window(node)
self._update_lookback(node.args[0], window)
def extract(self, root: ASTNode) -> DataRequirement:
"""执行提取"""
root.accept(self)
return DataRequirement(
fields=self.fields,
lookback=dict(self.field_lookback)
)
```
#### 数据需求规格
```python
@dataclass
class DataRequirement:
"""
数据需求规格
属性:
fields: 需要的字段集合
lookback: 每个字段需要回看的天数
date_range: 计算日期范围 (start, end)
"""
fields: Set[str]
lookback: Dict[str, int]
date_range: Optional[Tuple[str, str]] = None
def get_max_lookback(self) -> int:
"""获取最大回看天数"""
return max(self.lookback.values()) if self.lookback else 1
```
---
### 4.4 Layer 3: 数据路由层详细设计
#### 元数据注册表
```python
@dataclass
class FieldMetadata:
"""
字段元数据
属性:
name: 字段名
table: 所属表名
dtype: 数据类型
freq: 数据频度 ("daily", "quarterly", "pit")
announce_date_field: 公告日字段名PIT数据使用
"""
name: str
table: str
dtype: str
freq: str
announce_date_field: Optional[str] = None
class MetadataRegistry:
"""
元数据注册表 - 管理字段到表的映射
单例模式,系统启动时加载配置
"""
def register(self, metadata: FieldMetadata) -> None:
"""注册字段元数据"""
pass
def get_table(self, field: str) -> str:
"""获取字段所属表"""
pass
def group_by_table(self, fields: Set[str]) -> Dict[str, Set[str]]:
"""按表分组字段"""
pass
```
#### PIT对齐策略
```python
class DataAligner:
"""
数据对齐器 - 处理多表数据合并与PIT对齐
"""
def align(
self,
dataframes: Dict[str, pl.DataFrame],
plans: List[QueryPlan]
) -> pl.DataFrame:
"""
对齐并合并多个数据表
步骤:
1. 分离日频表和PIT表
2. 日频表直接join
3. PIT表使用asof join
4. 最终排序
"""
pass
def _asof_join(
self,
left: pl.DataFrame,
right: pl.DataFrame,
announce_date_field: str
) -> pl.DataFrame:
"""
执行PIT asof join
策略: 对于每个交易日,使用最新公告的数据
"""
return left.join_asof(
right,
left_on="trade_date",
right_on=announce_date_field,
by="ts_code",
strategy="backward"
)
```
---
### 4.5 Layer 4: 执行引擎层详细设计
#### Polars翻译器
```python
class PolarsTranslator(NodeVisitor):
"""
Polars翻译器 - 将AST翻译为Polars表达式
"""
def __init__(self, df: pl.LazyFrame):
self.df = df
def translate(self, root: ASTNode) -> pl.Expr:
"""翻译AST为Polars表达式"""
return root.accept(self)
def visit_field(self, node: Field) -> pl.Expr:
"""字段 → pl.col()"""
return pl.col(node.name)
def visit_binary_op(self, node: BinaryOp) -> pl.Expr:
"""二元操作 → Polars运算符"""
left = node.left.accept(self)
right = node.right.accept(self)
ops = {
"+": lambda a, b: a + b,
"-": lambda a, b: a - b,
"*": lambda a, b: a * b,
"/": lambda a, b: a / b,
}
return ops[node.op](left, right)
def visit_function_call(self, node: FunctionCall) -> pl.Expr:
"""
函数调用 → Polars窗口函数
关键根据func_type注入分组约束
"""
args = [arg.accept(self) for arg in node.args]
impl = self._get_impl(node.name)
if node.func_type == "timeseries":
return impl(*args).over("ts_code")
elif node.func_type == "cross_sectional":
return impl(*args).over("trade_date")
else:
return impl(*args)
```
#### 分组约束注入规则
```python
# 时序算子:按股票分组,确保滚动窗口不跨股票
def inject_timeseries_constraint(expr: pl.Expr) -> pl.Expr:
return expr.over("ts_code")
# 截面算子:按日期分组,确保排名在每天内部进行
def inject_cross_sectional_constraint(expr: pl.Expr) -> pl.Expr:
return expr.over("trade_date")
```
---
### 4.6 Layer 5: 编排层详细设计
#### FactorEngine
```python
class FactorEngine:
"""
因子执行引擎 - 系统统一入口
"""
def __init__(
self,
data_source: DataSource,
registry: MetadataRegistry
):
self.data_source = data_source
self.registry = registry
self.compiler = Compiler()
self.planner = QueryPlanner(registry)
self.aligner = DataAligner()
def compute(
self,
expression: ASTNode,
start_date: str,
end_date: str,
stock_codes: Optional[List[str]] = None
) -> pl.DataFrame:
"""
计算因子表达式
执行流程:
1. 编译:提取数据依赖
2. 规划:生成查询计划
3. 加载:从数据源获取数据
4. 对齐PIT对齐与合并
5. 翻译AST → Polars表达式
6. 执行:计算并返回结果
"""
# Step 1: 编译
requirement = self.compiler.extract_dependency(expression)
requirement.date_range = (start_date, end_date)
# Step 2: 规划
plans = self.planner.plan(requirement)
# Step 3: 加载
raw_data = {}
for plan in plans:
df = self.data_source.load(...)
raw_data[plan.table] = df
# Step 4: 对齐
aligned_data = self.aligner.align(raw_data, plans)
# Step 5: 翻译
translator = PolarsTranslator(aligned_data.lazy())
polars_expr = translator.translate(expression)
# Step 6: 执行
result = aligned_data.with_columns(
polars_expr.alias("factor_value")
)
return result
```
---
## 五、 实施路线图(详细版)
### 阶段1: 基础架构Layer 1 + Layer 2
**目标**: 实现DSL表达式树和依赖提取
**任务清单**:
- [ ] 实现AST节点类Field, Constant, BinaryOp, UnaryOp, FunctionCall
- [ ] 实现运算符重载
- [ ] 实现基础算子库ts_mean, ts_std, cs_rank等
- [ ] 实现DependencyExtractor
- [ ] 编写单元测试
**验收标准**:
```python
close = Field("close")
factor = ts_mean(close, 20) / close
deps = extract_dependencies(factor)
assert deps.fields == {"close"}
assert deps.lookback == {"close": 20}
```
### 阶段2: 数据层Layer 3
**目标**: 实现元数据管理和PIT对齐
**任务清单**:
- [ ] 实现MetadataRegistry
- [ ] 实现QueryPlanner
- [ ] 实现DataAligner含asof join
- [ ] 集成DuckDB数据源
### 阶段3: 执行层Layer 4
**目标**: 实现Polars翻译和执行
**任务清单**:
- [ ] 实现PolarsTranslator
- [ ] 实现算子到Polars的映射
- [ ] 实现分组约束注入
### 阶段4: 编排层Layer 5
**目标**: 实现FactorEngine统一入口
**任务清单**:
- [ ] 实现FactorEngine
- [ ] 整合各层组件
- [ ] 编写端到端测试
---
## 六、 关键设计决策
### 6.1 为什么使用Visitor模式
- **扩展性**: 新增节点类型只需添加visit方法
- **分离关注点**: 遍历逻辑与处理逻辑分离
- **类型安全**: 每个节点类型有明确的处理函数
### 6.2 为什么算子需要分类ts_/cs_/math_
- **显式分组**: 用户明确知道计算维度
- **约束注入**: 系统根据前缀自动注入正确的分组
- **错误预防**: 避免截面/时序算子混用导致的逻辑错误
### 6.3 向后兼容性
**决策**: 完全重构不保留旧API
**理由**:
- 新旧架构差异过大绑定vs解耦
- 保持旧API会增加维护负担
- 量化策略代码通常是一次性编写,迁移成本可控
---
## 七、 附录
### A. 完整算子列表
**时序算子 (ts_*)**: ts_mean, ts_std, ts_var, ts_sum, ts_max, ts_min, ts_product, ts_median, ts_argmax, ts_argmin, ts_skew, ts_kurt, ts_delta, ts_pct_change, ts_corr, ts_cov, ts_rank
**截面算子 (cs_*)**: cs_rank, cs_percentile, cs_zscore, cs_mean, cs_std, cs_median, cs_max, cs_min
**数学算子 (math_*)**: math_log, math_log1p, math_exp, math_sqrt, math_abs, math_sign, math_power
### B. 元数据配置示例
```python
METADATA = [
{"name": "close", "table": "daily", "dtype": "float64", "freq": "daily"},
{"name": "volume", "table": "daily", "dtype": "float64", "freq": "daily"},
{"name": "pe", "table": "daily", "dtype": "float64", "freq": "daily"},
{"name": "eps", "table": "financial_income", "dtype": "float64",
"freq": "pit", "announce_date_field": "ann_date"},
]
```
### C. 与现有代码对比
| 维度 | 现有实现 | 新设计 |
|------|---------|--------|
| 因子定义 | 类继承 | 表达式 |
| 数据绑定 | data_specs硬编码 | 元数据注册表 |
| 组合方式 | CompositeFactor包装 | AST节点自然组合 |
| 执行时机 | 立即执行 | 延迟执行 |
| 防泄露 | 手动控制 | 自动注入分组约束 |
| 可优化性 | 低 | 高 |
---
**文档版本**: 2.0 | **更新日期**: 2026-02-26

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,211 +0,0 @@
# ProStock HDF5 到 DuckDB 迁移测试报告
**报告生成时间**: 2026-02-22
**完成时间**: 2026-02-22
**状态**: ✅ 已完成
**迁移文档**: [hdf5_to_duckdb_migration.md](./hdf5_to_duckdb_migration.md)
**测试数据范围**: 2024年1月-3月3个月
---
## 1. 迁移实施摘要
### 已完成的核心任务 ✅
| 任务 | 文件 | 状态 |
|------|------|------|
| Storage 类重写 | `src/data/storage.py` | ✅ 完成 |
| ThreadSafeStorage 实现 | `src/data/storage.py` | ✅ 完成 |
| Sync 模块适配 | `src/data/sync.py` | ✅ 完成 |
| DataLoader 适配 | `src/factors/data_loader.py` | ✅ 完成 |
| 测试文件更新 | `tests/` | ✅ 完成 |
### 架构变更
```
HDF5 格式 (.h5 文件) → DuckDB (prostock.db)
├── pandas.read_hdf() → duckdb.execute().fetchdf()
├── 全表加载到内存 → SQL 查询下推,按需加载
├── 文件锁并发 → ThreadSafeStorage 队列写入
└── Polars 通过 Pandas 中转 → DuckDB → PyArrow → Polars (零拷贝)
```
---
## 2. 测试执行情况
### 2.1 测试文件清单
| 测试文件 | 测试类型 | 数据范围 |
|---------|---------|---------|
| `test_daily_storage.py` | DuckDB Storage 集成测试 | 3个月2024/01-03 |
| `test_data_loader.py` | DataLoader 功能测试 | 3个月2024/01-03 |
| `test_sync.py` | Sync 模块单元测试 | Mock 数据 |
### 2.2 关键测试用例
#### DuckDB Storage 测试 (`test_daily_storage.py`)
```python
class TestDailyStorageValidation:
TEST_START_DATE = "20240101"
TEST_END_DATE = "20240331" # 3个月数据
def test_duckdb_connection() # ✅ 连接测试
def test_load_3months_data() # ⚠️ 需要先有数据
def test_polars_export() # ✅ PyArrow 零拷贝导出
def test_all_stocks_saved() # ⚠️ 需要先有数据
```
#### DataLoader 测试 (`test_data_loader.py`)
```python
class TestDataLoaderBasic:
def test_load_single_source() # 从 DuckDB 加载
def test_load_with_date_range() # 3个月日期范围
def test_column_selection() # 列选择
def test_cache_used() # 缓存性能
```
---
## 3. 性能对比预期
| 测试项 | HDF5 (旧) | DuckDB (新) | 预期提升 |
|--------|----------|------------|---------|
| 单股票查询 | 5-10s | 0.1-0.5s | **10-100x** |
| 日期范围查询 | 5-10s | 0.2-1s | **5-50x** |
| 内存占用 | 1GB+ | 100-500MB | **50-90%** |
---
## 4. 使用前准备
### 4.1 数据同步(必须)
当前数据库中没有 2024年1-3月的测试数据需要先进行数据同步
```bash
# 方式1: 同步特定股票代码的3个月数据推荐用于测试
uv run python -c "
from src.data.sync import DataSync
from src.data.api_wrappers import get_daily
import pandas as pd
# 获取测试股票数据
data = get_daily('000001.SZ', start_date='20240101', end_date='20240331')
# 保存到 DuckDB
from src.data.storage import Storage
storage = Storage()
storage.save('daily', data)
print(f'已保存 {len(data)} 行数据')
"
# 方式2: 全量同步所有股票(耗时较长)
uv run python -c "from src.data.sync import sync_all; sync_all(force_full=True)"
# 方式3: 增量同步(从上次同步日期继续)
uv run python -c "from src.data.sync import sync_all; sync_all()"
```
### 4.2 验证安装
```bash
# 检查 DuckDB 和 PyArrow 是否安装
uv run python -c "import duckdb; import pyarrow; print('✅ 依赖检查通过')"
# 验证 Storage 类
uv run python -c "from src.data.storage import Storage, ThreadSafeStorage; print('✅ Storage 类导入成功')"
```
---
## 5. 运行测试
### 5.1 运行所有测试
```bash
# 运行 DuckDB 相关测试
uv run pytest tests/test_daily_storage.py tests/factors/test_data_loader.py -v
# 运行 Sync 模块测试
uv run pytest tests/test_sync.py -v
# 运行全部测试
uv run pytest tests/ -v
```
### 5.2 预期输出
```
tests/test_daily_storage.py::TestDailyStorageValidation::test_duckdb_connection PASSED
tests/test_daily_storage.py::TestDailyStorageValidation::test_polars_export PASSED
tests/factors/test_data_loader.py::TestDataLoaderBasic::test_load_single_source PASSED
tests/factors/test_data_loader.py::TestDataLoaderBasic::test_load_with_date_range PASSED
...
```
---
## 6. 常见问题 (FAQ)
### Q: 测试提示 "No data found for period"
**A**: 需要先执行数据同步,将 2024年1-3月的数据写入 DuckDB。
### Q: ModuleNotFoundError: No module named 'pyarrow'
**A**: 需要安装 pyarrow
```bash
uv pip install pyarrow
```
### Q: 如何查看数据库中的数据?
**A**:
```python
from src.data.storage import Storage
storage = Storage()
# 检查表是否存在
print(storage.exists("daily")) # True/False
# 查询最新日期
print(storage.get_last_date("daily")) # "20240331"
```
### Q: 如何备份 DuckDB 数据库?
**A**:
```bash
# 备份
cp data/prostock.db data/prostock_backup.db
# 恢复
cp data/prostock_backup.db data/prostock.db
```
---
## 7. 迁移验证清单
- [x] Storage 类实现 DuckDB 存储
- [x] ThreadSafeStorage 实现并发安全
- [x] DataLoader 适配 DuckDB
- [x] Sync 模块使用 ThreadSafeStorage
- [x] 测试文件更新为 3 个月数据范围
- [x] PyArrow 零拷贝导出支持
- [ ] 执行数据同步(需手动运行)
- [ ] 运行全部测试通过(需先有数据)
- [ ] 性能基准测试对比
---
## 8. 下一步行动
1. **数据同步**: 运行上述 4.1 节的数据同步命令
2. **测试验证**: 运行 `uv run pytest tests/ -v` 确认所有测试通过
3. **性能测试**: 使用 `scripts/benchmark_storage.py` 对比 HDF5 vs DuckDB 性能
4. **生产部署**: 备份 HDF5 文件,删除旧数据,完全切换到 DuckDB
---
**报告生成**: ProStock Migration Tool
**状态**: 核心代码完成,等待数据同步后运行测试