Compare commits

...

3 Commits

Author SHA1 Message Date
505279c08b fix(data): 修复财务因子计算非确定性问题
重构 financial_loader 的去重逻辑,确保截面排名计算的股票集合一致:
- 引入"高水位线"算法剔除陈旧历史财报(解决2026年发布2021年财报的问题)
- 改变去重策略:按报告期(end_date)而非更新标识(update_flag)保留最新数据
- 扩展回看期从1年到2年,防止ST/停牌公司财报缺失
- 确保相同交易日在不同查询范围下返回一致的财务数据
2026-03-08 20:58:35 +08:00
3c7795f630 feat: 新增多项技术指标和成交量因子定义 2026-03-08 14:12:03 +08:00
36e0e4b234 feat(training): 新增财务数据因子并修复多表 join 冲突
- 添加 9 个财务数据因子(利润表/资产负债表/现金流量表)
- 修复多表 asof join 时 f_ann_date_right 列名重复错误
- 将 Top5 改为可配置的 TopN 参数
- 删除已弃用的 regression.py 脚本
2026-03-08 11:46:30 +08:00
6 changed files with 597 additions and 3336 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -49,7 +49,13 @@ class FinancialLoader:
清洗后的 Polars DataFrame已排序f_ann_date 为 pl.Date 类型
"""
# 确保包含必要字段
required_cols = {"ts_code", "f_ann_date", "report_type", "update_flag"}
required_cols = {
"ts_code",
"f_ann_date",
"report_type",
"update_flag",
"end_date",
}
query_cols = list(set(columns) | required_cols)
# 从数据库加载原始数据
@@ -61,32 +67,51 @@ class FinancialLoader:
# 步骤1: 仅保留合并报表 (report_type 可能是字符串或整数)
df = df.filter(pl.col("report_type") == 1)
# 步骤2: 按 update_flag 降序排列后去重
# 步骤2: 添加辅助列用于排序和过滤
df = df.with_columns(
[pl.col("update_flag").cast(pl.Int32).alias("update_flag_int")]
[
# 将报告期 20231231 转为整数
pl.col("end_date").cast(pl.Int32).alias("end_date_int"),
# 将更新标识转为整数,容错处理 null 和空字符串
pl.col("update_flag")
.fill_null("0")
.cast(pl.Int32, strict=False)
.fill_null(0)
.alias("update_flag_int"),
]
)
# 排序ts_code, f_ann_date 升序update_flag 降
df = df.sort(
["ts_code", "f_ann_date", "update_flag_int"],
descending=[False, False, True],
# 步骤3: 绝对确定性排
# 按照: 股票代码(升序) -> 公告日(升序) -> 报告期(升序) -> 更新标识(升序)
# 这样排序后同一天发布的多份报表中end_date最大、update_flag最新的
# 必然排在这一天的最后面
df = df.sort(["ts_code", "f_ann_date", "end_date_int", "update_flag_int"])
# 步骤4: 核心算法 - 计算截至每一个公告日,市场"见过的"最新报告期(最高水位线)
df = df.with_columns(
pl.col("end_date_int").cum_max().over("ts_code").alias("max_end_date_seen")
)
# 去重:保留每个 (ts_code, f_ann_date) 的第一条update_flag 最高的
df = df.unique(subset=["ts_code", "f_ann_date"], keep="first")
# 步骤5: 剔除"历史包袱"解决2026年发2021年财报的问题
# 如果今天发布的财报,它的 end_date 小于我们之前已经见过的最大值,
# 说明它是陈旧的追溯调整,直接抛弃!
df = df.filter(pl.col("end_date_int") == pl.col("max_end_date_seen"))
# 移除临时列
df = df.drop("update_flag_int")
# 步骤6: 唯一化处理:满足 join_asof 的前置要求
# 经过上述处理后,同一个 f_ann_date 的最后一行,必定是 end_date 最大的那一份
# 我们只保留这最后一行,确保每个 f_ann_date 只有唯一的一条记录暴露给行情去 join
df = df.unique(subset=["ts_code", "f_ann_date"], keep="last")
# 步骤3: 确保 f_ann_date 是 Date 类型并排序
# 数据库返回的必须是 Date 类型,如果不是则报错
# 步骤7: 清理临时辅助列
df = df.drop(["end_date_int", "update_flag_int", "max_end_date_seen"])
# 步骤8: 确保 f_ann_date 是 Date 类型并排序join_asof 要求)
if df["f_ann_date"].dtype != pl.Date:
raise TypeError(
f"f_ann_date 必须是 Date 类型,实际类型为 {df['f_ann_date'].dtype}. "
f"请检查数据库表结构,确保日期字段为 DATE 类型。"
)
# 最终排序join_asof 要求)
df = df.sort(["ts_code", "f_ann_date"])
return df
@@ -128,6 +153,10 @@ class FinancialLoader:
strategy="backward",
)
# 删除 f_ann_date_right 列(如果有),避免多次 asof join 时列名冲突
if "f_ann_date_right" in merged.columns:
merged = merged.drop("f_ann_date_right")
return merged
def _load_from_db(
@@ -164,14 +193,14 @@ class FinancialLoader:
self,
start_date: str,
end_date: str,
lookback_years: int = 1,
lookback_years: int = 2,
) -> tuple[str, str]:
"""计算包含回看期的日期范围。
Args:
start_date: 原始开始日期YYYYMMDD
end_date: 原始结束日期YYYYMMDD
lookback_years: 回看年数(默认1年
lookback_years: 回看年数(默认2年防止ST/停牌公司财报缺失
Returns:
(扩展后的开始日期, 结束日期)

File diff suppressed because one or more lines are too long

View File

@@ -1,484 +0,0 @@
"""LightGBM 回归训练示例 - 使用因子字符串表达式
使用字符串表达式定义因子,训练 LightGBM 回归模型预测未来5日收益率。
Label: return_5 = (ts_delay(close, -5) / close) - 1 # 未来5日收益率
"""
import os
from datetime import datetime
from typing import List, Tuple
import polars as pl
from src.factors import FactorEngine
from src.training import (
DateSplitter,
LightGBMModel,
STFilter,
StandardScaler,
StockFilterConfig,
StockPoolManager,
Trainer,
Winsorizer,
NullFiller,
)
from src.training.config import TrainingConfig
# =============================================================================
# 因子定义(集中在此,方便修改)
# =============================================================================
# 特征因子定义字典:新增因子只需在此处添加一行
FACTOR_DEFINITIONS = {
# 1. 价格动量因子
"ma5": "ts_mean(close, 5)",
"ma10": "ts_mean(close, 10)",
"ma20": "ts_mean(close, 20)",
"ma_ratio": "ts_mean(close, 5) / ts_mean(close, 20) - 1",
# 2. 波动率因子
"volatility_5": "ts_std(close, 5)",
"volatility_20": "ts_std(close, 20)",
"vol_ratio": "ts_std(close, 5) / (ts_std(close, 20) + 1e-8)",
# 3. 收益率动量因子
"return_10": "(close / ts_delay(close, 10)) - 1",
"return_20": "(close / ts_delay(close, 20)) - 1",
# 4. 收益率变化因子
"return_diff": "(close / ts_delay(close, 5)) - 1 - ((close / ts_delay(close, 10)) - 1)",
# 5. 成交量因子
"vol_ma5": "ts_mean(vol, 5)",
"vol_ma20": "ts_mean(vol, 20)",
"vol_ratio": "ts_mean(vol, 5) / (ts_mean(vol, 20) + 1e-8)",
# 6. 市值因子(截面排名)
"market_cap_rank": "cs_rank(total_mv)",
# 7. 价格位置因子
"high_low_ratio": "(close - ts_min(low, 20)) / (ts_max(high, 20) - ts_min(low, 20) + 1e-8)",
"n_income": "n_income",
}
# Label 因子定义(不参与训练,用于计算目标)
LABEL_FACTOR = {
"return_5": "(ts_delay(close, -5) / close) - 1", # 未来5日收益率
}
# =============================================================================
# 训练参数配置(集中在此,方便修改)
# =============================================================================
# 日期范围配置
TRAIN_START = "20200101"
TRAIN_END = "20241231"
TEST_START = "20250101"
TEST_END = "20251231"
# 模型参数配置
MODEL_PARAMS = {
"objective": "regression",
"metric": "mae", # 改为 MAE对异常值更稳健
# 树结构控制(防过拟合核心)
"num_leaves": 20, # 从31降为20降低模型复杂度
"max_depth": 4, # 显式限制深度,防止过度拟合噪声
"min_child_samples": 50, # 叶子最小样本数,防止学习极端样本
"min_child_weight": 0.001,
# 学习参数
"learning_rate": 0.01, # 降低学习率,配合更多树
"n_estimators": 1000, # 增加树数量,配合早停
# 采样策略(关键防过拟合)
"subsample": 0.8, # 每棵树随机采样80%数据(行采样)
"subsample_freq": 5, # 每5轮迭代进行一次 subsample
"colsample_bytree": 0.8, # 每棵树随机选择80%特征(列采样)
# 正则化
"reg_alpha": 0.1, # L1正则增加稀疏性
"reg_lambda": 1.0, # L2正则平滑权重
# 数值稳定性
"verbose": -1,
"random_state": 42,
}
# 数据处理器配置
PROCESSOR_CONFIGS = [
{"name": "winsorizer", "params": {"lower": 0.01, "upper": 0.99}},
{"name": "cs_standard_scaler", "params": {}},
]
# 股票池筛选配置
STOCK_FILTER_CONFIG = {
"exclude_cyb": True, # 排除创业板
"exclude_kcb": True, # 排除科创板
"exclude_bj": True, # 排除北交所
"exclude_st": True, # 排除ST股票
}
# 输出配置(相对于本文件所在目录)
OUTPUT_DIR = "output"
SAVE_PREDICTIONS = True
PERSIST_MODEL = False
def create_factors_with_strings(engine: FactorEngine) -> List[str]:
"""使用字符串表达式定义因子
Args:
engine: FactorEngine 实例
Returns:
特征因子名称列表(不包含 label
"""
print("=" * 80)
print("使用字符串表达式定义因子")
print("=" * 80)
# 使用模块级别的因子定义
# 注册所有特征因子
print("\n注册特征因子:")
for name, expr in FACTOR_DEFINITIONS.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 注册 label 因子
print("\n注册 Label 因子:")
for name, expr in LABEL_FACTOR.items():
engine.add_factor(name, expr)
print(f" - {name}: {expr}")
# 从字典自动获取特征列
feature_cols = list(FACTOR_DEFINITIONS.keys())
print(f"\n特征因子数: {len(feature_cols)}")
print(f"Label: {list(LABEL_FACTOR.keys())[0]}")
print(f"已注册因子总数: {len(engine.list_registered())}")
return feature_cols
def prepare_data(
engine: FactorEngine,
feature_cols: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""准备数据:计算因子
Args:
engine: FactorEngine 实例
feature_cols: 特征列名列表
start_date: 开始日期
end_date: 结束日期
Returns:
因子数据 DataFrame
"""
print("\n" + "=" * 80)
print("准备数据")
print("=" * 80)
# 计算因子(全市场数据)
print(f"\n计算因子: {start_date} - {end_date}")
factor_names = feature_cols + ["return_5"] # 包含 label
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
print(f"数据形状: {data.shape}")
print(f"数据列: {data.columns}")
print(f"\n前5行预览:")
print(data.head())
return data
def train_regression_model():
"""训练 LightGBM 回归模型"""
print("\n" + "=" * 80)
print("LightGBM 回归模型训练")
print("=" * 80)
# 1. 创建 FactorEngine
print("\n[1] 创建 FactorEngine")
engine = FactorEngine()
# 2. 使用字符串表达式定义因子
print("\n[2] 定义因子(字符串表达式)")
feature_cols = create_factors_with_strings(engine)
target_col = "return_5"
# 3. 准备数据(使用模块级别的日期配置)
print("\n[3] 准备数据")
data = prepare_data(
engine=engine,
feature_cols=feature_cols,
start_date=TRAIN_START,
end_date=TEST_END,
)
# 4. 打印配置信息(使用模块级别的配置常量)
print(f"\n[配置] 训练期: {TRAIN_START} - {TRAIN_END}")
print(f"[配置] 测试期: {TEST_START} - {TEST_END}")
print(f"[配置] 特征数: {len(feature_cols)}")
print(f"[配置] 目标变量: {target_col}")
# 5. 创建模型(使用模块级别的模型参数)
model = LightGBMModel(params=MODEL_PARAMS)
# 6. 创建数据处理器(从 PROCESSOR_CONFIGS 解析)
processors = [
NullFiller(strategy="mean"),
Winsorizer(**PROCESSOR_CONFIGS[0]["params"]), # type: ignore[arg-type]
StandardScaler(exclude_cols=["ts_code", "trade_date", target_col]), # type: ignore[call-arg]
]
# 7. 创建数据划分器(使用模块级别的日期配置)
splitter = DateSplitter(
train_start=TRAIN_START,
train_end=TRAIN_END,
test_start=TEST_START,
test_end=TEST_END,
)
# 8. 创建股票池管理器(使用模块级别的筛选配置)
pool_manager = StockPoolManager(
filter_config=StockFilterConfig(**STOCK_FILTER_CONFIG),
selector_config=None, # 暂时不启用市值选择
data_router=engine.router, # 从 FactorEngine 获取数据路由器
)
# 8.5 创建 ST 股票过滤器(在股票池筛选之前执行)
st_filter = STFilter(
data_router=engine.router,
)
# 9. 创建训练器
trainer = Trainer(
model=model,
pool_manager=pool_manager,
processors=processors,
filters=[st_filter], # 在股票池筛选之前过滤 ST 股票
splitter=splitter,
target_col=target_col,
feature_cols=feature_cols,
persist_model=PERSIST_MODEL,
)
# 10. 手动执行训练流程(增加详细打印)
print("\n" + "=" * 80)
print("开始训练")
print("=" * 80)
# 10.1 股票池筛选
print("\n[步骤 1/6] 股票池筛选")
print("-" * 60)
if pool_manager:
print(" 执行每日独立筛选股票池...")
filtered_data = pool_manager.filter_and_select_daily(data)
print(f" 筛选前数据规模: {data.shape}")
print(f" 筛选后数据规模: {filtered_data.shape}")
print(f" 筛选前股票数: {data['ts_code'].n_unique()}")
print(f" 筛选后股票数: {filtered_data['ts_code'].n_unique()}")
print(f" 删除记录数: {len(data) - len(filtered_data)}")
else:
filtered_data = data
print(" 未配置股票池管理器,跳过筛选")
# 10.2 划分训练/测试集
print("\n[步骤 2/6] 划分训练集和测试集")
print("-" * 60)
if splitter:
train_data, test_data = splitter.split(filtered_data)
print(f" 训练集数据规模: {train_data.shape}")
print(f" 测试集数据规模: {test_data.shape}")
print(f" 训练集股票数: {train_data['ts_code'].n_unique()}")
print(f" 测试集股票数: {test_data['ts_code'].n_unique()}")
print(
f" 训练集日期范围: {train_data['trade_date'].min()} - {train_data['trade_date'].max()}"
)
print(
f" 测试集日期范围: {test_data['trade_date'].min()} - {test_data['trade_date'].max()}"
)
print("\n 训练集前5行预览:")
print(train_data.head())
print("\n 测试集前5行预览:")
print(test_data.head())
else:
train_data = filtered_data
test_data = filtered_data
print(" 未配置划分器,全部作为训练集")
# 10.3 训练集数据处理
print("\n[步骤 3/6] 训练集数据处理")
print("-" * 60)
fitted_processors = []
if processors:
for i, processor in enumerate(processors, 1):
print(
f" [{i}/{len(processors)}] 应用处理器: {processor.__class__.__name__}"
)
train_data_before = len(train_data)
train_data = processor.fit_transform(train_data)
train_data_after = len(train_data)
fitted_processors.append(processor)
print(f" 处理前记录数: {train_data_before}")
print(f" 处理后记录数: {train_data_after}")
if train_data_before != train_data_after:
print(f" 删除记录数: {train_data_before - train_data_after}")
print("\n 训练集处理后前5行预览:")
print(train_data.head())
print(f"\n 训练集特征统计:")
print(f" 特征数: {len(feature_cols)}")
print(f" 样本数: {len(train_data)}")
print(f" 缺失值统计:")
for col in feature_cols[:5]: # 只显示前5个特征的缺失值
null_count = train_data[col].null_count()
if null_count > 0:
print(
f" {col}: {null_count} ({null_count / len(train_data) * 100:.2f}%)"
)
# 10.4 训练模型
print("\n[步骤 4/6] 训练模型")
print("-" * 60)
print(f" 模型类型: LightGBM")
print(f" 训练样本数: {len(train_data)}")
print(f" 特征数: {len(feature_cols)}")
print(f" 目标变量: {target_col}")
X_train = train_data.select(feature_cols)
y_train = train_data.select(target_col).to_series()
print(f"\n 目标变量统计:")
print(f" 均值: {y_train.mean():.6f}")
print(f" 标准差: {y_train.std():.6f}")
print(f" 最小值: {y_train.min():.6f}")
print(f" 最大值: {y_train.max():.6f}")
print(f" 缺失值: {y_train.null_count()}")
print("\n 开始训练...")
model.fit(X_train, y_train)
print(" 训练完成!")
# 10.5 测试集数据处理
print("\n[步骤 5/6] 测试集数据处理")
print("-" * 60)
if processors and test_data is not train_data:
for i, processor in enumerate(fitted_processors, 1):
print(
f" [{i}/{len(fitted_processors)}] 应用处理器: {processor.__class__.__name__}"
)
test_data_before = len(test_data)
test_data = processor.transform(test_data)
test_data_after = len(test_data)
print(f" 处理前记录数: {test_data_before}")
print(f" 处理后记录数: {test_data_after}")
else:
print(" 跳过测试集处理")
# 10.6 预测
print("\n[步骤 6/6] 生成预测")
print("-" * 60)
X_test = test_data.select(feature_cols)
print(f" 测试样本数: {len(X_test)}")
print(f" 预测中...")
predictions = model.predict(X_test)
print(f" 预测完成!")
print(f"\n 预测结果统计:")
print(f" 均值: {predictions.mean():.6f}")
print(f" 标准差: {predictions.std():.6f}")
print(f" 最小值: {predictions.min():.6f}")
print(f" 最大值: {predictions.max():.6f}")
# 保存结果到 trainer
trainer.results = test_data.with_columns([pl.Series("prediction", predictions)])
# 11. 获取结果
print("\n" + "=" * 80)
print("训练结果")
print("=" * 80)
results = trainer.results
print(f"\n结果数据形状: {results.shape}")
print(f"结果列: {results.columns}")
print(f"\n结果前10行预览:")
print(results.head(10))
print(f"\n结果后5行预览:")
print(results.tail())
print(f"\n每日预测样本数统计:")
daily_counts = results.group_by("trade_date").agg(pl.count()).sort("trade_date")
print(f" 最小: {daily_counts['count'].min()}")
print(f" 最大: {daily_counts['count'].max()}")
print(f" 平均: {daily_counts['count'].mean():.2f}")
# 展示某一天的前10个预测结果
sample_date = results["trade_date"][0]
sample_data = results.filter(results["trade_date"] == sample_date).head(10)
print(f"\n示例日期 {sample_date} 的前10条预测:")
print(sample_data.select(["ts_code", "trade_date", target_col, "prediction"]))
# 12. 保存结果
print("\n" + "=" * 80)
print("保存预测结果")
print("=" * 80)
# 确保输出目录存在
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 生成时间戳
start_dt = datetime.strptime(TEST_START, "%Y%m%d")
end_dt = datetime.strptime(TEST_END, "%Y%m%d")
date_str = f"{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}"
# 12.1 保存每日 Top5
print("\n[1/1] 保存每日 Top5 股票...")
top5_output_path = os.path.join(OUTPUT_DIR, f"top5_{date_str}.csv")
# 按日期分组,取每日 top5
top5_by_date = []
unique_dates = results["trade_date"].unique().sort()
for date in unique_dates:
day_data = results.filter(results["trade_date"] == date)
# 按 prediction 降序排序取前5
top5 = day_data.sort("prediction", descending=True).head(5)
top5_by_date.append(top5)
# 合并所有日期的 top5
top5_results = pl.concat(top5_by_date)
# 格式化日期并调整列顺序:日期、分数、股票
top5_to_save = top5_results.select(
[
pl.col("trade_date").str.slice(0, 4)
+ "-"
+ pl.col("trade_date").str.slice(4, 2)
+ "-"
+ pl.col("trade_date").str.slice(6, 2).alias("date"),
pl.col("prediction").alias("score"),
pl.col("ts_code"),
]
)
top5_to_save.write_csv(top5_output_path, include_header=True)
print(f" 保存路径: {top5_output_path}")
print(f" 保存行数: {len(top5_to_save)}{len(unique_dates)}个交易日 × 每日top5")
print(f"\n 预览前15行:")
print(top5_to_save.head(15))
# 13. 特征重要性
importance = model.feature_importance()
if importance is not None:
print("\n特征重要性:")
print(importance.sort_values(ascending=False))
print("\n" + "=" * 80)
print("训练完成!")
print("=" * 80)
return trainer, results
if __name__ == "__main__":
trainer, results = train_regression_model()

View File

@@ -16,7 +16,7 @@ def create_mock_price_data() -> pl.DataFrame:
"""创建模拟行情数据。"""
return pl.DataFrame(
{
"ts_code": ["000001.SZ"] * 10,
"ts_code": ["000001.SZ"] * 12,
"trade_date": [
"20240101",
"20240102",
@@ -28,8 +28,24 @@ def create_mock_price_data() -> pl.DataFrame:
"20240110",
"20240111",
"20240112",
# 添加2024-04-30之后的日期用于测试同日不同报告期场景
"20240501",
"20240502",
],
"close": [
10.0,
10.2,
10.3,
10.1,
10.5,
10.6,
10.4,
10.7,
10.8,
10.9,
11.0,
11.1,
],
"close": [10.0, 10.2, 10.3, 10.1, 10.5, 10.6, 10.4, 10.7, 10.8, 10.9],
}
)
@@ -37,31 +53,63 @@ def create_mock_price_data() -> pl.DataFrame:
def create_mock_financial_data() -> pl.DataFrame:
"""创建模拟财务数据(覆盖多种场景)。
场景说明:
1. 2024-01-02 发布 2023Q3 报告end_date=20230930
2. 2024-01-02 发布 2023Q3 更正版update_flag=1
3. 2024-04-30 同时发布 2023年报end_date=20231231和 2024Q1季报end_date=20240331
4. 2024-04-30 发布 2023年报更正版
预期结果:
- 2024-01-02 保留 2023Q3 更正版
- 2024-04-30 保留 2024Q1 季报end_date 最新)
注意f_ann_date 必须是 Date 类型(与数据库保持一致)。
"""
return pl.DataFrame(
{
"ts_code": ["000001.SZ", "000001.SZ", "000001.SZ", "000001.SZ"],
# 场景1: 2023Q3 报告,正常公告
# 场景2: 同日多版update_flag 区分)
# 场景3: 隔日修改
"ts_code": [
"000001.SZ",
"000001.SZ",
"000001.SZ",
"000001.SZ",
"000001.SZ",
],
"f_ann_date": [
date(2024, 1, 2),
date(2024, 1, 2),
date(2024, 1, 5),
date(2024, 1, 10),
date(2024, 1, 2), # 同日多版
date(2024, 4, 30),
date(2024, 4, 30),
date(2024, 4, 30), # 同日不同报告期
],
"end_date": [
"20230930",
"20230930", # 2023Q3
"20231231",
"20240331",
"20231231", # 年报和季报同一天发布
],
"report_type": [1, 1, 1, 1, 1], # 整数类型(与数据库一致)
"update_flag": [0, 1, 0, 0, 1], # 年报也有更正版
"net_profit": [
1000000.0,
1100000.0, # 2023Q3
5000000.0,
1500000.0,
5500000.0, # 年报更正后550万季报150万
],
"revenue": [
5000000.0,
5200000.0, # 2023Q3
20000000.0,
8000000.0,
22000000.0,
],
"end_date": ["20230930", "20230930", "20230930", "20231231"],
"report_type": [1, 1, 1, 1], # 整数类型(与数据库一致)
"update_flag": [0, 1, 1, 1], # 整数类型(与数据库一致)
"net_profit": [1000000.0, 1100000.0, 1100000.0, 1200000.0],
"revenue": [5000000.0, 5200000.0, 5200000.0, 6000000.0],
}
)
def test_financial_data_cleaning():
"""测试财务数据清洗逻辑。"""
"""测试财务数据清洗逻辑 - 确保同日多报告期时选 end_date 最新的"""
print("=== 测试 1: 财务数据清洗 ===")
df_finance = create_mock_financial_data()
@@ -70,36 +118,61 @@ def test_financial_data_cleaning():
loader = FinancialLoader()
# 手动执行清洗(模拟 load_financial_data 的逻辑
# 步骤1: 仅保留合并报表
# 手动执行新的清洗逻辑
df = df_finance.filter(pl.col("report_type") == 1)
# 步骤2: 按 update_flag 降序排列后去重
# 添加辅助列
df = df.with_columns(
[pl.col("update_flag").cast(pl.Int32).alias("update_flag_int")]
[
pl.col("end_date").cast(pl.Int32).alias("end_date_int"),
pl.col("update_flag")
.fill_null("0")
.cast(pl.Int32, strict=False)
.fill_null(0)
.alias("update_flag_int"),
]
)
df = df.sort(
["ts_code", "f_ann_date", "update_flag_int"], descending=[False, False, True]
# 确定性排序
df = df.sort(["ts_code", "f_ann_date", "end_date_int", "update_flag_int"])
# 累积最大报告期
df = df.with_columns(
pl.col("end_date_int").cum_max().over("ts_code").alias("max_end_date_seen")
)
df = df.unique(subset=["ts_code", "f_ann_date"], keep="first")
df = df.drop("update_flag_int")
# 过滤历史包袱
df = df.filter(pl.col("end_date_int") == pl.col("max_end_date_seen"))
# 步骤3: 排序f_ann_date 已经是 Date 类型
# 去重保留最后一条end_date 最大的
df = df.unique(subset=["ts_code", "f_ann_date"], keep="last")
# 清理辅助列
df = df.drop(["end_date_int", "update_flag_int", "max_end_date_seen"])
df = df.sort(["ts_code", "f_ann_date"])
print("\n清洗后的财务数据:")
print(df)
# 验证:应该有3条记录(第1-2行去重为1条第3行第4行
assert len(df) == 3, f"清洗后应该有3条记录,实际有 {len(df)}"
# 验证:应该有2条记录(2024-01-02 和 2024-04-30
assert len(df) == 2, f"清洗后应该有2条记录,实际有 {len(df)}"
# 验证2024-01-02 的 update_flag 应该是 1
# 验证2024-01-02 的 end_date 应该是 20230930
row_jan02 = df.filter(pl.col("f_ann_date") == date(2024, 1, 2))
assert len(row_jan02) == 1, "应该有1条 2024-01-02 的记录"
assert row_jan02["update_flag"][0] == 1, "update_flag 应该为 1"
assert row_jan02["net_profit"][0] == 1100000.0, "net_profit 应该为 1100000"
assert len(row_jan02) == 1
assert row_jan02["end_date"][0] == "20230930"
assert row_jan02["update_flag"][0] == 1
print("[验证 1] 2024-01-02 正确保留了 2023Q3 更正版")
# 验证2024-04-30 应该保留 2024Q1end_date=20240331而不是年报
row_apr30 = df.filter(pl.col("f_ann_date") == date(2024, 4, 30))
assert len(row_apr30) == 1
assert row_apr30["end_date"][0] == "20240331", (
f"2024-04-30 应该保留 end_date 最新的 20240331"
f"实际为 {row_apr30['end_date'][0]}"
)
assert row_apr30["net_profit"][0] == 1500000.0
print("[验证 2] 2024-04-30 正确保留了 2024Q1 季报end_date 最新)")
print("\n[通过] 财务数据清洗测试通过!")
return df
@@ -114,17 +187,44 @@ def test_financial_price_merge():
loader = FinancialLoader()
# 步骤1: 清洗财务数据(手动执行)
# 步骤1: 清洗财务数据(手动执行新的清洗逻辑
# 注意f_ann_date 已经是 Date 类型,不需要转换
df_finance = df_finance_raw.filter(pl.col("report_type") == 1)
# 添加辅助列
df_finance = df_finance.with_columns(
[pl.col("update_flag").cast(pl.Int32).alias("update_flag_int")]
[
pl.col("end_date").cast(pl.Int32).alias("end_date_int"),
pl.col("update_flag")
.fill_null("0")
.cast(pl.Int32, strict=False)
.fill_null(0)
.alias("update_flag_int"),
]
)
# 确定性排序
df_finance = df_finance.sort(
["ts_code", "f_ann_date", "update_flag_int"], descending=[False, False, True]
["ts_code", "f_ann_date", "end_date_int", "update_flag_int"]
)
# 累积最大报告期
df_finance = df_finance.with_columns(
pl.col("end_date_int").cum_max().over("ts_code").alias("max_end_date_seen")
)
# 过滤历史包袱
df_finance = df_finance.filter(
pl.col("end_date_int") == pl.col("max_end_date_seen")
)
# 去重保留最后一条end_date 最大的)
df_finance = df_finance.unique(subset=["ts_code", "f_ann_date"], keep="last")
# 清理辅助列
df_finance = df_finance.drop(
["end_date_int", "update_flag_int", "max_end_date_seen"]
)
df_finance = df_finance.unique(subset=["ts_code", "f_ann_date"], keep="first")
df_finance = df_finance.drop("update_flag_int")
df_finance = df_finance.sort(["ts_code", "f_ann_date"])
print("清洗后的财务数据:")
@@ -166,15 +266,22 @@ def test_financial_price_merge():
assert jan04["net_profit"][0] == 1100000.0, "2024-01-04 应延续使用 2023Q3 数据"
print("[验证 3] 2024-01-04 net_profit=1100000 - 正确(延续使用)")
# 20240110 应切换到 2023Q4 数据(公告)
# 20240110 应延续使用 2023Q3 数据(2024-04-30 还未公告)
jan10 = merged.filter(pl.col("trade_date") == "20240110")
assert jan10["net_profit"][0] == 1200000.0, "2024-01-10 应切换到 2023Q4 数据"
print("[验证 4] 2024-01-10 net_profit=1200000 - 正确(新财报公告")
assert jan10["net_profit"][0] == 1100000.0, "2024-01-10 应延续使用 2023Q3 数据"
print("[验证 4] 2024-01-10 net_profit=1100000 - 正确(延续使用 2023Q3")
# 20240112 应继续延续使用 2023Q4 数据
# 20240112 应继续延续使用 2023Q3 数据
jan12 = merged.filter(pl.col("trade_date") == "20240112")
assert jan12["net_profit"][0] == 1200000.0, "2024-01-12 应继续使用 2023Q4 数据"
print("[验证 5] 2024-01-12 net_profit=1200000 - 正确(延续使用)")
assert jan12["net_profit"][0] == 1100000.0, "2024-01-12 应继续使用 2023Q3 数据"
print("[验证 5] 2024-01-12 net_profit=1100000 - 正确(延续使用 2023Q3")
# 20240501 应切换到 2024Q1 数据2024-04-30 已公告,且选择 end_date 最新的)
may01 = merged.filter(pl.col("trade_date") == "20240501")
assert may01["net_profit"][0] == 1500000.0, "2024-05-01 应切换到 2024Q1 数据"
print(
"[验证 6] 2024-05-01 net_profit=1500000 - 正确(切换到 2024Q1end_date 最新)"
)
print("\n[通过] 所有验证通过,无未来函数!")
return merged