Files
ProStock/tests/diagnose_nan.py
liaozhaorun 36a3ccbcc8 feat(training): 新增 TabM 模型支持及数据质量优化
- 添加 TabMModel、TabPFNModel 深度学习模型实现
- 新增 DataQualityAnalyzer 进行训练前数据质量诊断
- 改进数据处理器 NaN/null 双重处理,增强数据鲁棒性
- 支持 train_skip_days 参数跳过训练初期数据不足期
- Pipeline 自动清理标签为 NaN 的样本
2026-03-31 23:11:21 +08:00

209 lines
6.6 KiB
Python

"""诊断 NaN 来源"""
import numpy as np
import polars as pl
from src.factors import FactorEngine
from src.training import (
FactorManager,
DataPipeline,
NullFiller,
Winsorizer,
StandardScaler,
)
from src.training.components.filters import STFilter
from src.experiment.common import (
SELECTED_FACTORS,
FACTOR_DEFINITIONS,
LABEL_NAME,
LABEL_FACTOR,
stock_pool_filter,
STOCK_FILTER_REQUIRED_COLUMNS,
)
# 只使用少量因子加速测试
EXCLUDED_FACTORS = [
"GTJA_alpha001",
"GTJA_alpha002",
"GTJA_alpha003",
"GTJA_alpha004",
"GTJA_alpha005",
"GTJA_alpha006",
"GTJA_alpha007",
"GTJA_alpha008",
"GTJA_alpha009",
"GTJA_alpha010",
"GTJA_alpha011",
"GTJA_alpha012",
"GTJA_alpha013",
"GTJA_alpha014",
"GTJA_alpha015",
]
TEST_DATE_RANGE = {
"train": ("20200101", "20200331"), # 缩小范围加速测试
"val": ("20200401", "20200430"),
"test": ("20200501", "20200531"),
}
def main():
print("=" * 80)
print("NaN 来源诊断")
print("=" * 80)
engine = FactorEngine()
factor_manager = FactorManager(
selected_factors=SELECTED_FACTORS,
factor_definitions=FACTOR_DEFINITIONS,
label_factor=LABEL_FACTOR,
excluded_factors=EXCLUDED_FACTORS,
)
# Step 1: 注册因子并计算原始数据
print("\n[Step 1] 注册因子并计算原始数据...")
feature_cols = factor_manager.register_to_engine(engine, verbose=False)
print(f" 特征数: {len(feature_cols)}")
all_start = min(
TEST_DATE_RANGE["train"][0],
TEST_DATE_RANGE["val"][0],
TEST_DATE_RANGE["test"][0],
)
all_end = max(
TEST_DATE_RANGE["train"][1],
TEST_DATE_RANGE["val"][1],
TEST_DATE_RANGE["test"][1],
)
raw_data = engine.compute(
factor_names=feature_cols + [LABEL_NAME],
start_date=all_start,
end_date=all_end,
)
print(f" 原始数据形状: {raw_data.shape}")
# 检查原始数据中的 NaN
print("\n[Step 2] 原始数据 NaN 统计...")
nan_counts = {}
for col in feature_cols[:20]: # 只检查前20个特征
nan_count = raw_data[col].null_count()
if nan_count > 0:
nan_counts[col] = nan_count
print(f" 含 NaN 的特征数 (前20个): {len(nan_counts)}")
for col, count in list(nan_counts.items())[:10]:
pct = count / len(raw_data) * 100
print(f" {col}: {count} ({pct:.1f}%)")
# Step 3: 应用过滤器
print("\n[Step 3] 应用过滤器...")
st_filter = STFilter(data_router=engine.router)
filtered_data = st_filter.filter(raw_data)
print(f" 过滤后数据形状: {filtered_data.shape}")
# 检查过滤后的 NaN
nan_after_filter = sum(filtered_data[col].null_count() for col in feature_cols[:20])
print(f" 前20个特征总 NaN 数: {nan_after_filter}")
# Step 4: 应用股票池筛选
print("\n[Step 4] 应用股票池筛选...")
from src.training.core.stock_pool_manager import StockPoolManager
pool_manager = StockPoolManager(
filter_func=stock_pool_filter,
required_columns=STOCK_FILTER_REQUIRED_COLUMNS,
data_router=engine.router,
)
pool_data = pool_manager.filter_and_select_daily(filtered_data)
print(f" 筛选后数据形状: {pool_data.shape}")
# 检查筛选后的 NaN
nan_after_pool = sum(pool_data[col].null_count() for col in feature_cols[:20])
print(f" 前20个特征总 NaN 数: {nan_after_pool}")
# Step 5: 划分数据
print("\n[Step 5] 划分训练集...")
train_mask = (pool_data["trade_date"] >= TEST_DATE_RANGE["train"][0]) & (
pool_data["trade_date"] <= TEST_DATE_RANGE["train"][1]
)
train_df = pool_data.filter(train_mask)
print(f" 训练集形状: {train_df.shape}")
# 检查训练集的 NaN
nan_train_before = sum(train_df[col].null_count() for col in feature_cols[:20])
print(f" 前20个特征总 NaN 数: {nan_train_before}")
# Step 6: 依次应用 processors 并检查每一步的 NaN
print("\n[Step 6] 依次应用 processors...")
# 6.1 NullFiller
print("\n [6.1] NullFiller (by_date=True, strategy=mean)...")
null_filler = NullFiller(feature_cols=feature_cols, strategy="mean", by_date=True)
after_null = null_filler.fit_transform(train_df)
nan_after_null = sum(after_null[col].null_count() for col in feature_cols[:20])
print(f" 处理后前20个特征总 NaN 数: {nan_after_null}")
# 检查具体哪些列还有 NaN
if nan_after_null > 0:
print(" 仍有 NaN 的列:")
for col in feature_cols[:20]:
count = after_null[col].null_count()
if count > 0:
print(f" {col}: {count}")
# 6.2 Winsorizer
print("\n [6.2] Winsorizer (by_date=False)...")
winsorizer = Winsorizer(
feature_cols=feature_cols, lower=0.01, upper=0.99, by_date=False
)
after_winsor = winsorizer.fit_transform(after_null)
nan_after_winsor = sum(after_winsor[col].null_count() for col in feature_cols[:20])
print(f" 处理后前20个特征总 NaN 数: {nan_after_winsor}")
# 6.3 StandardScaler
print("\n [6.3] StandardScaler...")
scaler = StandardScaler(feature_cols=feature_cols)
after_scaler = scaler.fit_transform(after_winsor)
nan_after_scaler = sum(after_scaler[col].null_count() for col in feature_cols[:20])
print(f" 处理后前20个特征总 NaN 数: {nan_after_scaler}")
# 检查具体哪些列还有 NaN
if nan_after_scaler > 0:
print(" 仍有 NaN 的列:")
for col in feature_cols[:20]:
count = after_scaler[col].null_count()
if count > 0:
# 检查这列在训练时的统计量
has_mean = col in scaler.mean_
has_std = col in scaler.std_
mean_val = scaler.mean_.get(col, "N/A")
std_val = scaler.std_.get(col, "N/A")
print(f" {col}: {count}, mean={mean_val}, std={std_val}")
# Step 7: 提取 X 并检查
print("\n[Step 7] 提取特征矩阵 X...")
X = after_scaler.select(feature_cols)
X_np = X.to_numpy()
print(f" X 形状: {X_np.shape}")
print(f" X 中 NaN 总数: {np.isnan(X_np).sum()}")
# 检查哪些特征列有 NaN
nan_by_col = []
for i, col in enumerate(feature_cols):
col_nan = np.isnan(X_np[:, i]).sum()
if col_nan > 0:
nan_by_col.append((col, col_nan))
print(f" 含 NaN 的特征列数: {len(nan_by_col)}")
for col, count in nan_by_col[:10]:
print(f" {col}: {count}")
print("\n" + "=" * 80)
print("诊断完成")
print("=" * 80)
if __name__ == "__main__":
main()