This commit is contained in:
2025-06-04 20:34:17 +08:00
parent 81bbee9b32
commit f3f4912d2b
4 changed files with 3408 additions and 1478 deletions

View File

@@ -1,115 +1,112 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
def holder_trade_factors(all_data_df: pd.DataFrame, stk_holdertrade_df: pd.DataFrame) -> pd.DataFrame: def holder_trade_factors(all_data_df: pd.DataFrame,
stk_holdertrade_df: pd.DataFrame) -> pd.DataFrame:
""" """
生成合并的股东增减持因子以及 change_ratio 相关因子(优化版) 计算基于股东增减持数据的因子
Args: Args:
stk_holdertrade_df (pd.DataFrame): 股东增减持数据,包含 'ts_code', 'ann_date', 'in_de', 'change_ratio' all_data_df (pd.DataFrame): 包含每日股票数据的 DataFrame
all_data_df (pd.DataFrame): 所有日期所有股票数据,包含 'ts_code', 'trade_date' 必须包含 'ts_code' 'trade_date'
stk_holdertrade_df (pd.DataFrame): 包含股东增减持信息的 DataFrame。
必须包含 'ts_code', 'ann_date',
'in_de' (例如, '增持', '减持'),
'change_ratio'
Returns: Returns:
pd.DataFrame: 包含增减持因子的 all_data_df。 pd.DataFrame: 添加了新的股东增减持因子的 all_data_df DataFrame
""" """
print('正在计算股东增减持因子(优化版)...') print("开始计算股东增减持因子...")
# 1. 确保日期列为 datetime 类型 # --- 1. 预处理 stk_holdertrade_df ---
stk_holdertrade_df['ann_date'] = pd.to_datetime(stk_holdertrade_df['ann_date']) # 创建副本以避免修改原始传入的DataFrame
all_data_df['trade_date'] = pd.to_datetime(all_data_df['trade_date']) stk_trade_processed_df = stk_holdertrade_df.copy()
# 2. 对增减持数据进行预处理和排序(排序在此阶段可能不是严格必需的,但保持良好习惯) # 确保 'ann_date' 是 datetime 类型
holder_data_processed = stk_holdertrade_df.copy() stk_trade_processed_df['ann_date'] = pd.to_datetime(stk_trade_processed_df['ann_date'])
holder_data_processed['change_ratio_in_agg'] = holder_data_processed['change_ratio'].where(holder_data_processed['in_de'] == 'IN', 0)
holder_data_processed['change_ratio_de_agg'] = holder_data_processed['change_ratio'].where(holder_data_processed['in_de'] == 'DE', 0)
holder_data_processed['change_ratio_total_agg'] = holder_data_processed['change_ratio']
holder_data_processed['in_de_numeric'] = holder_data_processed['in_de'].map({'IN': 1, 'DE': -1}).fillna(0) # 用于判断类型
# 提前获取所有唯一的交易日期集合,以提高查找效率 # 将 'in_de' 映射为数值: 1 代表 '增持', -1 代表 '减持'
all_trade_dates_set = set(all_data_df['trade_date'].unique()) # 请根据你数据中实际的 'in_de' 字符串调整
in_de_map = {'增持': 1, '减持': -1} # 假设是这两个值
# 如果你的值是 '1' 和 '2' (1代表增持, 2代表减持),则映射应相应调整
# 或者如果 'in_de' 已经是 1 和 -1 (或类似数值),则可以跳过映射,但要确保类型正确
stk_trade_processed_df['_direction'] = stk_trade_processed_df['in_de'].map(in_de_map)
# 如果 _direction 列在映射后可能产生NaN (因为in_de中有未覆盖的值),需要处理
if stk_trade_processed_df['_direction'].isnull().any():
print("警告: 'in_de' 列中存在未映射的值,可能导致 _direction 列出现NaN。")
# 可以选择填充NaN例如用0填充或者移除这些行
# stk_trade_processed_df['_direction'].fillna(0, inplace=True)
# 3. 构建一个辅助DataFrame记录每个公告在未来10个日历日且是交易日的影响 # 计算有效变动比例 (方向 * 变动比例)
expanded_holder_events = [] # 确保 change_ratio 是数值类型。假设 change_ratio 是一个正确的比例值 (例如 0.005 代表 0.5%)。
for _, row in holder_data_processed.iterrows(): # 如果 change_ratio 是百分点 (例如 0.5 代表 0.5%),你可能需要除以 100。
ts_code = row['ts_code'] stk_trade_processed_df['change_ratio'] = pd.to_numeric(stk_trade_processed_df['change_ratio'], errors='coerce')
ann_date = row['ann_date'] stk_trade_processed_df['_effective_change'] = stk_trade_processed_df['_direction'] * stk_trade_processed_df['change_ratio']
# 生成从公告日期开始的未来10个日历日的日期范围包括公告日本身 # 按股票代码和公告日期聚合当日的多次增减持操作
# pd.Timedelta(days=10) 表示从公告日+10天 daily_trade_agg = stk_trade_processed_df.groupby(['ts_code', 'ann_date']).agg(
# pd.date_range(start=ann_date, end=ann_date + pd.Timedelta(days=10), freq='D') net_change_ratio_daily=('_effective_change', 'sum'),
# 更精确地生成11个日期涵盖公告日及其后的10个日历日 any_increase_daily=('_direction', lambda x: (x == 1).any().astype(int)),
future_dates = pd.date_range(start=ann_date, periods=11, freq='D') any_decrease_daily=('_direction', lambda x: (x == -1).any().astype(int))
for date_in_window in future_dates:
# 只有当日期是实际交易日时才添加
if date_in_window in all_trade_dates_set:
expanded_holder_events.append({
'ts_code': ts_code,
'trade_date': date_in_window,
'in_de_numeric': row['in_de_numeric'],
'change_ratio_total_agg': row['change_ratio_total_agg'],
'change_ratio_in_agg': row['change_ratio_in_agg'],
'change_ratio_de_agg': row['change_ratio_de_agg']
})
if not expanded_holder_events: # 如果没有事件,直接返回原始 df
# 确保返回的DataFrame与原始df具有相同的列和顺序
# 并填充为默认值
default_factors = pd.DataFrame({
'holder_trade_type_10d': None,
'holder_change_ratio_sum_10d': 0.0,
'holder_in_change_ratio_sum_10d': 0.0,
'holder_de_change_ratio_sum_10d': 0.0
}, index=all_data_df.index)
return pd.concat([all_data_df, default_factors], axis=1)
expanded_holder_events_df = pd.DataFrame(expanded_holder_events)
# 4. 聚合每个 (ts_code, trade_date) 对上的事件
# 可能会有重复的 (ts_code, trade_date) 对,因为一个交易日可能受多个公告影响
daily_aggregated_factors = expanded_holder_events_df.groupby(['ts_code', 'trade_date']).agg(
holder_change_ratio_sum_10d=('change_ratio_total_agg', 'sum'),
holder_in_change_ratio_sum_10d=('change_ratio_in_agg', 'sum'),
holder_de_change_ratio_sum_10d=('change_ratio_de_agg', 'sum'),
# 对于 holder_trade_type_10d聚合 in_de_numeric 的唯一值集合
_in_de_types_unique=('in_de_numeric', lambda x: set(x)) # 获取该日期窗口内所有独特的增减持类型
).reset_index() ).reset_index()
# 根据 _in_de_types_unique 确定 holder_trade_type_10d # 将 'ann_date' 重命名为 'trade_date' 以便与 all_data_df 合并
def get_trade_type(unique_types_set): daily_trade_agg = daily_trade_agg.rename(columns={'ann_date': 'trade_date'})
if 1 in unique_types_set and -1 in unique_types_set:
return 'BOTH'
elif 1 in unique_types_set:
return 'IN'
elif -1 in unique_types_set:
return 'DE'
else:
return None # 理论上不应该发生,除非 unique_types_set 为空或只包含0
daily_aggregated_factors['holder_trade_type_10d'] = daily_aggregated_factors['_in_de_types_unique'].apply(get_trade_type) # --- 2. 与 all_data_df 合并 ---
# 创建 all_data_df 的副本进行操作
df_merged = all_data_df.copy()
df_merged['trade_date'] = pd.to_datetime(df_merged['trade_date']) # 确保日期类型一致
# 移除辅助列 # 使用左合并保留 all_data_df 的所有行,并在有增减持公告的日期添加信息
daily_aggregated_factors.drop(columns=['_in_de_types_unique'], inplace=True) df_merged = pd.merge(df_merged, daily_trade_agg, on=['ts_code', 'trade_date'], how='left')
# 5. 将计算得到的因子合并回 all_data_df # 对于没有增减持公告的日期填充NaN值为0表示当天无变动或无公告
# 确保 all_data_df 也按 ts_code, trade_date 排序,以便 merge 高效 df_merged['net_change_ratio_daily'] = df_merged['net_change_ratio_daily'].fillna(0)
all_data_df_sorted = all_data_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True) df_merged['any_increase_daily'] = df_merged['any_increase_daily'].fillna(0)
df_merged['any_decrease_daily'] = df_merged['any_decrease_daily'].fillna(0)
final_df = pd.merge( # --- 3. 计算滚动因子 ---
all_data_df_sorted, # !!! 关键步骤:确保在 groupby().rolling() 之前按分组键和时间键排序 !!!
daily_aggregated_factors, # 这一步至关重要,以保证滚动计算后的 Series 在 reset_index 后能正确对齐
on=['ts_code', 'trade_date'], df_merged = df_merged.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
how='left'
)
# 6. 对于没有增减持记录的日期,因子值为 None 或 0 grouped = df_merged.groupby('ts_code', group_keys=False) # group_keys=False 避免在结果中保留分组键作为索引层级
# 在 merge_asof 中确实需要排序但在这种事件展开的方法中merge 是普通的 left merge不需要预排序。
# 考虑到最终的 merge最好还是保持 `all_data_df` 和 `daily_aggregated_factors` 的键排序。
# 所以在 `merge` 前对 `all_data_df` 进行一次排序是好的实践。
final_df['holder_trade_type_10d'] = final_df['holder_trade_type_10d'].fillna(None) # 因子: 过去N日净变动比例之和
fillna_ratio_cols = ['holder_change_ratio_sum_10d', 'holder_in_change_ratio_sum_10d', 'holder_de_change_ratio_sum_10d'] for N in [10]:
final_df[fillna_ratio_cols] = final_df[fillna_ratio_cols].fillna(0.0) rolling_series = grouped['net_change_ratio_daily'].rolling(window=N, min_periods=1).sum()
df_merged[f'holder_net_change_sum_{N}d'] = rolling_series.reset_index(level=0, drop=True)
# 因子: 过去N日发生增持的天数
for N in [10]:
rolling_series = grouped['any_increase_daily'].rolling(window=N, min_periods=1).sum()
df_merged[f'holder_increase_days_{N}d'] = rolling_series.reset_index(level=0, drop=True)
# 因子: 过去N日发生减持的天数
for N in [10]:
rolling_series = grouped['any_decrease_daily'].rolling(window=N, min_periods=1).sum()
df_merged[f'holder_decrease_days_{N}d'] = rolling_series.reset_index(level=0, drop=True)
# 因子: 过去N日是否发生过增持 (布尔标志)
for N in [10]:
rolling_series_sum = grouped['any_increase_daily'].rolling(window=N, min_periods=1).sum()
df_merged[f'holder_any_increase_flag_{N}d'] = (rolling_series_sum > 0).astype(int).reset_index(level=0, drop=True)
# 因子: 过去N日是否发生过减持 (布尔标志)
for N in [10]:
rolling_series_sum = grouped['any_decrease_daily'].rolling(window=N, min_periods=1).sum()
df_merged[f'holder_any_decrease_flag_{N}d'] = (rolling_series_sum > 0).astype(int).reset_index(level=0, drop=True)
# 因子: 过去N日净“活动”得分 (增持天数 - 减持天数)
for N in [10]:
# 直接使用已经计算好且索引对齐的列
df_merged[f'holder_direction_score_{N}d'] = df_merged[f'holder_increase_days_{N}d'] - df_merged[f'holder_decrease_days_{N}d']
# 清理在合并过程中产生的每日临时列(如果不再需要它们)
df_merged.drop(columns=['net_change_ratio_daily', 'any_increase_daily', 'any_decrease_daily'], inplace=True, errors='ignore')
print("股东增减持因子计算完成。")
return df_merged
return final_df

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff