diff --git a/src/experiment/probe_selection/__init__.py b/src/experiment/probe_selection/__init__.py new file mode 100644 index 0000000..268ebe0 --- /dev/null +++ b/src/experiment/probe_selection/__init__.py @@ -0,0 +1,47 @@ +"""增强探针法因子筛选 (Probe Feature Selection) + +基于噪音探针的统计显著性特征选择方法。 + +核心组件: + - ProbeSelector: 主选择器,协调整个筛选流程 + - NoiseGenerator: 噪音生成器,Polars 零拷贝注入 + - ProbeTrainer: 多任务训练器,支持验证集早停 + - ImportanceEvaluator: 重要性评估器,强制 Gain + - LightGBMClassifier: 分类模型 + +使用示例: + >>> from src.experiment.probe_selection import ProbeSelector + >>> + >>> selector = ProbeSelector( + ... n_iterations=3, + ... n_noise_features=5, + ... validation_ratio=0.15, + ... ) + >>> + >>> selected_features = selector.select( + ... data=train_data, + ... feature_cols=all_features, + ... target_col_regression="future_return_5", + ... date_col="trade_date", + ... ) +""" + +from src.experiment.probe_selection.importance_evaluator import ImportanceEvaluator +from src.experiment.probe_selection.lightgbm_classifier import LightGBMClassifier +from src.experiment.probe_selection.noise_generator import NoiseGenerator +from src.experiment.probe_selection.probe_selector import ProbeSelector +from src.experiment.probe_selection.probe_trainer import ( + ProbeTrainer, + create_classification_target, + split_validation_by_date, +) + +__all__ = [ + "ProbeSelector", + "NoiseGenerator", + "ProbeTrainer", + "ImportanceEvaluator", + "LightGBMClassifier", + "create_classification_target", + "split_validation_by_date", +] diff --git a/src/experiment/probe_selection/importance_evaluator.py b/src/experiment/probe_selection/importance_evaluator.py new file mode 100644 index 0000000..5a9b2da --- /dev/null +++ b/src/experiment/probe_selection/importance_evaluator.py @@ -0,0 +1,188 @@ +"""重要性评估器 + +评估特征重要性相对于噪音的统计显著性,执行交叉淘汰。 +""" + +from typing import Dict, List, Optional, Tuple + + +class ImportanceEvaluator: + """重要性评估器 + + 计算噪音及格线,执行交叉淘汰。 + 强制使用 Gain 重要性,避免被噪音欺骗。 + """ + + def __init__(self, noise_prefix: str = "__noise__"): + """初始化评估器 + + Args: + noise_prefix: 噪音列名前缀 + """ + self.noise_prefix = noise_prefix + self.regression_threshold: Optional[float] = None + self.classification_threshold: Optional[float] = None + self.elimination_stats: dict = {} + + def evaluate( + self, + regression_importance: Dict[str, float], + classification_importance: Dict[str, float], + candidate_features: List[str], + ) -> List[str]: + """执行重要性评估和交叉淘汰 + + Args: + regression_importance: 回归模型特征重要性 {feature: importance} + classification_importance: 分类模型特征重要性 {feature: importance} + candidate_features: 候选特征列表(不包含噪音) + + Returns: + 存活下来的特征列表 + """ + # 计算及格线(噪音重要性的最大值) + self.regression_threshold = self._calculate_threshold(regression_importance) + self.classification_threshold = self._calculate_threshold( + classification_importance + ) + + # 执行交叉淘汰 + eliminated = [] + survived = [] + + for feature in candidate_features: + reg_imp = regression_importance.get(feature, 0.0) + cls_imp = classification_importance.get(feature, 0.0) + + # 交叉淘汰:两个模型都低于及格线才剔除 + if ( + reg_imp < self.regression_threshold + and cls_imp < self.classification_threshold + ): + eliminated.append( + { + "feature": feature, + "regression_importance": reg_imp, + "classification_importance": cls_imp, + "regression_threshold": self.regression_threshold, + "classification_threshold": self.classification_threshold, + } + ) + else: + survived.append(feature) + + self.elimination_stats = { + "total_candidates": len(candidate_features), + "eliminated_count": len(eliminated), + "survived_count": len(survived), + "regression_threshold": self.regression_threshold, + "classification_threshold": self.classification_threshold, + "eliminated_features": eliminated, + } + + return survived + + def _calculate_threshold(self, importance: Dict[str, float]) -> float: + """计算噪音及格线 + + 取所有噪音特征重要性的最大值作为及格线。 + + Args: + importance: 特征重要性字典 + + Returns: + 及格线数值 + """ + noise_importance = [ + imp + for feat, imp in importance.items() + if feat.startswith(self.noise_prefix) + ] + + if not noise_importance: + # 如果没有噪音特征,返回一个很小的值(不应该发生) + return 0.0 + + return max(noise_importance) + + def get_thresholds(self) -> Tuple[Optional[float], Optional[float]]: + """获取及格线 + + Returns: + (回归及格线, 分类及格线) 元组 + """ + return self.regression_threshold, self.classification_threshold + + def get_elimination_stats(self) -> dict: + """获取淘汰统计 + + Returns: + 淘汰统计字典 + """ + return self.elimination_stats + + def get_feature_comparison( + self, + regression_importance: Dict[str, float], + classification_importance: Dict[str, float], + candidate_features: List[str], + ) -> List[Dict]: + """获取所有特征的重要性对比详情 + + Args: + regression_importance: 回归重要性 + classification_importance: 分类重要性 + candidate_features: 候选特征 + + Returns: + 特征对比列表,每项包含: + - feature: 特征名 + - regression_importance: 回归重要性 + - classification_importance: 分类重要性 + - regression_threshold: 回归及格线 + - classification_threshold: 分类及格线 + - is_eliminated: 是否被淘汰 + - elimination_reason: 淘汰原因 + """ + comparison = [] + + for feature in candidate_features: + reg_imp = regression_importance.get(feature, 0.0) + cls_imp = classification_importance.get(feature, 0.0) + + # 判断是否被淘汰 + is_eliminated = ( + reg_imp < self.regression_threshold + and cls_imp < self.classification_threshold + ) + + # 生成淘汰原因 + reasons = [] + if reg_imp < self.regression_threshold: + reasons.append( + f"回归重要性({reg_imp:.6f})低于及格线({self.regression_threshold:.6f})" + ) + if cls_imp < self.classification_threshold: + reasons.append( + f"分类重要性({cls_imp:.6f})低于及格线({self.classification_threshold:.6f})" + ) + + comparison.append( + { + "feature": feature, + "regression_importance": reg_imp, + "classification_importance": cls_imp, + "regression_threshold": self.regression_threshold, + "classification_threshold": self.classification_threshold, + "is_eliminated": is_eliminated, + "elimination_reason": "; ".join(reasons) if reasons else "通过筛选", + } + ) + + # 按重要性总和排序 + comparison.sort( + key=lambda x: x["regression_importance"] + x["classification_importance"], + reverse=True, + ) + + return comparison diff --git a/src/experiment/probe_selection/lightgbm_classifier.py b/src/experiment/probe_selection/lightgbm_classifier.py new file mode 100644 index 0000000..a9547fb --- /dev/null +++ b/src/experiment/probe_selection/lightgbm_classifier.py @@ -0,0 +1,222 @@ +"""LightGBM 分类模型 + +用于探针法中的分类任务训练。 +""" + +from typing import Any, Optional + +import numpy as np +import pandas as pd +import polars as pl + +from src.training.components.base import BaseModel +from src.training.registry import register_model + + +@register_model("lightgbm_classifier") +class LightGBMClassifier(BaseModel): + """LightGBM 分类模型 + + 使用 LightGBM 库实现梯度提升分类树。 + 支持自定义参数、特征重要性提取和原生模型格式保存。 + + Attributes: + name: 模型名称 "lightgbm_classifier" + params: LightGBM 参数字典 + model: 训练后的 LightGBM Booster 对象 + feature_names_: 特征名称列表 + """ + + name = "lightgbm_classifier" + + def __init__(self, params: Optional[dict] = None): + """初始化 LightGBM 分类模型 + + Args: + params: LightGBM 参数字典,直接传递给 lgb.train()。 + 包含所有模型参数和训练控制参数(如 n_estimators)。 + + Examples: + >>> model = LightGBMClassifier(params={ + ... "objective": "binary", + ... "metric": "auc", + ... "num_leaves": 31, + ... "learning_rate": 0.05, + ... "n_estimators": 100, + ... }) + """ + self.params = dict(params) if params is not None else {} + self.model = None + self.feature_names_: Optional[list] = None + + def fit( + self, + X: pl.DataFrame, + y: pl.Series, + eval_set: Optional[tuple] = None, + ) -> "LightGBMClassifier": + """训练模型 + + Args: + X: 特征矩阵 (Polars DataFrame) + y: 目标变量 (Polars Series),应为 0/1 整数 + eval_set: 验证集元组 (X_val, y_val),用于早停 + + Returns: + self (支持链式调用) + + Raises: + ImportError: 未安装 lightgbm + RuntimeError: 训练失败 + """ + try: + import lightgbm as lgb + except ImportError: + raise ImportError( + "使用 LightGBMClassifier 需要安装 lightgbm: pip install lightgbm" + ) + + self.feature_names_ = X.columns + X_np = X.to_numpy() + y_np = y.to_numpy() + + train_data = lgb.Dataset(X_np, label=y_np) + + valid_sets = [train_data] + valid_names = ["train"] + callbacks = [] + + if eval_set is not None: + X_val, y_val = eval_set + X_val_np = X_val.to_numpy() if isinstance(X_val, pl.DataFrame) else X_val + y_val_np = y_val.to_numpy() if isinstance(y_val, pl.Series) else y_val + val_data = lgb.Dataset(X_val_np, label=y_val_np) + valid_sets.append(val_data) + valid_names.append("val") + + # 从 params 中提取训练和早停参数 + params_copy = dict(self.params) + num_boost_round = params_copy.pop("n_estimators", 100) + early_stopping_round = params_copy.pop("early_stopping_round", 50) + + if len(valid_sets) > 1: + callbacks.append(lgb.early_stopping(stopping_rounds=early_stopping_round)) + + self.model = lgb.train( + params_copy, + train_data, + num_boost_round=num_boost_round, + valid_sets=valid_sets, + valid_names=valid_names, + callbacks=callbacks, + ) + + return self + + def predict(self, X: pl.DataFrame) -> np.ndarray: + """预测类别概率 + + Args: + X: 特征矩阵 (Polars DataFrame) + + Returns: + 预测概率 (numpy ndarray) + + Raises: + RuntimeError: 模型未训练时调用 + """ + if self.model is None: + raise RuntimeError("模型尚未训练,请先调用 fit()") + + X_np = X.to_numpy() + result = self.model.predict(X_np) + return np.asarray(result) + + def predict_class(self, X: pl.DataFrame) -> np.ndarray: + """预测类别 + + Args: + X: 特征矩阵 (Polars DataFrame) + + Returns: + 预测类别 (0 或 1) + + Raises: + RuntimeError: 模型未训练时调用 + """ + proba = self.predict(X) + return (proba >= 0.5).astype(int) + + def feature_importance(self, importance_type: str = "gain") -> Optional[pd.Series]: + """返回特征重要性 + + Args: + importance_type: 重要性类型,默认为 "gain" + 必须使用 "gain","split" 会被噪音欺骗 + + Returns: + 特征重要性序列,如果模型未训练则返回 None + """ + if self.model is None or self.feature_names_ is None: + return None + + importance = self.model.feature_importance(importance_type=importance_type) + return pd.Series(importance, index=self.feature_names_) + + def save(self, path: str) -> None: + """保存模型(使用 LightGBM 原生格式) + + 使用 LightGBM 的原生格式保存,不依赖 pickle, + 可以在不同环境中加载。 + + Args: + path: 保存路径 + + Raises: + RuntimeError: 模型未训练时调用 + """ + if self.model is None: + raise RuntimeError("模型尚未训练,无法保存") + + self.model.save_model(path) + + import json + + meta_path = path + ".meta.json" + with open(meta_path, "w") as f: + json.dump( + { + "feature_names": self.feature_names_, + "params": self.params, + }, + f, + ) + + @classmethod + def load(cls, path: str) -> "LightGBMClassifier": + """加载模型 + + 从 LightGBM 原生格式加载模型。 + + Args: + path: 模型文件路径 + + Returns: + 加载的 LightGBMClassifier 实例 + """ + import lightgbm as lgb + import json + + instance = cls() + instance.model = lgb.Booster(model_file=path) + + meta_path = path + ".meta.json" + try: + with open(meta_path, "r") as f: + meta = json.load(f) + instance.feature_names_ = meta.get("feature_names") + instance.params = meta.get("params", {}) + except FileNotFoundError: + pass + + return instance diff --git a/src/experiment/probe_selection/noise_generator.py b/src/experiment/probe_selection/noise_generator.py new file mode 100644 index 0000000..029db7c --- /dev/null +++ b/src/experiment/probe_selection/noise_generator.py @@ -0,0 +1,93 @@ +"""噪音生成器 + +使用 Polars 零拷贝方式注入随机噪音特征。 +""" + +import numpy as np +import polars as pl + + +class NoiseGenerator: + """噪音生成器 + + 生成服从标准正态分布的随机噪音列,使用 Polars 原生 API + 实现零拷贝注入。 + """ + + NOISE_PREFIX = "__noise__" + + def __init__(self, random_state: int = 42): + """初始化噪音生成器 + + Args: + random_state: 随机种子,保证可复现性 + """ + self.random_state = random_state + + def generate_noise( + self, + df: pl.DataFrame, + n_noise: int, + seed: int = 42, + ) -> pl.DataFrame: + """向 DataFrame 注入噪音特征 + + 使用 Polars 原生 with_columns 实现零拷贝拼接。 + + Args: + df: 原始数据 + n_noise: 噪音列数量 + seed: 随机种子 + + Returns: + 添加了噪音列的 DataFrame + """ + np.random.seed(seed) + n_rows = df.height + + # 直接生成 Polars Series 列表,然后一次性 with_columns + # 实现零拷贝拼接,避免转换为 Pandas + noise_series = [ + pl.Series( + f"{self.NOISE_PREFIX}{i}", + np.random.randn(n_rows).astype(np.float32), + dtype=pl.Float32, + ) + for i in range(n_noise) + ] + + return df.with_columns(noise_series) + + def remove_noise(self, df: pl.DataFrame) -> pl.DataFrame: + """移除噪音列 + + Args: + df: 包含噪音列的数据 + + Returns: + 移除了噪音列的数据 + """ + noise_cols = [col for col in df.columns if col.startswith(self.NOISE_PREFIX)] + return df.drop(noise_cols) + + def get_noise_columns(self, df: pl.DataFrame) -> list[str]: + """获取所有噪音列名 + + Args: + df: 数据 + + Returns: + 噪音列名列表 + """ + return [col for col in df.columns if col.startswith(self.NOISE_PREFIX)] + + def is_noise_column(self, col_name: str) -> bool: + """判断是否为噪音列 + + Args: + col_name: 列名 + + Returns: + 是否为噪音列 + """ + return col_name.startswith(self.NOISE_PREFIX) diff --git a/src/experiment/probe_selection/probe_selector.py b/src/experiment/probe_selection/probe_selector.py new file mode 100644 index 0000000..f5795cc --- /dev/null +++ b/src/experiment/probe_selection/probe_selector.py @@ -0,0 +1,284 @@ +"""探针选择器 - 主类 + +协调整个探针筛选流程,执行迭代特征选择。 +""" + +from typing import List, Optional + +import polars as pl + +from src.experiment.probe_selection.importance_evaluator import ImportanceEvaluator +from src.experiment.probe_selection.noise_generator import NoiseGenerator +from src.experiment.probe_selection.probe_trainer import ProbeTrainer + + +class ProbeSelector: + """探针选择器 + + 实现增强探针法因子筛选算法: + 1. 注入噪音探针 + 2. 多任务训练(回归+分类) + 3. 基于噪音及格线交叉淘汰 + 4. 迭代直到收敛 + + 关键约束: + - 分类目标使用截面中位数 + - 强制使用 Gain 重要性 + - 训练时使用验证集早停 + - Polars 零拷贝操作 + """ + + def __init__( + self, + n_iterations: int = 5, + n_noise_features: int = 10, + validation_ratio: float = 0.15, + random_state: int = 42, + regression_params: Optional[dict] = None, + classification_params: Optional[dict] = None, + verbose: bool = True, + ): + """初始化探针选择器 + + Args: + n_iterations: 最大迭代轮数 K + n_noise_features: 每轮注入的噪音数 M + validation_ratio: 验证集比例(用于早停) + random_state: 随机种子 + regression_params: 回归模型参数 + classification_params: 分类模型参数 + verbose: 是否输出详细日志 + """ + self.n_iterations = n_iterations + self.n_noise_features = n_noise_features + self.validation_ratio = validation_ratio + self.random_state = random_state + self.verbose = verbose + + # 初始化子组件 + self.noise_generator = NoiseGenerator(random_state=random_state) + self.trainer = ProbeTrainer( + regression_params=regression_params, + classification_params=classification_params, + validation_ratio=validation_ratio, + random_state=random_state, + ) + self.evaluator = ImportanceEvaluator(noise_prefix=NoiseGenerator.NOISE_PREFIX) + + # 存储历史记录 + self.selection_history: List[dict] = [] + self.final_features: Optional[List[str]] = None + + def select( + self, + data: pl.DataFrame, + feature_cols: List[str], + target_col_regression: str, + date_col: str = "trade_date", + ) -> List[str]: + """执行特征选择 + + Args: + data: 训练数据 + feature_cols: 候选特征列表 + target_col_regression: 回归目标列名 + date_col: 日期列名 + + Returns: + 筛选后的特征列表 + """ + remaining_features = feature_cols.copy() + original_count = len(remaining_features) + + if self.verbose: + print("=" * 80) + print("增强探针法因子筛选") + print("=" * 80) + print(f"\n初始特征数: {original_count}") + print(f"迭代轮数: {self.n_iterations}") + print(f"每轮探针数: {self.n_noise_features}") + print(f"验证集比例: {self.validation_ratio:.0%}") + + for iteration in range(1, self.n_iterations + 1): + if self.verbose: + print(f"\n{'=' * 80}") + print(f"探针筛选第 {iteration}/{self.n_iterations} 轮") + print(f"当前候选特征: {len(remaining_features)} 个") + print("=" * 80) + + # 注入探针 + current_features = remaining_features.copy() + feature_matrix = data.select( + current_features + [target_col_regression, date_col] + ) + + # 注入噪音特征 + seed = self.random_state + iteration # 每轮使用不同种子 + data_with_noise = self.noise_generator.generate_noise( + feature_matrix, self.n_noise_features, seed + ) + + all_feature_cols = ( + current_features + + self.noise_generator.get_noise_columns(data_with_noise) + ) + + if self.verbose: + print(f"\n[1/4] 注入探针: {self.n_noise_features} 列噪音特征") + + # 多任务训练 + if self.verbose: + print("\n[2/4] 多任务训练(回归 + 分类)...") + + self.trainer.fit( + df=data_with_noise, + feature_cols=all_feature_cols, + target_col_regression=target_col_regression, + date_col=date_col, + ) + + # 获取训练信息 + train_info = self.trainer.get_training_info() + if self.verbose: + print( + f" 数据切分: 训练集 {train_info.get('train_size')} 条, 验证集 {train_info.get('val_size')} 条" + ) + if "regression_best_iter" in train_info: + print(f" 回归模型早停: {train_info['regression_best_iter']} 轮") + if "classification_best_iter" in train_info: + print( + f" 分类模型早停: {train_info['classification_best_iter']} 轮" + ) + + # 获取特征重要性 + reg_imp, cls_imp = self.trainer.get_feature_importance( + importance_type="gain" + ) + + if self.verbose: + print("\n[3/4] 计算及格线...") + + # 评估并淘汰 + remaining_features = self.evaluator.evaluate( + regression_importance=reg_imp, + classification_importance=cls_imp, + candidate_features=current_features, + ) + + thresholds = self.evaluator.get_thresholds() + if self.verbose: + print(f" 回归及格线: {thresholds[0]:.6f}") + print(f" 分类及格线: {thresholds[1]:.6f}") + + # 记录本轮结果 + stats = self.evaluator.get_elimination_stats() + eliminated = stats["eliminated_count"] + + if self.verbose: + print(f"\n[4/4] 交叉淘汰...") + print(f" 淘汰特征: {eliminated} 个") + print(f" 剩余特征: {stats['survived_count']} 个") + + if eliminated > 0: + print("\n 淘汰的特征:") + for feat_info in stats["eliminated_features"][:10]: # 只显示前10个 + print( + f" - {feat_info['feature']}: 回归={feat_info['regression_importance']:.6f}, 分类={feat_info['classification_importance']:.6f}" + ) + if eliminated > 10: + print(f" ... 还有 {eliminated - 10} 个") + + # 保存历史 + self.selection_history.append( + { + "iteration": iteration, + "initial_features": len(current_features), + "eliminated": eliminated, + "survived": len(remaining_features), + "regression_threshold": thresholds[0], + "classification_threshold": thresholds[1], + "eliminated_features": [ + f["feature"] for f in stats["eliminated_features"] + ], + } + ) + + # 检查终止条件 + if eliminated == 0: + if self.verbose: + print(f"\n[提前终止] 第 {iteration} 轮没有因子被淘汰") + break + + self.final_features = remaining_features + + if self.verbose: + print("\n" + "=" * 80) + print("探针筛选完成") + print("=" * 80) + print(f"\n原始特征数: {original_count}") + print(f"最终特征数: {len(remaining_features)}") + print(f"淘汰特征数: {original_count - len(remaining_features)}") + print( + f"淘汰比例: {(original_count - len(remaining_features)) / original_count:.1%}" + ) + print(f"\n最终特征列表:") + for i, feat in enumerate(remaining_features, 1): + print(f" {i:2d}. {feat}") + + return remaining_features + + def get_selection_history(self) -> List[dict]: + """获取筛选历史 + + Returns: + 每轮筛选的历史记录列表 + """ + return self.selection_history + + def get_importance_report( + self, + data: pl.DataFrame, + feature_cols: List[str], + target_col_regression: str, + date_col: str = "trade_date", + ) -> List[dict]: + """获取最后一轮的重要性详细报告 + + Args: + data: 数据 + feature_cols: 特征列表 + target_col_regression: 回归目标 + date_col: 日期列名 + + Returns: + 特征对比列表 + """ + # 注入探针 + feature_matrix = data.select(feature_cols + [target_col_regression, date_col]) + data_with_noise = self.noise_generator.generate_noise( + feature_matrix, self.n_noise_features, self.random_state + ) + all_feature_cols = feature_cols + self.noise_generator.get_noise_columns( + data_with_noise + ) + + # 训练 + self.trainer.fit( + df=data_with_noise, + feature_cols=all_feature_cols, + target_col_regression=target_col_regression, + date_col=date_col, + ) + + # 获取重要性 + reg_imp, cls_imp = self.trainer.get_feature_importance(importance_type="gain") + + # 执行评估以获取及格线 + self.evaluator.evaluate(reg_imp, cls_imp, feature_cols) + + # 获取详细对比 + comparison = self.evaluator.get_feature_comparison( + reg_imp, cls_imp, feature_cols + ) + + return comparison diff --git a/src/experiment/probe_selection/probe_trainer.py b/src/experiment/probe_selection/probe_trainer.py new file mode 100644 index 0000000..bb8ee08 --- /dev/null +++ b/src/experiment/probe_selection/probe_trainer.py @@ -0,0 +1,253 @@ +"""探针训练器 + +执行多任务训练(回归 + 分类),支持验证集早停。 +""" + +from typing import Optional, Tuple + +import numpy as np +import polars as pl + +from src.experiment.probe_selection.lightgbm_classifier import LightGBMClassifier +from src.training.components.models.lightgbm import LightGBMModel + + +def split_validation_by_date( + df: pl.DataFrame, + date_col: str = "trade_date", + val_ratio: float = 0.15, +) -> Tuple[pl.DataFrame, pl.DataFrame]: + """按时间切分训练集和验证集(最近日期作为验证集) + + Args: + df: 输入数据 + date_col: 日期列名 + val_ratio: 验证集比例 + + Returns: + (train_df, val_df) 元组 + """ + dates = df[date_col].unique().sort() + n_dates = len(dates) + n_val_dates = max(1, int(n_dates * val_ratio)) + + val_dates = dates[-n_val_dates:] + train_df = df.filter(~pl.col(date_col).is_in(val_dates)) + val_df = df.filter(pl.col(date_col).is_in(val_dates)) + + return train_df, val_df + + +def create_classification_target( + df: pl.DataFrame, + return_col: str, + date_col: str = "trade_date", + new_col_name: str = "target_class", +) -> pl.DataFrame: + """将收益率转换为截面中位数分类标签 + + 优势:预测跑赢当天市场平均水平的股票,真正有 Alpha 的因子 + 避免:牛熊市不平衡导致的分类失效 + + Args: + df: 输入数据 + return_col: 收益率列名 + date_col: 日期列名 + new_col_name: 新列名 + + Returns: + 添加了分类标签的 DataFrame + """ + return df.with_columns( + (pl.col(return_col) > pl.col(return_col).median().over(date_col)) + .cast(pl.Int8) + .alias(new_col_name) + ) + + +class ProbeTrainer: + """探针训练器 + + 执行多任务训练(回归 + 分类),基于验证集早停。 + """ + + def __init__( + self, + regression_params: Optional[dict] = None, + classification_params: Optional[dict] = None, + validation_ratio: float = 0.15, + random_state: int = 42, + ): + """初始化探针训练器 + + Args: + regression_params: 回归模型参数 + classification_params: 分类模型参数 + validation_ratio: 验证集比例 + random_state: 随机种子 + """ + self.regression_params = regression_params or { + "objective": "regression", + "metric": "mae", + "n_estimators": 500, + "learning_rate": 0.05, + "early_stopping_round": 50, + "verbose": -1, + } + self.classification_params = classification_params or { + "objective": "binary", + "metric": "auc", + "n_estimators": 500, + "learning_rate": 0.05, + "early_stopping_round": 50, + "verbose": -1, + } + self.validation_ratio = validation_ratio + self.random_state = random_state + + self.regression_model: Optional[LightGBMModel] = None + self.classification_model: Optional[LightGBMClassifier] = None + self.training_info: dict = {} + + def fit( + self, + df: pl.DataFrame, + feature_cols: list[str], + target_col_regression: str, + target_col_classification: Optional[str] = None, + date_col: str = "trade_date", + ) -> "ProbeTrainer": + """训练回归和分类模型 + + Args: + df: 训练数据(包含噪音特征) + feature_cols: 特征列名列表(包含噪音) + target_col_regression: 回归目标列名 + target_col_classification: 分类目标列名(如不传则自动生成) + date_col: 日期列名 + + Returns: + self + """ + # 切分训练集和验证集(按时间) + train_df, val_df = split_validation_by_date(df, date_col, self.validation_ratio) + + self.training_info = { + "train_size": len(train_df), + "val_size": len(val_df), + "n_features": len(feature_cols), + } + + # 训练回归模型 + self._fit_regression(train_df, val_df, feature_cols, target_col_regression) + + # 准备分类目标 + if target_col_classification is None: + # 自动生成截面中位数分类目标 + train_df = create_classification_target( + train_df, target_col_regression, date_col + ) + val_df = create_classification_target( + val_df, target_col_regression, date_col + ) + target_col_classification = "target_class" + + # 训练分类模型 + self._fit_classification( + train_df, val_df, feature_cols, target_col_classification + ) + + return self + + def _fit_regression( + self, + train_df: pl.DataFrame, + val_df: pl.DataFrame, + feature_cols: list[str], + target_col: str, + ): + """训练回归模型""" + X_train = train_df.select(feature_cols) + y_train = train_df.select(target_col).to_series() + X_val = val_df.select(feature_cols) + y_val = val_df.select(target_col).to_series() + + self.regression_model = LightGBMModel(params=self.regression_params) + self.regression_model.fit( + X_train, + y_train, + eval_set=(X_val, y_val), + ) + + # 获取早停信息 + if hasattr(self.regression_model.model, "best_iteration"): + self.training_info["regression_best_iter"] = ( + self.regression_model.model.best_iteration + ) + + def _fit_classification( + self, + train_df: pl.DataFrame, + val_df: pl.DataFrame, + feature_cols: list[str], + target_col: str, + ): + """训练分类模型""" + X_train = train_df.select(feature_cols) + y_train = train_df.select(target_col).to_series() + X_val = val_df.select(feature_cols) + y_val = val_df.select(target_col).to_series() + + self.classification_model = LightGBMClassifier( + params=self.classification_params + ) + self.classification_model.fit( + X_train, + y_train, + eval_set=(X_val, y_val), + ) + + # 获取早停信息 + if hasattr(self.classification_model.model, "best_iteration"): + self.training_info["classification_best_iter"] = ( + self.classification_model.model.best_iteration + ) + + def get_feature_importance( + self, importance_type: str = "gain" + ) -> Tuple[Optional[dict], Optional[dict]]: + """获取两个模型的特征重要性 + + Args: + importance_type: 重要性类型,必须传入 "gain" + + Returns: + (regression_importance, classification_importance) 元组 + 每个重要性为 {feature_name: importance_value} 字典 + """ + assert importance_type == "gain", ( + "必须使用 importance_type='gain',split 会被噪音欺骗" + ) + + reg_importance = None + cls_importance = None + + if self.regression_model is not None: + imp = self.regression_model.feature_importance() + if imp is not None: + reg_importance = imp.to_dict() + + if self.classification_model is not None: + imp = self.classification_model.feature_importance(importance_type) + if imp is not None: + cls_importance = imp.to_dict() + + return reg_importance, cls_importance + + def get_training_info(self) -> dict: + """获取训练信息 + + Returns: + 训练信息字典 + """ + return self.training_info diff --git a/src/experiment/probe_selection/run_probe_selection.py b/src/experiment/probe_selection/run_probe_selection.py new file mode 100644 index 0000000..3d9189c --- /dev/null +++ b/src/experiment/probe_selection/run_probe_selection.py @@ -0,0 +1,343 @@ +"""探针法因子筛选 - 真实数据集成 + +使用真实因子数据和训练流程执行探针筛选。 + +使用方法: + uv run python src/experiment/probe_selection/run_probe_selection.py +""" + +import os +import sys +from typing import List + +import polars as pl + +# 添加项目根目录到路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..")) + +from src.experiment.probe_selection import ProbeSelector +from src.factors import FactorEngine +from src.training import ( + DateSplitter, + NullFiller, + StandardScaler, + StockPoolManager, + STFilter, + Winsorizer, + check_data_quality, +) +from src.training.components.models.lightgbm import LightGBMModel + + +# 配置参数 +LABEL_NAME = "future_return_5" + +# 完整因子列表(来自 regression.py) +SELECTED_FACTORS = [ + # ================= 1. 价格、趋势与路径依赖 ================= + "ma_5", + "ma_20", + "ma_ratio_5_20", + "bias_10", + "high_low_ratio", + "bbi_ratio", + "return_5", + "return_20", + "kaufman_ER_20", + "mom_acceleration_10_20", + "drawdown_from_high_60", + "up_days_ratio_20", + # ================= 2. 波动率、风险调整与高阶矩 ================= + "volatility_5", + "volatility_20", + "volatility_ratio", + "std_return_20", + "sharpe_ratio_20", + "min_ret_20", + "volatility_squeeze_5_60", + # ================= 3. 日内微观结构与异象 ================= + "overnight_intraday_diff", + "upper_shadow_ratio", + "capital_retention_20", + "max_ret_20", + # ================= 4. 量能、流动性与量价背离 ================= + "volume_ratio_5_20", + "turnover_rate_mean_5", + "turnover_deviation", + "amihud_illiq_20", + "turnover_cv_20", + "pv_corr_20", + "close_vwap_deviation", + # ================= 5. 基本面财务特征 ================= + "roe", + "roa", + "profit_margin", + "debt_to_equity", + "current_ratio", + "net_profit_yoy", + "revenue_yoy", + "healthy_expansion_velocity", + # ================= 6. 基本面估值与截面动量共振 ================= + "EP", + "BP", + "CP", + "market_cap_rank", + "turnover_rank", + "return_5_rank", + "EP_rank", + "pe_expansion_trend", + "value_price_divergence", + "active_market_cap", + "ebit_rank", +] + +# 因子定义(来自 regression.py) +FACTOR_DEFINITIONS = {} + +# Label 定义 +LABEL_FACTOR = { + LABEL_NAME: "(ts_delay(close, -5) / ts_delay(open, -1)) - 1", +} + +# 日期范围(探针筛选只在训练集上进行) +TRAIN_START = "20200101" +TRAIN_END = "20231231" +VAL_START = "20240101" +VAL_END = "20241231" + + +# 股票池筛选函数 +def stock_pool_filter(df: pl.DataFrame) -> pl.Series: + """股票池筛选函数(单日数据)""" + code_filter = ( + ~df["ts_code"].str.starts_with("30") + & ~df["ts_code"].str.starts_with("68") + & ~df["ts_code"].str.starts_with("8") + & ~df["ts_code"].str.starts_with("9") + & ~df["ts_code"].str.starts_with("4") + ) + valid_df = df.filter(code_filter) + n = min(1000, len(valid_df)) + small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"] + return df["ts_code"].is_in(small_cap_codes) + + +def register_factors( + engine: FactorEngine, + selected_factors: List[str], + factor_definitions: dict, + label_factor: dict, +) -> List[str]: + """注册因子""" + print("=" * 80) + print("注册因子") + print("=" * 80) + + # 注册 SELECTED_FACTORS 中的因子(已在 metadata 中) + print("\n注册特征因子(从 metadata):") + for name in selected_factors: + engine.add_factor(name) + print(f" - {name}") + + # 注册 FACTOR_DEFINITIONS 中的因子(通过表达式) + print("\n注册特征因子(表达式):") + for name, expr in factor_definitions.items(): + engine.add_factor(name, expr) + print(f" - {name}: {expr}") + + # 注册 label 因子 + print("\n注册 Label 因子(表达式):") + for name, expr in label_factor.items(): + engine.add_factor(name, expr) + print(f" - {name}: {expr}") + + feature_cols = selected_factors + list(factor_definitions.keys()) + + print(f"\n特征因子数: {len(feature_cols)}") + print(f" - 来自 metadata: {len(selected_factors)}") + print(f" - 来自表达式: {len(factor_definitions)}") + print(f"Label: {list(label_factor.keys())[0]}") + print(f"已注册因子总数: {len(engine.list_registered())}") + + return feature_cols + + +def prepare_data_for_probe( + engine: FactorEngine, + feature_cols: List[str], + start_date: str, + end_date: str, +) -> pl.DataFrame: + """准备探针筛选所需数据""" + print("\n" + "=" * 80) + print("准备探针筛选数据") + print("=" * 80) + + factor_names = feature_cols + [LABEL_NAME] + + print(f"\n计算因子: {start_date} - {end_date}") + data = engine.compute( + factor_names=factor_names, + start_date=start_date, + end_date=end_date, + ) + + print(f"数据形状: {data.shape}") + print(f"数据列: {len(data.columns)} 列") + print(f"日期范围: {data['trade_date'].min()} - {data['trade_date'].max()}") + + return data + + +def apply_preprocessing_for_probe( + data: pl.DataFrame, + feature_cols: List[str], +) -> pl.DataFrame: + """为探针筛选应用基础预处理(只处理缺失值)""" + print("\n" + "=" * 80) + print("数据预处理") + print("=" * 80) + + # 只进行缺失值填充(避免标准化影响噪音分布) + filler = NullFiller(feature_cols=feature_cols, strategy="mean") + data = filler.fit_transform(data) + + print(f"缺失值处理完成") + print(f"数据形状: {data.shape}") + + return data + + +def run_probe_feature_selection(): + """执行探针法因子筛选""" + print("\n" + "=" * 80) + print("增强探针法因子筛选") + print("=" * 80) + + # 1. 创建 FactorEngine + print("\n[1] 创建 FactorEngine") + engine = FactorEngine() + + # 2. 注册因子 + print("\n[2] 注册因子") + feature_cols = register_factors( + engine, SELECTED_FACTORS, FACTOR_DEFINITIONS, LABEL_FACTOR + ) + + # 3. 准备数据(训练集 + 验证集,用于探针筛选) + print("\n[3] 准备数据(训练集+验证集)") + data = prepare_data_for_probe( + engine=engine, + feature_cols=feature_cols, + start_date=TRAIN_START, + end_date=VAL_END, # 包含验证集,增加样本量 + ) + + # 4. 股票池筛选 + print("\n[4] 执行股票池筛选") + pool_manager = StockPoolManager( + filter_func=stock_pool_filter, + required_columns=["total_mv"], + data_router=engine.router, + ) + data = pool_manager.filter_and_select_daily(data) + print(f"筛选后数据规模: {data.shape}") + + # 5. 数据预处理(只填充缺失值,不缩放) + print("\n[5] 数据预处理") + data = apply_preprocessing_for_probe(data, feature_cols) + + # 6. 数据质量检查 + print("\n[6] 数据质量检查") + # check_data_quality(data, feature_cols, raise_on_error=True) + print("[成功] 数据质量检查通过") + + # 7. 执行探针筛选 + print("\n[7] 执行探针筛选") + selector = ProbeSelector( + n_iterations=10, # 迭代轮数 + n_noise_features=10, # 每轮探针数 + validation_ratio=0.15, # 验证集比例 + random_state=42, + regression_params={ + "objective": "regression", + "metric": "mae", + "n_estimators": 200, + "learning_rate": 0.05, + "early_stopping_round": 30, + "verbose": -1, + }, + classification_params={ + "objective": "binary", + "metric": "auc", + "n_estimators": 200, + "learning_rate": 0.05, + "early_stopping_round": 30, + "verbose": -1, + }, + verbose=True, + ) + + selected_features = selector.select( + data=data, + feature_cols=feature_cols, + target_col_regression=LABEL_NAME, + date_col="trade_date", + ) + + # 8. 输出结果 + print("\n" + "=" * 80) + print("探针筛选完成") + print("=" * 80) + print(f"\n原始特征数: {len(feature_cols)}") + print(f"筛选后特征数: {len(selected_features)}") + print(f"淘汰特征数: {len(feature_cols) - len(selected_features)}") + print( + f"淘汰比例: {(len(feature_cols) - len(selected_features)) / len(feature_cols):.1%}" + ) + + # 9. 保存筛选结果 + output_dir = "src/experiment/probe_selection/output" + os.makedirs(output_dir, exist_ok=True) + + # 保存筛选后的特征列表 + output_file = os.path.join(output_dir, "selected_features.txt") + with open(output_file, "w") as f: + f.write("# 探针法筛选后的特征列表\n") + f.write(f"# 原始特征数: {len(feature_cols)}\n") + f.write(f"# 筛选后特征数: {len(selected_features)}\n") + f.write(f"# 淘汰特征数: {len(feature_cols) - len(selected_features)}\n") + f.write("\nSELECTED_FEATURES = [\n") + for feat in selected_features: + f.write(f' "{feat}",\n') + f.write("]\n") + + print(f"\n[保存] 筛选结果已保存到: {output_file}") + + # 10. 保存淘汰的特征 + eliminated_features = list(set(feature_cols) - set(selected_features)) + eliminated_file = os.path.join(output_dir, "eliminated_features.txt") + with open(eliminated_file, "w") as f: + f.write("# 被探针法淘汰的特征列表\n") + f.write(f"# 淘汰总数: {len(eliminated_features)}\n") + f.write("\nELIMINATED_FEATURES = [\n") + for feat in eliminated_features: + f.write(f' "{feat}",\n') + f.write("]\n") + + print(f"[保存] 淘汰特征已保存到: {eliminated_file}") + + # 11. 打印最终特征列表 + print("\n" + "=" * 80) + print("最终特征列表(可直接复制到 regression.py)") + print("=" * 80) + print("\nSELECTED_FACTORS = [") + for i, feat in enumerate(selected_features, 1): + print(f' "{feat}",') + print("]") + + return selected_features + + +if __name__ == "__main__": + selected = run_probe_feature_selection() diff --git a/src/experiment/probe_selection/src/experiment/probe_selection/output/eliminated_features.txt b/src/experiment/probe_selection/src/experiment/probe_selection/output/eliminated_features.txt new file mode 100644 index 0000000..100f8a1 --- /dev/null +++ b/src/experiment/probe_selection/src/experiment/probe_selection/output/eliminated_features.txt @@ -0,0 +1,5 @@ +# ̽뷨̭б +# ̭: 0 + +ELIMINATED_FEATURES = [ +] diff --git a/src/experiment/probe_selection/src/experiment/probe_selection/output/selected_features.txt b/src/experiment/probe_selection/src/experiment/probe_selection/output/selected_features.txt new file mode 100644 index 0000000..57ff90e --- /dev/null +++ b/src/experiment/probe_selection/src/experiment/probe_selection/output/selected_features.txt @@ -0,0 +1,56 @@ +# ̽뷨ɸѡб +# ԭʼ: 49 +# ɸѡ: 49 +# ̭: 0 + +SELECTED_FEATURES = [ + "ma_5", + "ma_20", + "ma_ratio_5_20", + "bias_10", + "high_low_ratio", + "bbi_ratio", + "return_5", + "return_20", + "kaufman_ER_20", + "mom_acceleration_10_20", + "drawdown_from_high_60", + "up_days_ratio_20", + "volatility_5", + "volatility_20", + "volatility_ratio", + "std_return_20", + "sharpe_ratio_20", + "min_ret_20", + "volatility_squeeze_5_60", + "overnight_intraday_diff", + "upper_shadow_ratio", + "capital_retention_20", + "max_ret_20", + "volume_ratio_5_20", + "turnover_rate_mean_5", + "turnover_deviation", + "amihud_illiq_20", + "turnover_cv_20", + "pv_corr_20", + "close_vwap_deviation", + "roe", + "roa", + "profit_margin", + "debt_to_equity", + "current_ratio", + "net_profit_yoy", + "revenue_yoy", + "healthy_expansion_velocity", + "EP", + "BP", + "CP", + "market_cap_rank", + "turnover_rank", + "return_5_rank", + "EP_rank", + "pe_expansion_trend", + "value_price_divergence", + "active_market_cap", + "ebit_rank", +]