refactor(training): 简化 LightGBM 模型参数处理

- 重构 LightGBM 和 LambdaRank 模型,移除参数提取逻辑
- 模型类只保留 params 属性,符合 LightGBM 设计规范
This commit is contained in:
2026-03-14 02:41:24 +08:00
parent ecb22b826c
commit bdf937086f
4 changed files with 105 additions and 864 deletions

File diff suppressed because one or more lines are too long

View File

@@ -293,6 +293,7 @@ SELECTED_FACTORS = [
"net_profit_yoy",
"revenue_yoy",
"healthy_expansion_velocity",
"ebit_rank",
# ================= 6. 基本面估值与截面动量共振 =================
"EP",
"BP",
@@ -304,12 +305,11 @@ SELECTED_FACTORS = [
"pe_expansion_trend",
"value_price_divergence",
"active_market_cap",
"ebit_rank",
]
# 因子定义字典(完整因子库)
FACTOR_DEFINITIONS = {
"turnover_rate_volatility": "ts_std(log(turnover_rate), 20)"
# "turnover_rate_volatility": "ts_std(log(turnover_rate), 20)"
}
# Label 因子定义(不参与训练,用于计算目标)
@@ -341,7 +341,7 @@ MODEL_PARAMS = {
"max_depth": 4,
"min_data_in_leaf": 20,
"n_estimators": 2000,
"early_stopping_round": 300,
"early_stopping_round": 100,
"subsample": 0.8,
"colsample_bytree": 0.8,
"reg_alpha": 0.1,
@@ -372,7 +372,7 @@ def stock_pool_filter(df: pl.DataFrame) -> pl.Series:
)
valid_df = df.filter(code_filter)
n = min(1000, len(valid_df))
n = min(500, len(valid_df))
small_cap_codes = valid_df.sort("total_mv").head(n)["ts_code"]
return df["ts_code"].is_in(small_cap_codes)

View File

@@ -29,33 +29,14 @@ class LightGBMModel(BaseModel):
name = "lightgbm"
def __init__(
self,
params: Optional[dict] = None,
objective: str = "regression",
metric: str = "rmse",
num_leaves: int = 31,
learning_rate: float = 0.05,
n_estimators: int = 100,
**kwargs,
):
def __init__(self, params: Optional[dict] = None):
"""初始化 LightGBM 模型
支持两种方式传入参数:
1. 通过 params 字典传入所有参数(推荐方式)
2. 通过独立参数传入(向后兼容)
Args:
params: LightGBM 参数字典,如果提供则直接使用此字典
objective: 目标函数,默认 "regression"
metric: 评估指标,默认 "rmse"
num_leaves: 叶子节点数,默认 31
learning_rate: 学习率,默认 0.05
n_estimators: 迭代次数,默认 100
**kwargs: 其他 LightGBM 参数
params: LightGBM 参数字典,直接传递给 lgb.train()。
包含所有模型参数和训练控制参数(如 n_estimators
Examples:
>>> # 方式1通过 params 字典(推荐)
>>> model = LightGBMModel(params={
... "objective": "regression",
... "metric": "rmse",
@@ -63,32 +44,8 @@ class LightGBMModel(BaseModel):
... "learning_rate": 0.05,
... "n_estimators": 100,
... })
>>>
>>> # 方式2通过独立参数向后兼容
>>> model = LightGBMModel(
... objective="regression",
... num_leaves=31,
... learning_rate=0.05,
... )
"""
if params is not None:
# 方式1直接使用 params 字典
self.params = dict(params) # 复制一份,避免修改原始字典
self.params.setdefault("verbose", -1) # 默认抑制训练输出
# n_estimators 可能存在于 params 中
self.n_estimators = self.params.pop("n_estimators", n_estimators)
else:
# 方式2通过独立参数构建 params
self.params = {
"objective": objective,
"metric": metric,
"num_leaves": num_leaves,
"learning_rate": learning_rate,
"verbose": -1, # 抑制训练输出
**kwargs,
}
self.n_estimators = n_estimators
self.params = dict(params) if params is not None else {}
self.model = None
self.feature_names_: Optional[list] = None
@@ -113,21 +70,19 @@ class LightGBMModel(BaseModel):
"使用 LightGBMModel 需要安装 lightgbm: pip install lightgbm"
)
# 保存特征名称
self.feature_names_ = X.columns
# 转换为 numpy
X_np = X.to_numpy()
y_np = y.to_numpy()
# 创建数据集
train_data = lgb.Dataset(X_np, label=y_np)
# 训练
# 从 params 中提取 num_boost_round默认 100
num_boost_round = self.params.pop("n_estimators", 100)
self.model = lgb.train(
self.params,
train_data,
num_boost_round=self.n_estimators,
num_boost_round=num_boost_round,
)
return self
@@ -148,7 +103,8 @@ class LightGBMModel(BaseModel):
raise RuntimeError("模型尚未训练,请先调用 fit()")
X_np = X.to_numpy()
return self.model.predict(X_np)
result = self.model.predict(X_np)
return np.asarray(result)
def feature_importance(self) -> Optional[pd.Series]:
"""返回特征重要性
@@ -179,7 +135,6 @@ class LightGBMModel(BaseModel):
self.model.save_model(path)
# 同时保存特征名称LightGBM 原生格式不保存这个)
import json
meta_path = path + ".meta.json"
@@ -188,7 +143,6 @@ class LightGBMModel(BaseModel):
{
"feature_names": self.feature_names_,
"params": self.params,
"n_estimators": self.n_estimators,
},
f,
)
@@ -211,16 +165,13 @@ class LightGBMModel(BaseModel):
instance = cls()
instance.model = lgb.Booster(model_file=path)
# 加载元数据
meta_path = path + ".meta.json"
try:
with open(meta_path, "r") as f:
meta = json.load(f)
instance.feature_names_ = meta.get("feature_names")
instance.params = meta.get("params", instance.params)
instance.n_estimators = meta.get("n_estimators", instance.n_estimators)
instance.params = meta.get("params", {})
except FileNotFoundError:
# 如果没有元数据文件继续运行feature_names_ 为 None
pass
return instance

View File

@@ -30,35 +30,14 @@ class LightGBMLambdaRankModel(BaseModel):
name = "lightgbm_lambdarank"
def __init__(
self,
params: Optional[dict] = None,
learning_rate: float = 0.05,
num_leaves: int = 31,
n_estimators: int = 100,
min_data_in_leaf: int = 20,
ndcg_at: Optional[List[int]] = None,
early_stopping_rounds: int = 50,
**kwargs,
):
def __init__(self, params: Optional[dict] = None):
"""初始化 LambdaRank 模型
支持两种方式传入参数:
1. 通过 params 字典传入所有参数(推荐方式)
2. 通过独立参数传入(向后兼容)
Args:
params: LightGBM 参数字典,如果提供则直接使用此字典
learning_rate: 学习率,默认 0.05
num_leaves: 叶子节点数,默认 31
n_estimators: 迭代次数,默认 100
min_data_in_leaf: 叶子最小样本数,默认 20
ndcg_at: NDCG 评估的 k 值列表,默认 [1, 5, 10, 20]
early_stopping_rounds: 早停轮数,默认 50
**kwargs: 其他 LightGBM 参数
params: LightGBM 参数字典,直接传递给 lgb.train()。
包含所有模型参数和训练控制参数。
Examples:
>>> # 方式1通过 params 字典(推荐)
>>> model = LightGBMLambdaRankModel(params={
... "objective": "lambdarank",
... "metric": "ndcg",
@@ -66,39 +45,13 @@ class LightGBMLambdaRankModel(BaseModel):
... "num_leaves": 31,
... "learning_rate": 0.05,
... "n_estimators": 1000,
... "early_stopping_round": 50,
... })
"""
if ndcg_at is None:
ndcg_at = [1, 5, 10, 20]
if params is not None:
# 方式1直接使用 params 字典
self.params = dict(params) # 复制一份,避免修改原始字典
self.params.setdefault("objective", "lambdarank")
self.params.setdefault("metric", "ndcg")
self.params.setdefault("verbose", -1)
self.n_estimators = self.params.pop("n_estimators", n_estimators)
self.early_stopping_rounds = self.params.pop(
"early_stopping_rounds", early_stopping_rounds
)
else:
# 方式2通过独立参数构建 params
self.params = {
"objective": "lambdarank",
"metric": "ndcg",
"ndcg_at": ndcg_at,
"num_leaves": num_leaves,
"learning_rate": learning_rate,
"min_data_in_leaf": min_data_in_leaf,
"verbose": -1,
**kwargs,
}
self.n_estimators = n_estimators
self.early_stopping_rounds = early_stopping_rounds
self.params = dict(params) if params is not None else {}
self.model = None
self.feature_names_: Optional[list] = None
self.evals_result_: Optional[dict] = None # 存储训练评估结果
self.evals_result_: Optional[dict] = None
def fit(
self,
@@ -112,10 +65,7 @@ class LightGBMLambdaRankModel(BaseModel):
Args:
X: 特征矩阵 (Polars DataFrame)
y: 目标变量 (Polars Series),应为分位数标签 (0, 1, 2, ...)
group: 分组数组,表示每个 query 的样本数
例如 [10, 15, 20] 表示第一个 query 有 10 个样本,
第二个 query 有 15 个样本,第三个 query 有 20 个样本。
如果为 None则假设所有样本属于同一个 query。
group: 分组数组,表示每个 query 的样本数
eval_set: 验证集元组 (X_val, y_val, group_val),用于早停
Returns:
@@ -133,19 +83,13 @@ class LightGBMLambdaRankModel(BaseModel):
"使用 LightGBMLambdaRankModel 需要安装 lightgbm: pip install lightgbm"
)
# 保存特征名称
self.feature_names_ = X.columns
# 转换为 numpy
X_np = X.to_numpy()
y_np = y.to_numpy()
# 处理 group 参数
if group is None:
# 如果未提供 group假设所有样本属于同一个 query
group = np.array([len(y_np)])
# 验证 group 参数
if not isinstance(group, np.ndarray):
group = np.array(group)
if group.sum() != len(y_np):
@@ -153,10 +97,8 @@ class LightGBMLambdaRankModel(BaseModel):
f"group 数组的和 ({group.sum()}) 必须等于样本数 ({len(y_np)})"
)
# 创建训练数据集
train_data = lgb.Dataset(X_np, label=y_np, group=group)
# 准备验证集和验证集名称
valid_sets = [train_data]
valid_names = ["train"]
if eval_set is not None:
@@ -173,19 +115,22 @@ class LightGBMLambdaRankModel(BaseModel):
valid_sets.append(val_data)
valid_names.append("val")
# 初始化评估结果存储
self.evals_result_ = {}
# 训练
# 从 params 提取训练控制参数
params_copy = dict(self.params)
num_boost_round = params_copy.pop("n_estimators", 100)
early_stopping_round = params_copy.pop("early_stopping_round", 50)
callbacks = [
lgb.early_stopping(stopping_rounds=self.early_stopping_rounds),
lgb.early_stopping(stopping_rounds=early_stopping_round),
lgb.record_evaluation(self.evals_result_),
]
self.model = lgb.train(
self.params,
params_copy,
train_data,
num_boost_round=self.n_estimators,
num_boost_round=num_boost_round,
valid_sets=valid_sets,
valid_names=valid_names,
callbacks=callbacks,
@@ -200,7 +145,7 @@ class LightGBMLambdaRankModel(BaseModel):
X: 特征矩阵 (Polars DataFrame)
Returns:
预测分数 (numpy ndarray),分数越高表示排序越靠前
预测分数 (numpy ndarray)
Raises:
RuntimeError: 模型未训练时调用
@@ -209,18 +154,37 @@ class LightGBMLambdaRankModel(BaseModel):
raise RuntimeError("模型尚未训练,请先调用 fit()")
X_np = X.to_numpy()
return self.model.predict(X_np)
result = self.model.predict(X_np)
return np.asarray(result)
def get_evals_result(self) -> Optional[dict]:
"""获取训练评估结果
Returns:
评估结果字典,包含训练集和验证集的指标历史
格式: {'train': {'metric_name': [...]}, 'val': {'metric_name': [...]}}
如果模型尚未训练,返回 None
评估结果字典,如果模型尚未训练返回 None
"""
return self.evals_result_
def get_best_iteration(self) -> Optional[int]:
"""获取最佳迭代轮数(考虑早停)
Returns:
最佳迭代轮数,如果模型未训练返回 None
"""
if self.model is None:
return None
return self.model.best_iteration
def get_best_score(self) -> Optional[dict]:
"""获取最佳评分
Returns:
最佳评分字典,如果模型未训练返回 None
"""
if self.model is None:
return None
return self.model.best_score
def plot_metric(
self,
metric: Optional[str] = None,
@@ -230,25 +194,14 @@ class LightGBMLambdaRankModel(BaseModel):
):
"""绘制训练指标曲线
使用 LightGBM 原生的 plot_metric 接口绘制训练曲线。
Args:
metric: 要绘制的指标名称,如 'ndcg@5''ndcg@10' 等。
如果为 None则自动选择第一个可用的 NDCG 指标。
metric: 要绘制的指标名称,如 'ndcg@5'
figsize: 图大小,默认 (10, 6)
title: 图表标题,如果为 None 则自动生成
ax: matplotlib Axes 对象,如果为 None 则创建新图
title: 图表标题
ax: matplotlib Axes 对象
Returns:
matplotlib Axes 对象
Raises:
RuntimeError: 模型尚未训练
ValueError: 指定的指标不存在
Examples:
>>> model.plot_metric('ndcg@20') # 绘制 ndcg@20 曲线
>>> model.plot_metric() # 自动选择指标
"""
if self.model is None:
raise RuntimeError("模型尚未训练,请先调用 fit()")
@@ -259,7 +212,6 @@ class LightGBMLambdaRankModel(BaseModel):
import lightgbm as lgb
import matplotlib.pyplot as plt
# 如果没有指定指标,自动选择第一个 NDCG 指标
if metric is None:
available_metrics = list(self.evals_result_.get("train", {}).keys())
ndcg_metrics = [m for m in available_metrics if "ndcg" in m.lower()]
@@ -270,20 +222,17 @@ class LightGBMLambdaRankModel(BaseModel):
else:
raise ValueError("没有可用的评估指标")
# 检查指标是否存在
if metric not in self.evals_result_.get("train", {}):
available = list(self.evals_result_.get("train", {}).keys())
raise ValueError(f"指标 '{metric}' 不存在。可用的指标: {available}")
# 创建图表
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
_, ax = plt.subplots(figsize=figsize)
# 使用 LightGBM 原生接口绘制
lgb.plot_metric(self.evals_result_, metric=metric, ax=ax)
# 设置标题
if title is None:
assert metric is not None
title = f"Training Metric ({metric.upper()}) over Iterations"
ax.set_title(title, fontsize=12, fontweight="bold")
@@ -297,18 +246,13 @@ class LightGBMLambdaRankModel(BaseModel):
):
"""绘制所有训练指标曲线
在一个图表中绘制多个指标的训练曲线。
Args:
metrics: 要绘制的指标列表,如果为 None 则绘制所有 NDCG 指标
metrics: 要绘制的指标列表
figsize: 图大小,默认 (14, 10)
max_cols: 每行最多显示的子图数,默认 2
Returns:
matplotlib Figure 对象
Raises:
RuntimeError: 模型尚未训练
"""
if self.model is None:
raise RuntimeError("模型尚未训练,请先调用 fit()")
@@ -321,7 +265,6 @@ class LightGBMLambdaRankModel(BaseModel):
available_metrics = list(self.evals_result_.get("train", {}).keys())
# 如果没有指定指标,使用所有 NDCG 指标(最多 4 个)
if metrics is None:
ndcg_metrics = [m for m in available_metrics if "ndcg" in m.lower()]
metrics = ndcg_metrics[:4] if ndcg_metrics else available_metrics[:4]
@@ -329,7 +272,6 @@ class LightGBMLambdaRankModel(BaseModel):
if not metrics:
raise ValueError("没有可用的评估指标")
# 计算子图布局
n_metrics = len(metrics)
n_cols = min(max_cols, n_metrics)
n_rows = (n_metrics + n_cols - 1) // n_cols
@@ -362,34 +304,12 @@ class LightGBMLambdaRankModel(BaseModel):
transform=ax.transAxes,
)
# 隐藏多余的子图
for idx in range(n_metrics, len(axes)):
axes[idx].axis("off")
plt.tight_layout()
return fig
def get_best_iteration(self) -> Optional[int]:
"""获取最佳迭代轮数(考虑早停)
Returns:
最佳迭代轮数,如果模型未训练返回 None
"""
if self.model is None:
return None
return self.model.best_iteration
def get_best_score(self) -> Optional[dict]:
"""获取最佳评分
Returns:
最佳评分字典,格式: {'valid_0': {'metric': value}, 'valid_1': {...}}
如果模型未训练返回 None
"""
if self.model is None:
return None
return self.model.best_score
def feature_importance(self) -> Optional[pd.Series]:
"""返回特征重要性
@@ -405,9 +325,6 @@ class LightGBMLambdaRankModel(BaseModel):
def save(self, path: str) -> None:
"""保存模型(使用 LightGBM 原生格式)
使用 LightGBM 的原生格式保存,不依赖 pickle
可以在不同环境中加载。
Args:
path: 保存路径
@@ -419,7 +336,6 @@ class LightGBMLambdaRankModel(BaseModel):
self.model.save_model(path)
# 同时保存特征名称和其他元数据
import json
meta_path = path + ".meta.json"
@@ -428,8 +344,6 @@ class LightGBMLambdaRankModel(BaseModel):
{
"feature_names": self.feature_names_,
"params": self.params,
"n_estimators": self.n_estimators,
"early_stopping_rounds": self.early_stopping_rounds,
},
f,
)
@@ -438,8 +352,6 @@ class LightGBMLambdaRankModel(BaseModel):
def load(cls, path: str) -> "LightGBMLambdaRankModel":
"""加载模型
从 LightGBM 原生格式加载模型。
Args:
path: 模型文件路径
@@ -452,19 +364,13 @@ class LightGBMLambdaRankModel(BaseModel):
instance = cls()
instance.model = lgb.Booster(model_file=path)
# 加载元数据
meta_path = path + ".meta.json"
try:
with open(meta_path, "r") as f:
meta = json.load(f)
instance.feature_names_ = meta.get("feature_names")
instance.params = meta.get("params", instance.params)
instance.n_estimators = meta.get("n_estimators", instance.n_estimators)
instance.early_stopping_rounds = meta.get(
"early_stopping_rounds", instance.early_stopping_rounds
)
instance.params = meta.get("params", {})
except FileNotFoundError:
# 如果没有元数据文件继续运行feature_names_ 为 None
pass
return instance
@@ -476,24 +382,13 @@ class LightGBMLambdaRankModel(BaseModel):
) -> np.ndarray:
"""从日期列生成 group 数组
将数据按日期分组,每个日期作为一个 query。
Args:
df: 包含日期列的 DataFrame
date_col: 日期列名,默认 "trade_date"
Returns:
group 数组,表示每个日期的样本数
Example:
>>> df = pl.DataFrame({
... "trade_date": ["20240101", "20240101", "20240102", "20240102", "20240102"],
... "feature": [1, 2, 3, 4, 5]
... })
>>> group = LightGBMLambdaRankModel.prepare_group_from_dates(df)
>>> print(group) # array([2, 3])
group 数组
"""
# 按日期统计样本数
group_counts = df.group_by(date_col, maintain_order=True).agg(
pl.count().alias("count")
)
@@ -509,35 +404,19 @@ class LightGBMLambdaRankModel(BaseModel):
) -> pl.DataFrame:
"""将连续标签转换为分位数标签
对每个日期的数据分别进行分位数划分,生成 0, 1, 2, ..., n_quantiles-1 的标签。
值越大表示原始值越大(排序越靠前)。
Args:
df: 输入 DataFrame
label_col: 原始标签列名(如 "future_return_5"
label_col: 原始标签列名
date_col: 日期列名,默认 "trade_date"
n_quantiles: 分位数数量,默认 20
new_col_name: 新列名,默认在原始列名后加 "_rank"
new_col_name: 新列名
Returns:
添加了分位数标签列的 DataFrame
Example:
>>> df = pl.DataFrame({
... "trade_date": ["20240101"] * 5 + ["20240102"] * 5,
... "future_return_5": [0.01, 0.02, 0.03, 0.04, 0.05,
... 0.02, 0.03, 0.04, 0.05, 0.06]
... })
>>> df = LightGBMLambdaRankModel.convert_to_quantile_labels(
... df, "future_return_5", n_quantiles=5
... )
>>> print(df["future_return_5_rank"]) # [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
"""
if new_col_name is None:
new_col_name = f"{label_col}_rank"
# 使用 qcut 按日期分组进行分位数划分
# qcut 返回的是 Categorical使用 to_physical() 转换为整数0, 1, 2, ...
return df.with_columns(
pl.col(label_col)
.qcut(n_quantiles)
@@ -560,17 +439,15 @@ class LightGBMLambdaRankModel(BaseModel):
X: 特征矩阵
y: 真实标签
group: 分组数组
k: NDCG@k 的 k 值None 表示使用所有位置
k: NDCG@k 的 k 值
Returns:
NDCG 分数
"""
from sklearn.metrics import ndcg_score
# 获取预测分数
y_pred = self.predict(X)
# 将数据按 group 拆分
y_true_list = []
y_score_list = []
@@ -581,15 +458,13 @@ class LightGBMLambdaRankModel(BaseModel):
y_score_list.append(y_pred[start_idx:end_idx])
start_idx = end_idx
# 计算平均 NDCG
ndcg_scores = []
for y_true, y_score in zip(y_true_list, y_score_list):
if len(y_true) > 1: # 至少要有 2 个样本才能计算 NDCG
if len(y_true) > 1:
try:
score = ndcg_score([y_true], [y_score], k=k)
ndcg_scores.append(score)
except ValueError:
# 如果标签都相同,跳过
pass
return np.mean(ndcg_scores) if ndcg_scores else 0.0
return float(np.mean(ndcg_scores)) if ndcg_scores else 0.0