This commit is contained in:
liaozhaorun
2025-05-29 20:41:18 +08:00
parent cecbef02f6
commit eb52c3673c
11 changed files with 5236 additions and 4520 deletions

View File

@@ -3150,6 +3150,7 @@ def ts_volatility_slope_20_5(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'ts_volatility_slope_20_5' 列的 DataFrame。
"""
print(f"计算因子 ts_volatility_slope_20_5")
df["trade_date"] = pd.to_datetime(df["trade_date"])
df.sort_values(["ts_code", "trade_date"], inplace=True)
@@ -3186,6 +3187,8 @@ def ts_turnover_rate_acceleration_5_20(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'ts_turnover_rate_acceleration_5_20' 列的 DataFrame。
"""
print(f"计算因子 ts_turnover_rate_acceleration_5_20")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
df['short_avg_turnover'] = df.groupby('ts_code')['turnover_rate'].rolling(window=5, min_periods=1).mean().reset_index(level=0, drop=True)
@@ -3204,6 +3207,8 @@ def ts_vol_sustain_10_30(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'ts_vol_sustain_10_30' 列的 DataFrame。
"""
print(f"计算因子 ts_vol_sustain_10_30")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
df['long_avg_vol'] = df.groupby('ts_code')['vol'].rolling(window=30, min_periods=1).mean().reset_index(level=0, drop=True)
@@ -3227,6 +3232,8 @@ def cs_turnover_rate_relative_strength_20(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'cs_turnover_rate_relative_strength_20' 列的 DataFrame。
"""
print(f"计算因子 cs_turnover_rate_relative_strength_20")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
@@ -3254,6 +3261,8 @@ def cs_amount_outlier_10(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'cs_amount_outlier_10' 列的 DataFrame。
"""
print(f"计算因子 cs_amount_outlier_10")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
@@ -3281,12 +3290,14 @@ def ts_ff_to_total_turnover_ratio(df: pd.DataFrame) -> pd.DataFrame:
计算自由流通股换手率与总换手率之比。
Args:
df (pd.DataFrame): 包含 'ts_code', 'trade_date', 'turnover_rate''turnover_rate_ff' 列的 DataFrame。
df (pd.DataFrame): 包含 'ts_code', 'trade_date', 'turnover_rate''turnover_rate' 列的 DataFrame。
Returns:
pd.DataFrame: 包含新增 'ts_ff_to_total_turnover_ratio' 列的 DataFrame。
"""
df['ts_ff_to_total_turnover_ratio'] = df['turnover_rate_ff'] / (df['turnover_rate'] + 1e-8) # 避免除零
print(f"计算因子 ts_ff_to_total_turnover_ratio")
df['ts_ff_to_total_turnover_ratio'] = df['turnover_rate'] / (df['turnover_rate'] + 1e-8) # 避免除零
return df
@@ -3300,6 +3311,8 @@ def ts_price_volume_trend_coherence_5_20(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'ts_price_volume_trend_coherence_5_20' 列的 DataFrame。
"""
print(f"计算因子 ts_price_volume_trend_coherence_5_20")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
@@ -3323,6 +3336,8 @@ def ts_turnover_rate_trend_strength_5(df: pd.DataFrame) -> pd.DataFrame:
Returns:
pd.DataFrame: 包含新增 'ts_turnover_rate_trend_strength_5' 列的 DataFrame。
"""
print(f"计算因子 ts_turnover_rate_trend_strength_5")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
@@ -3342,14 +3357,219 @@ def ts_ff_turnover_rate_surge_10(df: pd.DataFrame) -> pd.DataFrame:
计算当日自由流通股换手率与过去 10 日均值比值。
Args:
df (pd.DataFrame): 包含 'ts_code', 'trade_date''turnover_rate_ff' 列的 DataFrame。
df (pd.DataFrame): 包含 'ts_code', 'trade_date''turnover_rate' 列的 DataFrame。
Returns:
pd.DataFrame: 包含新增 'ts_ff_turnover_rate_surge_10' 列的 DataFrame。
"""
print(f"计算因子 ts_ff_turnover_rate_surge_10")
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.sort_values(['ts_code', 'trade_date'], inplace=True)
df['avg_ff_turnover_10'] = df.groupby('ts_code')['turnover_rate_ff'].rolling(window=10, min_periods=1).mean().reset_index(level=0, drop=True)
df['ts_ff_turnover_rate_surge_10'] = df['turnover_rate_ff'] / (df['avg_ff_turnover_10'] + 1e-8) # 避免除零
df['avg_ff_turnover_10'] = df.groupby('ts_code')['turnover_rate'].rolling(window=10, min_periods=1).mean().reset_index(level=0, drop=True)
df['ts_ff_turnover_rate_surge_10'] = df['turnover_rate'] / (df['avg_ff_turnover_10'] + 1e-8) # 避免除零
df.drop(columns=['avg_ff_turnover_10'], inplace=True)
return df
# --- Factor 1: 近期积极动量与成交量激增 (简化版催化剂代理) ---
def cat_senti_mom_vol_spike(df_input: pd.DataFrame,
return_period: int = 3,
return_threshold: float = 0.05,
volume_ratio_threshold: float = 1.5,
current_pct_chg_min: float = -0.01,
current_pct_chg_max: float = 0.03,
factor_name: str = 'cat_senti_mom_vol_spike') -> pd.DataFrame:
"""
计算近期积极动量与成交量激增因子。
理念: 近期有显著正收益 + 近期成交量显著放大 + 当日小幅上涨或横盘。
"""
df = df_input
print(f"Calculating {factor_name}...")
_temp_cols = []
try:
# 1. 计算N日收益率 (如果不存在)
return_col = f'_return_{return_period}d'
if return_col not in df.columns:
df[return_col] = df.groupby('ts_code')['close'].pct_change(periods=return_period)
_temp_cols.append(return_col)
# 2. 检查 volume_ratio 是否存在 (通常由基础数据提供或 factor.txt 计算)
# 如果没有,我们可以尝试计算一个简单的 N 日均量比当日量
if 'volume_ratio' not in df.columns:
print(f"Warning: 'volume_ratio' column not found. Calculating a proxy for {factor_name}.")
df['_avg_vol_5d'] = df.groupby('ts_code')['vol'].rolling(window=5, min_periods=1).mean().reset_index(level=0, drop=True)
df['_volume_ratio_proxy'] = df['vol'] / (df['_avg_vol_5d'] + epsilon)
volume_metric_col = '_volume_ratio_proxy'
_temp_cols.extend(['_avg_vol_5d', '_volume_ratio_proxy'])
else:
volume_metric_col = 'volume_ratio'
# 条件判断
cond_momentum = df[return_col] > return_threshold
cond_volume = df[volume_metric_col] > volume_ratio_threshold
cond_current_price = (df['pct_chg'] > current_pct_chg_min) & (df['pct_chg'] < current_pct_chg_max)
df[factor_name] = (cond_momentum.astype(int).astype(str) + cond_volume.astype(int).astype(str) + cond_current_price.astype(int).astype(str))
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
except Exception as e:
print(f"An unexpected error occurred in {factor_name}: {e}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
finally:
# 清理中间列
df.drop(columns=[col for col in _temp_cols if col in df.columns], inplace=True, errors='ignore')
print(f"Finished {factor_name}.")
return df
# --- Factor 2: 强主力资金流入信号(未实现) ---
def calculate_strong_inflow_signal(df_input: pd.DataFrame,
intensity_avg_N: int = 3,
intensity_threshold: float = 0.01, # 假设 flow_lg_elg_intensity 的合理阈值
consecutive_buy_N: int = 2,
accel_positive_M: int = 1,
factor_name: str = 'senti_strong_inflow') -> pd.DataFrame:
"""
计算强主力资金流入信号因子。
理念: 大单资金持续、显著净流入,且有加速迹象。
依赖: df 中已包含 'flow_lg_elg_intensity''flow_lg_elg_accel' (来自 factor.txt)
"""
df = df_input
print(f"Calculating {factor_name}...")
_temp_cols = []
required_flow_cols = ['flow_lg_elg_intensity', 'flow_lg_elg_accel']
if not all(col in df.columns for col in required_flow_cols):
missing = [col for col in required_flow_cols if col not in df.columns]
print(f"Error: DataFrame 缺少必需的资金流因子列: {missing} for {factor_name}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
return df
try:
# 1. 近N日主力资金强度均值
avg_intensity_col = f'_avg_flow_intensity_{intensity_avg_N}d'
df[avg_intensity_col] = df.groupby('ts_code')['flow_lg_elg_intensity'].rolling(window=intensity_avg_N, min_periods=1).mean().reset_index(level=0, drop=True)
_temp_cols.append(avg_intensity_col)
cond_avg_intensity = df[avg_intensity_col] > intensity_threshold
# 2. 近N日连续主力净买入天数 (近似flow_lg_elg_intensity > 0)
# 或者使用 lg_elg_net_buy_vol > 0 (如果该列存在)
df['_lg_elg_is_net_buy'] = (df['flow_lg_elg_intensity'] > 0).astype(int) # 或者用绝对量判断
_temp_cols.append('_lg_elg_is_net_buy')
# 计算连续天数
def count_consecutive_positive(series):
return series.rolling(window=consecutive_buy_N, min_periods=consecutive_buy_N).apply(lambda x: x.sum() == consecutive_buy_N, raw=True)
df['_consecutive_buy_days_flag'] = df.groupby('ts_code')['_lg_elg_is_net_buy'].apply(count_consecutive_positive).reset_index(level=0, drop=True).fillna(0)
_temp_cols.append('_consecutive_buy_days_flag')
cond_consecutive_buy = df['_consecutive_buy_days_flag'] == 1
# 3. 近M日主力资金流加速度为正
df['_accel_is_positive'] = (df['flow_lg_elg_accel'] > 0).astype(int)
_temp_cols.append('_accel_is_positive')
def check_all_positive_recent_M(series):
return series.rolling(window=accel_positive_M, min_periods=accel_positive_M).apply(lambda x: x.sum() == accel_positive_M, raw=True)
df['_accel_positive_M_flag'] = df.groupby('ts_code')['_accel_is_positive'].apply(check_all_positive_recent_M).reset_index(level=0, drop=True).fillna(0)
_temp_cols.append('_accel_positive_M_flag')
cond_accel_positive = df['_accel_positive_M_flag'] == 1
df[factor_name] = (cond_avg_intensity & cond_consecutive_buy & cond_accel_positive).astype(int)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
except Exception as e:
print(f"An unexpected error occurred in {factor_name}: {e}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
finally:
df.drop(columns=[col for col in _temp_cols if col in df.columns], inplace=True, errors='ignore')
print(f"Finished {factor_name}.")
return df
# --- Factor 3: 突破前盘整模式 ---
def cat_senti_pre_breakout(df_input: pd.DataFrame,
atr_short_N: int = 10,
atr_long_M: int = 40,
vol_atrophy_N: int = 10, # 用于计算短期均量
vol_atrophy_M: int = 40, # 用于计算长期均量
price_stab_N: int = 5,
price_stab_threshold: float = 0.05,
current_pct_chg_min_signal: float = 0.005, # 当日上涨至少0.5%
current_pct_chg_max_signal: float = 0.07, # 当日上涨不超过7% (避免追已大涨的)
volume_ratio_signal_threshold: float = 1.2,
factor_name: str = 'cat_senti_pre_breakout') -> pd.DataFrame:
"""
计算突破前盘整模式因子。
理念: 波动率收缩、成交量萎缩、近期价格稳定,当日出现温和放量上涨。
"""
df = df_input
print(f"Calculating {factor_name}...")
_temp_cols = []
try:
# 1. 波动率收缩 (使用 ATR)
atr_short_col = f'atr_{atr_short_N}'
atr_long_col = f'atr_{atr_long_M}'
for N, col_name in [(atr_short_N, atr_short_col), (atr_long_M, atr_long_col)]:
if col_name not in df.columns:
print(f"Calculating {col_name} as it's missing...")
# TA-Lib需要numpy array作为输入并且不能有NaN在中间 (首行NaN可以)
# 分组计算ATR比较麻烦这里假设如果df不是很大可以先整列计算再groupby获取
# 一个更稳健的方法是groupby().apply(lambda x: talib.ATR(x['high'], x['low'], x['close'], N))
# 但为了避免 apply 的性能问题,这里用一种近似,如果数据量大,最好预计算
temp_atr = df.groupby('ts_code', group_keys=False).apply(
lambda x: pd.Series(talib.ATR(x['high'].values, x['low'].values, x['close'].values, timeperiod=N), index=x.index)
)
df[col_name] = temp_atr
_temp_cols.append(col_name)
cond_vol_contraction = df[atr_short_col] < (0.7 * df[atr_long_col]) # 短期ATR显著小于长期ATR
# 2. 成交量萎缩
avg_vol_short_col = f'_avg_vol_{vol_atrophy_N}'
avg_vol_long_col = f'_avg_vol_{vol_atrophy_M}'
df[avg_vol_short_col] = df.groupby('ts_code')['vol'].rolling(window=vol_atrophy_N, min_periods=1).mean().reset_index(level=0,drop=True)
df[avg_vol_long_col] = df.groupby('ts_code')['vol'].rolling(window=vol_atrophy_M, min_periods=1).mean().reset_index(level=0,drop=True)
_temp_cols.extend([avg_vol_short_col, avg_vol_long_col])
cond_vol_atrophy = df[avg_vol_short_col] < (0.7 * df[avg_vol_long_col]) # 短期均量显著小于长期均量
# 3. 近期价格稳定
rolling_max_h_col = f'_rolling_max_h_{price_stab_N}'
rolling_min_l_col = f'_rolling_min_l_{price_stab_N}'
df[rolling_max_h_col] = df.groupby('ts_code')['high'].rolling(window=price_stab_N, min_periods=1).max().reset_index(level=0,drop=True)
df[rolling_min_l_col] = df.groupby('ts_code')['low'].rolling(window=price_stab_N, min_periods=1).min().reset_index(level=0,drop=True)
_temp_cols.extend([rolling_max_h_col, rolling_min_l_col])
cond_price_stability = ( (df[rolling_max_h_col] - df[rolling_min_l_col]) / (df['close'] + epsilon) ) < price_stab_threshold
# 4. 当日温和放量上涨信号
if 'volume_ratio' not in df.columns:
print(f"Warning: 'volume_ratio' column not found for {factor_name}. Using a proxy.")
# 如果没有量比,就用当日成交量 > 1.2 * 近5日均量作为代理
if avg_vol_short_col not in df.columns: # 确保这个短期均量列已计算
df[avg_vol_short_col] = df.groupby('ts_code')['vol'].rolling(window=vol_atrophy_N, min_periods=1).mean().reset_index(level=0,drop=True)
cond_vol_signal = df['vol'] > (1.2 * df[avg_vol_short_col])
else:
cond_vol_signal = df['volume_ratio'] > volume_ratio_signal_threshold
cond_price_signal = (df['pct_chg'] > current_pct_chg_min_signal) & (df['pct_chg'] < current_pct_chg_max_signal)
cond_current_day_signal = cond_price_signal & cond_vol_signal
df[factor_name] = (cond_vol_contraction.astype(int).astype(str) + cond_vol_atrophy.astype(int).astype(str) + cond_price_stability.astype(int).astype(str) + cond_current_day_signal.astype(int).astype(str))
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
except Exception as e:
print(f"An unexpected error occurred in {factor_name}: {e}. Factor column will be all zeros or NaN.")
df[factor_name] = 0
finally:
df.drop(columns=[col for col in _temp_cols if col in df.columns], inplace=True, errors='ignore')
print(f"Finished {factor_name}.")
return df