Files
NewStock/main/factor/utils.py

146 lines
6.6 KiB
Python
Raw Normal View History

2025-11-29 00:23:12 +08:00
import pandas as pd
def add_financial_factor(
main_df: pd.DataFrame,
financial_df: pd.DataFrame,
factor_value_col: str, # 财务指标值所在的列
ts_code_col: str = "ts_code",
trade_date_col: str = "trade_date",
ann_date_col: str = "ann_date", # 公告日期
f_ann_date_col: str = "f_ann_date", # 实际公告日期 (优先使用)
) -> pd.DataFrame:
"""
将财务指标数据如每股未分配利润作为因子添加到主时间序列 DataFrame
使用 merge_asof 根据股票代码和公告日期将最新的财务指标值匹配到每个交易日
Args:
main_df: 包含时间序列交易数据的主 DataFrame (至少包含 ts_code_col trade_date_col)
financial_df: 包含财务指标数据的 DataFrame (至少包含 ts_code_col,
ann_date_col f_ann_date_col, 以及 factor_value_col)
ts_code_col: 股票代码列在两个 DataFrame 中的名称默认为 'ts_code'
trade_date_col: 交易日期列在 main_df 中的名称默认为 'trade_date'
ann_date_col: 公告日期列在 financial_df 中的名称作为 f_ann_date_col 的备选默认为 'ann_date'
f_ann_date_col: 实际公告日期列在 financial_df 中的名称优先使用默认为 'f_ann_date'
factor_value_col: 财务指标值即要添加的因子值 financial_df 中的列名默认为 'undistr_pft_ps'
new_factor_col_name: 添加到 main_df 中的新因子列的名称默认为 'undist_profit_ps'
Returns:
包含新因子列的 main_df DataFrame
"""
if factor_value_col in main_df.columns:
return main_df
new_factor_col_name = factor_value_col
# --- 数据校验 ---
required_main_cols = [ts_code_col, trade_date_col]
if not all(col in main_df.columns for col in required_main_cols):
raise ValueError(f"主 DataFrame 必须包含列: {required_main_cols}")
required_financial_cols = [ts_code_col, factor_value_col]
if f_ann_date_col and f_ann_date_col in financial_df.columns:
effective_date_col = f_ann_date_col
print(f"使用 '{f_ann_date_col}' 作为财务数据生效日期。")
elif ann_date_col and ann_date_col in financial_df.columns:
effective_date_col = ann_date_col
print(f"使用 '{ann_date_col}' 作为财务数据生效日期。")
else:
raise ValueError(
f"财务指标 DataFrame 必须包含列 '{f_ann_date_col}''{ann_date_col}' 作为数据生效日期"
)
required_financial_cols.append(effective_date_col)
if not all(col in financial_df.columns for col in required_financial_cols):
raise ValueError(f"财务指标 DataFrame 必须包含列: {required_financial_cols}")
# --- 数据准备和清理 ---
# 确保日期列是 datetime 类型
# 使用 .copy() 避免 SettingWithCopyWarning
main_df = main_df.copy()
financial_df = financial_df.copy()
main_df[trade_date_col] = pd.to_datetime(main_df[trade_date_col], errors="coerce")
financial_df[effective_date_col] = pd.to_datetime(
financial_df[effective_date_col], errors="coerce"
)
# 确保股票代码是字符串类型
main_df[ts_code_col] = main_df[ts_code_col].astype(str)
financial_df[ts_code_col] = financial_df[ts_code_col].astype(str)
# 选取 financial_df 中需要合并的列
financial_data_subset = financial_df[
[ts_code_col, effective_date_col, factor_value_col]
].copy()
# *** 新增:处理右表合并键中的空值 ***
initial_rows_financial = len(financial_data_subset)
financial_data_subset = financial_data_subset.dropna(
subset=[ts_code_col, effective_date_col]
)
rows_dropped = initial_rows_financial - len(financial_data_subset)
if rows_dropped > 0:
print(
f"警告: 从 financial_data_subset 中移除了 {rows_dropped} 行,因为其 '{ts_code_col}''{effective_date_col}' 列存在空值。"
)
if financial_data_subset.empty:
print(
f"警告: 清理空值后 financial_data_subset 为空,无法添加因子 '{new_factor_col_name}'。将填充 NaN。"
)
main_df[new_factor_col_name] = np.nan
return main_df
# *** 修改:修正排序顺序以满足 merge_asof 要求 ***
# 先按 ts_code 排序,再按日期排序
# main_df = main_df.sort_values(by=[ts_code_col, trade_date_col])
# financial_data_subset = financial_data_subset.sort_values(by=[ts_code_col, effective_date_col])
main_df = main_df.sort_values(by=[trade_date_col, ts_code_col])
financial_data_subset = financial_data_subset.sort_values(
by=[effective_date_col, ts_code_col]
)
# --- 使用 merge_asof 计算因子 ---
try:
df_with_factor = pd.merge_asof(
main_df,
financial_data_subset,
left_on=trade_date_col,
right_on=effective_date_col,
by=ts_code_col,
direction="backward",
)
except Exception as e:
print(f"merge_asof 执行失败: {e}")
# 根据需要决定如何处理错误,这里填充 NaN
main_df[new_factor_col_name] = np.nan
return main_df
# --- 清理与重命名 ---
# 移除右表的日期列(如果它与左表日期列名称不同)
if (
effective_date_col in df_with_factor.columns
and effective_date_col != trade_date_col
):
df_with_factor = df_with_factor.drop(columns=[effective_date_col])
# 重命名新加入的因子列
if factor_value_col != new_factor_col_name:
if factor_value_col in df_with_factor.columns:
df_with_factor = df_with_factor.rename(
columns={factor_value_col: new_factor_col_name}
)
else:
# 这种情况理论上不应发生,因为 merge_asof 应该会把右表的非 key 列带过来
print(f"警告: 合并后未找到原始因子列 '{factor_value_col}',无法重命名。")
# 如果 factor_value_col 已是目标名称,则无需重命名
if new_factor_col_name not in df_with_factor.columns:
# 如果目标名称也不存在,则可能合并失败或列名有问题
df_with_factor[new_factor_col_name] = np.nan
# 如果 factor_value_col 就是目标名称,确保该列存在
elif new_factor_col_name not in df_with_factor.columns:
print(f"警告: 合并后未找到目标因子列 '{new_factor_col_name}'。填充 NaN。")
df_with_factor[new_factor_col_name] = np.nan
return df_with_factor