#%% md # # Learn-to-Rank 排序学习训练流程 # # # 本 Notebook 实现基于 LightGBM LambdaRank 的排序学习训练,用于股票排序任务。 # # # ## 核心特点 # # # 1. **Label 转换**: 将 `future_return_5` 按每日进行 20 分位数划分(qcut) # 2. **排序学习**: 使用 LambdaRank 目标函数,学习每日股票排序 # 3. **NDCG 评估**: 使用 NDCG@1/5/10/20 评估排序质量 # 4. **策略回测**: 基于排序分数构建 Top-k 选股策略 #%% md # ## 1. 导入依赖 #%% import os from datetime import datetime from typing import List, Tuple, Optional import numpy as np import polars as pl import pandas as pd import matplotlib.pyplot as plt from sklearn.metrics import ndcg_score from src.factors import FactorEngine from src.training import ( DateSplitter, STFilter, StockPoolManager, Trainer, Winsorizer, NullFiller, StandardScaler, check_data_quality, ) from src.training.components.models import LightGBMLambdaRankModel from src.training.config import TrainingConfig #%% md # ## 2. 辅助函数 #%% def register_factors( engine: FactorEngine, selected_factors: List[str], factor_definitions: dict, label_factor: dict, ) -> List[str]: """注册因子(selected_factors 从 metadata 查询,factor_definitions 用 DSL 表达式注册)""" print("=" * 80) print("注册因子") print("=" * 80) # 注册 SELECTED_FACTORS 中的因子(已在 metadata 中) print("\n注册特征因子(从 metadata):") for name in selected_factors: engine.add_factor(name) print(f" - {name}") # 注册 FACTOR_DEFINITIONS 中的因子(通过表达式,尚未在 metadata 中) print("\n注册特征因子(表达式):") for name, expr in factor_definitions.items(): engine.add_factor(name, expr) print(f" - {name}: {expr}") # 注册 label 因子(通过表达式) print("\n注册 Label 因子(表达式):") for name, expr in label_factor.items(): engine.add_factor(name, expr) print(f" - {name}: {expr}") # 特征列 = SELECTED_FACTORS + FACTOR_DEFINITIONS 的 keys feature_cols = selected_factors + list(factor_definitions.keys()) print(f"\n特征因子数: {len(feature_cols)}") print(f" - 来自 metadata: {len(selected_factors)}") print(f" - 来自表达式: {len(factor_definitions)}") print(f"Label: {list(label_factor.keys())[0]}") print(f"已注册因子总数: {len(engine.list_registered())}") return feature_cols def prepare_data( engine: FactorEngine, feature_cols: List[str], start_date: str, end_date: str, ) -> pl.DataFrame: """准备数据""" print("\n" + "=" * 80) print("准备数据") print("=" * 80) # 计算因子(全市场数据) print(f"\n计算因子: {start_date} - {end_date}") factor_names = feature_cols + [LABEL_NAME] # 包含 label data = engine.compute( factor_names=factor_names, start_date=start_date, end_date=end_date, ) print(f"数据形状: {data.shape}") print(f"数据列: {data.columns}") print(f"\n前5行预览:") print(data.head()) return data def prepare_ranking_data( df: pl.DataFrame, label_col: str = "future_return_5", date_col: str = "trade_date", n_quantiles: int = 20, ) -> Tuple[pl.DataFrame, str]: """准备排序学习数据 将连续 label 转换为分位数标签,用于排序学习任务。 Args: df: 原始数据 label_col: 原始标签列名 date_col: 日期列名 n_quantiles: 分位数数量 Returns: (处理后的 DataFrame, 新的标签列名) """ print("\n" + "=" * 80) print(f"准备排序学习数据(将 {label_col} 转换为 {n_quantiles} 分位数标签)") print("=" * 80) # 新的标签列名 rank_col = f"{label_col}_rank" # 按日期分组进行分位数划分 # 使用 rank 生成 0, 1, 2, ..., n_quantiles-1 的标签 # 方法: 计算每天内的排名,然后映射到 n_quantiles 个分位数组 df_ranked = ( df.with_columns( # 计算每天内的排名 (1-based) pl.col(label_col).rank(method="min").over(date_col).alias("_rank") ) .with_columns( # 将排名转换为分位数标签 (0 to n_quantiles-1) ((pl.col("_rank") - 1) / pl.len().over(date_col) * n_quantiles) .floor() .cast(pl.Int64) .clip(0, n_quantiles - 1) .alias(rank_col) ) .drop("_rank") ) # 检查转换结果 print(f"\n原始 {label_col} 统计:") print(df_ranked[label_col].describe()) print(f"\n转换后 {rank_col} 统计:") print(df_ranked[rank_col].describe()) # 检查每日样本分布 print(f"\n每日样本数统计:") daily_counts = df_ranked.group_by(date_col).agg(pl.count().alias("count")) print(daily_counts["count"].describe()) # 检查分位数分布(应该是均匀的) print(f"\n分位数标签分布:") rank_dist = df_ranked[rank_col].value_counts().sort(rank_col) print(rank_dist) return df_ranked, rank_col def compute_group_array(df: pl.DataFrame, date_col: str = "trade_date") -> np.ndarray: """计算 group 数组用于 LambdaRank 每个日期作为一个 query,group 数组表示每个 query 的样本数。 Args: df: 数据框 date_col: 日期列名 Returns: group 数组 """ group_counts = df.group_by(date_col, maintain_order=True).agg( pl.count().alias("count") ) return group_counts["count"].to_numpy() def evaluate_ndcg_at_k( y_true: np.ndarray, y_pred: np.ndarray, group: np.ndarray, k_list: List[int] = [1, 5, 10, 20], ) -> dict: """计算 NDCG@k 指标 Args: y_true: 真实标签 y_pred: 预测分数 group: 分组数组 k_list: 要计算的 k 值列表 Returns: NDCG 指标字典 """ results = {} # 按 group 拆分数据 start_idx = 0 y_true_groups = [] y_pred_groups = [] for group_size in group: end_idx = start_idx + group_size y_true_groups.append(y_true[start_idx:end_idx]) y_pred_groups.append(y_pred[start_idx:end_idx]) start_idx = end_idx # 计算每个 k 值的平均 NDCG for k in k_list: ndcg_scores = [] for yt, yp in zip(y_true_groups, y_pred_groups): if len(yt) > 1: try: score = ndcg_score([yt], [yp], k=k) ndcg_scores.append(score) except ValueError: # 标签都相同,无法计算 pass results[f"ndcg@{k}"] = np.mean(ndcg_scores) if ndcg_scores else 0.0 return results #%% md # ## 3. 配置参数 # # # ### 3.1 因子定义 #%% # 特征因子定义字典(复用 regression.ipynb 的因子定义) LABEL_NAME = "future_return_5_rank" # 当前选择的因子列表(从 FACTOR_DEFINITIONS 中选择要使用的因子) SELECTED_FACTORS = [ # ================= 1. 价格、趋势与路径依赖 ================= "ma_5", "ma_20", "ma_ratio_5_20", "bias_10", "high_low_ratio", "bbi_ratio", "return_5", "return_20", "kaufman_ER_20", "mom_acceleration_10_20", "drawdown_from_high_60", "up_days_ratio_20", # ================= 2. 波动率、风险调整与高阶矩 ================= "volatility_5", "volatility_20", "volatility_ratio", "std_return_20", "sharpe_ratio_20", "min_ret_20", "volatility_squeeze_5_60", # ================= 3. 日内微观结构与异象 ================= "overnight_intraday_diff", "upper_shadow_ratio", "capital_retention_20", "max_ret_20", # ================= 4. 量能、流动性与量价背离 ================= "volume_ratio_5_20", "turnover_rate_mean_5", "turnover_deviation", "amihud_illiq_20", "turnover_cv_20", "pv_corr_20", "close_vwap_deviation", # ================= 5. 基本面财务特征 ================= "roe", "roa", "profit_margin", "debt_to_equity", "current_ratio", "net_profit_yoy", "revenue_yoy", "healthy_expansion_velocity", # ================= 6. 基本面估值与截面动量共振 ================= "EP", "BP", "CP", "market_cap_rank", "turnover_rank", "return_5_rank", "EP_rank", "pe_expansion_trend", # "value_price_divergence", "active_market_cap", # "ebit_rank", ] # 因子定义字典(完整因子库) FACTOR_DEFINITIONS = { # "turnover_rate_volatility": "ts_std(log(turnover_rate), 20)" } # Label 因子定义(不参与训练,用于计算目标) LABEL_FACTOR = { LABEL_NAME: "(ts_delay(close, -5) / ts_delay(open, -1)) - 1", } #%% md # ### 3.2 训练参数配置 #%% # 日期范围配置(正确的 train/val/test 三分法) TRAIN_START = "20200101" TRAIN_END = "20231231" VAL_START = "20240101" VAL_END = "20241231" TEST_START = "20250101" TEST_END = "20251231" # 分位数配置 N_QUANTILES = 20 # 将 label 分为 20 组 # LambdaRank 模型参数配置 MODEL_PARAMS = { "objective": "lambdarank", "metric": "ndcg", "ndcg_at": 10, # 评估 NDCG@k "learning_rate": 0.01, "num_leaves": 31, "max_depth": 4, "min_data_in_leaf": 20, "n_estimators": 2000, "early_stopping_round": 300, "subsample": 0.8, "colsample_bytree": 0.8, "reg_alpha": 0.1, "reg_lambda": 1.0, "verbose": -1, "random_state": 42, "lambdarank_truncation_level": 10, "label_gain": [i for i in range(1, N_QUANTILES + 1)], } # 股票池筛选函数 def stock_pool_filter(df: pl.DataFrame) -> pl.Series: """股票池筛选函数(单日数据) 筛选条件: 1. 排除创业板(代码以 300 开头) 2. 排除科创板(代码以 688 开头) 3. 排除北交所(代码以 8、9 或 4 开头) 4. 选取当日市值最小的500只股票 """ code_filter = ( ~df["ts_code"].str.starts_with("30") & ~df["ts_code"].str.starts_with("68") & ~df["ts_code"].str.starts_with("8") & ~df["ts_code"].str.starts_with("9") & ~df["ts_code"].str.starts_with("4") ) valid_df = df.filter(code_filter) n = min(1000, len(valid_df)) small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"] return df["ts_code"].is_in(small_cap_codes) STOCK_FILTER_REQUIRED_COLUMNS = ["total_mv"] # 输出配置 OUTPUT_DIR = "output" SAVE_PREDICTIONS = True PERSIST_MODEL = False # Top N 配置:每日推荐股票数量 TOP_N = 5 # 可调整为 10, 20 等 #%% md # ## 4. 训练流程 #%% print("\n" + "=" * 80) print("LightGBM LambdaRank 排序学习训练") print("=" * 80) # 1. 创建 FactorEngine(启用 metadata 功能) print("\n[1] 创建 FactorEngine") engine = FactorEngine() # 2. 使用 metadata 定义因子 print("\n[2] 定义因子(从 metadata 注册)") feature_cols = register_factors( engine, SELECTED_FACTORS, FACTOR_DEFINITIONS, LABEL_FACTOR ) # 3. 准备数据 print("\n[3] 准备数据") data = prepare_data( engine=engine, feature_cols=feature_cols, start_date=TRAIN_START, end_date=TEST_END, ) # 4. 转换为排序学习格式(分位数标签) print("\n[4] 转换为排序学习格式") data, target_col = prepare_ranking_data( df=data, label_col=LABEL_NAME, n_quantiles=N_QUANTILES, ) # 5. 打印配置信息 print(f"\n[配置] 训练期: {TRAIN_START} - {TRAIN_END}") print(f"[配置] 验证期: {VAL_START} - {VAL_END}") print(f"[配置] 测试期: {TEST_START} - {TEST_END}") print(f"[配置] 特征数: {len(feature_cols)}") print(f"[配置] 目标变量: {target_col}({N_QUANTILES}分位数)") # 6. 创建排序学习模型 model = LightGBMLambdaRankModel(params=MODEL_PARAMS) # 7. 创建数据处理器(使用函数返回的完整特征列表) processors = [ NullFiller(feature_cols=feature_cols, strategy="mean"), Winsorizer(feature_cols=feature_cols, lower=0.01, upper=0.99), StandardScaler(feature_cols=feature_cols), ] # 8. 创建数据划分器 splitter = DateSplitter( train_start=TRAIN_START, train_end=TRAIN_END, val_start=VAL_START, val_end=VAL_END, test_start=TEST_START, test_end=TEST_END, ) # 9. 创建股票池管理器 pool_manager = StockPoolManager( filter_func=stock_pool_filter, required_columns=STOCK_FILTER_REQUIRED_COLUMNS, data_router=engine.router, ) # 10. 创建 ST 过滤器 st_filter = STFilter(data_router=engine.router) # 11. 创建训练器 trainer = Trainer( model=model, pool_manager=pool_manager, processors=processors, filters=[st_filter], splitter=splitter, target_col=target_col, feature_cols=feature_cols, persist_model=PERSIST_MODEL, ) #%% md # ### 4.1 股票池筛选 #%% print("\n" + "=" * 80) print("股票池筛选") print("=" * 80) # 先执行 ST 过滤(在股票池筛选之前,与 Trainer.train() 保持一致) if st_filter: print("\n[过滤] 应用 ST 过滤器...") data = st_filter.filter(data) print(f" ST 过滤后数据规模: {data.shape}") if pool_manager: print("\n执行每日独立筛选股票池...") filtered_data = pool_manager.filter_and_select_daily(data) print(f" 筛选前数据规模: {data.shape}") print(f" 筛选后数据规模: {filtered_data.shape}") print(f" 筛选前股票数: {data['ts_code'].n_unique()}") print(f" 筛选后股票数: {filtered_data['ts_code'].n_unique()}") print(f" 删除记录数: {len(data) - len(filtered_data)}") else: filtered_data = data print(" 未配置股票池管理器,跳过筛选") #%% md # ### 4.2 数据划分 #%% print("\n" + "=" * 80) print("数据划分") print("=" * 80) if splitter: train_data, val_data, test_data = splitter.split(filtered_data) print(f"\n训练集数据规模: {train_data.shape}") print(f"验证集数据规模: {val_data.shape}") print(f"测试集数据规模: {test_data.shape}") # 计算各集的 group 数组 train_group = compute_group_array(train_data) val_group = compute_group_array(val_data) test_group = compute_group_array(test_data) print(f"\n训练集 group 数量: {len(train_group)}") print(f"验证集 group 数量: {len(val_group)}") print(f"测试集 group 数量: {len(test_group)}") print(f"训练集日均样本数: {np.mean(train_group):.1f}") print(f"验证集日均样本数: {np.mean(val_group):.1f}") print(f"测试集日均样本数: {np.mean(test_group):.1f}") else: raise ValueError("必须配置数据划分器") #%% md # ### 4.3 数据质量检查 #%% print("\n" + "=" * 80) print("数据质量检查(必须在预处理之前)") print("=" * 80) print("\n检查训练集...") check_data_quality(train_data, feature_cols, raise_on_error=False) print("\n检查验证集...") check_data_quality(val_data, feature_cols, raise_on_error=True) print("\n检查测试集...") check_data_quality(test_data, feature_cols, raise_on_error=True) print("[成功] 数据质量检查通过,未发现异常") #%% md # ### 4.4 数据预处理 #%% print("\n" + "=" * 80) print("数据预处理") print("=" * 80) fitted_processors = [] if processors: print("\n训练集处理...") for i, processor in enumerate(processors, 1): print(f" [{i}/{len(processors)}] {processor.__class__.__name__}") train_data = processor.fit_transform(train_data) fitted_processors.append(processor) print("\n验证集处理...") for processor in fitted_processors: val_data = processor.transform(val_data) print("\n测试集处理...") for processor in fitted_processors: test_data = processor.transform(test_data) print(f"\n处理后训练集形状: {train_data.shape}") print(f"处理后验证集形状: {val_data.shape}") print(f"处理后测试集形状: {test_data.shape}") #%% md # ### 4.4 训练 LambdaRank 模型 #%% print("\n" + "=" * 80) print("训练 LambdaRank 模型") print("=" * 80) # 准备数据 X_train = train_data.select(feature_cols) y_train = train_data.select(target_col).to_series() X_val = val_data.select(feature_cols) y_val = val_data.select(target_col).to_series() print(f"\n训练样本数: {len(X_train)}") print(f"验证样本数: {len(X_val)}") print(f"特征数: {len(feature_cols)}") print(f"目标变量: {target_col}") print("\n目标变量统计(训练集):") print(y_train.describe()) print("\n开始训练...") model.fit( X=X_train, y=y_train, group=train_group, eval_set=(X_val, y_val, val_group), ) print("训练完成!") #%% md # ### 4.5 训练指标曲线 #%% print("\n" + "=" * 80) print("训练指标曲线") print("=" * 80) # 从模型获取训练评估结果 evals_result = model.get_evals_result() if evals_result is None or not evals_result: print("[警告] 没有可用的训练指标,请确保训练时使用了 eval_set 参数") else: print("[成功] 已从模型获取训练评估结果") # 获取评估的 NDCG 指标 ndcg_metrics = [k for k in evals_result["train"].keys() if "ndcg" in k] print(f"\n评估的 NDCG 指标: {ndcg_metrics}") # 显示早停信息 actual_rounds = len(list(evals_result["train"].values())[0]) expected_rounds = MODEL_PARAMS.get("n_estimators", 1000) print(f"\n[早停信息]") print(f" 配置的最大轮数: {expected_rounds}") print(f" 实际训练轮数: {actual_rounds}") best_iter = model.get_best_iteration() if best_iter is not None and best_iter < actual_rounds: print(f" 早停状态: 已触发(最佳迭代: {best_iter})") else: print(f" 早停状态: 未触发(达到最大轮数)") # 显示各 NDCG 指标的最终值 print(f"\n最终 NDCG 指标:") for metric in ndcg_metrics: train_ndcg = evals_result["train"][metric][-1] val_ndcg = evals_result["val"][metric][-1] print(f" {metric}: 训练集={train_ndcg:.4f}, 验证集={val_ndcg:.4f}") # 使用封装好的方法绘制所有指标 print("\n[绘图] 使用 LightGBM 原生接口绘制训练曲线...") fig = model.plot_all_metrics(metrics=ndcg_metrics[:4], figsize=(14, 10)) plt.show() print(f"\n[指标分析]") print(f" 各NDCG指标在验证集上的最佳值:") for metric in ndcg_metrics: val_metric_list = evals_result["val"][metric] best_iter_metric = val_metric_list.index(max(val_metric_list)) best_val = max(val_metric_list) print(f" {metric}: {best_val:.4f} (迭代 {best_iter_metric + 1})") print(f"\n[重要提醒] 验证集仅用于早停/调参,测试集完全独立于训练过程!") #%% md # ### 4.6 模型评估 #%% print("\n" + "=" * 80) print("模型评估") print("=" * 80) # 准备测试集 X_test = test_data.select(feature_cols) y_test = test_data.select(target_col).to_series() # 预测 print("\n生成预测...") predictions = model.predict(X_test) # 添加预测列 test_data = test_data.with_columns([pl.Series("prediction", predictions)]) # 计算 NDCG 指标 print("\n计算 NDCG 指标...") ndcg_results = evaluate_ndcg_at_k( y_true=y_test.to_numpy(), y_pred=predictions, group=test_group, k_list=[1, 5, 10, 20], ) print("\nNDCG 评估结果:") print("-" * 40) for metric, value in ndcg_results.items(): print(f" {metric}: {value:.4f}") # 特征重要性 print("\n特征重要性(Top 20):") print("-" * 40) importance = model.feature_importance() if importance is not None: top_features = importance.sort_values(ascending=False).head(20) for i, (feature, score) in enumerate(top_features.items(), 1): print(f" {i:2d}. {feature:30s} {score:10.2f}") #%% # 确保输出目录存在 os.makedirs(OUTPUT_DIR, exist_ok=True) # 生成时间戳 start_dt = datetime.strptime(TEST_START, "%Y%m%d") end_dt = datetime.strptime(TEST_END, "%Y%m%d") date_str = f"{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}" # 保存每日 Top N print(f"\n[1/1] 保存每日 Top {TOP_N} 股票...") topn_output_path = os.path.join(OUTPUT_DIR, "rank_output.csv") # 按日期分组,取每日 top N topn_by_date = [] unique_dates = test_data["trade_date"].unique().sort() for date in unique_dates: day_data = test_data.filter(test_data["trade_date"] == date) # 按 prediction 降序排序,取前 N topn = day_data.sort("prediction", descending=True).head(TOP_N) topn_by_date.append(topn) # 合并所有日期的 top N topn_results = pl.concat(topn_by_date) # 格式化日期并调整列顺序:日期、分数、股票 topn_to_save = topn_results.select( [ pl.col("trade_date").str.slice(0, 4) + "-" + pl.col("trade_date").str.slice(4, 2) + "-" + pl.col("trade_date").str.slice(6, 2).alias("date"), pl.col("prediction").alias("score"), pl.col("ts_code"), ] ) topn_to_save.write_csv(topn_output_path, include_header=True) print(f" 保存路径: {topn_output_path}") print( f" 保存行数: {len(topn_to_save)}({len(unique_dates)}个交易日 x 每日top{TOP_N})" ) print(f"\n 预览(前15行):") print(topn_to_save.head(15)) print("\n训练流程完成!") #%% md # ## 5. 总结 # # # 本 Notebook 实现了完整的 Learn-to-Rank 训练流程: # # # ### 核心步骤 # # # 1. **数据准备**: 计算 49 个特征因子,将 `future_return_5` 转换为 20 分位数标签 # 2. **模型训练**: 使用 LightGBM LambdaRank 学习每日股票排序 # 3. **模型评估**: 使用 NDCG@1/5/10/20 评估排序质量 # 4. **策略分析**: 基于排序分数构建 Top-k 选股策略 # # # ### 关键参数 # # # - **Objective**: lambdarank # - **Metric**: ndcg # - **Learning Rate**: 0.05 # - **Num Leaves**: 31 # - **N Quantiles**: 20 # # # ### 输出结果 # # # - rank_output.csv: 每日Top-N推荐股票(格式:date, score, ts_code) # - 特征重要性排名 # - Top-k 策略统计和图表 # - NDCG训练指标曲线 # # # ### 后续优化方向 # # # 1. **特征工程**: 尝试更多因子组合 # 2. **超参数调优**: 使用网格搜索优化 LambdaRank 参数 # 3. **模型集成**: 结合多个排序模型的预测 # 4. **更复杂的分组**: 考虑按行业分组排序 #