Files
NewStock/main/factor/index_factor.py
2025-05-26 21:34:36 +08:00

193 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import numpy as np
from scipy.stats import spearmanr # 用于因子3的原始思路但实际简化了
epsilon = 1e-10
def _safe_divide(numerator, denominator, default_val=0.0):
"""安全除法"""
with np.errstate(divide='ignore', invalid='ignore'):
result = numerator / denominator
result[~np.isfinite(result)] = default_val
return result
# --- 修改后的因子计算函数 ---
def calculate_size_style_strength_factor(df: pd.DataFrame, N: int = 5, factor_name_suffix: str = '') -> pd.DataFrame:
"""
计算大小盘风格相对强度因子。
返回: 以 trade_date 为索引,因子值为列的 DataFrame。
"""
factor_name = f'size_style_strength_{N}{factor_name_suffix}'
print(f"Calculating {factor_name}...")
required_indices = ['399300.SZ', '000905.SH', '000852.SH']
if not all(idx in df['ts_code'].unique() for idx in required_indices):
print(f"Error: DataFrame 中缺少部分必需的指数代码 ({required_indices})。返回空因子 Series。")
return pd.DataFrame(index=df['trade_date'].unique(), columns=[factor_name]).rename_axis('trade_date')
# 1. 计算各指数N日收益率
df_copy = df.copy() # 操作副本避免修改原始传入df
df_copy['_ret_N'] = df_copy.groupby('ts_code')['close'].pct_change(periods=N)
# 2. Pivot 以方便截面计算
pivot_ret_N = df_copy.pivot_table(index='trade_date', columns='ts_code', values='_ret_N')
# 确保列存在并获取
large_ret = pivot_ret_N.get('399300.SZ', pd.Series(np.nan, index=pivot_ret_N.index))
mid_ret = pivot_ret_N.get('000905.SH', pd.Series(np.nan, index=pivot_ret_N.index))
small_ret = pivot_ret_N.get('000852.SH', pd.Series(np.nan, index=pivot_ret_N.index))
# 3. 计算因子 (结果是每日一个标量值)
large_small_diff = large_ret - small_ret
avg_large_small_ret = (large_ret + small_ret) / 2
# 计算中盘偏离因子处理NaN如果中盘收益为NaN则偏离因子不起调整作用乘以1
mid_deviation_raw = mid_ret - avg_large_small_ret
mid_deviation_factor = 1 + np.sign(mid_ret.fillna(0)) * np.abs(mid_deviation_raw.fillna(0))
daily_factor_values = large_small_diff * mid_deviation_factor
daily_factor_values.name = factor_name # 给 Series 命名
print(f"Finished {factor_name}.")
return daily_factor_values.to_frame() # 转换为 DataFrame 返回
def calculate_volatility_structure_factor(df: pd.DataFrame, N: int = 10, factor_name_suffix: str = '') -> pd.DataFrame:
"""
计算市场波动结构因子。
返回: 以 trade_date 为索引,因子值为列的 DataFrame。
"""
factor_name = f'vol_structure_idx_{N}{factor_name_suffix}'
print(f"Calculating {factor_name}...")
required_indices = ['399300.SZ', '000905.SH', '000852.SH']
if not all(idx in df['ts_code'].unique() for idx in required_indices):
print(f"Error: DataFrame 中缺少部分必需的指数代码 ({required_indices})。返回空因子 Series。")
return pd.DataFrame(index=df['trade_date'].unique(), columns=[factor_name]).rename_axis('trade_date')
if 'pct_chg' not in df.columns:
print(f"Error: DataFrame 缺少 'pct_chg' 列。将为 {factor_name} 填充 NaN。")
return pd.DataFrame(index=df['trade_date'].unique(), columns=[factor_name]).rename_axis('trade_date')
df_copy = df.copy()
# 1. 计算各指数N日波动率
df_copy['_vol_N'] = df_copy.groupby('ts_code')['pct_chg'].rolling(N, min_periods=max(1, N//2)).std().reset_index(level=0, drop=True)
# 2. Pivot
pivot_vol_N = df_copy.pivot_table(index='trade_date', columns='ts_code', values='_vol_N')
large_vol = pivot_vol_N.get('399300.SZ', pd.Series(np.nan, index=pivot_vol_N.index))
mid_vol = pivot_vol_N.get('000905.SH', pd.Series(np.nan, index=pivot_vol_N.index))
small_vol = pivot_vol_N.get('000852.SH', pd.Series(np.nan, index=pivot_vol_N.index))
# 3. 计算因子
daily_factor_values = _safe_divide((small_vol - mid_vol), large_vol)
daily_factor_values.name = factor_name
print(f"Finished {factor_name}.")
return daily_factor_values.to_frame()
def calculate_market_divergence_factor(df: pd.DataFrame, factor_name_suffix: str = '') -> pd.DataFrame:
"""
计算市场分化度因子 (基于每日三个指数收益率符号的一致性)。
返回: 以 trade_date 为索引,因子值为列的 DataFrame。
"""
factor_name = f'market_divergence_score{factor_name_suffix}'
print(f"Calculating {factor_name}...")
required_indices = ['399300.SZ', '000905.SH', '000852.SH']
if not all(idx in df['ts_code'].unique() for idx in required_indices):
print(f"Error: DataFrame 中缺少部分必需的指数代码 ({required_indices})。返回空因子 Series。")
return pd.DataFrame(index=df['trade_date'].unique(), columns=[factor_name]).rename_axis('trade_date')
if 'pct_chg' not in df.columns:
print(f"Error: DataFrame 缺少 'pct_chg' 列。将为 {factor_name} 填充 NaN。")
return pd.DataFrame(index=df['trade_date'].unique(), columns=[factor_name]).rename_axis('trade_date')
pivot_pct_chg = df.pivot_table(index='trade_date', columns='ts_code', values='pct_chg')
# 确保列存在
idx_large_col = '399300.SZ'
idx_mid_col = '000905.SH'
idx_small_col = '000852.SH'
# 使用 reindex 确保所有期望的列都存在缺失的填充NaN
pivot_pct_chg = pivot_pct_chg.reindex(columns=[idx_large_col, idx_mid_col, idx_small_col])
def daily_divergence_score_calc(row):
# 当天只有这三个指数的收益率 Series
valid_returns = row.dropna() # 获取非 NaN 的收益率
if len(valid_returns) < 2: # 如果有效数据少于2个无法判断分化
return np.nan
signs = np.sign(valid_returns)
unique_sign_count = len(signs.unique())
if unique_sign_count == 1: # 所有符号相同 (或都为0sign后也是0)
return 0.0 # 分化度最低 (高度一致)
elif unique_sign_count == 2 and 0 in signs.unique(): # 一个方向一个0
return 0.25 # 较低分化
elif unique_sign_count == 2: # 两个方向 (例如两正一负,或两负一正)
return 0.75 # 较高分化
elif unique_sign_count == 3: # 三个不同方向 (+, -, 0)
return 1.0 # 分化度最高
return np.nan # 其他未覆盖的情况 (理论上不应发生)
daily_factor_values = pivot_pct_chg[[idx_large_col, idx_mid_col, idx_small_col]].apply(daily_divergence_score_calc, axis=1)
daily_factor_values.name = factor_name
print(f"Finished {factor_name}.")
return daily_factor_values.to_frame()
# --- 整合所有因子计算到一个主函数 ---
def generate_daily_index_relation_factors(df_input: pd.DataFrame) -> pd.DataFrame:
"""
计算所有基于大中小盘指数关系的每日截面因子。
Args:
df_input (pd.DataFrame): 长格式的指数行情数据,包含 'ts_code', 'trade_date', 'close', 'pct_chg'
Returns:
pd.DataFrame: 以 'trade_date' 为索引,各因子为列的 DataFrame。
"""
# 确保输入 df 不被修改
df = df_input.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
# 计算各个因子 (每个函数返回一个单列或多列的 DataFrame索引为 trade_date)
factor1_df = calculate_size_style_strength_factor(df, N=5)
factor2_df = calculate_volatility_structure_factor(df, N=10)
factor3_df = calculate_market_divergence_factor(df)
# 还可以继续添加其他每日截面因子...
# 合并所有因子 DataFrame
# 使用 functools.reduce 和 pd.merge 来优雅地合并多个 DataFrame
from functools import reduce
daily_factors_list = [factor1_df, factor2_df, factor3_df]
# 过滤掉可能因错误产生的完全为空或只有NaN的DataFrame
daily_factors_list = [f_df for f_df in daily_factors_list if not f_df.empty and not f_df.iloc[:,0].isna().all()]
if not daily_factors_list:
print("警告: 未能成功计算任何因子。返回空 DataFrame。")
# 返回一个以日期为索引的空DataFrame或者基于输入df的日期
return pd.DataFrame(index=df['trade_date'].unique()).rename_axis('trade_date')
# 使用 outer join 以保留所有日期,并确保索引是 trade_date
final_factors_df = reduce(lambda left, right: pd.merge(left, right, on='trade_date', how='outer'),
daily_factors_list)
final_factors_df = final_factors_df.sort_index() # 按日期排序
return final_factors_df
# --- 使用示例 ---
# 假设 all_indices_df 是你包含 '399300.SZ', '000905.SH', '000852.SH' 三个指数的长格式行情数据
# 确保它有 'ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'vol', 'pct_chg' 列
# all_indices_df['trade_date'] = pd.to_datetime(all_indices_df['trade_date'])
# all_indices_df = all_indices_df.sort_values(['ts_code', 'trade_date'])
# daily_market_factors = generate_daily_index_relation_factors(all_indices_df)
# print("\n每日市场风格/情绪因子:")
# print(daily_market_factors.tail())
# 后续,你可以将 daily_market_factors 与你的个股数据 pdf 按 'trade_date' 合并
# pdf_with_market_factors = pd.merge(pdf, daily_market_factors, on='trade_date', how='left')