feat(training): 新增 TabM 模型支持及数据质量优化

- 添加 TabMModel、TabPFNModel 深度学习模型实现
- 新增 DataQualityAnalyzer 进行训练前数据质量诊断
- 改进数据处理器 NaN/null 双重处理,增强数据鲁棒性
- 支持 train_skip_days 参数跳过训练初期数据不足期
- Pipeline 自动清理标签为 NaN 的样本
This commit is contained in:
2026-03-31 23:11:21 +08:00
parent 9e0114c745
commit 36a3ccbcc8
22 changed files with 4421 additions and 204 deletions

View File

@@ -5,5 +5,7 @@
from src.training.components.models.lightgbm import LightGBMModel
from src.training.components.models.lightgbm_lambdarank import LightGBMLambdaRankModel
from src.training.components.models.tabpfn_model import TabPFNModel
from src.training.components.models.tabm_model import TabMModel
__all__ = ["LightGBMModel", "LightGBMLambdaRankModel"]
__all__ = ["LightGBMModel", "LightGBMLambdaRankModel", "TabPFNModel", "TabMModel"]

View File

@@ -0,0 +1,368 @@
"""TabM模型实现
TabM (Tabular Multilayer Perceptron with Ensembles)
基于 rtdl_revisiting_models 的 TabM 模型,支持内置集成。
"""
from typing import Dict, Any, List, Optional
from pathlib import Path
import pickle
import numpy as np
import polars as pl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tabm import TabM
from src.training.components.base import BaseModel
from src.training.registry import register_model
@register_model("tabm")
class TabMModel(BaseModel):
"""TabM回归模型
特点:
- 使用MLP架构
- 内置集成机制(ensemble_size),显存开销远小于独立模型
- 训练时所有集成成员独立优化,保持多样性
- 预测时取集成成员均值获得稳定结果
Attributes:
name: 模型名称标识
params: 模型参数字典
model: TabM模型实例
device: 计算设备(cuda/cpu)
training_history_: 训练历史记录
feature_names_: 特征名称列表
"""
name = "tabm"
def __init__(self, params: Dict[str, Any]):
"""初始化TabM模型
Args:
params: 模型参数字典,包含:
- n_blocks: MLP层数 (默认: 3)
- d_block: 每层神经元数 (默认: 256)
- dropout: Dropout率 (默认: 0.1)
- ensemble_size: 集成大小 (默认: 32)
- batch_size: 批次大小 (默认: 1024)
- learning_rate: 学习率 (默认: 1e-3)
- weight_decay: 权重衰减 (默认: 1e-5)
- epochs: 训练轮数 (默认: 50)
- early_stopping_patience: 早停耐心值 (默认: 10)
"""
self.params = params
self.model = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.training_history_: Dict[str, List[float]] = {
"train_loss": [],
"val_loss": [],
}
self.feature_names_: Optional[List[str]] = None
# 损失函数
self.criterion = nn.MSELoss()
def _make_loader(
self, X: np.ndarray, y: Optional[np.ndarray] = None, shuffle: bool = False
) -> DataLoader:
"""创建DataLoader
Args:
X: 特征数组 [N, n_features]
y: 标签数组 [N] 或 None
shuffle: 是否打乱数据
Returns:
DataLoader实例
"""
if y is not None:
dataset = TensorDataset(torch.from_numpy(X), torch.from_numpy(y))
else:
dataset = TensorDataset(torch.from_numpy(X))
batch_size = self.params.get("batch_size", 1024)
return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
def _validate(self, val_loader: DataLoader) -> float:
"""验证模型
Args:
val_loader: 验证数据加载器
Returns:
平均验证损失
"""
self.model.eval()
total_loss = 0.0
n_batches = 0
with torch.no_grad():
for batch in val_loader:
if len(batch) == 2:
bx, by = batch
bx, by = bx.to(self.device), by.to(self.device)
else:
bx = batch[0].to(self.device)
by = None
# 预测时取集成成员均值
outputs = self.model(bx) # [B, E, 1]
preds = outputs.mean(dim=1).squeeze(-1) # [B]
if by is not None:
loss = self.criterion(preds, by).item()
total_loss += loss
n_batches += 1
return total_loss / max(n_batches, 1)
def fit(
self, X: pl.DataFrame, y: pl.Series, eval_set: Optional[tuple] = None
) -> "TabMModel":
"""训练TabM模型
训练策略:
1. 对所有集成成员独立计算Loss保持多样性
2. 验证和预测时取ensemble成员均值
Args:
X: 训练特征DataFrame
y: 训练标签Series
eval_set: 验证集元组 (X_val, y_val),可选
Returns:
self
"""
# 保存特征名称
self.feature_names_ = X.columns
# 【关键】数据类型强制转换为float32
# PyTorch对float64支持较差避免使用Polars/Numpy默认类型
X_np = X.to_numpy().astype(np.float32)
y_np = y.to_numpy().astype(np.float32)
# 创建DataLoader
train_loader = self._make_loader(X_np, y_np, shuffle=True)
val_loader = None
if eval_set is not None:
X_val, y_val = eval_set
X_val_np = X_val.to_numpy().astype(np.float32)
y_val_np = y_val.to_numpy().astype(np.float32)
val_loader = self._make_loader(X_val_np, y_val_np, shuffle=False)
n_features = X_np.shape[1]
ensemble_size = self.params.get("ensemble_size", 32)
# 初始化TabM模型使用TabM.make()自动填充默认参数
self.model = TabM.make(
n_num_features=n_features,
cat_cardinalities=[],
d_out=1, # 回归任务输出维度为1
n_blocks=self.params.get("n_blocks", 3),
d_block=self.params.get("d_block", 256),
dropout=self.params.get("dropout", 0.1),
k=ensemble_size, # 集成大小
).to(self.device)
# 优化器
optimizer = optim.AdamW(
self.model.parameters(),
lr=self.params.get("learning_rate", 1e-3),
weight_decay=self.params.get("weight_decay", 1e-5),
)
# 训练参数
epochs = self.params.get("epochs", 50)
early_stopping_patience = self.params.get("early_stopping_patience", 10)
# 训练循环
best_val_loss = float("inf")
patience_counter = 0
print(f"[TabM] 开始训练... 设备: {self.device}, 集成大小: {ensemble_size}")
for epoch in range(epochs):
# 训练阶段
self.model.train()
train_loss = 0.0
n_train_batches = 0
for bx, by in train_loader:
bx, by = bx.to(self.device), by.to(self.device)
optimizer.zero_grad()
# 前向传播
# outputs形状: [Batch, Ensemble, 1]
outputs = self.model(bx)
outputs_squeezed = outputs.squeeze(-1) # [B, E]
# 【关键】针对所有集成成员计算Loss
# 不先取均值,让每个集成成员独立收敛,保持集成多样性
by_expanded = by.unsqueeze(-1).expand(-1, ensemble_size) # [B, E]
loss = self.criterion(outputs_squeezed, by_expanded)
loss.backward()
optimizer.step()
train_loss += loss.item()
n_train_batches += 1
avg_train_loss = train_loss / max(n_train_batches, 1)
self.training_history_["train_loss"].append(avg_train_loss)
# 验证阶段
if val_loader is not None:
val_loss = self._validate(val_loader)
self.training_history_["val_loss"].append(val_loss)
# 早停逻辑
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if (epoch + 1) % 5 == 0 or epoch == 0:
print(
f"[TabM] Epoch {epoch + 1}/{epochs} | "
f"Train Loss: {avg_train_loss:.6f} | "
f"Val Loss: {val_loss:.6f}"
)
if patience_counter >= early_stopping_patience:
print(f"[TabM] 早停触发epoch {epoch + 1}")
break
else:
if (epoch + 1) % 5 == 0 or epoch == 0:
print(
f"[TabM] Epoch {epoch + 1}/{epochs} | "
f"Train Loss: {avg_train_loss:.6f}"
)
print(f"[TabM] 训练完成")
return self
def predict(self, X: pl.DataFrame) -> np.ndarray:
"""生成预测
预测时对ensemble_size个成员取均值获得稳定结果。
Args:
X: 特征DataFrame
Returns:
预测结果数组 [N]
"""
if self.model is None:
raise RuntimeError("模型未训练请先调用fit()")
# 数据类型转换
X_np = X.to_numpy().astype(np.float32)
loader = self._make_loader(X_np, shuffle=False)
self.model.eval()
all_preds = []
with torch.no_grad():
for batch in loader:
bx = batch[0].to(self.device)
# 预测时取集成成员均值
outputs = self.model(bx) # [B, E, 1]
preds = outputs.mean(dim=1).squeeze(-1) # [B]
all_preds.append(preds.cpu().numpy())
return np.concatenate(all_preds)
def feature_importance(self) -> Optional[pl.Series]:
"""获取特征重要性
TabM没有内置特征重要性计算返回None。
Returns:
None
"""
return None
def save(self, path: str | Path) -> None:
"""保存模型
保存模型state_dict和元数据params, feature_names, training_history
Args:
path: 保存路径
"""
if self.model is None:
raise RuntimeError("模型未训练,无法保存")
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# 保存模型权重
model_path = path.with_suffix(".pt")
torch.save(self.model.state_dict(), model_path)
# 保存元数据
meta_path = path.with_suffix(".meta")
meta = {
"params": self.params,
"feature_names": self.feature_names_,
"training_history": self.training_history_,
"device": str(self.device),
}
with open(meta_path, "wb") as f:
pickle.dump(meta, f)
print(f"[TabM] 模型保存到: {path}")
@classmethod
def load(cls, path: str) -> "TabMModel":
"""加载模型
Args:
path: 模型路径(不含扩展名)
Returns:
加载的TabMModel实例
"""
path = Path(path)
# 加载元数据
meta_path = path.with_suffix(".meta")
with open(meta_path, "rb") as f:
meta = pickle.load(f)
# 创建实例
instance = cls(meta["params"])
instance.feature_names_ = meta["feature_names"]
instance.training_history_ = meta["training_history"]
# 重建模型结构
if instance.feature_names_ is not None:
n_features = len(instance.feature_names_)
ensemble_size = instance.params.get("ensemble_size", 32)
instance.model = TabM.make(
n_num_features=n_features,
cat_cardinalities=[],
d_out=1, # 回归任务输出维度为1
n_blocks=instance.params.get("n_blocks", 3),
d_block=instance.params.get("d_block", 256),
dropout=instance.params.get("dropout", 0.1),
k=ensemble_size, # 集成大小
).to(instance.device)
# 加载权重
model_path = path.with_suffix(".pt")
instance.model.load_state_dict(
torch.load(model_path, map_location=instance.device)
)
print(f"[TabM] 模型从 {path} 加载完成")
return instance

View File

@@ -0,0 +1,296 @@
"""TabPFN 模型实现
基于 TabPFN (Prior-Data Fitted Network) 的回归模型实现。
TabPFN 利用预训练的 Transformer 网络,通过上下文学习(in-context learning)
进行快速的小样本/中样本回归预测,无需传统训练过程。
"""
import json
import os
from pathlib import Path
from typing import Any, Optional
import numpy as np
import pandas as pd
import polars as pl
from scipy.stats import spearmanr
from src.training.components.base import BaseModel
from src.training.registry import register_model
os.environ["HF_TOKEN"] = "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"
@register_model("tabpfn")
class TabPFNModel(BaseModel):
"""TabPFN 回归模型
使用 TabPFN 库实现基于 Prior-Data Fitted Network 的回归预测。
该模型通过上下文学习方式进行预测,无需传统梯度下降训练。
支持 GPU 加速和自动上下文截断处理。
Attributes:
name: 模型名称 "tabpfn"
params: TabPFN 参数字典
model: TabPFNRegressor 实例
feature_names_: 特征名称列表
evals_result_: 训练评估结果
best_score_: 最佳评估指标
"""
name = "tabpfn"
# TabPFN 官方限制(最大样本数),可通过 ignore_pretraining_limits=True 扩展
MAX_CONTEXT_SIZE = 10000
def __init__(self, params: Optional[dict] = None):
"""初始化 TabPFN 模型
Args:
params: TabPFN 参数字典,支持以下参数:
- device: 计算设备,'cuda''cpu'(默认 'cpu'
- model_path: 本地模型权重文件路径(可选)
- N_ensemble: 集成数量,用于降低预测方差(默认 1
- max_context_size: 最大上下文样本数(默认 50000
Examples:
>>> model = TabPFNModel(params={
... "device": "cuda",
... "N_ensemble": 5,
... })
"""
self.params = dict(params) if params is not None else {}
self.model = None
self.feature_names_: Optional[list] = None
self.evals_result_: Optional[dict] = None
self.best_score_: Optional[dict] = None
def fit(
self,
X: pl.DataFrame,
y: pl.Series,
eval_set: Optional[tuple] = None,
) -> "TabPFNModel":
"""训练/加载 TabPFN 模型
TabPFN 采用上下文学习,"fit" 操作实际上是加载训练数据到模型上下文。
如果训练数据超过上下文限制,会自动截取最近的数据。
Args:
X: 特征矩阵 (Polars DataFrame)
y: 目标变量 (Polars Series)
eval_set: 验证集元组 (X_val, y_val),用于评估模型性能
Returns:
self (支持链式调用)
Raises:
ImportError: 未安装 tabpfn
RuntimeError: 模型初始化或加载失败
"""
from tabpfn import TabPFNRegressor
self.feature_names_ = X.columns
# 转换为 numpy 数组
X_np = X.to_numpy()
y_np = y.to_numpy()
# 处理上下文大小限制
max_context = self.params.get("max_context_size", self.MAX_CONTEXT_SIZE)
if len(X_np) > max_context:
print(
f"[TabPFN] 训练数据 {len(X_np)} 超过上下文限制 {max_context},截取最近数据"
)
X_np = X_np[-max_context:]
y_np = y_np[-max_context:]
# 初始化模型
# TabPFNRegressor 需要设置 ignore_pretraining_limits=True 以支持超过 10,000 样本
device = self.params.get("device", "cuda")
ignore_limits = self.params.get("ignore_pretraining_limits", True)
self.model = TabPFNRegressor(
device=device,
ignore_pretraining_limits=ignore_limits,
n_estimators=1
)
# 加载上下文TabPFN 的 "fit" 是加载上下文)
print("[TabPFN] 加载训练数据到上下文...")
self.model.fit(X_np, y_np)
# 评估验证集
if eval_set is not None:
X_val, y_val = eval_set
val_preds = self.predict(X_val)
y_val_np = y_val.to_numpy()
# 计算评估指标
mse = np.mean((y_val_np - val_preds) ** 2)
rank_ic, p_value = spearmanr(val_preds, y_val_np)
self.evals_result_ = {
"valid_0": {
"mse": [mse],
"rank_ic": [rank_ic],
}
}
self.best_score_ = {
"valid_0": {
"mse": mse,
"rank_ic": rank_ic,
"rank_ic_pvalue": p_value,
}
}
print(f"[TabPFN] 验证集 MSE: {mse:.6f}, Rank IC: {rank_ic:.4f}")
return self
def predict(self, X: pl.DataFrame) -> np.ndarray:
"""预测
Args:
X: 特征矩阵 (Polars DataFrame)
Returns:
预测结果 (numpy ndarray)
Raises:
RuntimeError: 模型未初始化时调用
"""
if self.model is None:
raise RuntimeError("模型尚未初始化,请先调用 fit()")
X_np = X.to_numpy()
result = self.model.predict(X_np)
return np.asarray(result)
def predict_with_uncertainty(
self, X: pl.DataFrame
) -> tuple[np.ndarray, np.ndarray]:
"""预测并返回不确定性估计
利用 N_ensemble 预测的标准差作为不确定性估计。
Args:
X: 特征矩阵 (Polars DataFrame)
Returns:
(predictions, uncertainties) 元组,均为 numpy ndarray
"""
if self.model is None:
raise RuntimeError("模型尚未初始化,请先调用 fit()")
X_np = X.to_numpy()
predictions = self.model.predict(X_np)
# 如果使用了 ensemble可以通过多次预测计算标准差
# 注意:这需要修改 TabPFNRegressor 的使用方式
# 这里返回预测值的零不确定性作为默认行为
uncertainties = np.zeros_like(predictions)
return np.asarray(predictions), np.asarray(uncertainties)
def get_evals_result(self) -> Optional[dict]:
"""获取训练评估结果
Returns:
评估结果字典,如果未进行评估返回 None
"""
return self.evals_result_
def get_best_score(self) -> Optional[dict]:
"""获取最佳评分
Returns:
最佳评分字典,如果未进行评估返回 None
"""
return self.best_score_
def evaluate(self, X: pl.DataFrame, y: pl.Series) -> dict[str, float]:
"""评估模型性能
计算回归任务常用指标MSE 和 Rank IC。
Args:
X: 特征矩阵
y: 真实目标值
Returns:
评估指标字典,包含 mse 和 rank_ic
"""
preds = self.predict(X)
y_np = y.to_numpy()
mse = float(np.mean((y_np - preds) ** 2))
rank_ic_result = spearmanr(preds, y_np)
rank_ic = float(rank_ic_result.correlation)
p_value = float(rank_ic_result.pvalue)
return {
"mse": mse,
"rank_ic": rank_ic,
"rank_ic_pvalue": p_value,
}
def feature_importance(self) -> None:
"""TabPFN 不支持传统特征重要性
TabPFN 是基于 Transformer 的上下文学习模型,
不提供类似决策树的特征重要性指标。
Returns:
None
"""
return None
def save(self, path: str) -> None:
"""保存模型元数据和配置
TabPFN 模型本身不支持序列化保存,因此只保存:
- 模型参数配置
- 特征名称列表
- 上下文数据摘要(样本数、特征数)
注意:实际使用时需要重新 fit 来加载上下文。
Args:
path: 保存路径
"""
save_data = {
"model_type": self.name,
"params": self.params,
"feature_names": self.feature_names_,
"evals_result": self.evals_result_,
"best_score": self.best_score_,
}
# 保存为 JSON
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(save_data, f, indent=2, ensure_ascii=False)
@classmethod
def load(cls, path: str) -> "TabPFNModel":
"""加载模型配置
注意TabPFN 模型需要重新 fit 才能使用,
此方法仅恢复模型参数配置。
Args:
path: 配置文件路径
Returns:
配置恢复的 TabPFNModel 实例(未 fit
"""
with open(path, "r", encoding="utf-8") as f:
save_data = json.load(f)
instance = cls(params=save_data.get("params", {}))
instance.feature_names_ = save_data.get("feature_names")
instance.evals_result_ = save_data.get("evals_result")
instance.best_score_ = save_data.get("best_score")
print(f"[TabPFN] 已加载模型配置,需要调用 fit() 重新加载上下文")
return instance

View File

@@ -3,6 +3,7 @@
包含标准化、缩尾、缺失值填充等数据处理器。
"""
import math
from typing import List, Literal, Optional, Union
import polars as pl
@@ -88,7 +89,9 @@ class NullFiller(BaseProcessor):
"""
if not self.by_date and self.strategy in ("mean", "median"):
for col in self.feature_cols:
if col in X.columns and X[col].dtype.is_numeric():
if col in X.columns and (
X[col].dtype.is_numeric() or X[col].dtype == pl.Boolean
):
if self.strategy == "mean":
self.stats_[col] = X[col].mean() or 0.0
else: # median
@@ -119,11 +122,14 @@ class NullFiller(BaseProcessor):
raise ValueError(f"未知的填充策略: {self.strategy}")
def _fill_with_zero(self, X: pl.DataFrame) -> pl.DataFrame:
"""使用0填充缺失值"""
"""使用0填充缺失值(同时处理 NaN 和 null"""
expressions = []
for col in X.columns:
if col in self.feature_cols and X[col].dtype.is_numeric():
expr = pl.col(col).fill_null(0).alias(col)
if col in self.feature_cols and (
X[col].dtype.is_numeric() or X[col].dtype == pl.Boolean
):
# 先 fill_nan 再 fill_null确保两种缺失值都被处理
expr = pl.col(col).fill_nan(0).fill_null(0).alias(col)
expressions.append(expr)
else:
expressions.append(pl.col(col))
@@ -131,11 +137,19 @@ class NullFiller(BaseProcessor):
return X.select(expressions)
def _fill_with_value(self, X: pl.DataFrame) -> pl.DataFrame:
"""使用指定值填充缺失值"""
"""使用指定值填充缺失值(同时处理 NaN 和 null"""
expressions = []
for col in X.columns:
if col in self.feature_cols and X[col].dtype.is_numeric():
expr = pl.col(col).fill_null(self.fill_value).alias(col)
if col in self.feature_cols and (
X[col].dtype.is_numeric() or X[col].dtype == pl.Boolean
):
# 先 fill_nan 再 fill_null
expr = (
pl.col(col)
.fill_nan(self.fill_value)
.fill_null(self.fill_value)
.alias(col)
)
expressions.append(expr)
else:
expressions.append(pl.col(col))
@@ -143,12 +157,13 @@ class NullFiller(BaseProcessor):
return X.select(expressions)
def _fill_global(self, X: pl.DataFrame) -> pl.DataFrame:
"""使用全局统计量填充(训练集学到的统计量)"""
"""使用全局统计量填充(训练集学到的统计量,同时处理 NaN 和 null"""
expressions = []
for col in X.columns:
if col in self.stats_:
fill_val = self.stats_[col]
expr = pl.col(col).fill_null(fill_val).alias(col)
# 先 fill_nan 再 fill_null
expr = pl.col(col).fill_nan(fill_val).fill_null(fill_val).alias(col)
expressions.append(expr)
else:
expressions.append(pl.col(col))
@@ -156,8 +171,9 @@ class NullFiller(BaseProcessor):
return X.select(expressions)
def _fill_by_date(self, X: pl.DataFrame) -> pl.DataFrame:
"""使用每天截面统计量填充"""
# 确定需要处理的数值
"""使用每天截面统计量填充(同时处理 NaN 和 null"""
# 确定需要处理的列(仅 numeric 类型,排除 boolean
# 注意boolean 类型没有 NaN 概念fill_nan 会报错
target_cols = [
col
for col in self.feature_cols
@@ -180,10 +196,20 @@ class NullFiller(BaseProcessor):
result = X.with_columns(stat_exprs)
# 使用统计量填充缺失值
# 注意如果某天某列全为null统计量也会为null所以需要链式填充
# 同时处理 NaN 和 null
fill_exprs = []
for col in X.columns:
if col in target_cols:
expr = pl.col(col).fill_null(pl.col(f"{col}_stat")).alias(col)
# 先用当天统计量填充 NaN 和 null如果统计量也是null则用0填充
expr = (
pl.col(col)
.fill_nan(pl.col(f"{col}_stat"))
.fill_null(pl.col(f"{col}_stat"))
.fill_nan(0) # 如果统计量是 NaN再用 0 填充
.fill_null(0) # 如果统计量是 null再用 0 填充
.alias(col)
)
fill_exprs.append(expr)
else:
fill_exprs.append(pl.col(col))
@@ -230,17 +256,40 @@ class StandardScaler(BaseProcessor):
self
"""
for col in self.feature_cols:
# 仅处理数值类型,排除布尔类型(标准化布尔类型语义不明确)
if col in X.columns and X[col].dtype.is_numeric():
col_mean = X[col].mean()
col_std = X[col].std()
if col_mean is not None and col_std is not None:
# 关键修复:检查是否为 None 且不是 NaN
# 注意:使用 try-except 处理类型转换,避免 LSP 类型检查错误
try:
mean_is_valid = (
col_mean is not None
and isinstance(col_mean, (int, float))
and not math.isnan(col_mean)
)
std_is_valid = (
col_std is not None
and isinstance(col_std, (int, float))
and not math.isnan(col_std)
)
except (TypeError, ValueError):
mean_is_valid = False
std_is_valid = False
if mean_is_valid and std_is_valid:
self.mean_[col] = col_mean
self.std_[col] = col_std
else:
# 如果统计量无效使用默认值mean=0, std=1
# 防止 transform 时产生更多 NaN
self.mean_[col] = 0.0
self.std_[col] = 1.0
return self
def transform(self, X: pl.DataFrame) -> pl.DataFrame:
"""标准化(使用训练集学到的参数)
"""标准化(使用训练集学到的参数,增加 NaN 保护
Args:
X: 待转换数据
@@ -253,7 +302,18 @@ class StandardScaler(BaseProcessor):
if col in self.mean_ and col in self.std_:
# 避免除以0
std_val = self.std_[col] if self.std_[col] != 0 else 1.0
expr = ((pl.col(col) - self.mean_[col]) / std_val).alias(col)
# 关键修复:添加 fill_nan(0) 保险,防止计算产生 NaN
expr = (
((pl.col(col) - self.mean_[col]) / std_val)
.fill_nan(0)
.fill_null(0)
.alias(col)
)
expressions.append(expr)
elif col in self.feature_cols:
# 对于应该被处理但未学习到统计量的列
# 统一转换为float并同时处理 NaN 和 null
expr = pl.col(col).cast(pl.Float64).fill_nan(0).fill_null(0).alias(col)
expressions.append(expr)
else:
expressions.append(pl.col(col))
@@ -308,13 +368,24 @@ class CrossSectionalStandardScaler(BaseProcessor):
# 构建表达式列表
expressions = []
for col in X.columns:
# 仅处理数值类型,排除布尔类型(标准化布尔类型语义不明确)
if col in self.feature_cols and X[col].dtype.is_numeric():
# 截面标准化:每天独立计算均值和标准差
# 避免除以0当std为0时设为1
# 关键修复:先 fill_nan 再 fill_null防止计算产生的 NaN
expr = (
(pl.col(col) - pl.col(col).mean().over(self.date_col))
/ (pl.col(col).std().over(self.date_col) + 1e-10)
).alias(col)
(
(pl.col(col) - pl.col(col).mean().over(self.date_col))
/ (pl.col(col).std().over(self.date_col) + 1e-10)
)
.fill_nan(0)
.fill_null(0)
.alias(col)
)
expressions.append(expr)
elif col in self.feature_cols:
# 对于应该被处理但类型不匹配的列转换为float并同时处理 NaN 和 null
expr = pl.col(col).cast(pl.Float64).fill_nan(0).fill_null(0).alias(col)
expressions.append(expr)
else:
expressions.append(pl.col(col))
@@ -384,6 +455,7 @@ class Winsorizer(BaseProcessor):
"""
if not self.by_date:
for col in self.feature_cols:
# 仅处理数值类型排除布尔类型quantile 不支持布尔类型)
if col in X.columns and X[col].dtype.is_numeric():
self.bounds_[col] = {
"lower": X[col].quantile(self.lower),
@@ -414,13 +486,19 @@ class Winsorizer(BaseProcessor):
upper = self.bounds_[col]["upper"]
expr = pl.col(col).clip(lower, upper).alias(col)
expressions.append(expr)
elif col in self.feature_cols:
# 对于应该被处理但未学习到边界的列如全为NaN、布尔列等
# 统一转换为float并填充0
expr = pl.col(col).cast(pl.Float64).fill_null(0).alias(col)
expressions.append(expr)
else:
expressions.append(pl.col(col))
return X.select(expressions)
def _transform_by_date(self, X: pl.DataFrame) -> pl.DataFrame:
"""每日独立缩尾"""
# 确定需要处理的数值
# 确定需要处理的列(仅 numeric 类型,排除 boolean
# 注意quantile 操作不支持布尔类型
target_cols = [
col
for col in self.feature_cols
@@ -444,9 +522,11 @@ class Winsorizer(BaseProcessor):
clip_exprs = []
for col in X.columns:
if col in target_cols:
# 先用当天分位数缩尾如果分位数是null该日全为NaN则填充0
clipped = (
pl.col(col)
.clip(pl.col(f"{col}_lower"), pl.col(f"{col}_upper"))
.fill_null(0)
.alias(col)
)
clip_exprs.append(clipped)