feat: 因子引擎字段验证改进、股票池过滤修复及实验模块增强
1. 因子引擎字段验证改进 - 新增 SchemaCache.get_all_fields() 方法,返回所有可用字段集合 - 修改 match_fields_to_tables(),对不存在的字段抛出明确错误 - 错误信息包含可用字段列表提示,帮助用户检查拼写 2. 股票池过滤修复 - 修复北交所股票排除逻辑:将识别方式从代码前缀(8/4开头)改为.BJ后缀 - 更新文档注释,明确北交所股票识别规则 3. 实验模块增强 - 新增 regression.py 实现回归实验逻辑 - 新增 output/ 目录存放实验输出结果
This commit is contained in:
1216
src/experiment/output/regression/predictions.csv
Normal file
1216
src/experiment/output/regression/predictions.csv
Normal file
File diff suppressed because it is too large
Load Diff
457
src/experiment/regression.py
Normal file
457
src/experiment/regression.py
Normal file
@@ -0,0 +1,457 @@
|
||||
"""LightGBM 回归训练示例 - 使用因子字符串表达式
|
||||
|
||||
使用字符串表达式定义因子,训练 LightGBM 回归模型预测未来5日收益率。
|
||||
Label: return_5 = (close / ts_delay(close, 5)) - 1
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import List, Tuple
|
||||
|
||||
import polars as pl
|
||||
|
||||
from src.factors import FactorEngine
|
||||
from src.training import (
|
||||
DateSplitter,
|
||||
LightGBMModel,
|
||||
StandardScaler,
|
||||
StockFilterConfig,
|
||||
StockPoolManager,
|
||||
Trainer,
|
||||
Winsorizer,
|
||||
)
|
||||
from src.training.config import TrainingConfig
|
||||
|
||||
|
||||
def create_factors_with_strings(engine: FactorEngine) -> List[str]:
|
||||
"""使用字符串表达式定义因子
|
||||
|
||||
Args:
|
||||
engine: FactorEngine 实例
|
||||
|
||||
Returns:
|
||||
特征因子名称列表(不包含 label)
|
||||
"""
|
||||
print("=" * 80)
|
||||
print("使用字符串表达式定义因子")
|
||||
print("=" * 80)
|
||||
|
||||
# 定义所有因子(使用字典,方便维护和扩展)
|
||||
# 新增因子只需在此处添加一行即可
|
||||
factor_definitions = {
|
||||
# 1. 价格动量因子
|
||||
"ma5": "ts_mean(close, 5)",
|
||||
"ma10": "ts_mean(close, 10)",
|
||||
"ma20": "ts_mean(close, 20)",
|
||||
"ma_ratio": "ts_mean(close, 5) / ts_mean(close, 20) - 1",
|
||||
# 2. 波动率因子
|
||||
"volatility_5": "ts_std(close, 5)",
|
||||
"volatility_20": "ts_std(close, 20)",
|
||||
"vol_ratio": "ts_std(close, 5) / (ts_std(close, 20) + 1e-8)",
|
||||
# 3. 收益率动量因子(return_5 是 label,需要单独注册)
|
||||
"return_10": "(close / ts_delay(close, 10)) - 1",
|
||||
"return_20": "(close / ts_delay(close, 20)) - 1",
|
||||
# 4. 收益率变化因子(使用完整表达式,不引用其他因子)
|
||||
"return_diff": "(close / ts_delay(close, 5)) - 1 - ((close / ts_delay(close, 10)) - 1)",
|
||||
# 5. 成交量因子
|
||||
"vol_ma5": "ts_mean(vol, 5)",
|
||||
"vol_ma20": "ts_mean(vol, 20)",
|
||||
"vol_ratio": "ts_mean(vol, 5) / (ts_mean(vol, 20) + 1e-8)",
|
||||
# 6. 市值因子(截面排名)
|
||||
"market_cap_rank": "cs_rank(total_mv)",
|
||||
# 7. 价格位置因子
|
||||
"high_low_ratio": "(close - ts_min(low, 20)) / (ts_max(high, 20) - ts_min(low, 20) + 1e-8)",
|
||||
}
|
||||
|
||||
# Label 因子(单独定义,不参与训练)
|
||||
label_factor = {
|
||||
"return_5": "(close / ts_delay(close, 5)) - 1",
|
||||
}
|
||||
|
||||
# 注册所有特征因子
|
||||
print("\n注册特征因子:")
|
||||
for name, expr in factor_definitions.items():
|
||||
engine.add_factor(name, expr)
|
||||
print(f" - {name}: {expr}")
|
||||
|
||||
# 注册 label 因子
|
||||
print("\n注册 Label 因子:")
|
||||
for name, expr in label_factor.items():
|
||||
engine.add_factor(name, expr)
|
||||
print(f" - {name}: {expr}")
|
||||
|
||||
# 从字典自动获取特征列(keys() 方法)
|
||||
feature_cols = list(factor_definitions.keys())
|
||||
|
||||
print(f"\n特征因子数: {len(feature_cols)}")
|
||||
print(f"Label: {list(label_factor.keys())[0]}")
|
||||
print(f"已注册因子总数: {len(engine.list_registered())}")
|
||||
|
||||
return feature_cols
|
||||
|
||||
|
||||
def prepare_data(
|
||||
engine: FactorEngine,
|
||||
feature_cols: List[str],
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
) -> pl.DataFrame:
|
||||
"""准备数据:计算因子
|
||||
|
||||
Args:
|
||||
engine: FactorEngine 实例
|
||||
feature_cols: 特征列名列表
|
||||
start_date: 开始日期
|
||||
end_date: 结束日期
|
||||
|
||||
Returns:
|
||||
因子数据 DataFrame
|
||||
"""
|
||||
print("\n" + "=" * 80)
|
||||
print("准备数据")
|
||||
print("=" * 80)
|
||||
|
||||
# 计算因子(全市场数据)
|
||||
print(f"\n计算因子: {start_date} - {end_date}")
|
||||
factor_names = feature_cols + ["return_5"] # 包含 label
|
||||
|
||||
data = engine.compute(
|
||||
factor_names=factor_names,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
|
||||
print(f"数据形状: {data.shape}")
|
||||
print(f"数据列: {data.columns}")
|
||||
print(f"\n前5行预览:")
|
||||
print(data.head())
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def train_regression_model():
|
||||
"""训练 LightGBM 回归模型"""
|
||||
print("\n" + "=" * 80)
|
||||
print("LightGBM 回归模型训练")
|
||||
print("=" * 80)
|
||||
|
||||
# 1. 创建 FactorEngine
|
||||
print("\n[1] 创建 FactorEngine")
|
||||
engine = FactorEngine()
|
||||
|
||||
# 2. 使用字符串表达式定义因子
|
||||
print("\n[2] 定义因子(字符串表达式)")
|
||||
feature_cols = create_factors_with_strings(engine)
|
||||
target_col = "return_5"
|
||||
|
||||
# 3. 准备数据
|
||||
print("\n[3] 准备数据")
|
||||
train_start, train_end = "20200101", "20241231"
|
||||
test_start, test_end = "20250101", "20251231"
|
||||
|
||||
data = prepare_data(
|
||||
engine=engine,
|
||||
feature_cols=feature_cols,
|
||||
start_date=train_start,
|
||||
end_date=test_end,
|
||||
)
|
||||
|
||||
# 4. 创建配置
|
||||
config = TrainingConfig(
|
||||
feature_cols=feature_cols,
|
||||
target_col=target_col,
|
||||
date_col="trade_date",
|
||||
code_col="ts_code",
|
||||
train_start=train_start,
|
||||
train_end=train_end,
|
||||
test_start=test_start,
|
||||
test_end=test_end,
|
||||
model_type="lightgbm",
|
||||
model_params={
|
||||
"objective": "regression",
|
||||
"metric": "rmse",
|
||||
"num_leaves": 31,
|
||||
"learning_rate": 0.05,
|
||||
"n_estimators": 100,
|
||||
},
|
||||
processors=[
|
||||
{"name": "winsorizer", "params": {"lower": 0.01, "upper": 0.99}},
|
||||
{"name": "cs_standard_scaler", "params": {}},
|
||||
],
|
||||
persist_model=False,
|
||||
model_save_path=None,
|
||||
output_dir="output/regression",
|
||||
save_predictions=True,
|
||||
)
|
||||
|
||||
print(f"\n[配置] 训练期: {train_start} - {train_end}")
|
||||
print(f"[配置] 测试期: {test_start} - {test_end}")
|
||||
print(f"[配置] 特征数: {len(feature_cols)}")
|
||||
print(f"[配置] 目标变量: {target_col}")
|
||||
|
||||
# 5. 创建模型
|
||||
model = LightGBMModel(
|
||||
objective="regression",
|
||||
metric="rmse",
|
||||
num_leaves=31,
|
||||
learning_rate=0.05,
|
||||
n_estimators=100,
|
||||
)
|
||||
|
||||
# 6. 创建数据处理器
|
||||
processors = [
|
||||
Winsorizer(lower=0.01, upper=0.99),
|
||||
StandardScaler(exclude_cols=["ts_code", "trade_date", target_col]),
|
||||
]
|
||||
|
||||
# 7. 创建数据划分器
|
||||
splitter = DateSplitter(
|
||||
train_start=train_start,
|
||||
train_end=train_end,
|
||||
test_start=test_start,
|
||||
test_end=test_end,
|
||||
)
|
||||
|
||||
# 8. 创建股票池管理器(可选)
|
||||
pool_manager = StockPoolManager(
|
||||
filter_config=StockFilterConfig(
|
||||
exclude_cyb=True,
|
||||
exclude_kcb=True,
|
||||
exclude_bj=True,
|
||||
exclude_st=True,
|
||||
),
|
||||
selector_config=None, # 暂时不启用市值选择
|
||||
data_router=engine.router, # 从 FactorEngine 获取数据路由器
|
||||
)
|
||||
|
||||
# 9. 创建训练器
|
||||
trainer = Trainer(
|
||||
model=model,
|
||||
pool_manager=pool_manager,
|
||||
processors=processors,
|
||||
splitter=splitter,
|
||||
target_col=target_col,
|
||||
feature_cols=feature_cols,
|
||||
persist_model=False,
|
||||
)
|
||||
|
||||
# 10. 手动执行训练流程(增加详细打印)
|
||||
print("\n" + "=" * 80)
|
||||
print("开始训练")
|
||||
print("=" * 80)
|
||||
|
||||
# 10.1 股票池筛选
|
||||
print("\n[步骤 1/6] 股票池筛选")
|
||||
print("-" * 60)
|
||||
if pool_manager:
|
||||
print(" 执行每日独立筛选股票池...")
|
||||
filtered_data = pool_manager.filter_and_select_daily(data)
|
||||
print(f" 筛选前数据规模: {data.shape}")
|
||||
print(f" 筛选后数据规模: {filtered_data.shape}")
|
||||
print(f" 筛选前股票数: {data['ts_code'].n_unique()}")
|
||||
print(f" 筛选后股票数: {filtered_data['ts_code'].n_unique()}")
|
||||
print(f" 删除记录数: {len(data) - len(filtered_data)}")
|
||||
else:
|
||||
filtered_data = data
|
||||
print(" 未配置股票池管理器,跳过筛选")
|
||||
|
||||
# 10.2 划分训练/测试集
|
||||
print("\n[步骤 2/6] 划分训练集和测试集")
|
||||
print("-" * 60)
|
||||
if splitter:
|
||||
train_data, test_data = splitter.split(filtered_data)
|
||||
print(f" 训练集数据规模: {train_data.shape}")
|
||||
print(f" 测试集数据规模: {test_data.shape}")
|
||||
print(f" 训练集股票数: {train_data['ts_code'].n_unique()}")
|
||||
print(f" 测试集股票数: {test_data['ts_code'].n_unique()}")
|
||||
print(
|
||||
f" 训练集日期范围: {train_data['trade_date'].min()} - {train_data['trade_date'].max()}"
|
||||
)
|
||||
print(
|
||||
f" 测试集日期范围: {test_data['trade_date'].min()} - {test_data['trade_date'].max()}"
|
||||
)
|
||||
|
||||
print("\n 训练集前5行预览:")
|
||||
print(train_data.head())
|
||||
print("\n 测试集前5行预览:")
|
||||
print(test_data.head())
|
||||
else:
|
||||
train_data = filtered_data
|
||||
test_data = filtered_data
|
||||
print(" 未配置划分器,全部作为训练集")
|
||||
|
||||
# 10.3 训练集数据处理
|
||||
print("\n[步骤 3/6] 训练集数据处理")
|
||||
print("-" * 60)
|
||||
fitted_processors = []
|
||||
if processors:
|
||||
for i, processor in enumerate(processors, 1):
|
||||
print(
|
||||
f" [{i}/{len(processors)}] 应用处理器: {processor.__class__.__name__}"
|
||||
)
|
||||
train_data_before = len(train_data)
|
||||
train_data = processor.fit_transform(train_data)
|
||||
train_data_after = len(train_data)
|
||||
fitted_processors.append(processor)
|
||||
print(f" 处理前记录数: {train_data_before}")
|
||||
print(f" 处理后记录数: {train_data_after}")
|
||||
if train_data_before != train_data_after:
|
||||
print(f" 删除记录数: {train_data_before - train_data_after}")
|
||||
|
||||
print("\n 训练集处理后前5行预览:")
|
||||
print(train_data.head())
|
||||
print(f"\n 训练集特征统计:")
|
||||
print(f" 特征数: {len(feature_cols)}")
|
||||
print(f" 样本数: {len(train_data)}")
|
||||
print(f" 缺失值统计:")
|
||||
for col in feature_cols[:5]: # 只显示前5个特征的缺失值
|
||||
null_count = train_data[col].null_count()
|
||||
if null_count > 0:
|
||||
print(
|
||||
f" {col}: {null_count} ({null_count / len(train_data) * 100:.2f}%)"
|
||||
)
|
||||
|
||||
# 10.4 训练模型
|
||||
print("\n[步骤 4/6] 训练模型")
|
||||
print("-" * 60)
|
||||
print(f" 模型类型: LightGBM")
|
||||
print(f" 训练样本数: {len(train_data)}")
|
||||
print(f" 特征数: {len(feature_cols)}")
|
||||
print(f" 目标变量: {target_col}")
|
||||
|
||||
X_train = train_data.select(feature_cols)
|
||||
y_train = train_data.select(target_col).to_series()
|
||||
|
||||
print(f"\n 目标变量统计:")
|
||||
print(f" 均值: {y_train.mean():.6f}")
|
||||
print(f" 标准差: {y_train.std():.6f}")
|
||||
print(f" 最小值: {y_train.min():.6f}")
|
||||
print(f" 最大值: {y_train.max():.6f}")
|
||||
print(f" 缺失值: {y_train.null_count()}")
|
||||
|
||||
print("\n 开始训练...")
|
||||
model.fit(X_train, y_train)
|
||||
print(" 训练完成!")
|
||||
|
||||
# 10.5 测试集数据处理
|
||||
print("\n[步骤 5/6] 测试集数据处理")
|
||||
print("-" * 60)
|
||||
if processors and test_data is not train_data:
|
||||
for i, processor in enumerate(fitted_processors, 1):
|
||||
print(
|
||||
f" [{i}/{len(fitted_processors)}] 应用处理器: {processor.__class__.__name__}"
|
||||
)
|
||||
test_data_before = len(test_data)
|
||||
test_data = processor.transform(test_data)
|
||||
test_data_after = len(test_data)
|
||||
print(f" 处理前记录数: {test_data_before}")
|
||||
print(f" 处理后记录数: {test_data_after}")
|
||||
else:
|
||||
print(" 跳过测试集处理")
|
||||
|
||||
# 10.6 预测
|
||||
print("\n[步骤 6/6] 生成预测")
|
||||
print("-" * 60)
|
||||
X_test = test_data.select(feature_cols)
|
||||
print(f" 测试样本数: {len(X_test)}")
|
||||
print(f" 预测中...")
|
||||
predictions = model.predict(X_test)
|
||||
print(f" 预测完成!")
|
||||
|
||||
print(f"\n 预测结果统计:")
|
||||
print(f" 均值: {predictions.mean():.6f}")
|
||||
print(f" 标准差: {predictions.std():.6f}")
|
||||
print(f" 最小值: {predictions.min():.6f}")
|
||||
print(f" 最大值: {predictions.max():.6f}")
|
||||
|
||||
# 保存结果到 trainer
|
||||
trainer.results = test_data.with_columns([pl.Series("prediction", predictions)])
|
||||
|
||||
# 11. 获取结果
|
||||
print("\n" + "=" * 80)
|
||||
print("训练结果")
|
||||
print("=" * 80)
|
||||
|
||||
results = trainer.results
|
||||
|
||||
print(f"\n结果数据形状: {results.shape}")
|
||||
print(f"结果列: {results.columns}")
|
||||
print(f"\n结果前10行预览:")
|
||||
print(results.head(10))
|
||||
print(f"\n结果后5行预览:")
|
||||
print(results.tail())
|
||||
|
||||
print(f"\n每日预测样本数统计:")
|
||||
daily_counts = results.group_by("trade_date").agg(pl.count()).sort("trade_date")
|
||||
print(f" 最小: {daily_counts['count'].min()}")
|
||||
print(f" 最大: {daily_counts['count'].max()}")
|
||||
print(f" 平均: {daily_counts['count'].mean():.2f}")
|
||||
|
||||
# 展示某一天的前10个预测结果
|
||||
sample_date = results["trade_date"][0]
|
||||
sample_data = results.filter(results["trade_date"] == sample_date).head(10)
|
||||
print(f"\n示例日期 {sample_date} 的前10条预测:")
|
||||
print(sample_data.select(["ts_code", "trade_date", target_col, "prediction"]))
|
||||
|
||||
# 12. 保存结果(每日 top5)
|
||||
output_dir = "D:\\PyProject\\ProStock\\src\\training\\output"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# 生成文件名:top_5_{开始日期}_{结束日期}.csv
|
||||
from datetime import datetime
|
||||
|
||||
start_dt = datetime.strptime(test_start, "%Y%m%d")
|
||||
end_dt = datetime.strptime(test_end, "%Y%m%d")
|
||||
filename = (
|
||||
f"top_5_{start_dt.strftime('%Y-%m-%d')}_{end_dt.strftime('%Y-%m-%d')}.csv"
|
||||
)
|
||||
output_path = os.path.join(output_dir, filename)
|
||||
|
||||
# 按日期分组,取每日 top5
|
||||
print("\n选取每日 Top 5 股票...")
|
||||
top5_by_date = []
|
||||
unique_dates = results["trade_date"].unique().sort()
|
||||
for date in unique_dates:
|
||||
day_data = results.filter(results["trade_date"] == date)
|
||||
# 按 prediction 降序排序,取前5
|
||||
top5 = day_data.sort("prediction", descending=True).head(5)
|
||||
top5_by_date.append(top5)
|
||||
|
||||
print(f" 处理完成: 共 {len(unique_dates)} 个交易日,每交易日取 top5")
|
||||
|
||||
# 合并所有日期的 top5
|
||||
top5_results = pl.concat(top5_by_date)
|
||||
|
||||
# 格式化日期并调整列顺序:日期、分数、股票
|
||||
results_to_save = top5_results.select(
|
||||
[
|
||||
pl.col("trade_date").str.slice(0, 4)
|
||||
+ "-"
|
||||
+ pl.col("trade_date").str.slice(4, 2)
|
||||
+ "-"
|
||||
+ pl.col("trade_date").str.slice(6, 2),
|
||||
pl.col("prediction").alias("score"),
|
||||
pl.col("ts_code"),
|
||||
]
|
||||
).rename({"trade_date": "date"})
|
||||
results_to_save.write_csv(output_path, include_header=True)
|
||||
print(f"\n预测结果已保存: {output_path}")
|
||||
print(f"保存列: {results_to_save.columns}")
|
||||
print(f"总行数: {len(results_to_save)}(每日 top5)")
|
||||
print(f"\n保存数据预览:")
|
||||
print(results_to_save.head(15))
|
||||
|
||||
# 13. 特征重要性
|
||||
importance = model.feature_importance()
|
||||
if importance is not None:
|
||||
print("\n特征重要性:")
|
||||
print(importance.sort_values(ascending=False))
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("训练完成!")
|
||||
print("=" * 80)
|
||||
|
||||
return trainer, results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
trainer, results = train_regression_model()
|
||||
@@ -179,6 +179,17 @@ class SchemaCache:
|
||||
return False
|
||||
return field_name in self._field_to_table_map
|
||||
|
||||
def get_all_fields(self) -> Set[str]:
|
||||
"""获取所有可用字段的集合。
|
||||
|
||||
Returns:
|
||||
所有可用字段名的集合
|
||||
"""
|
||||
self._ensure_scanned()
|
||||
if self._field_to_table_map is None:
|
||||
return set()
|
||||
return set(self._field_to_table_map.keys())
|
||||
|
||||
def match_fields_to_tables(self, field_names: Set[str]) -> Dict[str, List[str]]:
|
||||
"""将字段集合按表分组。
|
||||
|
||||
@@ -199,10 +210,13 @@ class SchemaCache:
|
||||
table_to_fields[table] = []
|
||||
table_to_fields[table].append(field)
|
||||
else:
|
||||
# 字段不存在于任何表,归入 "daily" 表(默认表)
|
||||
if "daily" not in table_to_fields:
|
||||
table_to_fields["daily"] = []
|
||||
table_to_fields["daily"].append(field)
|
||||
# 字段不存在于任何表,报错
|
||||
available_fields = self.get_all_fields()
|
||||
raise ValueError(
|
||||
f"字段 '{field}' 不存在于数据库的任何表中。\n"
|
||||
f"可用字段: {sorted(available_fields)[:20]}...\n"
|
||||
f"请检查字段名拼写或同步相关数据表。"
|
||||
)
|
||||
|
||||
# 对字段列表排序以保持确定性输出
|
||||
for fields in table_to_fields.values():
|
||||
|
||||
@@ -17,7 +17,7 @@ class StockFilterConfig:
|
||||
Attributes:
|
||||
exclude_cyb: 是否排除创业板(300xxx)
|
||||
exclude_kcb: 是否排除科创板(688xxx)
|
||||
exclude_bj: 是否排除北交所(8xxxxxx, 4xxxxxx)
|
||||
exclude_bj: 是否排除北交所(.BJ 后缀)
|
||||
exclude_st: 是否排除ST股票(需要外部数据支持)
|
||||
"""
|
||||
|
||||
@@ -47,8 +47,8 @@ class StockFilterConfig:
|
||||
# 排除科创板(688xxx)
|
||||
if self.exclude_kcb and code.startswith("688"):
|
||||
continue
|
||||
# 排除北交所(8xxxxxx 或 4xxxxxx)
|
||||
if self.exclude_bj and (code.startswith("8") or code.startswith("4")):
|
||||
# 排除北交所(.BJ 后缀)
|
||||
if self.exclude_bj and code.endswith(".BJ"):
|
||||
continue
|
||||
result.append(code)
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user