This commit is contained in:
2025-06-05 20:21:01 +08:00
parent f3f4912d2b
commit c5ad56a9e8
7 changed files with 3602 additions and 731 deletions

268
main/utils/data_process.py Normal file
View File

@@ -0,0 +1,268 @@
import pandas as pd
import numpy as np
import statsmodels.api as sm # 用于中性化回归
from tqdm import tqdm
epsilon = 1e-10 # 防止除零
def zscore_standardize(train_df: pd.DataFrame, test_df: pd.DataFrame, features: list, epsilon: float = 1e-10):
"""
对指定特征列进行截面 Z-Score 标准化 (原地修改)。
方法: Z = (value - cross_sectional_mean) / (cross_sectional_std + epsilon)
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date' 和 features 列。
features (list): 需要处理的特征列名列表。
epsilon (float): 防止除以零的小常数。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
print("开始截面 Z-Score 标准化...")
if not all(col in train_df.columns for col in features):
missing = [col for col in features if col not in train_df.columns]
print(f"错误: DataFrame 中缺少以下特征列: {missing}。跳过标准化处理。")
return
for col in tqdm(features, desc="Standardizing"):
try:
# 使用 transform 计算截面均值和标准差
mean = train_df[col].transform('mean')
std = train_df[col].transform('std')
# 计算 Z-Score 并原地赋值
train_df[col] = (train_df[col] - mean) / (std + epsilon)
test_df[col] = (test_df[col] - mean) / (std + epsilon)
except KeyError:
print(f"警告: 列 '{col}' 可能不存在或在分组中出错,跳过此列的标准化处理。")
except Exception as e:
print(f"警告: 处理列 '{col}' 时发生错误: {e},跳过此列的标准化处理。")
print("截面 Z-Score 标准化完成。")
# --- 1. 中位数去极值 (MAD) ---
def cs_mad_filter(df: pd.DataFrame,
features: list,
k: float = 3.0,
scale_factor: float = 1.4826):
"""
对指定特征列进行截面 MAD 去极值处理 (原地修改)。
方法: 对每日截面数据,计算 median 和 MAD
将超出 [median - k * scale * MAD, median + k * scale * MAD] 范围的值
替换为边界值 (Winsorization)。
scale_factor=1.4826 使得 MAD 约等于正态分布的标准差。
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date' 和 features 列。
features (list): 需要处理的特征列名列表。
k (float): MAD 的倍数,用于确定边界。默认为 3.0。
scale_factor (float): MAD 的缩放因子。默认为 1.4826。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
print(f"开始截面 MAD 去极值处理 (k={k})...")
if not all(col in df.columns for col in features):
missing = [col for col in features if col not in df.columns]
print(f"错误: DataFrame 中缺少以下特征列: {missing}。跳过去极值处理。")
return
grouped = df.groupby('trade_date')
for col in tqdm(features, desc="MAD Filtering"):
try:
# 计算截面中位数
median = grouped[col].transform('median')
# 计算截面 MAD (Median Absolute Deviation from Median)
mad = (df[col] - median).abs().groupby(df['trade_date']).transform('median')
# 计算上下边界
lower_bound = median - k * scale_factor * mad
upper_bound = median + k * scale_factor * mad
# 原地应用 clip
df[col] = np.clip(df[col], lower_bound, upper_bound)
except KeyError:
print(f"警告: 列 '{col}' 可能不存在或在分组中出错,跳过此列的 MAD 处理。")
except Exception as e:
print(f"警告: 处理列 '{col}' 时发生错误: {e},跳过此列的 MAD 处理。")
print("截面 MAD 去极值处理完成。")
# --- 2. 行业市值中性化 ---
def cs_neutralize_industry_cap(df: pd.DataFrame,
features: list,
industry_col: str = 'cat_l2_code',
market_cap_col: str = 'circ_mv'):
"""
对指定特征列进行截面行业和对数市值中性化 (原地修改)。
使用 OLS 回归: feature ~ 1 + log(market_cap) + C(industry)
将回归残差写回原特征列。
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date', features 列,
industry_col, market_cap_col。
features (list): 需要处理的特征列名列表。
industry_col (str): 行业分类列名。
market_cap_col (str): 流通市值列名。
WARNING: 此函数会原地修改输入的 DataFrame 'df' 的 features 列。
计算量较大,可能耗时较长。
需要安装 statsmodels 库 (pip install statsmodels)。
"""
print("开始截面行业市值中性化...")
required_cols = features + ['trade_date', industry_col, market_cap_col]
if not all(col in df.columns for col in required_cols):
missing = [col for col in required_cols if col not in df.columns]
print(f"错误: DataFrame 中缺少必需列: {missing}。无法进行中性化。")
return
# 预处理:计算 log 市值,处理 industry code 可能的 NaN
log_cap_col = '_log_market_cap'
df[log_cap_col] = np.log1p(df[market_cap_col]) # log1p 处理 0 值
# df[industry_col] = df[industry_col].cat.add_categories('UnknownIndustry')
# df[industry_col] = df[industry_col].fillna('UnknownIndustry') # 填充行业 NaN
# df[industry_col] = df[industry_col].astype('category') # 转为类别ols 会自动处理
dates = df['trade_date'].unique()
all_residuals = [] # 用于收集所有日期的残差
for date in tqdm(dates, desc="Neutralizing"):
daily_data = df.loc[df['trade_date'] == date, features + [log_cap_col, industry_col]].copy() # 使用 .loc 获取副本
# 准备自变量 X (常数项 + log市值 + 行业哑变量)
X = daily_data[[log_cap_col]]
X = sm.add_constant(X, prepend=True) # 添加常数项
# 创建行业哑变量 (drop_first=True 避免共线性)
industry_dummies = pd.get_dummies(daily_data[industry_col], prefix=industry_col, drop_first=True)
industry_dummies = industry_dummies.astype(int)
X = pd.concat([X, industry_dummies], axis=1)
daily_residuals = daily_data[[col for col in features]].copy() # 创建用于存储残差的df
for col in features:
Y = daily_data[col]
# 处理 NaN 值,确保 X 和 Y 在相同位置有有效值
valid_mask = Y.notna() & X.notna().all(axis=1)
if valid_mask.sum() < (X.shape[1] + 1): # 数据点不足以估计模型
print(f"警告: 日期 {date}, 特征 {col} 有效数据不足 ({valid_mask.sum()}个),无法中性化,填充 NaN。")
daily_residuals[col] = np.nan
continue
Y_valid = Y[valid_mask]
X_valid = X[valid_mask]
# 执行 OLS 回归
try:
model = sm.OLS(Y_valid.to_numpy(), X_valid.to_numpy())
results = model.fit()
# 将残差填回对应位置
daily_residuals.loc[valid_mask, col] = results.resid
daily_residuals.loc[~valid_mask, col] = np.nan # 原本无效的位置填充 NaN
except Exception as e:
print(f"警告: 日期 {date}, 特征 {col} 回归失败: {e},填充 NaN。")
daily_residuals[col] = np.nan
break
all_residuals.append(daily_residuals)
# 合并所有日期的残差结果
if all_residuals:
residuals_df = pd.concat(all_residuals)
# 将残差结果更新回原始 df (原地修改)
# 使用 update 比 merge 更适合基于索引的原地更新
# 确保 residuals_df 的索引与 df 中对应部分一致
df.update(residuals_df)
else:
print("没有有效的残差结果可以合并。")
# 清理临时列
df.drop(columns=[log_cap_col], inplace=True)
print("截面行业市值中性化完成。")
# --- 3. Z-Score 标准化 ---
def cs_zscore_standardize(df: pd.DataFrame, features: list, epsilon: float = 1e-10):
"""
对指定特征列进行截面 Z-Score 标准化 (原地修改)。
方法: Z = (value - cross_sectional_mean) / (cross_sectional_std + epsilon)
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date' 和 features 列。
features (list): 需要处理的特征列名列表。
epsilon (float): 防止除以零的小常数。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
print("开始截面 Z-Score 标准化...")
if not all(col in df.columns for col in features):
missing = [col for col in features if col not in df.columns]
print(f"错误: DataFrame 中缺少以下特征列: {missing}。跳过标准化处理。")
return
grouped = df.groupby('trade_date')
for col in tqdm(features, desc="Standardizing"):
try:
# 使用 transform 计算截面均值和标准差
mean = grouped[col].transform('mean')
std = grouped[col].transform('std')
# 计算 Z-Score 并原地赋值
df[col] = (df[col] - mean) / (std + epsilon)
except KeyError:
print(f"警告: 列 '{col}' 可能不存在或在分组中出错,跳过此列的标准化处理。")
except Exception as e:
print(f"警告: 处理列 '{col}' 时发生错误: {e},跳过此列的标准化处理。")
print("截面 Z-Score 标准化完成。")
def fill_nan_with_daily_median(df: pd.DataFrame, feature_columns: list[str]) -> pd.DataFrame:
"""
对指定特征列进行每日截面中位数填充缺失值 (NaN)。
参数:
df (pd.DataFrame): 包含多日数据的DataFrame需要包含 'trade_date' 和 feature_columns 中的列。
feature_columns (list[str]): 需要进行缺失值填充的特征列名称列表。
返回:
pd.DataFrame: 包含缺失值填充后特征列的DataFrame。在输入DataFrame的副本上操作。
"""
processed_df = df.copy() # 在副本上操作,保留原始数据
# 确保 trade_date 是 datetime 类型以便正确分组
processed_df['trade_date'] = pd.to_datetime(processed_df['trade_date'])
def _fill_daily_nan(group):
# group 是某一个交易日的 DataFrame
# 遍历指定的特征列
for feature_col in feature_columns:
# 检查列是否存在于当前分组中
if feature_col in group.columns:
# 计算当日该特征的中位数
median_val = group[feature_col].median()
# 使用当日中位数填充该特征列的 NaN 值
# inplace=True 会直接修改 group DataFrame
group[feature_col].fillna(median_val, inplace=True)
# else:
# print(f"Warning: Feature column '{feature_col}' not found in daily group for {group['trade_date'].iloc[0]}. Skipping.")
return group
# 按交易日期分组,并应用每日填充函数
# group_keys=False 避免将分组键添加到结果索引中
filled_df = processed_df.groupby('trade_date', group_keys=False).apply(_fill_daily_nan)
return filled_df