Files
NewStock/main/utils/data_process.py
2025-06-05 20:21:01 +08:00

268 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import numpy as np
import statsmodels.api as sm # 用于中性化回归
from tqdm import tqdm
epsilon = 1e-10 # 防止除零
def zscore_standardize(train_df: pd.DataFrame, test_df: pd.DataFrame, features: list, epsilon: float = 1e-10):
"""
对指定特征列进行截面 Z-Score 标准化 (原地修改)。
方法: Z = (value - cross_sectional_mean) / (cross_sectional_std + epsilon)
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date' 和 features 列。
features (list): 需要处理的特征列名列表。
epsilon (float): 防止除以零的小常数。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
print("开始截面 Z-Score 标准化...")
if not all(col in train_df.columns for col in features):
missing = [col for col in features if col not in train_df.columns]
print(f"错误: DataFrame 中缺少以下特征列: {missing}。跳过标准化处理。")
return
for col in tqdm(features, desc="Standardizing"):
try:
# 使用 transform 计算截面均值和标准差
mean = train_df[col].transform('mean')
std = train_df[col].transform('std')
# 计算 Z-Score 并原地赋值
train_df[col] = (train_df[col] - mean) / (std + epsilon)
test_df[col] = (test_df[col] - mean) / (std + epsilon)
except KeyError:
print(f"警告: 列 '{col}' 可能不存在或在分组中出错,跳过此列的标准化处理。")
except Exception as e:
print(f"警告: 处理列 '{col}' 时发生错误: {e},跳过此列的标准化处理。")
print("截面 Z-Score 标准化完成。")
# --- 1. 中位数去极值 (MAD) ---
def cs_mad_filter(df: pd.DataFrame,
features: list,
k: float = 3.0,
scale_factor: float = 1.4826):
"""
对指定特征列进行截面 MAD 去极值处理 (原地修改)。
方法: 对每日截面数据,计算 median 和 MAD
将超出 [median - k * scale * MAD, median + k * scale * MAD] 范围的值
替换为边界值 (Winsorization)。
scale_factor=1.4826 使得 MAD 约等于正态分布的标准差。
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date' 和 features 列。
features (list): 需要处理的特征列名列表。
k (float): MAD 的倍数,用于确定边界。默认为 3.0。
scale_factor (float): MAD 的缩放因子。默认为 1.4826。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
print(f"开始截面 MAD 去极值处理 (k={k})...")
if not all(col in df.columns for col in features):
missing = [col for col in features if col not in df.columns]
print(f"错误: DataFrame 中缺少以下特征列: {missing}。跳过去极值处理。")
return
grouped = df.groupby('trade_date')
for col in tqdm(features, desc="MAD Filtering"):
try:
# 计算截面中位数
median = grouped[col].transform('median')
# 计算截面 MAD (Median Absolute Deviation from Median)
mad = (df[col] - median).abs().groupby(df['trade_date']).transform('median')
# 计算上下边界
lower_bound = median - k * scale_factor * mad
upper_bound = median + k * scale_factor * mad
# 原地应用 clip
df[col] = np.clip(df[col], lower_bound, upper_bound)
except KeyError:
print(f"警告: 列 '{col}' 可能不存在或在分组中出错,跳过此列的 MAD 处理。")
except Exception as e:
print(f"警告: 处理列 '{col}' 时发生错误: {e},跳过此列的 MAD 处理。")
print("截面 MAD 去极值处理完成。")
# --- 2. 行业市值中性化 ---
def cs_neutralize_industry_cap(df: pd.DataFrame,
features: list,
industry_col: str = 'cat_l2_code',
market_cap_col: str = 'circ_mv'):
"""
对指定特征列进行截面行业和对数市值中性化 (原地修改)。
使用 OLS 回归: feature ~ 1 + log(market_cap) + C(industry)
将回归残差写回原特征列。
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date', features 列,
industry_col, market_cap_col。
features (list): 需要处理的特征列名列表。
industry_col (str): 行业分类列名。
market_cap_col (str): 流通市值列名。
WARNING: 此函数会原地修改输入的 DataFrame 'df' 的 features 列。
计算量较大,可能耗时较长。
需要安装 statsmodels 库 (pip install statsmodels)。
"""
print("开始截面行业市值中性化...")
required_cols = features + ['trade_date', industry_col, market_cap_col]
if not all(col in df.columns for col in required_cols):
missing = [col for col in required_cols if col not in df.columns]
print(f"错误: DataFrame 中缺少必需列: {missing}。无法进行中性化。")
return
# 预处理:计算 log 市值,处理 industry code 可能的 NaN
log_cap_col = '_log_market_cap'
df[log_cap_col] = np.log1p(df[market_cap_col]) # log1p 处理 0 值
# df[industry_col] = df[industry_col].cat.add_categories('UnknownIndustry')
# df[industry_col] = df[industry_col].fillna('UnknownIndustry') # 填充行业 NaN
# df[industry_col] = df[industry_col].astype('category') # 转为类别ols 会自动处理
dates = df['trade_date'].unique()
all_residuals = [] # 用于收集所有日期的残差
for date in tqdm(dates, desc="Neutralizing"):
daily_data = df.loc[df['trade_date'] == date, features + [log_cap_col, industry_col]].copy() # 使用 .loc 获取副本
# 准备自变量 X (常数项 + log市值 + 行业哑变量)
X = daily_data[[log_cap_col]]
X = sm.add_constant(X, prepend=True) # 添加常数项
# 创建行业哑变量 (drop_first=True 避免共线性)
industry_dummies = pd.get_dummies(daily_data[industry_col], prefix=industry_col, drop_first=True)
industry_dummies = industry_dummies.astype(int)
X = pd.concat([X, industry_dummies], axis=1)
daily_residuals = daily_data[[col for col in features]].copy() # 创建用于存储残差的df
for col in features:
Y = daily_data[col]
# 处理 NaN 值,确保 X 和 Y 在相同位置有有效值
valid_mask = Y.notna() & X.notna().all(axis=1)
if valid_mask.sum() < (X.shape[1] + 1): # 数据点不足以估计模型
print(f"警告: 日期 {date}, 特征 {col} 有效数据不足 ({valid_mask.sum()}个),无法中性化,填充 NaN。")
daily_residuals[col] = np.nan
continue
Y_valid = Y[valid_mask]
X_valid = X[valid_mask]
# 执行 OLS 回归
try:
model = sm.OLS(Y_valid.to_numpy(), X_valid.to_numpy())
results = model.fit()
# 将残差填回对应位置
daily_residuals.loc[valid_mask, col] = results.resid
daily_residuals.loc[~valid_mask, col] = np.nan # 原本无效的位置填充 NaN
except Exception as e:
print(f"警告: 日期 {date}, 特征 {col} 回归失败: {e},填充 NaN。")
daily_residuals[col] = np.nan
break
all_residuals.append(daily_residuals)
# 合并所有日期的残差结果
if all_residuals:
residuals_df = pd.concat(all_residuals)
# 将残差结果更新回原始 df (原地修改)
# 使用 update 比 merge 更适合基于索引的原地更新
# 确保 residuals_df 的索引与 df 中对应部分一致
df.update(residuals_df)
else:
print("没有有效的残差结果可以合并。")
# 清理临时列
df.drop(columns=[log_cap_col], inplace=True)
print("截面行业市值中性化完成。")
# --- 3. Z-Score 标准化 ---
def cs_zscore_standardize(df: pd.DataFrame, features: list, epsilon: float = 1e-10):
"""
对指定特征列进行截面 Z-Score 标准化 (原地修改)。
方法: Z = (value - cross_sectional_mean) / (cross_sectional_std + epsilon)
Args:
df (pd.DataFrame): 输入 DataFrame需包含 'trade_date' 和 features 列。
features (list): 需要处理的特征列名列表。
epsilon (float): 防止除以零的小常数。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
print("开始截面 Z-Score 标准化...")
if not all(col in df.columns for col in features):
missing = [col for col in features if col not in df.columns]
print(f"错误: DataFrame 中缺少以下特征列: {missing}。跳过标准化处理。")
return
grouped = df.groupby('trade_date')
for col in tqdm(features, desc="Standardizing"):
try:
# 使用 transform 计算截面均值和标准差
mean = grouped[col].transform('mean')
std = grouped[col].transform('std')
# 计算 Z-Score 并原地赋值
df[col] = (df[col] - mean) / (std + epsilon)
except KeyError:
print(f"警告: 列 '{col}' 可能不存在或在分组中出错,跳过此列的标准化处理。")
except Exception as e:
print(f"警告: 处理列 '{col}' 时发生错误: {e},跳过此列的标准化处理。")
print("截面 Z-Score 标准化完成。")
def fill_nan_with_daily_median(df: pd.DataFrame, feature_columns: list[str]) -> pd.DataFrame:
"""
对指定特征列进行每日截面中位数填充缺失值 (NaN)。
参数:
df (pd.DataFrame): 包含多日数据的DataFrame需要包含 'trade_date' 和 feature_columns 中的列。
feature_columns (list[str]): 需要进行缺失值填充的特征列名称列表。
返回:
pd.DataFrame: 包含缺失值填充后特征列的DataFrame。在输入DataFrame的副本上操作。
"""
processed_df = df.copy() # 在副本上操作,保留原始数据
# 确保 trade_date 是 datetime 类型以便正确分组
processed_df['trade_date'] = pd.to_datetime(processed_df['trade_date'])
def _fill_daily_nan(group):
# group 是某一个交易日的 DataFrame
# 遍历指定的特征列
for feature_col in feature_columns:
# 检查列是否存在于当前分组中
if feature_col in group.columns:
# 计算当日该特征的中位数
median_val = group[feature_col].median()
# 使用当日中位数填充该特征列的 NaN 值
# inplace=True 会直接修改 group DataFrame
group[feature_col].fillna(median_val, inplace=True)
# else:
# print(f"Warning: Feature column '{feature_col}' not found in daily group for {group['trade_date'].iloc[0]}. Skipping.")
return group
# 按交易日期分组,并应用每日填充函数
# group_keys=False 避免将分组键添加到结果索引中
filled_df = processed_df.groupby('trade_date', group_keys=False).apply(_fill_daily_nan)
return filled_df