import pandas as pd import numpy as np def holder_trade_factors(all_data_df: pd.DataFrame, stk_holdertrade_df: pd.DataFrame) -> pd.DataFrame: """ 生成合并的股东增减持因子以及 change_ratio 相关因子(优化版)。 Args: stk_holdertrade_df (pd.DataFrame): 股东增减持数据,包含 'ts_code', 'ann_date', 'in_de', 'change_ratio'。 all_data_df (pd.DataFrame): 所有日期所有股票数据,包含 'ts_code', 'trade_date'。 Returns: pd.DataFrame: 包含增减持因子的 all_data_df。 """ print('正在计算股东增减持因子(优化版)...') # 1. 确保日期列为 datetime 类型 stk_holdertrade_df['ann_date'] = pd.to_datetime(stk_holdertrade_df['ann_date']) all_data_df['trade_date'] = pd.to_datetime(all_data_df['trade_date']) # 2. 对增减持数据进行预处理和排序(排序在此阶段可能不是严格必需的,但保持良好习惯) holder_data_processed = stk_holdertrade_df.copy() holder_data_processed['change_ratio_in_agg'] = holder_data_processed['change_ratio'].where(holder_data_processed['in_de'] == 'IN', 0) holder_data_processed['change_ratio_de_agg'] = holder_data_processed['change_ratio'].where(holder_data_processed['in_de'] == 'DE', 0) holder_data_processed['change_ratio_total_agg'] = holder_data_processed['change_ratio'] holder_data_processed['in_de_numeric'] = holder_data_processed['in_de'].map({'IN': 1, 'DE': -1}).fillna(0) # 用于判断类型 # 提前获取所有唯一的交易日期集合,以提高查找效率 all_trade_dates_set = set(all_data_df['trade_date'].unique()) # 3. 构建一个辅助DataFrame,记录每个公告在未来10个日历日(且是交易日)的影响 expanded_holder_events = [] for _, row in holder_data_processed.iterrows(): ts_code = row['ts_code'] ann_date = row['ann_date'] # 生成从公告日期开始的未来10个日历日的日期范围(包括公告日本身) # pd.Timedelta(days=10) 表示从公告日+10天 # pd.date_range(start=ann_date, end=ann_date + pd.Timedelta(days=10), freq='D') # 更精确地生成11个日期,涵盖公告日及其后的10个日历日 future_dates = pd.date_range(start=ann_date, periods=11, freq='D') for date_in_window in future_dates: # 只有当日期是实际交易日时才添加 if date_in_window in all_trade_dates_set: expanded_holder_events.append({ 'ts_code': ts_code, 'trade_date': date_in_window, 'in_de_numeric': row['in_de_numeric'], 'change_ratio_total_agg': row['change_ratio_total_agg'], 'change_ratio_in_agg': row['change_ratio_in_agg'], 'change_ratio_de_agg': row['change_ratio_de_agg'] }) if not expanded_holder_events: # 如果没有事件,直接返回原始 df # 确保返回的DataFrame与原始df具有相同的列和顺序 # 并填充为默认值 default_factors = pd.DataFrame({ 'holder_trade_type_10d': None, 'holder_change_ratio_sum_10d': 0.0, 'holder_in_change_ratio_sum_10d': 0.0, 'holder_de_change_ratio_sum_10d': 0.0 }, index=all_data_df.index) return pd.concat([all_data_df, default_factors], axis=1) expanded_holder_events_df = pd.DataFrame(expanded_holder_events) # 4. 聚合每个 (ts_code, trade_date) 对上的事件 # 可能会有重复的 (ts_code, trade_date) 对,因为一个交易日可能受多个公告影响 daily_aggregated_factors = expanded_holder_events_df.groupby(['ts_code', 'trade_date']).agg( holder_change_ratio_sum_10d=('change_ratio_total_agg', 'sum'), holder_in_change_ratio_sum_10d=('change_ratio_in_agg', 'sum'), holder_de_change_ratio_sum_10d=('change_ratio_de_agg', 'sum'), # 对于 holder_trade_type_10d,聚合 in_de_numeric 的唯一值集合 _in_de_types_unique=('in_de_numeric', lambda x: set(x)) # 获取该日期窗口内所有独特的增减持类型 ).reset_index() # 根据 _in_de_types_unique 确定 holder_trade_type_10d def get_trade_type(unique_types_set): if 1 in unique_types_set and -1 in unique_types_set: return 'BOTH' elif 1 in unique_types_set: return 'IN' elif -1 in unique_types_set: return 'DE' else: return None # 理论上不应该发生,除非 unique_types_set 为空或只包含0 daily_aggregated_factors['holder_trade_type_10d'] = daily_aggregated_factors['_in_de_types_unique'].apply(get_trade_type) # 移除辅助列 daily_aggregated_factors.drop(columns=['_in_de_types_unique'], inplace=True) # 5. 将计算得到的因子合并回 all_data_df # 确保 all_data_df 也按 ts_code, trade_date 排序,以便 merge 高效 all_data_df_sorted = all_data_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True) final_df = pd.merge( all_data_df_sorted, daily_aggregated_factors, on=['ts_code', 'trade_date'], how='left' ) # 6. 对于没有增减持记录的日期,因子值为 None 或 0 # 在 merge_asof 中确实需要排序,但在这种事件展开的方法中,merge 是普通的 left merge,不需要预排序。 # 考虑到最终的 merge,最好还是保持 `all_data_df` 和 `daily_aggregated_factors` 的键排序。 # 所以在 `merge` 前对 `all_data_df` 进行一次排序是好的实践。 final_df['holder_trade_type_10d'] = final_df['holder_trade_type_10d'].fillna(None) fillna_ratio_cols = ['holder_change_ratio_sum_10d', 'holder_in_change_ratio_sum_10d', 'holder_de_change_ratio_sum_10d'] final_df[fillna_ratio_cols] = final_df[fillna_ratio_cols].fillna(0.0) return final_df