fix(data): 修复财务因子计算非确定性问题

重构 financial_loader 的去重逻辑,确保截面排名计算的股票集合一致:
- 引入"高水位线"算法剔除陈旧历史财报(解决2026年发布2021年财报的问题)
- 改变去重策略:按报告期(end_date)而非更新标识(update_flag)保留最新数据
- 扩展回看期从1年到2年,防止ST/停牌公司财报缺失
- 确保相同交易日在不同查询范围下返回一致的财务数据
This commit is contained in:
2026-03-08 20:58:35 +08:00
parent 3c7795f630
commit 505279c08b
5 changed files with 480 additions and 2853 deletions

View File

@@ -49,7 +49,13 @@ class FinancialLoader:
清洗后的 Polars DataFrame已排序f_ann_date 为 pl.Date 类型
"""
# 确保包含必要字段
required_cols = {"ts_code", "f_ann_date", "report_type", "update_flag"}
required_cols = {
"ts_code",
"f_ann_date",
"report_type",
"update_flag",
"end_date",
}
query_cols = list(set(columns) | required_cols)
# 从数据库加载原始数据
@@ -61,32 +67,51 @@ class FinancialLoader:
# 步骤1: 仅保留合并报表 (report_type 可能是字符串或整数)
df = df.filter(pl.col("report_type") == 1)
# 步骤2: 按 update_flag 降序排列后去重
# 步骤2: 添加辅助列用于排序和过滤
df = df.with_columns(
[pl.col("update_flag").cast(pl.Int32).alias("update_flag_int")]
[
# 将报告期 20231231 转为整数
pl.col("end_date").cast(pl.Int32).alias("end_date_int"),
# 将更新标识转为整数,容错处理 null 和空字符串
pl.col("update_flag")
.fill_null("0")
.cast(pl.Int32, strict=False)
.fill_null(0)
.alias("update_flag_int"),
]
)
# 排序ts_code, f_ann_date 升序update_flag 降
df = df.sort(
["ts_code", "f_ann_date", "update_flag_int"],
descending=[False, False, True],
# 步骤3: 绝对确定性排
# 按照: 股票代码(升序) -> 公告日(升序) -> 报告期(升序) -> 更新标识(升序)
# 这样排序后同一天发布的多份报表中end_date最大、update_flag最新的
# 必然排在这一天的最后面
df = df.sort(["ts_code", "f_ann_date", "end_date_int", "update_flag_int"])
# 步骤4: 核心算法 - 计算截至每一个公告日,市场"见过的"最新报告期(最高水位线)
df = df.with_columns(
pl.col("end_date_int").cum_max().over("ts_code").alias("max_end_date_seen")
)
# 去重:保留每个 (ts_code, f_ann_date) 的第一条update_flag 最高的
df = df.unique(subset=["ts_code", "f_ann_date"], keep="first")
# 步骤5: 剔除"历史包袱"解决2026年发2021年财报的问题
# 如果今天发布的财报,它的 end_date 小于我们之前已经见过的最大值,
# 说明它是陈旧的追溯调整,直接抛弃!
df = df.filter(pl.col("end_date_int") == pl.col("max_end_date_seen"))
# 移除临时列
df = df.drop("update_flag_int")
# 步骤6: 唯一化处理:满足 join_asof 的前置要求
# 经过上述处理后,同一个 f_ann_date 的最后一行,必定是 end_date 最大的那一份
# 我们只保留这最后一行,确保每个 f_ann_date 只有唯一的一条记录暴露给行情去 join
df = df.unique(subset=["ts_code", "f_ann_date"], keep="last")
# 步骤3: 确保 f_ann_date 是 Date 类型并排序
# 数据库返回的必须是 Date 类型,如果不是则报错
# 步骤7: 清理临时辅助列
df = df.drop(["end_date_int", "update_flag_int", "max_end_date_seen"])
# 步骤8: 确保 f_ann_date 是 Date 类型并排序join_asof 要求)
if df["f_ann_date"].dtype != pl.Date:
raise TypeError(
f"f_ann_date 必须是 Date 类型,实际类型为 {df['f_ann_date'].dtype}. "
f"请检查数据库表结构,确保日期字段为 DATE 类型。"
)
# 最终排序join_asof 要求)
df = df.sort(["ts_code", "f_ann_date"])
return df
@@ -168,14 +193,14 @@ class FinancialLoader:
self,
start_date: str,
end_date: str,
lookback_years: int = 1,
lookback_years: int = 2,
) -> tuple[str, str]:
"""计算包含回看期的日期范围。
Args:
start_date: 原始开始日期YYYYMMDD
end_date: 原始结束日期YYYYMMDD
lookback_years: 回看年数(默认1年
lookback_years: 回看年数(默认2年防止ST/停牌公司财报缺失
Returns:
(扩展后的开始日期, 结束日期)

File diff suppressed because one or more lines are too long