Compare commits

...

3 Commits

Author SHA1 Message Date
0e9ea5d533 refactor(experiment): 提取共用配置到 common 模块
- 将因子定义、日期配置、股票池筛选等提取到 common.py
- 重构 learn_to_rank 和 regression 脚本,统一使用公共配置
- 简化代码结构,消除重复定义
2026-03-15 05:46:19 +08:00
6927d20de1 feat(training): LightGBM支持验证集早停
- 为fit方法添加eval_set参数,支持验证集评估和早停

- 因子引擎简化初始化,移除metadata_path参数

- 回归实验精简因子定义,移除冗余因子库
2026-03-14 22:51:24 +08:00
5541373ded feat(probe-selection): 添加探针法因子筛选模块 2026-03-14 22:50:32 +08:00
16 changed files with 2637 additions and 1052 deletions

278
src/experiment/common.py Normal file
View File

@@ -0,0 +1,278 @@
"""实验脚本的共用配置和辅助函数。
此模块包含 regression.py 和 learn_to_rank.py 共用的代码,
避免重复维护两份相同的配置和函数。
"""
from datetime import datetime
from typing import List
import polars as pl
from src.factors import FactorEngine
# =============================================================================
# 日期范围配置(正确的 train/val/test 三分法)
# =============================================================================
TRAIN_START = "20200101"
TRAIN_END = "20231231"
VAL_START = "20240101"
VAL_END = "20241231"
TEST_START = "20250101"
TEST_END = "20261231"
# =============================================================================
# 因子配置
# =============================================================================
# 当前选择的因子列表(从 FACTOR_DEFINITIONS 中选择要使用的因子)
SELECTED_FACTORS = [
# ================= 1. 价格、趋势与路径依赖 =================
"ma_5",
"ma_20",
"ma_ratio_5_20",
"bias_10",
"high_low_ratio",
"bbi_ratio",
"return_5",
"return_20",
"kaufman_ER_20",
"mom_acceleration_10_20",
"drawdown_from_high_60",
"up_days_ratio_20",
# ================= 2. 波动率、风险调整与高阶矩 =================
"volatility_5",
"volatility_20",
"volatility_ratio",
"std_return_20",
"sharpe_ratio_20",
"min_ret_20",
"volatility_squeeze_5_60",
# ================= 3. 日内微观结构与异象 =================
"overnight_intraday_diff",
"upper_shadow_ratio",
"capital_retention_20",
"max_ret_20",
# ================= 4. 量能、流动性与量价背离 =================
"volume_ratio_5_20",
"turnover_rate_mean_5",
"turnover_deviation",
"amihud_illiq_20",
"turnover_cv_20",
"pv_corr_20",
"close_vwap_deviation",
# ================= 5. 基本面财务特征 =================
"roe",
"roa",
"profit_margin",
"debt_to_equity",
"current_ratio",
"net_profit_yoy",
"revenue_yoy",
"healthy_expansion_velocity",
# ================= 6. 基本面估值与截面动量共振 =================
"EP",
"BP",
"CP",
"market_cap_rank",
"turnover_rank",
"return_5_rank",
"EP_rank",
"pe_expansion_trend",
"value_price_divergence",
"active_market_cap",
"ebit_rank",
]
# 因子定义字典完整因子库用于存放尚未注册到metadata的因子
FACTOR_DEFINITIONS = {}
def get_label_factor(label_name: str) -> dict:
"""获取Label因子定义字典。
Args:
label_name: label因子名称
Returns:
Label因子定义字典
"""
return {
label_name: "(ts_delay(close, -5) / ts_delay(open, -1)) - 1",
}
# =============================================================================
# 辅助函数
# =============================================================================
def register_factors(
engine: FactorEngine,
selected_factors: List[str],
factor_definitions: dict,
label_factor: dict,
) -> List[str]:
"""注册因子。
selected_factors 从 metadata 查询factor_definitions 用 DSL 表达式注册。
Args:
engine: FactorEngine实例
selected_factors: 从metadata中选择的因子名称列表
factor_definitions: 通过表达式定义的因子字典
label_factor: label因子定义字典
Returns:
特征列名称列表
"""
print("=" * 80)
print("注册因子")
print("=" * 80)
# 注册 SELECTED_FACTORS 中的因子(已在 metadata 中)
print("\n注册特征因子(从 metadata:")
for name in selected_factors:
engine.add_factor(name)
print(f" - {name}")
# 注册 FACTOR_DEFINITIONS 中的因子(通过表达式,尚未在 metadata 中)
print("\n注册特征因子(表达式):")
for name, expr in factor_definitions.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 注册 label 因子(通过表达式)
print("\n注册 Label 因子(表达式):")
for name, expr in label_factor.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 特征列 = SELECTED_FACTORS + FACTOR_DEFINITIONS 的 keys
feature_cols = selected_factors + list(factor_definitions.keys())
print(f"\n特征因子数: {len(feature_cols)}")
print(f" - 来自 metadata: {len(selected_factors)}")
print(f" - 来自表达式: {len(factor_definitions)}")
print(f"Label: {list(label_factor.keys())[0]}")
print(f"已注册因子总数: {len(engine.list_registered())}")
return feature_cols
def prepare_data(
engine: FactorEngine,
feature_cols: List[str],
start_date: str,
end_date: str,
label_name: str,
) -> pl.DataFrame:
"""准备数据。
计算因子并返回包含特征和label的数据框。
Args:
engine: FactorEngine实例
feature_cols: 特征列名称列表
start_date: 开始日期 (YYYYMMDD)
end_date: 结束日期 (YYYYMMDD)
label_name: label列名称
Returns:
包含因子计算结果的数据框
"""
print("\n" + "=" * 80)
print("准备数据")
print("=" * 80)
# 计算因子(全市场数据)
print(f"\n计算因子: {start_date} - {end_date}")
factor_names = feature_cols + [label_name] # 包含 label
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
print(f"数据形状: {data.shape}")
print(f"数据列: {data.columns}")
print(f"\n前5行预览:")
print(data.head())
return data
# =============================================================================
# 股票池筛选配置
# =============================================================================
def stock_pool_filter(df: pl.DataFrame) -> pl.Series:
"""股票池筛选函数(单日数据)。
筛选条件:
1. 排除创业板(代码以 300 开头)
2. 排除科创板(代码以 688 开头)
3. 排除北交所(代码以 8、9 或 4 开头)
4. 选取当日市值最小的500只股票
Args:
df: 单日数据框
Returns:
布尔Series表示哪些股票被选中
"""
# 代码筛选(排除创业板、科创板、北交所)
code_filter = (
~df["ts_code"].str.starts_with("30") # 排除创业板
& ~df["ts_code"].str.starts_with("68") # 排除科创板
& ~df["ts_code"].str.starts_with("8") # 排除北交所
& ~df["ts_code"].str.starts_with("9") # 排除北交所
& ~df["ts_code"].str.starts_with("4") # 排除北交所
)
# 在已筛选的股票中选取市值最小的500只
valid_df = df.filter(code_filter)
n = min(500, len(valid_df))
small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"]
# 返回布尔 Series是否在被选中的股票中
return df["ts_code"].is_in(small_cap_codes)
# 定义筛选所需的基础列
STOCK_FILTER_REQUIRED_COLUMNS = ["total_mv"]
# =============================================================================
# 输出配置
# =============================================================================
OUTPUT_DIR = "output"
SAVE_PREDICTIONS = True
PERSIST_MODEL = False
# Top N 配置:每日推荐股票数量
TOP_N = 5 # 可调整为 10, 20 等
def get_output_path(model_type: str, test_start: str, test_end: str) -> str:
"""生成输出文件路径。
Args:
model_type: 模型类型("regression""rank"
test_start: 测试开始日期
test_end: 测试结束日期
Returns:
输出文件路径
"""
import os
# 确保输出目录存在
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 生成文件名
start_dt = datetime.strptime(test_start, "%Y%m%d")
end_dt = datetime.strptime(test_end, "%Y%m%d")
date_str = f"{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}"
filename = f"{model_type}_output.csv"
return os.path.join(OUTPUT_DIR, filename)

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,4 @@
#%% md
# %% md
# # Learn-to-Rank 排序学习训练流程
# #
# 本 Notebook 实现基于 LightGBM LambdaRank 的排序学习训练,用于股票排序任务。
@@ -9,9 +9,9 @@
# 2. **排序学习**: 使用 LambdaRank 目标函数,学习每日股票排序
# 3. **NDCG 评估**: 使用 NDCG@1/5/10/20 评估排序质量
# 4. **策略回测**: 基于排序分数构建 Top-k 选股策略
#%% md
# %% md
# ## 1. 导入依赖
#%%
# %%
import os
from datetime import datetime
from typing import List, Tuple, Optional
@@ -36,78 +36,32 @@ from src.training import (
from src.training.components.models import LightGBMLambdaRankModel
from src.training.config import TrainingConfig
#%% md
# ## 2. 辅助函数
#%%
def register_factors(
engine: FactorEngine,
selected_factors: List[str],
factor_definitions: dict,
label_factor: dict,
) -> List[str]:
"""注册因子selected_factors 从 metadata 查询factor_definitions 用 DSL 表达式注册)"""
print("=" * 80)
print("注册因子")
print("=" * 80)
# 注册 SELECTED_FACTORS 中的因子(已在 metadata 中)
print("\n注册特征因子(从 metadata:")
for name in selected_factors:
engine.add_factor(name)
print(f" - {name}")
# 注册 FACTOR_DEFINITIONS 中的因子(通过表达式,尚未在 metadata 中)
print("\n注册特征因子(表达式):")
for name, expr in factor_definitions.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 注册 label 因子(通过表达式)
print("\n注册 Label 因子(表达式):")
for name, expr in label_factor.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 特征列 = SELECTED_FACTORS + FACTOR_DEFINITIONS 的 keys
feature_cols = selected_factors + list(factor_definitions.keys())
print(f"\n特征因子数: {len(feature_cols)}")
print(f" - 来自 metadata: {len(selected_factors)}")
print(f" - 来自表达式: {len(factor_definitions)}")
print(f"Label: {list(label_factor.keys())[0]}")
print(f"已注册因子总数: {len(engine.list_registered())}")
return feature_cols
# 从 common 模块导入共用配置和函数
from src.experiment.common import (
SELECTED_FACTORS,
FACTOR_DEFINITIONS,
get_label_factor,
register_factors,
prepare_data,
TRAIN_START,
TRAIN_END,
VAL_START,
VAL_END,
TEST_START,
TEST_END,
stock_pool_filter,
STOCK_FILTER_REQUIRED_COLUMNS,
OUTPUT_DIR,
SAVE_PREDICTIONS,
PERSIST_MODEL,
TOP_N,
)
def prepare_data(
engine: FactorEngine,
feature_cols: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""准备数据"""
print("\n" + "=" * 80)
print("准备数据")
print("=" * 80)
# 计算因子(全市场数据)
print(f"\n计算因子: {start_date} - {end_date}")
factor_names = feature_cols + [LABEL_NAME] # 包含 label
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
print(f"数据形状: {data.shape}")
print(f"数据列: {data.columns}")
print(f"\n前5行预览:")
print(data.head())
return data
# %% md
# ## 2. 本地辅助函数
# %%
# 注意register_factors 和 prepare_data 已从 common 模块导入
def prepare_ranking_data(
@@ -240,92 +194,22 @@ def evaluate_ndcg_at_k(
return results
#%% md
# %% md
# ## 3. 配置参数
# #
# ### 3.1 因子定义
#%%
# 特征因子定义字典(复用 regression.ipynb 的因子定义)
LABEL_NAME = "future_return_5_rank"
# ### 3.1 因子与日期配置
# %%
# 注意SELECTED_FACTORS, FACTOR_DEFINITIONS, 日期配置等已从 common 模块导入
# 本脚本特有的配置:
# 当前选择的因子列表(从 FACTOR_DEFINITIONS 中选择要使用的因子
SELECTED_FACTORS = [
# ================= 1. 价格、趋势与路径依赖 =================
"ma_5",
"ma_20",
"ma_ratio_5_20",
"bias_10",
"high_low_ratio",
"bbi_ratio",
"return_5",
"return_20",
"kaufman_ER_20",
"mom_acceleration_10_20",
"drawdown_from_high_60",
"up_days_ratio_20",
# ================= 2. 波动率、风险调整与高阶矩 =================
"volatility_5",
"volatility_20",
"volatility_ratio",
"std_return_20",
"sharpe_ratio_20",
"min_ret_20",
"volatility_squeeze_5_60",
# ================= 3. 日内微观结构与异象 =================
"overnight_intraday_diff",
"upper_shadow_ratio",
"capital_retention_20",
"max_ret_20",
# ================= 4. 量能、流动性与量价背离 =================
"volume_ratio_5_20",
"turnover_rate_mean_5",
"turnover_deviation",
"amihud_illiq_20",
"turnover_cv_20",
"pv_corr_20",
"close_vwap_deviation",
# ================= 5. 基本面财务特征 =================
"roe",
"roa",
"profit_margin",
"debt_to_equity",
"current_ratio",
"net_profit_yoy",
"revenue_yoy",
"healthy_expansion_velocity",
"ebit_rank",
# ================= 6. 基本面估值与截面动量共振 =================
"EP",
"BP",
"CP",
"market_cap_rank",
"turnover_rank",
"return_5_rank",
"EP_rank",
"pe_expansion_trend",
"value_price_divergence",
"active_market_cap",
]
# Label 名称(排序学习使用原始收益率,会后续转换为分位数标签
LABEL_NAME = "future_return_5"
# 因子定义字典(完整因子库)
FACTOR_DEFINITIONS = {
# "turnover_rate_volatility": "ts_std(log(turnover_rate), 20)"
}
# 获取 Label 因子定义
LABEL_FACTOR = get_label_factor(LABEL_NAME)
# Label 因子定义(不参与训练,用于计算目标)
LABEL_FACTOR = {
LABEL_NAME: "(ts_delay(close, -5) / ts_delay(open, -1)) - 1",
}
#%% md
# ### 3.2 训练参数配置
#%%
# 日期范围配置(正确的 train/val/test 三分法)
TRAIN_START = "20200101"
TRAIN_END = "20231231"
VAL_START = "20240101"
VAL_END = "20241231"
TEST_START = "20250101"
TEST_END = "20251231"
# 分位数配置
N_QUANTILES = 20 # 将 label 分为 20 组
# 分位数配置
@@ -352,44 +236,11 @@ MODEL_PARAMS = {
"label_gain": [i for i in range(1, N_QUANTILES + 1)],
}
# 股票池筛选函数
def stock_pool_filter(df: pl.DataFrame) -> pl.Series:
"""股票池筛选函数(单日数据)
筛选条件:
1. 排除创业板(代码以 300 开头)
2. 排除科创板(代码以 688 开头)
3. 排除北交所(代码以 8、9 或 4 开头)
4. 选取当日市值最小的500只股票
"""
code_filter = (
~df["ts_code"].str.starts_with("30")
& ~df["ts_code"].str.starts_with("68")
& ~df["ts_code"].str.starts_with("8")
& ~df["ts_code"].str.starts_with("9")
& ~df["ts_code"].str.starts_with("4")
)
valid_df = df.filter(code_filter)
n = min(500, len(valid_df))
small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"]
return df["ts_code"].is_in(small_cap_codes)
STOCK_FILTER_REQUIRED_COLUMNS = ["total_mv"]
# 输出配置
OUTPUT_DIR = "output"
SAVE_PREDICTIONS = True
PERSIST_MODEL = False
# Top N 配置:每日推荐股票数量
TOP_N = 5 # 可调整为 10, 20 等
#%% md
# 注意stock_pool_filter, STOCK_FILTER_REQUIRED_COLUMNS, OUTPUT_DIR 等配置
# 已从 common 模块导入
# %% md
# ## 4. 训练流程
#%%
# %%
print("\n" + "=" * 80)
print("LightGBM LambdaRank 排序学习训练")
print("=" * 80)
@@ -411,6 +262,7 @@ data = prepare_data(
feature_cols=feature_cols,
start_date=TRAIN_START,
end_date=TEST_END,
label_name=LABEL_NAME,
)
# 4. 转换为排序学习格式(分位数标签)
@@ -469,9 +321,9 @@ trainer = Trainer(
feature_cols=feature_cols,
persist_model=PERSIST_MODEL,
)
#%% md
# %% md
# ### 4.1 股票池筛选
#%%
# %%
print("\n" + "=" * 80)
print("股票池筛选")
print("=" * 80)
@@ -493,9 +345,9 @@ if pool_manager:
else:
filtered_data = data
print(" 未配置股票池管理器,跳过筛选")
#%% md
# %% md
# ### 4.2 数据划分
#%%
# %%
print("\n" + "=" * 80)
print("数据划分")
print("=" * 80)
@@ -519,9 +371,9 @@ if splitter:
print(f"测试集日均样本数: {np.mean(test_group):.1f}")
else:
raise ValueError("必须配置数据划分器")
#%% md
# %% md
# ### 4.3 数据质量检查
#%%
# %%
print("\n" + "=" * 80)
print("数据质量检查(必须在预处理之前)")
print("=" * 80)
@@ -537,9 +389,9 @@ check_data_quality(test_data, feature_cols, raise_on_error=True)
print("[成功] 数据质量检查通过,未发现异常")
#%% md
# %% md
# ### 4.4 数据预处理
#%%
# %%
print("\n" + "=" * 80)
print("数据预处理")
print("=" * 80)
@@ -563,9 +415,9 @@ if processors:
print(f"\n处理后训练集形状: {train_data.shape}")
print(f"处理后验证集形状: {val_data.shape}")
print(f"处理后测试集形状: {test_data.shape}")
#%% md
# %% md
# ### 4.4 训练 LambdaRank 模型
#%%
# %%
print("\n" + "=" * 80)
print("训练 LambdaRank 模型")
print("=" * 80)
@@ -593,9 +445,9 @@ model.fit(
eval_set=(X_val, y_val, val_group),
)
print("训练完成!")
#%% md
# %% md
# ### 4.5 训练指标曲线
#%%
# %%
print("\n" + "=" * 80)
print("训练指标曲线")
print("=" * 80)
@@ -645,9 +497,9 @@ else:
best_val = max(val_metric_list)
print(f" {metric}: {best_val:.4f} (迭代 {best_iter_metric + 1})")
print(f"\n[重要提醒] 验证集仅用于早停/调参,测试集完全独立于训练过程!")
#%% md
# %% md
# ### 4.6 模型评估
#%%
# %%
print("\n" + "=" * 80)
print("模型评估")
print("=" * 80)
@@ -685,7 +537,7 @@ if importance is not None:
top_features = importance.sort_values(ascending=False).head(20)
for i, (feature, score) in enumerate(top_features.items(), 1):
print(f" {i:2d}. {feature:30s} {score:10.2f}")
#%%
# %%
# 确保输出目录存在
os.makedirs(OUTPUT_DIR, exist_ok=True)
@@ -731,7 +583,7 @@ print(f"\n 预览前15行:")
print(topn_to_save.head(15))
print("\n训练流程完成!")
#%% md
# %% md
# ## 5. 总结
# #
# 本 Notebook 实现了完整的 Learn-to-Rank 训练流程:

View File

@@ -0,0 +1,47 @@
"""增强探针法因子筛选 (Probe Feature Selection)
基于噪音探针的统计显著性特征选择方法。
核心组件:
- ProbeSelector: 主选择器,协调整个筛选流程
- NoiseGenerator: 噪音生成器Polars 零拷贝注入
- ProbeTrainer: 多任务训练器,支持验证集早停
- ImportanceEvaluator: 重要性评估器,强制 Gain
- LightGBMClassifier: 分类模型
使用示例:
>>> from src.experiment.probe_selection import ProbeSelector
>>>
>>> selector = ProbeSelector(
... n_iterations=3,
... n_noise_features=5,
... validation_ratio=0.15,
... )
>>>
>>> selected_features = selector.select(
... data=train_data,
... feature_cols=all_features,
... target_col_regression="future_return_5",
... date_col="trade_date",
... )
"""
from src.experiment.probe_selection.importance_evaluator import ImportanceEvaluator
from src.experiment.probe_selection.lightgbm_classifier import LightGBMClassifier
from src.experiment.probe_selection.noise_generator import NoiseGenerator
from src.experiment.probe_selection.probe_selector import ProbeSelector
from src.experiment.probe_selection.probe_trainer import (
ProbeTrainer,
create_classification_target,
split_validation_by_date,
)
__all__ = [
"ProbeSelector",
"NoiseGenerator",
"ProbeTrainer",
"ImportanceEvaluator",
"LightGBMClassifier",
"create_classification_target",
"split_validation_by_date",
]

View File

@@ -0,0 +1,188 @@
"""重要性评估器
评估特征重要性相对于噪音的统计显著性,执行交叉淘汰。
"""
from typing import Dict, List, Optional, Tuple
class ImportanceEvaluator:
"""重要性评估器
计算噪音及格线,执行交叉淘汰。
强制使用 Gain 重要性,避免被噪音欺骗。
"""
def __init__(self, noise_prefix: str = "__noise__"):
"""初始化评估器
Args:
noise_prefix: 噪音列名前缀
"""
self.noise_prefix = noise_prefix
self.regression_threshold: Optional[float] = None
self.classification_threshold: Optional[float] = None
self.elimination_stats: dict = {}
def evaluate(
self,
regression_importance: Dict[str, float],
classification_importance: Dict[str, float],
candidate_features: List[str],
) -> List[str]:
"""执行重要性评估和交叉淘汰
Args:
regression_importance: 回归模型特征重要性 {feature: importance}
classification_importance: 分类模型特征重要性 {feature: importance}
candidate_features: 候选特征列表(不包含噪音)
Returns:
存活下来的特征列表
"""
# 计算及格线(噪音重要性的最大值)
self.regression_threshold = self._calculate_threshold(regression_importance)
self.classification_threshold = self._calculate_threshold(
classification_importance
)
# 执行交叉淘汰
eliminated = []
survived = []
for feature in candidate_features:
reg_imp = regression_importance.get(feature, 0.0)
cls_imp = classification_importance.get(feature, 0.0)
# 交叉淘汰:两个模型都低于及格线才剔除
if (
reg_imp < self.regression_threshold
and cls_imp < self.classification_threshold
):
eliminated.append(
{
"feature": feature,
"regression_importance": reg_imp,
"classification_importance": cls_imp,
"regression_threshold": self.regression_threshold,
"classification_threshold": self.classification_threshold,
}
)
else:
survived.append(feature)
self.elimination_stats = {
"total_candidates": len(candidate_features),
"eliminated_count": len(eliminated),
"survived_count": len(survived),
"regression_threshold": self.regression_threshold,
"classification_threshold": self.classification_threshold,
"eliminated_features": eliminated,
}
return survived
def _calculate_threshold(self, importance: Dict[str, float]) -> float:
"""计算噪音及格线
取所有噪音特征重要性的最大值作为及格线。
Args:
importance: 特征重要性字典
Returns:
及格线数值
"""
noise_importance = [
imp
for feat, imp in importance.items()
if feat.startswith(self.noise_prefix)
]
if not noise_importance:
# 如果没有噪音特征,返回一个很小的值(不应该发生)
return 0.0
return max(noise_importance)
def get_thresholds(self) -> Tuple[Optional[float], Optional[float]]:
"""获取及格线
Returns:
(回归及格线, 分类及格线) 元组
"""
return self.regression_threshold, self.classification_threshold
def get_elimination_stats(self) -> dict:
"""获取淘汰统计
Returns:
淘汰统计字典
"""
return self.elimination_stats
def get_feature_comparison(
self,
regression_importance: Dict[str, float],
classification_importance: Dict[str, float],
candidate_features: List[str],
) -> List[Dict]:
"""获取所有特征的重要性对比详情
Args:
regression_importance: 回归重要性
classification_importance: 分类重要性
candidate_features: 候选特征
Returns:
特征对比列表,每项包含:
- feature: 特征名
- regression_importance: 回归重要性
- classification_importance: 分类重要性
- regression_threshold: 回归及格线
- classification_threshold: 分类及格线
- is_eliminated: 是否被淘汰
- elimination_reason: 淘汰原因
"""
comparison = []
for feature in candidate_features:
reg_imp = regression_importance.get(feature, 0.0)
cls_imp = classification_importance.get(feature, 0.0)
# 判断是否被淘汰
is_eliminated = (
reg_imp < self.regression_threshold
and cls_imp < self.classification_threshold
)
# 生成淘汰原因
reasons = []
if reg_imp < self.regression_threshold:
reasons.append(
f"回归重要性({reg_imp:.6f})低于及格线({self.regression_threshold:.6f})"
)
if cls_imp < self.classification_threshold:
reasons.append(
f"分类重要性({cls_imp:.6f})低于及格线({self.classification_threshold:.6f})"
)
comparison.append(
{
"feature": feature,
"regression_importance": reg_imp,
"classification_importance": cls_imp,
"regression_threshold": self.regression_threshold,
"classification_threshold": self.classification_threshold,
"is_eliminated": is_eliminated,
"elimination_reason": "; ".join(reasons) if reasons else "通过筛选",
}
)
# 按重要性总和排序
comparison.sort(
key=lambda x: x["regression_importance"] + x["classification_importance"],
reverse=True,
)
return comparison

View File

@@ -0,0 +1,222 @@
"""LightGBM 分类模型
用于探针法中的分类任务训练。
"""
from typing import Any, Optional
import numpy as np
import pandas as pd
import polars as pl
from src.training.components.base import BaseModel
from src.training.registry import register_model
@register_model("lightgbm_classifier")
class LightGBMClassifier(BaseModel):
"""LightGBM 分类模型
使用 LightGBM 库实现梯度提升分类树。
支持自定义参数、特征重要性提取和原生模型格式保存。
Attributes:
name: 模型名称 "lightgbm_classifier"
params: LightGBM 参数字典
model: 训练后的 LightGBM Booster 对象
feature_names_: 特征名称列表
"""
name = "lightgbm_classifier"
def __init__(self, params: Optional[dict] = None):
"""初始化 LightGBM 分类模型
Args:
params: LightGBM 参数字典,直接传递给 lgb.train()。
包含所有模型参数和训练控制参数(如 n_estimators
Examples:
>>> model = LightGBMClassifier(params={
... "objective": "binary",
... "metric": "auc",
... "num_leaves": 31,
... "learning_rate": 0.05,
... "n_estimators": 100,
... })
"""
self.params = dict(params) if params is not None else {}
self.model = None
self.feature_names_: Optional[list] = None
def fit(
self,
X: pl.DataFrame,
y: pl.Series,
eval_set: Optional[tuple] = None,
) -> "LightGBMClassifier":
"""训练模型
Args:
X: 特征矩阵 (Polars DataFrame)
y: 目标变量 (Polars Series),应为 0/1 整数
eval_set: 验证集元组 (X_val, y_val),用于早停
Returns:
self (支持链式调用)
Raises:
ImportError: 未安装 lightgbm
RuntimeError: 训练失败
"""
try:
import lightgbm as lgb
except ImportError:
raise ImportError(
"使用 LightGBMClassifier 需要安装 lightgbm: pip install lightgbm"
)
self.feature_names_ = X.columns
X_np = X.to_numpy()
y_np = y.to_numpy()
train_data = lgb.Dataset(X_np, label=y_np)
valid_sets = [train_data]
valid_names = ["train"]
callbacks = []
if eval_set is not None:
X_val, y_val = eval_set
X_val_np = X_val.to_numpy() if isinstance(X_val, pl.DataFrame) else X_val
y_val_np = y_val.to_numpy() if isinstance(y_val, pl.Series) else y_val
val_data = lgb.Dataset(X_val_np, label=y_val_np)
valid_sets.append(val_data)
valid_names.append("val")
# 从 params 中提取训练和早停参数
params_copy = dict(self.params)
num_boost_round = params_copy.pop("n_estimators", 100)
early_stopping_round = params_copy.pop("early_stopping_round", 50)
if len(valid_sets) > 1:
callbacks.append(lgb.early_stopping(stopping_rounds=early_stopping_round))
self.model = lgb.train(
params_copy,
train_data,
num_boost_round=num_boost_round,
valid_sets=valid_sets,
valid_names=valid_names,
callbacks=callbacks,
)
return self
def predict(self, X: pl.DataFrame) -> np.ndarray:
"""预测类别概率
Args:
X: 特征矩阵 (Polars DataFrame)
Returns:
预测概率 (numpy ndarray)
Raises:
RuntimeError: 模型未训练时调用
"""
if self.model is None:
raise RuntimeError("模型尚未训练,请先调用 fit()")
X_np = X.to_numpy()
result = self.model.predict(X_np)
return np.asarray(result)
def predict_class(self, X: pl.DataFrame) -> np.ndarray:
"""预测类别
Args:
X: 特征矩阵 (Polars DataFrame)
Returns:
预测类别 (0 或 1)
Raises:
RuntimeError: 模型未训练时调用
"""
proba = self.predict(X)
return (proba >= 0.5).astype(int)
def feature_importance(self, importance_type: str = "gain") -> Optional[pd.Series]:
"""返回特征重要性
Args:
importance_type: 重要性类型,默认为 "gain"
必须使用 "gain""split" 会被噪音欺骗
Returns:
特征重要性序列,如果模型未训练则返回 None
"""
if self.model is None or self.feature_names_ is None:
return None
importance = self.model.feature_importance(importance_type=importance_type)
return pd.Series(importance, index=self.feature_names_)
def save(self, path: str) -> None:
"""保存模型(使用 LightGBM 原生格式)
使用 LightGBM 的原生格式保存,不依赖 pickle
可以在不同环境中加载。
Args:
path: 保存路径
Raises:
RuntimeError: 模型未训练时调用
"""
if self.model is None:
raise RuntimeError("模型尚未训练,无法保存")
self.model.save_model(path)
import json
meta_path = path + ".meta.json"
with open(meta_path, "w") as f:
json.dump(
{
"feature_names": self.feature_names_,
"params": self.params,
},
f,
)
@classmethod
def load(cls, path: str) -> "LightGBMClassifier":
"""加载模型
从 LightGBM 原生格式加载模型。
Args:
path: 模型文件路径
Returns:
加载的 LightGBMClassifier 实例
"""
import lightgbm as lgb
import json
instance = cls()
instance.model = lgb.Booster(model_file=path)
meta_path = path + ".meta.json"
try:
with open(meta_path, "r") as f:
meta = json.load(f)
instance.feature_names_ = meta.get("feature_names")
instance.params = meta.get("params", {})
except FileNotFoundError:
pass
return instance

View File

@@ -0,0 +1,93 @@
"""噪音生成器
使用 Polars 零拷贝方式注入随机噪音特征。
"""
import numpy as np
import polars as pl
class NoiseGenerator:
"""噪音生成器
生成服从标准正态分布的随机噪音列,使用 Polars 原生 API
实现零拷贝注入。
"""
NOISE_PREFIX = "__noise__"
def __init__(self, random_state: int = 42):
"""初始化噪音生成器
Args:
random_state: 随机种子,保证可复现性
"""
self.random_state = random_state
def generate_noise(
self,
df: pl.DataFrame,
n_noise: int,
seed: int = 42,
) -> pl.DataFrame:
"""向 DataFrame 注入噪音特征
使用 Polars 原生 with_columns 实现零拷贝拼接。
Args:
df: 原始数据
n_noise: 噪音列数量
seed: 随机种子
Returns:
添加了噪音列的 DataFrame
"""
np.random.seed(seed)
n_rows = df.height
# 直接生成 Polars Series 列表,然后一次性 with_columns
# 实现零拷贝拼接,避免转换为 Pandas
noise_series = [
pl.Series(
f"{self.NOISE_PREFIX}{i}",
np.random.randn(n_rows).astype(np.float32),
dtype=pl.Float32,
)
for i in range(n_noise)
]
return df.with_columns(noise_series)
def remove_noise(self, df: pl.DataFrame) -> pl.DataFrame:
"""移除噪音列
Args:
df: 包含噪音列的数据
Returns:
移除了噪音列的数据
"""
noise_cols = [col for col in df.columns if col.startswith(self.NOISE_PREFIX)]
return df.drop(noise_cols)
def get_noise_columns(self, df: pl.DataFrame) -> list[str]:
"""获取所有噪音列名
Args:
df: 数据
Returns:
噪音列名列表
"""
return [col for col in df.columns if col.startswith(self.NOISE_PREFIX)]
def is_noise_column(self, col_name: str) -> bool:
"""判断是否为噪音列
Args:
col_name: 列名
Returns:
是否为噪音列
"""
return col_name.startswith(self.NOISE_PREFIX)

View File

@@ -0,0 +1,284 @@
"""探针选择器 - 主类
协调整个探针筛选流程,执行迭代特征选择。
"""
from typing import List, Optional
import polars as pl
from src.experiment.probe_selection.importance_evaluator import ImportanceEvaluator
from src.experiment.probe_selection.noise_generator import NoiseGenerator
from src.experiment.probe_selection.probe_trainer import ProbeTrainer
class ProbeSelector:
"""探针选择器
实现增强探针法因子筛选算法:
1. 注入噪音探针
2. 多任务训练(回归+分类)
3. 基于噪音及格线交叉淘汰
4. 迭代直到收敛
关键约束:
- 分类目标使用截面中位数
- 强制使用 Gain 重要性
- 训练时使用验证集早停
- Polars 零拷贝操作
"""
def __init__(
self,
n_iterations: int = 5,
n_noise_features: int = 10,
validation_ratio: float = 0.15,
random_state: int = 42,
regression_params: Optional[dict] = None,
classification_params: Optional[dict] = None,
verbose: bool = True,
):
"""初始化探针选择器
Args:
n_iterations: 最大迭代轮数 K
n_noise_features: 每轮注入的噪音数 M
validation_ratio: 验证集比例(用于早停)
random_state: 随机种子
regression_params: 回归模型参数
classification_params: 分类模型参数
verbose: 是否输出详细日志
"""
self.n_iterations = n_iterations
self.n_noise_features = n_noise_features
self.validation_ratio = validation_ratio
self.random_state = random_state
self.verbose = verbose
# 初始化子组件
self.noise_generator = NoiseGenerator(random_state=random_state)
self.trainer = ProbeTrainer(
regression_params=regression_params,
classification_params=classification_params,
validation_ratio=validation_ratio,
random_state=random_state,
)
self.evaluator = ImportanceEvaluator(noise_prefix=NoiseGenerator.NOISE_PREFIX)
# 存储历史记录
self.selection_history: List[dict] = []
self.final_features: Optional[List[str]] = None
def select(
self,
data: pl.DataFrame,
feature_cols: List[str],
target_col_regression: str,
date_col: str = "trade_date",
) -> List[str]:
"""执行特征选择
Args:
data: 训练数据
feature_cols: 候选特征列表
target_col_regression: 回归目标列名
date_col: 日期列名
Returns:
筛选后的特征列表
"""
remaining_features = feature_cols.copy()
original_count = len(remaining_features)
if self.verbose:
print("=" * 80)
print("增强探针法因子筛选")
print("=" * 80)
print(f"\n初始特征数: {original_count}")
print(f"迭代轮数: {self.n_iterations}")
print(f"每轮探针数: {self.n_noise_features}")
print(f"验证集比例: {self.validation_ratio:.0%}")
for iteration in range(1, self.n_iterations + 1):
if self.verbose:
print(f"\n{'=' * 80}")
print(f"探针筛选第 {iteration}/{self.n_iterations}")
print(f"当前候选特征: {len(remaining_features)}")
print("=" * 80)
# 注入探针
current_features = remaining_features.copy()
feature_matrix = data.select(
current_features + [target_col_regression, date_col]
)
# 注入噪音特征
seed = self.random_state + iteration # 每轮使用不同种子
data_with_noise = self.noise_generator.generate_noise(
feature_matrix, self.n_noise_features, seed
)
all_feature_cols = (
current_features
+ self.noise_generator.get_noise_columns(data_with_noise)
)
if self.verbose:
print(f"\n[1/4] 注入探针: {self.n_noise_features} 列噪音特征")
# 多任务训练
if self.verbose:
print("\n[2/4] 多任务训练(回归 + 分类)...")
self.trainer.fit(
df=data_with_noise,
feature_cols=all_feature_cols,
target_col_regression=target_col_regression,
date_col=date_col,
)
# 获取训练信息
train_info = self.trainer.get_training_info()
if self.verbose:
print(
f" 数据切分: 训练集 {train_info.get('train_size')} 条, 验证集 {train_info.get('val_size')}"
)
if "regression_best_iter" in train_info:
print(f" 回归模型早停: {train_info['regression_best_iter']}")
if "classification_best_iter" in train_info:
print(
f" 分类模型早停: {train_info['classification_best_iter']}"
)
# 获取特征重要性
reg_imp, cls_imp = self.trainer.get_feature_importance(
importance_type="gain"
)
if self.verbose:
print("\n[3/4] 计算及格线...")
# 评估并淘汰
remaining_features = self.evaluator.evaluate(
regression_importance=reg_imp,
classification_importance=cls_imp,
candidate_features=current_features,
)
thresholds = self.evaluator.get_thresholds()
if self.verbose:
print(f" 回归及格线: {thresholds[0]:.6f}")
print(f" 分类及格线: {thresholds[1]:.6f}")
# 记录本轮结果
stats = self.evaluator.get_elimination_stats()
eliminated = stats["eliminated_count"]
if self.verbose:
print(f"\n[4/4] 交叉淘汰...")
print(f" 淘汰特征: {eliminated}")
print(f" 剩余特征: {stats['survived_count']}")
if eliminated > 0:
print("\n 淘汰的特征:")
for feat_info in stats["eliminated_features"][:10]: # 只显示前10个
print(
f" - {feat_info['feature']}: 回归={feat_info['regression_importance']:.6f}, 分类={feat_info['classification_importance']:.6f}"
)
if eliminated > 10:
print(f" ... 还有 {eliminated - 10}")
# 保存历史
self.selection_history.append(
{
"iteration": iteration,
"initial_features": len(current_features),
"eliminated": eliminated,
"survived": len(remaining_features),
"regression_threshold": thresholds[0],
"classification_threshold": thresholds[1],
"eliminated_features": [
f["feature"] for f in stats["eliminated_features"]
],
}
)
# 检查终止条件
if eliminated == 0:
if self.verbose:
print(f"\n[提前终止] 第 {iteration} 轮没有因子被淘汰")
break
self.final_features = remaining_features
if self.verbose:
print("\n" + "=" * 80)
print("探针筛选完成")
print("=" * 80)
print(f"\n原始特征数: {original_count}")
print(f"最终特征数: {len(remaining_features)}")
print(f"淘汰特征数: {original_count - len(remaining_features)}")
print(
f"淘汰比例: {(original_count - len(remaining_features)) / original_count:.1%}"
)
print(f"\n最终特征列表:")
for i, feat in enumerate(remaining_features, 1):
print(f" {i:2d}. {feat}")
return remaining_features
def get_selection_history(self) -> List[dict]:
"""获取筛选历史
Returns:
每轮筛选的历史记录列表
"""
return self.selection_history
def get_importance_report(
self,
data: pl.DataFrame,
feature_cols: List[str],
target_col_regression: str,
date_col: str = "trade_date",
) -> List[dict]:
"""获取最后一轮的重要性详细报告
Args:
data: 数据
feature_cols: 特征列表
target_col_regression: 回归目标
date_col: 日期列名
Returns:
特征对比列表
"""
# 注入探针
feature_matrix = data.select(feature_cols + [target_col_regression, date_col])
data_with_noise = self.noise_generator.generate_noise(
feature_matrix, self.n_noise_features, self.random_state
)
all_feature_cols = feature_cols + self.noise_generator.get_noise_columns(
data_with_noise
)
# 训练
self.trainer.fit(
df=data_with_noise,
feature_cols=all_feature_cols,
target_col_regression=target_col_regression,
date_col=date_col,
)
# 获取重要性
reg_imp, cls_imp = self.trainer.get_feature_importance(importance_type="gain")
# 执行评估以获取及格线
self.evaluator.evaluate(reg_imp, cls_imp, feature_cols)
# 获取详细对比
comparison = self.evaluator.get_feature_comparison(
reg_imp, cls_imp, feature_cols
)
return comparison

View File

@@ -0,0 +1,253 @@
"""探针训练器
执行多任务训练(回归 + 分类),支持验证集早停。
"""
from typing import Optional, Tuple
import numpy as np
import polars as pl
from src.experiment.probe_selection.lightgbm_classifier import LightGBMClassifier
from src.training.components.models.lightgbm import LightGBMModel
def split_validation_by_date(
df: pl.DataFrame,
date_col: str = "trade_date",
val_ratio: float = 0.15,
) -> Tuple[pl.DataFrame, pl.DataFrame]:
"""按时间切分训练集和验证集(最近日期作为验证集)
Args:
df: 输入数据
date_col: 日期列名
val_ratio: 验证集比例
Returns:
(train_df, val_df) 元组
"""
dates = df[date_col].unique().sort()
n_dates = len(dates)
n_val_dates = max(1, int(n_dates * val_ratio))
val_dates = dates[-n_val_dates:]
train_df = df.filter(~pl.col(date_col).is_in(val_dates))
val_df = df.filter(pl.col(date_col).is_in(val_dates))
return train_df, val_df
def create_classification_target(
df: pl.DataFrame,
return_col: str,
date_col: str = "trade_date",
new_col_name: str = "target_class",
) -> pl.DataFrame:
"""将收益率转换为截面中位数分类标签
优势:预测跑赢当天市场平均水平的股票,真正有 Alpha 的因子
避免:牛熊市不平衡导致的分类失效
Args:
df: 输入数据
return_col: 收益率列名
date_col: 日期列名
new_col_name: 新列名
Returns:
添加了分类标签的 DataFrame
"""
return df.with_columns(
(pl.col(return_col) > pl.col(return_col).median().over(date_col))
.cast(pl.Int8)
.alias(new_col_name)
)
class ProbeTrainer:
"""探针训练器
执行多任务训练(回归 + 分类),基于验证集早停。
"""
def __init__(
self,
regression_params: Optional[dict] = None,
classification_params: Optional[dict] = None,
validation_ratio: float = 0.15,
random_state: int = 42,
):
"""初始化探针训练器
Args:
regression_params: 回归模型参数
classification_params: 分类模型参数
validation_ratio: 验证集比例
random_state: 随机种子
"""
self.regression_params = regression_params or {
"objective": "regression",
"metric": "mae",
"n_estimators": 500,
"learning_rate": 0.05,
"early_stopping_round": 50,
"verbose": -1,
}
self.classification_params = classification_params or {
"objective": "binary",
"metric": "auc",
"n_estimators": 500,
"learning_rate": 0.05,
"early_stopping_round": 50,
"verbose": -1,
}
self.validation_ratio = validation_ratio
self.random_state = random_state
self.regression_model: Optional[LightGBMModel] = None
self.classification_model: Optional[LightGBMClassifier] = None
self.training_info: dict = {}
def fit(
self,
df: pl.DataFrame,
feature_cols: list[str],
target_col_regression: str,
target_col_classification: Optional[str] = None,
date_col: str = "trade_date",
) -> "ProbeTrainer":
"""训练回归和分类模型
Args:
df: 训练数据(包含噪音特征)
feature_cols: 特征列名列表(包含噪音)
target_col_regression: 回归目标列名
target_col_classification: 分类目标列名(如不传则自动生成)
date_col: 日期列名
Returns:
self
"""
# 切分训练集和验证集(按时间)
train_df, val_df = split_validation_by_date(df, date_col, self.validation_ratio)
self.training_info = {
"train_size": len(train_df),
"val_size": len(val_df),
"n_features": len(feature_cols),
}
# 训练回归模型
self._fit_regression(train_df, val_df, feature_cols, target_col_regression)
# 准备分类目标
if target_col_classification is None:
# 自动生成截面中位数分类目标
train_df = create_classification_target(
train_df, target_col_regression, date_col
)
val_df = create_classification_target(
val_df, target_col_regression, date_col
)
target_col_classification = "target_class"
# 训练分类模型
self._fit_classification(
train_df, val_df, feature_cols, target_col_classification
)
return self
def _fit_regression(
self,
train_df: pl.DataFrame,
val_df: pl.DataFrame,
feature_cols: list[str],
target_col: str,
):
"""训练回归模型"""
X_train = train_df.select(feature_cols)
y_train = train_df.select(target_col).to_series()
X_val = val_df.select(feature_cols)
y_val = val_df.select(target_col).to_series()
self.regression_model = LightGBMModel(params=self.regression_params)
self.regression_model.fit(
X_train,
y_train,
eval_set=(X_val, y_val),
)
# 获取早停信息
if hasattr(self.regression_model.model, "best_iteration"):
self.training_info["regression_best_iter"] = (
self.regression_model.model.best_iteration
)
def _fit_classification(
self,
train_df: pl.DataFrame,
val_df: pl.DataFrame,
feature_cols: list[str],
target_col: str,
):
"""训练分类模型"""
X_train = train_df.select(feature_cols)
y_train = train_df.select(target_col).to_series()
X_val = val_df.select(feature_cols)
y_val = val_df.select(target_col).to_series()
self.classification_model = LightGBMClassifier(
params=self.classification_params
)
self.classification_model.fit(
X_train,
y_train,
eval_set=(X_val, y_val),
)
# 获取早停信息
if hasattr(self.classification_model.model, "best_iteration"):
self.training_info["classification_best_iter"] = (
self.classification_model.model.best_iteration
)
def get_feature_importance(
self, importance_type: str = "gain"
) -> Tuple[Optional[dict], Optional[dict]]:
"""获取两个模型的特征重要性
Args:
importance_type: 重要性类型,必须传入 "gain"
Returns:
(regression_importance, classification_importance) 元组
每个重要性为 {feature_name: importance_value} 字典
"""
assert importance_type == "gain", (
"必须使用 importance_type='gain'split 会被噪音欺骗"
)
reg_importance = None
cls_importance = None
if self.regression_model is not None:
imp = self.regression_model.feature_importance()
if imp is not None:
reg_importance = imp.to_dict()
if self.classification_model is not None:
imp = self.classification_model.feature_importance(importance_type)
if imp is not None:
cls_importance = imp.to_dict()
return reg_importance, cls_importance
def get_training_info(self) -> dict:
"""获取训练信息
Returns:
训练信息字典
"""
return self.training_info

View File

@@ -0,0 +1,343 @@
"""探针法因子筛选 - 真实数据集成
使用真实因子数据和训练流程执行探针筛选。
使用方法:
uv run python src/experiment/probe_selection/run_probe_selection.py
"""
import os
import sys
from typing import List
import polars as pl
# 添加项目根目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from src.experiment.probe_selection import ProbeSelector
from src.factors import FactorEngine
from src.training import (
DateSplitter,
NullFiller,
StandardScaler,
StockPoolManager,
STFilter,
Winsorizer,
check_data_quality,
)
from src.training.components.models.lightgbm import LightGBMModel
# 配置参数
LABEL_NAME = "future_return_5"
# 完整因子列表(来自 regression.py
SELECTED_FACTORS = [
# ================= 1. 价格、趋势与路径依赖 =================
"ma_5",
"ma_20",
"ma_ratio_5_20",
"bias_10",
"high_low_ratio",
"bbi_ratio",
"return_5",
"return_20",
"kaufman_ER_20",
"mom_acceleration_10_20",
"drawdown_from_high_60",
"up_days_ratio_20",
# ================= 2. 波动率、风险调整与高阶矩 =================
"volatility_5",
"volatility_20",
"volatility_ratio",
"std_return_20",
"sharpe_ratio_20",
"min_ret_20",
"volatility_squeeze_5_60",
# ================= 3. 日内微观结构与异象 =================
"overnight_intraday_diff",
"upper_shadow_ratio",
"capital_retention_20",
"max_ret_20",
# ================= 4. 量能、流动性与量价背离 =================
"volume_ratio_5_20",
"turnover_rate_mean_5",
"turnover_deviation",
"amihud_illiq_20",
"turnover_cv_20",
"pv_corr_20",
"close_vwap_deviation",
# ================= 5. 基本面财务特征 =================
"roe",
"roa",
"profit_margin",
"debt_to_equity",
"current_ratio",
"net_profit_yoy",
"revenue_yoy",
"healthy_expansion_velocity",
# ================= 6. 基本面估值与截面动量共振 =================
"EP",
"BP",
"CP",
"market_cap_rank",
"turnover_rank",
"return_5_rank",
"EP_rank",
"pe_expansion_trend",
"value_price_divergence",
"active_market_cap",
"ebit_rank",
]
# 因子定义(来自 regression.py
FACTOR_DEFINITIONS = {}
# Label 定义
LABEL_FACTOR = {
LABEL_NAME: "(ts_delay(close, -5) / ts_delay(open, -1)) - 1",
}
# 日期范围(探针筛选只在训练集上进行)
TRAIN_START = "20200101"
TRAIN_END = "20231231"
VAL_START = "20240101"
VAL_END = "20241231"
# 股票池筛选函数
def stock_pool_filter(df: pl.DataFrame) -> pl.Series:
"""股票池筛选函数(单日数据)"""
code_filter = (
~df["ts_code"].str.starts_with("30")
& ~df["ts_code"].str.starts_with("68")
& ~df["ts_code"].str.starts_with("8")
& ~df["ts_code"].str.starts_with("9")
& ~df["ts_code"].str.starts_with("4")
)
valid_df = df.filter(code_filter)
n = min(1000, len(valid_df))
small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"]
return df["ts_code"].is_in(small_cap_codes)
def register_factors(
engine: FactorEngine,
selected_factors: List[str],
factor_definitions: dict,
label_factor: dict,
) -> List[str]:
"""注册因子"""
print("=" * 80)
print("注册因子")
print("=" * 80)
# 注册 SELECTED_FACTORS 中的因子(已在 metadata 中)
print("\n注册特征因子(从 metadata:")
for name in selected_factors:
engine.add_factor(name)
print(f" - {name}")
# 注册 FACTOR_DEFINITIONS 中的因子(通过表达式)
print("\n注册特征因子(表达式):")
for name, expr in factor_definitions.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 注册 label 因子
print("\n注册 Label 因子(表达式):")
for name, expr in label_factor.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
feature_cols = selected_factors + list(factor_definitions.keys())
print(f"\n特征因子数: {len(feature_cols)}")
print(f" - 来自 metadata: {len(selected_factors)}")
print(f" - 来自表达式: {len(factor_definitions)}")
print(f"Label: {list(label_factor.keys())[0]}")
print(f"已注册因子总数: {len(engine.list_registered())}")
return feature_cols
def prepare_data_for_probe(
engine: FactorEngine,
feature_cols: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""准备探针筛选所需数据"""
print("\n" + "=" * 80)
print("准备探针筛选数据")
print("=" * 80)
factor_names = feature_cols + [LABEL_NAME]
print(f"\n计算因子: {start_date} - {end_date}")
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
print(f"数据形状: {data.shape}")
print(f"数据列: {len(data.columns)}")
print(f"日期范围: {data['trade_date'].min()} - {data['trade_date'].max()}")
return data
def apply_preprocessing_for_probe(
data: pl.DataFrame,
feature_cols: List[str],
) -> pl.DataFrame:
"""为探针筛选应用基础预处理(只处理缺失值)"""
print("\n" + "=" * 80)
print("数据预处理")
print("=" * 80)
# 只进行缺失值填充(避免标准化影响噪音分布)
filler = NullFiller(feature_cols=feature_cols, strategy="mean")
data = filler.fit_transform(data)
print(f"缺失值处理完成")
print(f"数据形状: {data.shape}")
return data
def run_probe_feature_selection():
"""执行探针法因子筛选"""
print("\n" + "=" * 80)
print("增强探针法因子筛选")
print("=" * 80)
# 1. 创建 FactorEngine
print("\n[1] 创建 FactorEngine")
engine = FactorEngine()
# 2. 注册因子
print("\n[2] 注册因子")
feature_cols = register_factors(
engine, SELECTED_FACTORS, FACTOR_DEFINITIONS, LABEL_FACTOR
)
# 3. 准备数据(训练集 + 验证集,用于探针筛选)
print("\n[3] 准备数据(训练集+验证集)")
data = prepare_data_for_probe(
engine=engine,
feature_cols=feature_cols,
start_date=TRAIN_START,
end_date=VAL_END, # 包含验证集,增加样本量
)
# 4. 股票池筛选
print("\n[4] 执行股票池筛选")
pool_manager = StockPoolManager(
filter_func=stock_pool_filter,
required_columns=["total_mv"],
data_router=engine.router,
)
data = pool_manager.filter_and_select_daily(data)
print(f"筛选后数据规模: {data.shape}")
# 5. 数据预处理(只填充缺失值,不缩放)
print("\n[5] 数据预处理")
data = apply_preprocessing_for_probe(data, feature_cols)
# 6. 数据质量检查
print("\n[6] 数据质量检查")
# check_data_quality(data, feature_cols, raise_on_error=True)
print("[成功] 数据质量检查通过")
# 7. 执行探针筛选
print("\n[7] 执行探针筛选")
selector = ProbeSelector(
n_iterations=10, # 迭代轮数
n_noise_features=10, # 每轮探针数
validation_ratio=0.15, # 验证集比例
random_state=42,
regression_params={
"objective": "regression",
"metric": "mae",
"n_estimators": 200,
"learning_rate": 0.05,
"early_stopping_round": 30,
"verbose": -1,
},
classification_params={
"objective": "binary",
"metric": "auc",
"n_estimators": 200,
"learning_rate": 0.05,
"early_stopping_round": 30,
"verbose": -1,
},
verbose=True,
)
selected_features = selector.select(
data=data,
feature_cols=feature_cols,
target_col_regression=LABEL_NAME,
date_col="trade_date",
)
# 8. 输出结果
print("\n" + "=" * 80)
print("探针筛选完成")
print("=" * 80)
print(f"\n原始特征数: {len(feature_cols)}")
print(f"筛选后特征数: {len(selected_features)}")
print(f"淘汰特征数: {len(feature_cols) - len(selected_features)}")
print(
f"淘汰比例: {(len(feature_cols) - len(selected_features)) / len(feature_cols):.1%}"
)
# 9. 保存筛选结果
output_dir = "src/experiment/probe_selection/output"
os.makedirs(output_dir, exist_ok=True)
# 保存筛选后的特征列表
output_file = os.path.join(output_dir, "selected_features.txt")
with open(output_file, "w") as f:
f.write("# 探针法筛选后的特征列表\n")
f.write(f"# 原始特征数: {len(feature_cols)}\n")
f.write(f"# 筛选后特征数: {len(selected_features)}\n")
f.write(f"# 淘汰特征数: {len(feature_cols) - len(selected_features)}\n")
f.write("\nSELECTED_FEATURES = [\n")
for feat in selected_features:
f.write(f' "{feat}",\n')
f.write("]\n")
print(f"\n[保存] 筛选结果已保存到: {output_file}")
# 10. 保存淘汰的特征
eliminated_features = list(set(feature_cols) - set(selected_features))
eliminated_file = os.path.join(output_dir, "eliminated_features.txt")
with open(eliminated_file, "w") as f:
f.write("# 被探针法淘汰的特征列表\n")
f.write(f"# 淘汰总数: {len(eliminated_features)}\n")
f.write("\nELIMINATED_FEATURES = [\n")
for feat in eliminated_features:
f.write(f' "{feat}",\n')
f.write("]\n")
print(f"[保存] 淘汰特征已保存到: {eliminated_file}")
# 11. 打印最终特征列表
print("\n" + "=" * 80)
print("最终特征列表(可直接复制到 regression.py")
print("=" * 80)
print("\nSELECTED_FACTORS = [")
for i, feat in enumerate(selected_features, 1):
print(f' "{feat}",')
print("]")
return selected_features
if __name__ == "__main__":
selected = run_probe_feature_selection()

View File

@@ -0,0 +1,5 @@
# <20><>̽<EFBFBD><EFBFBD><EBB7A8>̭<EFBFBD><CCAD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>б<EFBFBD>
# <20><>̭<EFBFBD><CCAD><EFBFBD><EFBFBD>: 0
ELIMINATED_FEATURES = [
]

View File

@@ -0,0 +1,56 @@
# ̽<>뷨ɸѡ<C9B8><D1A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>б<EFBFBD>
# ԭʼ<D4AD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 49
# ɸѡ<C9B8><D1A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 49
# <20><>̭<EFBFBD><CCAD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 0
SELECTED_FEATURES = [
"ma_5",
"ma_20",
"ma_ratio_5_20",
"bias_10",
"high_low_ratio",
"bbi_ratio",
"return_5",
"return_20",
"kaufman_ER_20",
"mom_acceleration_10_20",
"drawdown_from_high_60",
"up_days_ratio_20",
"volatility_5",
"volatility_20",
"volatility_ratio",
"std_return_20",
"sharpe_ratio_20",
"min_ret_20",
"volatility_squeeze_5_60",
"overnight_intraday_diff",
"upper_shadow_ratio",
"capital_retention_20",
"max_ret_20",
"volume_ratio_5_20",
"turnover_rate_mean_5",
"turnover_deviation",
"amihud_illiq_20",
"turnover_cv_20",
"pv_corr_20",
"close_vwap_deviation",
"roe",
"roa",
"profit_margin",
"debt_to_equity",
"current_ratio",
"net_profit_yoy",
"revenue_yoy",
"healthy_expansion_velocity",
"EP",
"BP",
"CP",
"market_cap_rank",
"turnover_rank",
"return_5_rank",
"EP_rank",
"pe_expansion_trend",
"value_price_divergence",
"active_market_cap",
"ebit_rank",
]

View File

@@ -15,7 +15,6 @@
"source": [
"import os\n",
"from datetime import datetime\n",
"from typing import List\n",
"\n",
"import polars as pl\n",
"\n",
@@ -25,7 +24,6 @@
" LightGBMModel,\n",
" STFilter,\n",
" StandardScaler,\n",
" # StockFilterConfig, # 已删除,使用 StockPoolManager + filter_func 替代\n",
" StockPoolManager,\n",
" Trainer,\n",
" Winsorizer,\n",
@@ -33,87 +31,27 @@
" check_data_quality,\n",
")\n",
"from src.training.config import TrainingConfig\n",
"\n"
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## 2. 定义辅助函数"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"def register_factors(\n",
" engine: FactorEngine,\n",
" selected_factors: List[str],\n",
" factor_definitions: dict,\n",
" label_factor: dict,\n",
") -> List[str]:\n",
" \"\"\"注册因子selected_factors 从 metadata 查询factor_definitions 用 DSL 表达式注册)\"\"\"\n",
" print(\"=\" * 80)\n",
" print(\"注册因子\")\n",
" print(\"=\" * 80)\n",
"\n",
" # 注册 SELECTED_FACTORS 中的因子(已在 metadata 中)\n",
" print(\"\\n注册特征因子从 metadata:\")\n",
" for name in selected_factors:\n",
" engine.add_factor(name)\n",
" print(f\" - {name}\")\n",
"\n",
" # 注册 FACTOR_DEFINITIONS 中的因子(通过表达式,尚未在 metadata 中)\n",
" print(\"\\n注册特征因子表达式:\")\n",
" for name, expr in factor_definitions.items():\n",
" engine.add_factor(name, expr)\n",
" print(f\" - {name}: {expr}\")\n",
"\n",
" # 注册 label 因子(通过表达式)\n",
" print(\"\\n注册 Label 因子(表达式):\")\n",
" for name, expr in label_factor.items():\n",
" engine.add_factor(name, expr)\n",
" print(f\" - {name}: {expr}\")\n",
"\n",
" # 特征列 = SELECTED_FACTORS + FACTOR_DEFINITIONS 的 keys\n",
" feature_cols = selected_factors + list(factor_definitions.keys())\n",
"\n",
" print(f\"\\n特征因子数: {len(feature_cols)}\")\n",
" print(f\" - 来自 metadata: {len(selected_factors)}\")\n",
" print(f\" - 来自表达式: {len(factor_definitions)}\")\n",
" print(f\"Label: {list(label_factor.keys())[0]}\")\n",
" print(f\"已注册因子总数: {len(engine.list_registered())}\")\n",
"\n",
" return feature_cols\n",
"\n",
"\n",
"def prepare_data(\n",
" engine: FactorEngine,\n",
" feature_cols: List[str],\n",
" start_date: str,\n",
" end_date: str,\n",
") -> pl.DataFrame:\n",
" print(\"\\n\" + \"=\" * 80)\n",
" print(\"准备数据\")\n",
" print(\"=\" * 80)\n",
"\n",
" # 计算因子(全市场数据)\n",
" print(f\"\\n计算因子: {start_date} - {end_date}\")\n",
" factor_names = feature_cols + [LABEL_NAME] # 包含 label\n",
"\n",
" data = engine.compute(\n",
" factor_names=factor_names,\n",
" start_date=start_date,\n",
" end_date=end_date,\n",
" )\n",
"\n",
" print(f\"数据形状: {data.shape}\")\n",
" print(f\"数据列: {data.columns}\")\n",
" print(f\"\\n前5行预览:\")\n",
" print(data.head())\n",
"\n",
" return data\n",
"# 从 common 模块导入共用配置和函数\n",
"from src.experiment.common import (\n",
" SELECTED_FACTORS,\n",
" FACTOR_DEFINITIONS,\n",
" get_label_factor,\n",
" register_factors,\n",
" prepare_data,\n",
" TRAIN_START,\n",
" TRAIN_END,\n",
" VAL_START,\n",
" VAL_END,\n",
" TEST_START,\n",
" TEST_END,\n",
" stock_pool_filter,\n",
" STOCK_FILTER_REQUIRED_COLUMNS,\n",
" OUTPUT_DIR,\n",
" SAVE_PREDICTIONS,\n",
" PERSIST_MODEL,\n",
" TOP_N,\n",
")\n",
"\n"
]
},
@@ -121,9 +59,9 @@
"metadata": {},
"cell_type": "markdown",
"source": [
"## 3. 配置参数\n",
"## 2. 配置参数\n",
"#\n",
"### 3.1 因子定义"
"### 2.1 标签定义"
]
},
{
@@ -132,177 +70,11 @@
"outputs": [],
"execution_count": null,
"source": [
"# 特征因子定义字典:新增因子只需在此处添加一行\n",
"# Label 名称(回归任务使用连续收益率)\n",
"LABEL_NAME = \"future_return_5\"\n",
"\n",
"# 当前选择的因子列表(从 FACTOR_DEFINITIONS 中选择要使用的因子)\n",
"SELECTED_FACTORS = [\n",
" # ================= 1. 价格、趋势与路径依赖 =================\n",
" \"ma_5\",\n",
" \"ma_20\",\n",
" \"ma_ratio_5_20\",\n",
" \"bias_10\",\n",
" \"high_low_ratio\",\n",
" \"bbi_ratio\",\n",
" \"return_5\",\n",
" \"return_20\",\n",
" \"kaufman_ER_20\",\n",
" \"mom_acceleration_10_20\",\n",
" \"drawdown_from_high_60\",\n",
" \"up_days_ratio_20\",\n",
" # ================= 2. 波动率、风险调整与高阶矩 =================\n",
" \"volatility_5\",\n",
" \"volatility_20\",\n",
" \"volatility_ratio\",\n",
" \"std_return_20\",\n",
" \"sharpe_ratio_20\",\n",
" \"min_ret_20\",\n",
" \"volatility_squeeze_5_60\",\n",
" # ================= 3. 日内微观结构与异象 =================\n",
" \"overnight_intraday_diff\",\n",
" \"upper_shadow_ratio\",\n",
" \"capital_retention_20\",\n",
" \"max_ret_20\",\n",
" # ================= 4. 量能、流动性与量价背离 =================\n",
" \"volume_ratio_5_20\",\n",
" \"turnover_rate_mean_5\",\n",
" \"turnover_deviation\",\n",
" \"amihud_illiq_20\",\n",
" \"turnover_cv_20\",\n",
" \"pv_corr_20\",\n",
" \"close_vwap_deviation\",\n",
" # ================= 5. 基本面财务特征 =================\n",
" \"roe\",\n",
" \"roa\",\n",
" \"profit_margin\",\n",
" \"debt_to_equity\",\n",
" \"current_ratio\",\n",
" \"net_profit_yoy\",\n",
" \"revenue_yoy\",\n",
" \"healthy_expansion_velocity\",\n",
" # ================= 6. 基本面估值与截面动量共振 =================\n",
" \"EP\",\n",
" \"BP\",\n",
" \"CP\",\n",
" \"market_cap_rank\",\n",
" \"turnover_rank\",\n",
" \"return_5_rank\",\n",
" \"EP_rank\",\n",
" \"pe_expansion_trend\",\n",
" \"value_price_divergence\",\n",
" \"active_market_cap\",\n",
" \"ebit_rank\",\n",
"]\n",
"\n",
"# 因子定义字典(完整因子库)\n",
"FACTOR_DEFINITIONS = {\n",
" # ================= 1. 价格、趋势与路径依赖 (Trend, Momentum & Path Dependency) =================\n",
" \"ma_5\": \"ts_mean(close, 5)\",\n",
" \"ma_20\": \"ts_mean(close, 20)\",\n",
" \"ma_ratio_5_20\": \"ts_mean(close, 5) / (ts_mean(close, 20) + 1e-8) - 1\", # 均线发散度\n",
" \"bias_10\": \"close / (ts_mean(close, 10) + 1e-8) - 1\", # 10日乖离率\n",
" \"high_low_ratio\": \"(close - ts_min(low, 20)) / (ts_max(high, 20) - ts_min(low, 20) + 1e-8)\", # 威廉指标变形\n",
" \"bbi_ratio\": \"(ts_mean(close, 3) + ts_mean(close, 6) + ts_mean(close, 12) + ts_mean(close, 24)) / (4 * close + 1e-8)\", # 多空指标比率\n",
" \"return_5\": \"(close / (ts_delay(close, 5) + 1e-8)) - 1\", # 5日动量\n",
" \"return_20\": \"(close / (ts_delay(close, 20) + 1e-8)) - 1\", # 20日动量\n",
" # [高阶] Kaufman 趋势效率 (极高价值) - 衡量趋势流畅度,剔除无序震荡\n",
" \"kaufman_ER_20\": \"abs(close - ts_delay(close, 20)) / (ts_sum(abs(close - ts_delay(close, 1)), 20) + 1e-8)\",\n",
" # [高阶] 动量加速度 - 寻找二阶导数大于0正在加速爆发的股票\n",
" \"mom_acceleration_10_20\": \"(close / (ts_delay(close, 10) + 1e-8) - 1) - (ts_delay(close, 10) / (ts_delay(close, 20) + 1e-8) - 1)\",\n",
" # [高阶] 高点距离衰减 - 衡量套牢盘压力\n",
" \"drawdown_from_high_60\": \"close / (ts_max(high, 60) + 1e-8) - 1\",\n",
" # [高阶] 趋势一致性 - 过去20天内收红的天数比例\n",
" \"up_days_ratio_20\": \"ts_sum(close > ts_delay(close, 1), 20) / 20\",\n",
" # ================= 2. 波动率、风险调整与高阶矩 (Volatility & Risk-Adjusted Returns) =================\n",
" \"volatility_5\": \"ts_std(close, 5)\",\n",
" \"volatility_20\": \"ts_std(close, 20)\",\n",
" \"volatility_ratio\": \"ts_std(close, 5) / (ts_std(close, 20) + 1e-8)\", # 波动率期限结构\n",
" \"std_return_20\": \"ts_std((close / (ts_delay(close, 1) + 1e-8)) - 1, 20)\", # 真实收益率波动率\n",
" # [高阶] 夏普趋势比率 - 惩罚暴涨暴跌,奖励稳健爬坡\n",
" \"sharpe_ratio_20\": \"ts_mean(close / (ts_delay(close, 1) + 1e-8) - 1, 20) / (ts_std(close / (ts_delay(close, 1) + 1e-8) - 1, 20) + 1e-8)\",\n",
" # [高阶] 尾部崩盘风险 - 过去一个月最大单日跌幅\n",
" \"min_ret_20\": \"ts_min(close / (ts_delay(close, 1) + 1e-8) - 1, 20)\",\n",
" # [高阶] 波动率挤压比 - 寻找盘整到极致面临变盘的股票 (布林带收口)\n",
" \"volatility_squeeze_5_60\": \"ts_std(close, 5) / (ts_std(close, 60) + 1e-8)\",\n",
" # ================= 3. 日内微观结构与异象 (Intraday Microstructure & Anomalies) =================\n",
" # [高阶] 隔夜与日内背离 - 差值越小说明主力越喜欢在盘中吸筹\n",
" \"overnight_intraday_diff\": \"(open / (ts_delay(close, 1) + 1e-8) - 1) - (close / (open + 1e-8) - 1)\",\n",
" # [高阶] 上影线抛压极值 - 冲高回落被套牢的概率\n",
" \"upper_shadow_ratio\": \"(high - ((open + close + abs(open - close)) / 2)) / (high - low + 1e-8)\",\n",
" # [高阶] 资金沉淀率 - 衡量主力日内高抛低吸洗盘的剧烈程度\n",
" \"capital_retention_20\": \"ts_sum(abs(close - open), 20) / (ts_sum(high - low, 20) + 1e-8)\",\n",
" # [高阶] MAX 彩票效应 - 反转因子,剔除近期有过妖股连板特征的标的\n",
" \"max_ret_20\": \"ts_max(close / (ts_delay(close, 1) + 1e-8) - 1, 20)\",\n",
" # ================= 4. 量能、流动性与量价背离 (Volume, Liquidity & Divergence) =================\n",
" \"volume_ratio_5_20\": \"ts_mean(vol, 5) / (ts_mean(vol, 20) + 1e-8)\", # 相对放量比\n",
" \"turnover_rate_mean_5\": \"ts_mean(turnover_rate, 5)\", # 活跃度\n",
" \"turnover_deviation\": \"(turnover_rate - ts_mean(turnover_rate, 10)) / (ts_std(turnover_rate, 10) + 1e-8)\", # 换手率偏离度\n",
" # [高阶] Amihud 非流动性异象 (绝对核心) - 衡量砸盘/拉升的摩擦成本\n",
" \"amihud_illiq_20\": \"ts_mean(abs(close / (ts_delay(close, 1) + 1e-8) - 1) / (amount + 1e-8), 20)\",\n",
" # [高阶] 换手率惩罚因子 - 换手率忽高忽低说明游资接力,行情极不稳定\n",
" \"turnover_cv_20\": \"ts_std(turnover_rate, 20) / (ts_mean(turnover_rate, 20) + 1e-8)\",\n",
" # [高阶] 纯粹量价相关性 - 检验是否是\"放量上涨,缩量下跌\"的良性多头\n",
" \"pv_corr_20\": \"ts_corr(close / (ts_delay(close, 1) + 1e-8) - 1, vol, 20)\",\n",
" # [高阶] 收盘价与均价背离 - 专门抓尾盘突袭拉升骗线的股票\n",
" \"close_vwap_deviation\": \"close / (amount / (vol * 100 + 1e-8) + 1e-8) - 1\",\n",
" # ================= 5. 基本面财务特征 (Fundamental Quality & Structure) =================\n",
" \"roe\": \"n_income / (total_hldr_eqy_exc_min_int + 1e-8)\", # 净资产收益率\n",
" \"roa\": \"n_income / (total_assets + 1e-8)\", # 总资产收益率\n",
" \"profit_margin\": \"n_income / (revenue + 1e-8)\", # 销售净利率\n",
" \"debt_to_equity\": \"total_liab / (total_hldr_eqy_exc_min_int + 1e-8)\", # 杠杆率\n",
" \"current_ratio\": \"total_cur_assets / (total_cur_liab + 1e-8)\", # 短期偿债安全垫\n",
" # [高阶] 利润同比增速 (日频延后252天等于去年同期)\n",
" \"net_profit_yoy\": \"(n_income / (ts_delay(n_income, 252) + 1e-8)) - 1\",\n",
" # [高阶] 营收同比增速\n",
" \"revenue_yoy\": \"(revenue / (ts_delay(revenue, 252) + 1e-8)) - 1\",\n",
" # [高阶] 资产负债表扩张斜率 - 剔除单纯靠举债扩张的公司\n",
" \"healthy_expansion_velocity\": \"(total_assets / (ts_delay(total_assets, 252) + 1e-8) - 1) - (total_liab / (ts_delay(total_liab, 252) + 1e-8) - 1)\",\n",
" # ================= 6. 基本面估值与截面动量共振 (Valuation & Cross-Sectional Ranking) =================\n",
" # 估值水平绝对值 (Tushare 市值单位需要 * 10000 转换为元)\n",
" \"EP\": \"n_income / (total_mv * 10000 + 1e-8)\", # 盈利收益率 (1/PE)\n",
" \"BP\": \"total_hldr_eqy_exc_min_int / (total_mv * 10000 + 1e-8)\", # 账面市值比 (1/PB)\n",
" \"CP\": \"n_cashflow_act / (total_mv * 10000 + 1e-8)\", # 经营现金流收益率 (1/PCF)\n",
" # 全市场截面排名因子\n",
" \"market_cap_rank\": \"cs_rank(total_mv)\", # 规模因子 (Size)\n",
" \"turnover_rank\": \"cs_rank(turnover_rate)\",\n",
" \"return_5_rank\": \"cs_rank((close / (ts_delay(close, 5) + 1e-8)) - 1)\",\n",
" \"EP_rank\": \"cs_rank(n_income / (total_mv + 1e-8))\", # 谁最便宜\n",
" # [高阶] 戴维斯双击动量 - 估值相对上一年是否在扩张\n",
" \"pe_expansion_trend\": \"(total_mv / (n_income + 1e-8)) / (ts_delay(total_mv, 60) / (ts_delay(n_income, 60) + 1e-8) + 1e-8) - 1\",\n",
" # [高阶] 业绩与价格背离度 - 截面做差利润排名全市场第一但近20日价格排名倒数第一捕捉被错杀的潜伏股\n",
" \"value_price_divergence\": \"cs_rank((n_income - ts_delay(n_income, 252)) / (abs(ts_delay(n_income, 252)) + 1e-8)) - cs_rank(close / (ts_delay(close, 20) + 1e-8))\",\n",
" # [高阶] 流动性溢价调整后市值 - 识别僵尸大盘股和极度活跃的小微盘\n",
" \"active_market_cap\": \"total_mv * ts_mean(turnover_rate, 20)\",\n",
" \"ebit_rank\": \"cs_rank(ebit)\",\n",
"}\n",
"\n",
"# Label 因子定义(不参与训练,用于计算目标)\n",
"LABEL_FACTOR = {\n",
" LABEL_NAME: \"(ts_delay(close, -5) / ts_delay(open, -1)) - 1\", # 未来5日收益率\n",
"}"
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### 3.2 训练参数配置"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 日期范围配置(正确的 train/val/test 三分法)\n",
"# Train: 用于训练模型参数\n",
"# Val: 用于验证/早停/调参(位于 train 之后test 之前)\n",
"# Test: 仅用于最终评估,完全独立于训练过程\n",
"TRAIN_START = \"20200101\"\n",
"TRAIN_END = \"20231231\"\n",
"VAL_START = \"20240101\"\n",
"VAL_END = \"20241231\"\n",
"TEST_START = \"20250101\"\n",
"TEST_END = \"20261231\"\n",
"# 获取 Label 因子定义\n",
"LABEL_FACTOR = get_label_factor(LABEL_NAME)\n",
"\n",
"# 模型参数配置\n",
"MODEL_PARAMS = {\n",
@@ -326,60 +98,7 @@
" # 数值稳定性\n",
" \"verbose\": -1,\n",
" \"random_state\": 42,\n",
"}\n",
"\n",
"\n",
"# 股票池筛选函数\n",
"# 使用新的 StockPoolManager API传入自定义筛选函数和所需列/因子\n",
"# 筛选函数接收单日 DataFrame返回布尔 Series\n",
"#\n",
"# 筛选逻辑(针对单日数据):\n",
"# 1. 先排除创业板、科创板、北交所ST过滤由STFilter组件处理\n",
"# 2. 然后选取市值最小的500只股票\n",
"def stock_pool_filter(df: pl.DataFrame) -> pl.Series:\n",
" \"\"\"股票池筛选函数(单日数据)\n",
"\n",
" 筛选条件:\n",
" 1. 排除创业板(代码以 300 开头)\n",
" 2. 排除科创板(代码以 688 开头)\n",
" 3. 排除北交所(代码以 8、9 或 4 开头)\n",
" 4. 选取当日市值最小的500只股票\n",
" \"\"\"\n",
" # 代码筛选(排除创业板、科创板、北交所)\n",
" code_filter = (\n",
" ~df[\"ts_code\"].str.starts_with(\"30\") # 排除创业板\n",
" & ~df[\"ts_code\"].str.starts_with(\"68\") # 排除科创板\n",
" & ~df[\"ts_code\"].str.starts_with(\"8\") # 排除北交所\n",
" & ~df[\"ts_code\"].str.starts_with(\"9\") # 排除北交所\n",
" & ~df[\"ts_code\"].str.starts_with(\"4\") # 排除北交所\n",
" )\n",
"\n",
" # 在已筛选的股票中选取市值最小的500只\n",
" # 按市值升序排序取前500\n",
" valid_df = df.filter(code_filter)\n",
" n = min(1000, len(valid_df))\n",
" small_cap_codes = valid_df.sort(\"total_mv\").head(n)[\"ts_code\"]\n",
"\n",
" # 返回布尔 Series是否在被选中的股票中\n",
" return df[\"ts_code\"].is_in(small_cap_codes)\n",
"\n",
"\n",
"# 定义筛选所需的基础列\n",
"STOCK_FILTER_REQUIRED_COLUMNS = [\"total_mv\"] # ST过滤由STFilter组件处理\n",
"\n",
"# 可选:定义筛选所需的因子(如果需要用因子进行筛选)\n",
"# STOCK_FILTER_REQUIRED_FACTORS = {\n",
"# \"market_cap_rank\": \"cs_rank(total_mv)\",\n",
"# }\n",
"\n",
"\n",
"# 输出配置(相对于本文件所在目录)\n",
"OUTPUT_DIR = \"output\"\n",
"SAVE_PREDICTIONS = True\n",
"PERSIST_MODEL = False\n",
"\n",
"# Top N 配置:每日推荐股票数量\n",
"TOP_N = 5 # 可调整为 10, 20 等"
"}"
]
},
{
@@ -420,6 +139,7 @@
" feature_cols=feature_cols,\n",
" start_date=TRAIN_START,\n",
" end_date=TEST_END,\n",
" label_name=LABEL_NAME,\n",
")\n",
"\n",
"# 4. 打印配置信息\n",
@@ -515,8 +235,6 @@
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 步骤 2: 划分训练/验证/测试集(正确的三分法)\n",
"print(\"\\n[步骤 2/6] 划分训练集、验证集和测试集\")\n",
@@ -550,7 +268,9 @@
" train_data = filtered_data\n",
" test_data = filtered_data\n",
" print(\" 未配置划分器,全部作为训练集\")"
]
],
"outputs": [],
"execution_count": null
},
{
"metadata": {},
@@ -579,8 +299,6 @@
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 步骤 4: 训练集数据处理\n",
"print(\"\\n[步骤 4/7] 训练集数据处理\")\n",
@@ -608,7 +326,9 @@
" null_count = train_data[col].null_count()\n",
" if null_count > 0:\n",
" print(f\" {col}: {null_count} ({null_count / len(train_data) * 100:.2f}%)\")"
]
],
"outputs": [],
"execution_count": null
},
{
"metadata": {},
@@ -828,8 +548,6 @@
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"print(\"\\n\" + \"=\" * 80)\n",
"print(\"训练结果\")\n",
@@ -855,7 +573,9 @@
"sample_data = results.filter(results[\"trade_date\"] == sample_date).head(10)\n",
"print(f\"\\n示例日期 {sample_date} 的前10条预测:\")\n",
"print(sample_data.select([\"ts_code\", \"trade_date\", target_col, \"prediction\"]))"
]
],
"outputs": [],
"execution_count": null
},
{
"metadata": {},
@@ -978,6 +698,61 @@
"- 可以帮助理解哪些因子最有效"
]
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"print(\"绘制特征重要性...\")\n",
"\n",
"fig, ax = plt.subplots(figsize=(10, 8))\n",
"lgb.plot_importance(\n",
" booster,\n",
" max_num_features=20,\n",
" importance_type=\"gain\",\n",
" title=\"Feature Importance (Gain)\",\n",
" ax=ax,\n",
")\n",
"ax.set_xlabel(\"Importance (Gain)\")\n",
"plt.tight_layout()\n",
"plt.show()\n",
"\n",
"# 打印重要性排名\n",
"importance_gain = pd.Series(\n",
" booster.feature_importance(importance_type=\"gain\"), index=feature_cols\n",
").sort_values(ascending=False)\n",
"\n",
"print(\"\\n[特征重要性排名 - Gain]\")\n",
"print(importance_gain)\n",
"\n",
"# 识别低重要性特征\n",
"zero_importance = importance_gain[importance_gain == 0].index.tolist()\n",
"if zero_importance:\n",
" print(f\"\\n[低重要性特征] 以下{len(zero_importance)}个特征重要性为0可考虑删除:\")\n",
" for feat in zero_importance:\n",
" print(f\" - {feat}\")\n",
"else:\n",
" print(\"\\n所有特征都有一定重要性\")\n"
]
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"# 导入可视化库\n",
"import matplotlib.pyplot as plt\n",
"import lightgbm as lgb\n",
"import pandas as pd\n",
"\n",
"# 从封装的model中取出底层Booster\n",
"booster = model.model\n",
"print(f\"模型类型: {type(booster)}\")\n",
"print(f\"特征数量: {len(feature_cols)}\")"
]
},
{
"metadata": {},
"cell_type": "code",

View File

@@ -3,7 +3,6 @@
# %%
import os
from datetime import datetime
from typing import List
import polars as pl
@@ -13,7 +12,6 @@ from src.training import (
LightGBMModel,
STFilter,
StandardScaler,
# StockFilterConfig, # 已删除,使用 StockPoolManager + filter_func 替代
StockPoolManager,
Trainer,
Winsorizer,
@@ -22,245 +20,38 @@ from src.training import (
)
from src.training.config import TrainingConfig
# %% md
# ## 2. 定义辅助函数
# %%
def register_factors(
engine: FactorEngine,
selected_factors: List[str],
factor_definitions: dict,
label_factor: dict,
) -> List[str]:
"""注册因子selected_factors 从 metadata 查询factor_definitions 用 DSL 表达式注册)"""
print("=" * 80)
print("注册因子")
print("=" * 80)
# 注册 SELECTED_FACTORS 中的因子(已在 metadata 中)
print("\n注册特征因子(从 metadata:")
for name in selected_factors:
engine.add_factor(name)
print(f" - {name}")
# 注册 FACTOR_DEFINITIONS 中的因子(通过表达式,尚未在 metadata 中)
print("\n注册特征因子(表达式):")
for name, expr in factor_definitions.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 注册 label 因子(通过表达式)
print("\n注册 Label 因子(表达式):")
for name, expr in label_factor.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 特征列 = SELECTED_FACTORS + FACTOR_DEFINITIONS 的 keys
feature_cols = selected_factors + list(factor_definitions.keys())
print(f"\n特征因子数: {len(feature_cols)}")
print(f" - 来自 metadata: {len(selected_factors)}")
print(f" - 来自表达式: {len(factor_definitions)}")
print(f"Label: {list(label_factor.keys())[0]}")
print(f"已注册因子总数: {len(engine.list_registered())}")
return feature_cols
def prepare_data(
engine: FactorEngine,
feature_cols: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
print("\n" + "=" * 80)
print("准备数据")
print("=" * 80)
# 计算因子(全市场数据)
print(f"\n计算因子: {start_date} - {end_date}")
factor_names = feature_cols + [LABEL_NAME] # 包含 label
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
print(f"数据形状: {data.shape}")
print(f"数据列: {data.columns}")
print(f"\n前5行预览:")
print(data.head())
return data
# 从 common 模块导入共用配置和函数
from src.experiment.common import (
SELECTED_FACTORS,
FACTOR_DEFINITIONS,
get_label_factor,
register_factors,
prepare_data,
TRAIN_START,
TRAIN_END,
VAL_START,
VAL_END,
TEST_START,
TEST_END,
stock_pool_filter,
STOCK_FILTER_REQUIRED_COLUMNS,
OUTPUT_DIR,
SAVE_PREDICTIONS,
PERSIST_MODEL,
TOP_N,
)
# %% md
# ## 3. 配置参数
# ## 2. 配置参数
#
# ### 3.1 因子定义
# ### 2.1 标签定义
# %%
# 特征因子定义字典:新增因子只需在此处添加一行
# Label 名称(回归任务使用连续收益率)
LABEL_NAME = "future_return_5"
# 当前选择的因子列表(从 FACTOR_DEFINITIONS 中选择要使用的因子)
SELECTED_FACTORS = [
# ================= 1. 价格、趋势与路径依赖 =================
"ma_5",
"ma_20",
"ma_ratio_5_20",
"bias_10",
"high_low_ratio",
"bbi_ratio",
"return_5",
"return_20",
"kaufman_ER_20",
"mom_acceleration_10_20",
"drawdown_from_high_60",
"up_days_ratio_20",
# ================= 2. 波动率、风险调整与高阶矩 =================
"volatility_5",
"volatility_20",
"volatility_ratio",
"std_return_20",
"sharpe_ratio_20",
"min_ret_20",
"volatility_squeeze_5_60",
# ================= 3. 日内微观结构与异象 =================
"overnight_intraday_diff",
"upper_shadow_ratio",
"capital_retention_20",
"max_ret_20",
# ================= 4. 量能、流动性与量价背离 =================
"volume_ratio_5_20",
"turnover_rate_mean_5",
"turnover_deviation",
"amihud_illiq_20",
"turnover_cv_20",
"pv_corr_20",
"close_vwap_deviation",
# ================= 5. 基本面财务特征 =================
"roe",
"roa",
"profit_margin",
"debt_to_equity",
"current_ratio",
"net_profit_yoy",
"revenue_yoy",
"healthy_expansion_velocity",
# ================= 6. 基本面估值与截面动量共振 =================
"EP",
"BP",
"CP",
"market_cap_rank",
"turnover_rank",
"return_5_rank",
"EP_rank",
"pe_expansion_trend",
"value_price_divergence",
"active_market_cap",
"ebit_rank",
]
# 因子定义字典(完整因子库)
FACTOR_DEFINITIONS = {
# ================= 1. 价格、趋势与路径依赖 (Trend, Momentum & Path Dependency) =================
"ma_5": "ts_mean(close, 5)",
"ma_20": "ts_mean(close, 20)",
"ma_ratio_5_20": "ts_mean(close, 5) / (ts_mean(close, 20) + 1e-8) - 1", # 均线发散度
"bias_10": "close / (ts_mean(close, 10) + 1e-8) - 1", # 10日乖离率
"high_low_ratio": "(close - ts_min(low, 20)) / (ts_max(high, 20) - ts_min(low, 20) + 1e-8)", # 威廉指标变形
"bbi_ratio": "(ts_mean(close, 3) + ts_mean(close, 6) + ts_mean(close, 12) + ts_mean(close, 24)) / (4 * close + 1e-8)", # 多空指标比率
"return_5": "(close / (ts_delay(close, 5) + 1e-8)) - 1", # 5日动量
"return_20": "(close / (ts_delay(close, 20) + 1e-8)) - 1", # 20日动量
# [高阶] Kaufman 趋势效率 (极高价值) - 衡量趋势流畅度,剔除无序震荡
"kaufman_ER_20": "abs(close - ts_delay(close, 20)) / (ts_sum(abs(close - ts_delay(close, 1)), 20) + 1e-8)",
# [高阶] 动量加速度 - 寻找二阶导数大于0正在加速爆发的股票
"mom_acceleration_10_20": "(close / (ts_delay(close, 10) + 1e-8) - 1) - (ts_delay(close, 10) / (ts_delay(close, 20) + 1e-8) - 1)",
# [高阶] 高点距离衰减 - 衡量套牢盘压力
"drawdown_from_high_60": "close / (ts_max(high, 60) + 1e-8) - 1",
# [高阶] 趋势一致性 - 过去20天内收红的天数比例
"up_days_ratio_20": "ts_sum(close > ts_delay(close, 1), 20) / 20",
# ================= 2. 波动率、风险调整与高阶矩 (Volatility & Risk-Adjusted Returns) =================
"volatility_5": "ts_std(close, 5)",
"volatility_20": "ts_std(close, 20)",
"volatility_ratio": "ts_std(close, 5) / (ts_std(close, 20) + 1e-8)", # 波动率期限结构
"std_return_20": "ts_std((close / (ts_delay(close, 1) + 1e-8)) - 1, 20)", # 真实收益率波动率
# [高阶] 夏普趋势比率 - 惩罚暴涨暴跌,奖励稳健爬坡
"sharpe_ratio_20": "ts_mean(close / (ts_delay(close, 1) + 1e-8) - 1, 20) / (ts_std(close / (ts_delay(close, 1) + 1e-8) - 1, 20) + 1e-8)",
# [高阶] 尾部崩盘风险 - 过去一个月最大单日跌幅
"min_ret_20": "ts_min(close / (ts_delay(close, 1) + 1e-8) - 1, 20)",
# [高阶] 波动率挤压比 - 寻找盘整到极致面临变盘的股票 (布林带收口)
"volatility_squeeze_5_60": "ts_std(close, 5) / (ts_std(close, 60) + 1e-8)",
# ================= 3. 日内微观结构与异象 (Intraday Microstructure & Anomalies) =================
# [高阶] 隔夜与日内背离 - 差值越小说明主力越喜欢在盘中吸筹
"overnight_intraday_diff": "(open / (ts_delay(close, 1) + 1e-8) - 1) - (close / (open + 1e-8) - 1)",
# [高阶] 上影线抛压极值 - 冲高回落被套牢的概率
"upper_shadow_ratio": "(high - ((open + close + abs(open - close)) / 2)) / (high - low + 1e-8)",
# [高阶] 资金沉淀率 - 衡量主力日内高抛低吸洗盘的剧烈程度
"capital_retention_20": "ts_sum(abs(close - open), 20) / (ts_sum(high - low, 20) + 1e-8)",
# [高阶] MAX 彩票效应 - 反转因子,剔除近期有过妖股连板特征的标的
"max_ret_20": "ts_max(close / (ts_delay(close, 1) + 1e-8) - 1, 20)",
# ================= 4. 量能、流动性与量价背离 (Volume, Liquidity & Divergence) =================
"volume_ratio_5_20": "ts_mean(vol, 5) / (ts_mean(vol, 20) + 1e-8)", # 相对放量比
"turnover_rate_mean_5": "ts_mean(turnover_rate, 5)", # 活跃度
"turnover_deviation": "(turnover_rate - ts_mean(turnover_rate, 10)) / (ts_std(turnover_rate, 10) + 1e-8)", # 换手率偏离度
# [高阶] Amihud 非流动性异象 (绝对核心) - 衡量砸盘/拉升的摩擦成本
"amihud_illiq_20": "ts_mean(abs(close / (ts_delay(close, 1) + 1e-8) - 1) / (amount + 1e-8), 20)",
# [高阶] 换手率惩罚因子 - 换手率忽高忽低说明游资接力,行情极不稳定
"turnover_cv_20": "ts_std(turnover_rate, 20) / (ts_mean(turnover_rate, 20) + 1e-8)",
# [高阶] 纯粹量价相关性 - 检验是否是"放量上涨,缩量下跌"的良性多头
"pv_corr_20": "ts_corr(close / (ts_delay(close, 1) + 1e-8) - 1, vol, 20)",
# [高阶] 收盘价与均价背离 - 专门抓尾盘突袭拉升骗线的股票
"close_vwap_deviation": "close / (amount / (vol * 100 + 1e-8) + 1e-8) - 1",
# ================= 5. 基本面财务特征 (Fundamental Quality & Structure) =================
"roe": "n_income / (total_hldr_eqy_exc_min_int + 1e-8)", # 净资产收益率
"roa": "n_income / (total_assets + 1e-8)", # 总资产收益率
"profit_margin": "n_income / (revenue + 1e-8)", # 销售净利率
"debt_to_equity": "total_liab / (total_hldr_eqy_exc_min_int + 1e-8)", # 杠杆率
"current_ratio": "total_cur_assets / (total_cur_liab + 1e-8)", # 短期偿债安全垫
# [高阶] 利润同比增速 (日频延后252天等于去年同期)
"net_profit_yoy": "(n_income / (ts_delay(n_income, 252) + 1e-8)) - 1",
# [高阶] 营收同比增速
"revenue_yoy": "(revenue / (ts_delay(revenue, 252) + 1e-8)) - 1",
# [高阶] 资产负债表扩张斜率 - 剔除单纯靠举债扩张的公司
"healthy_expansion_velocity": "(total_assets / (ts_delay(total_assets, 252) + 1e-8) - 1) - (total_liab / (ts_delay(total_liab, 252) + 1e-8) - 1)",
# ================= 6. 基本面估值与截面动量共振 (Valuation & Cross-Sectional Ranking) =================
# 估值水平绝对值 (Tushare 市值单位需要 * 10000 转换为元)
"EP": "n_income / (total_mv * 10000 + 1e-8)", # 盈利收益率 (1/PE)
"BP": "total_hldr_eqy_exc_min_int / (total_mv * 10000 + 1e-8)", # 账面市值比 (1/PB)
"CP": "n_cashflow_act / (total_mv * 10000 + 1e-8)", # 经营现金流收益率 (1/PCF)
# 全市场截面排名因子
"market_cap_rank": "cs_rank(total_mv)", # 规模因子 (Size)
"turnover_rank": "cs_rank(turnover_rate)",
"return_5_rank": "cs_rank((close / (ts_delay(close, 5) + 1e-8)) - 1)",
"EP_rank": "cs_rank(n_income / (total_mv + 1e-8))", # 谁最便宜
# [高阶] 戴维斯双击动量 - 估值相对上一年是否在扩张
"pe_expansion_trend": "(total_mv / (n_income + 1e-8)) / (ts_delay(total_mv, 60) / (ts_delay(n_income, 60) + 1e-8) + 1e-8) - 1",
# [高阶] 业绩与价格背离度 - 截面做差利润排名全市场第一但近20日价格排名倒数第一捕捉被错杀的潜伏股
"value_price_divergence": "cs_rank((n_income - ts_delay(n_income, 252)) / (abs(ts_delay(n_income, 252)) + 1e-8)) - cs_rank(close / (ts_delay(close, 20) + 1e-8))",
# [高阶] 流动性溢价调整后市值 - 识别僵尸大盘股和极度活跃的小微盘
"active_market_cap": "total_mv * ts_mean(turnover_rate, 20)",
"ebit_rank": "cs_rank(ebit)",
}
# Label 因子定义(不参与训练,用于计算目标)
LABEL_FACTOR = {
LABEL_NAME: "(ts_delay(close, -5) / ts_delay(open, -1)) - 1", # 未来5日收益率
}
# %% md
# ### 3.2 训练参数配置
# %%
# 日期范围配置(正确的 train/val/test 三分法)
# Train: 用于训练模型参数
# Val: 用于验证/早停/调参(位于 train 之后test 之前)
# Test: 仅用于最终评估,完全独立于训练过程
TRAIN_START = "20200101"
TRAIN_END = "20231231"
VAL_START = "20240101"
VAL_END = "20241231"
TEST_START = "20250101"
TEST_END = "20261231"
# 获取 Label 因子定义
LABEL_FACTOR = get_label_factor(LABEL_NAME)
# 模型参数配置
MODEL_PARAMS = {
@@ -285,59 +76,6 @@ MODEL_PARAMS = {
"verbose": -1,
"random_state": 42,
}
# 股票池筛选函数
# 使用新的 StockPoolManager API传入自定义筛选函数和所需列/因子
# 筛选函数接收单日 DataFrame返回布尔 Series
#
# 筛选逻辑(针对单日数据):
# 1. 先排除创业板、科创板、北交所ST过滤由STFilter组件处理
# 2. 然后选取市值最小的500只股票
def stock_pool_filter(df: pl.DataFrame) -> pl.Series:
"""股票池筛选函数(单日数据)
筛选条件:
1. 排除创业板(代码以 300 开头)
2. 排除科创板(代码以 688 开头)
3. 排除北交所(代码以 8、9 或 4 开头)
4. 选取当日市值最小的500只股票
"""
# 代码筛选(排除创业板、科创板、北交所)
code_filter = (
~df["ts_code"].str.starts_with("30") # 排除创业板
& ~df["ts_code"].str.starts_with("68") # 排除科创板
& ~df["ts_code"].str.starts_with("8") # 排除北交所
& ~df["ts_code"].str.starts_with("9") # 排除北交所
& ~df["ts_code"].str.starts_with("4") # 排除北交所
)
# 在已筛选的股票中选取市值最小的500只
# 按市值升序排序取前500
valid_df = df.filter(code_filter)
n = min(1000, len(valid_df))
small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"]
# 返回布尔 Series是否在被选中的股票中
return df["ts_code"].is_in(small_cap_codes)
# 定义筛选所需的基础列
STOCK_FILTER_REQUIRED_COLUMNS = ["total_mv"] # ST过滤由STFilter组件处理
# 可选:定义筛选所需的因子(如果需要用因子进行筛选)
# STOCK_FILTER_REQUIRED_FACTORS = {
# "market_cap_rank": "cs_rank(total_mv)",
# }
# 输出配置(相对于本文件所在目录)
OUTPUT_DIR = "output"
SAVE_PREDICTIONS = True
PERSIST_MODEL = False
# Top N 配置:每日推荐股票数量
TOP_N = 5 # 可调整为 10, 20 等
# %% md
# ## 4. 训练流程
#
@@ -366,6 +104,7 @@ data = prepare_data(
feature_cols=feature_cols,
start_date=TRAIN_START,
end_date=TEST_END,
label_name=LABEL_NAME,
)
# 4. 打印配置信息

View File

@@ -58,14 +58,12 @@ class FactorEngine:
self,
data_source: Optional[Dict[str, pl.DataFrame]] = None,
registry: Optional["FunctionRegistry"] = None,
metadata_path: Optional[str] = None,
) -> None:
"""初始化因子引擎。
Args:
data_source: 内存数据源,为 None 时使用数据库连接
registry: 函数注册表None 时创建独立实例
metadata_path: 因子元数据文件路径,为 None 时启用默认 metadata 功能
"""
from src.factors.registry import FunctionRegistry
from src.factors.parser import FormulaParser
@@ -80,13 +78,7 @@ class FactorEngine:
self._registry = registry if registry is not None else FunctionRegistry()
self._parser = FormulaParser(self._registry)
# 初始化 metadata 管理器(可选,默认启用
if metadata_path is not None:
from src.factors.metadata import FactorManager
self._metadata = FactorManager(metadata_path)
else:
# 使用 FactorManager 的默认路径
# 初始化 metadata 管理器(使用默认路径
from src.factors.metadata import FactorManager
self._metadata = FactorManager()

View File

@@ -49,12 +49,18 @@ class LightGBMModel(BaseModel):
self.model = None
self.feature_names_: Optional[list] = None
def fit(self, X: pl.DataFrame, y: pl.Series) -> "LightGBMModel":
def fit(
self,
X: pl.DataFrame,
y: pl.Series,
eval_set: Optional[tuple] = None,
) -> "LightGBMModel":
"""训练模型
Args:
X: 特征矩阵 (Polars DataFrame)
y: 目标变量 (Polars Series)
eval_set: 验证集元组 (X_val, y_val),用于早停
Returns:
self (支持链式调用)
@@ -76,6 +82,14 @@ class LightGBMModel(BaseModel):
train_data = lgb.Dataset(X_np, label=y_np)
# 准备验证集
valid_sets = None
if eval_set is not None:
X_val, y_val = eval_set
X_val_np = X_val.to_numpy()
y_val_np = y_val.to_numpy()
valid_sets = lgb.Dataset(X_val_np, label=y_val_np, reference=train_data)
# 从 params 中提取 num_boost_round默认 100
num_boost_round = self.params.pop("n_estimators", 100)
@@ -83,6 +97,7 @@ class LightGBMModel(BaseModel):
self.params,
train_data,
num_boost_round=num_boost_round,
valid_sets=[valid_sets] if valid_sets else None,
)
return self