Classify2

This commit is contained in:
liaozhaorun
2025-05-13 15:30:06 +08:00
parent 791c84aba6
commit a4b05bb62f
20 changed files with 10737 additions and 7456 deletions

View File

@@ -2547,3 +2547,305 @@ def limit_factor(df: pd.DataFrame) -> pd.DataFrame:
lambda x: calculate_consecutive_limits(x)[0]
)
return df
import pandas as pd
import numpy as np
# 假设 df 已经加载并包含 'ts_code', 'trade_date', 'pct_chg' 列
# 并且已经按照 'ts_code' 和 'trade_date' 进行了排序
def daily_momentum_benchmark(df):
"""
计算日级别动量基准 (Positive and Negative),使用现有的 'pct_chg' 列。
这个函数将原分钟级动量基准的概念应用于日线数据。
计算每日全市场上涨股票 ('pct_chg' > 0) 的平均涨跌幅
和下跌股票 ('pct_chg' < 0) 的平均涨跌幅。
参数:
df (pd.DataFrame): 包含日级别股票数据的DataFrame。
必须包含 'ts_code', 'trade_date', 'pct_chg' 列,
并已按 'ts_code''trade_date' 排序。
返回:
pd.DataFrame: 增加了 'daily_positive_benchmark', 'daily_negative_benchmark' 列的DataFrame。
原始的 'pct_chg' 列会被直接使用。
"""
print("--- 计算日级别动量基准 (使用 pct_chg) ---")
# 确保 pct_chg 列存在
if 'pct_chg' not in df.columns:
print("错误: DataFrame中没有'pct_chg'列,无法计算日级别动量基准。")
return df
# 计算每日的全市场动量基准
# 对于每一个交易日,计算所有股票中 pct_chg > 0 和 < 0 的平均值
# 使用 trade_date 进行分组
daily_benchmarks = df.groupby('trade_date')['pct_chg'].agg(
daily_positive_benchmark = lambda x: x[x > 0].mean(), # 日级别上涨股票的平均涨跌幅
daily_negative_benchmark = lambda x: x[x < 0].mean() # 日级别下跌股票的平均涨跌幅
).reset_index()
# 将日级别动量基准合并回原始日线数据DataFrame
df = pd.merge(
df,
daily_benchmarks,
on='trade_date',
how='left'
)
# 对可能出现的NaN基准进行填充这里用0填充表示没有对应的同向基准
df['daily_positive_benchmark'].fillna(0, inplace=True)
df['daily_negative_benchmark'].fillna(0, inplace=True)
print("日级别动量基准计算完成 (使用 pct_chg)。")
return df
def daily_deviation(df):
"""
计算日级别偏离度,使用现有的 'pct_chg' 列和计算出的日级别动量基准。
计算每只股票的日涨跌幅 ('pct_chg') 相对于日级别动量基准的偏离。
参数:
df (pd.DataFrame): 包含日级别股票数据的DataFrame。
必须包含 'ts_code', 'trade_date', 'pct_chg',
'daily_positive_benchmark', 'daily_negative_benchmark' 列。
这些基准列通常通过运行 daily_momentum_benchmark(df) 获得。
返回:
pd.DataFrame: 增加了 'daily_deviation' 列的DataFrame。
"""
print("--- 计算日级别偏离度 (使用 pct_chg) ---")
# 确保所需的列存在
df = daily_momentum_benchmark(df)
required_cols = ['pct_chg', 'daily_positive_benchmark', 'daily_negative_benchmark']
if not all(col in df.columns for col in required_cols):
print(f"错误: 计算日级别偏离度需要以下列: {required_cols}。请先运行 daily_momentum_benchmark(df)。")
return df
conditions = [
(df['pct_chg'] > 0) & (df['daily_positive_benchmark'] > 0),
(df['pct_chg'] < 0) & (df['daily_negative_benchmark'] < 0)
]
choices = [
df['pct_chg'] - df['daily_positive_benchmark'],
df['pct_chg'] - df['daily_negative_benchmark']
]
df['daily_deviation'] = np.select(conditions, choices, default=0)
df = df.drop(columns=['daily_positive_benchmark', 'daily_negative_benchmark'])
print("日级别偏离度计算完成 (使用 pct_chg)。")
return df
def daily_industry_momentum_benchmark(df):
"""
计算日级别行业动量基准 (Positive and Negative),使用现有的 'pct_chg' 列和 'cat_l2_code' 列。
计算每日每个行业内部上涨股票 ('pct_chg' > 0) 的平均涨跌幅
和下跌股票 ('pct_chg' < 0) 的平均涨跌幅。
参数:
df (pd.DataFrame): 包含日级别股票数据的DataFrame。
必须包含 'ts_code', 'trade_date', 'pct_chg', 'cat_l2_code' 列,
并已按 'ts_code''trade_date' 排序。
返回:
pd.DataFrame: 增加了 'daily_industry_positive_benchmark', 'daily_industry_negative_benchmark' 列的DataFrame。
原始的 'pct_chg''cat_l2_code' 列会被直接使用。
"""
print("--- 计算日级别行业动量基准 (使用 pct_chg 和 cat_l2_code) ---")
# 确保必需列存在
required_cols = ['pct_chg', 'cat_l2_code', 'trade_date', 'ts_code']
if not all(col in df.columns for col in required_cols):
print(f"错误: 计算日级别行业动量基准需要以下列: {required_cols}")
return df
# 计算每日每个行业内部的动量基准
# 使用 trade_date 和 cat_l2_code 进行分组
industry_daily_benchmarks = df.groupby(['trade_date', 'cat_l2_code'])['pct_chg'].agg(
daily_industry_positive_benchmark = lambda x: x[x > 0].mean(), # 日级别行业内上涨股票的平均涨跌幅
daily_industry_negative_benchmark = lambda x: x[x < 0].mean() # 日级别行业内下跌股票的平均涨跌幅
).reset_index()
# 将日级别行业动量基准合并回原始日线数据DataFrame
# 使用 trade_date 和 cat_l2_code 进行 merge
df = pd.merge(
df,
industry_daily_benchmarks,
on=['trade_date', 'cat_l2_code'],
how='left'
)
# 对可能出现的NaN基准进行填充例如某个行业某一天没有上涨或下跌的股票
# 这里用0填充表示该行业该天没有对应的同向基准
df['daily_industry_positive_benchmark'].fillna(0, inplace=True)
df['daily_industry_negative_benchmark'].fillna(0, inplace=True)
print("日级别行业动量基准计算完成 (使用 pct_chg 和 cat_l2_code)。")
return df
def daily_industry_deviation(df):
"""
计算日级别行业偏离度,使用现有的 'pct_chg' 列和计算出的日级别行业动量基准。
计算每只股票的日涨跌幅 ('pct_chg') 相对于其所属行业日级别动量基准的偏离。
参数:
df (pd.DataFrame): 包含日级别股票数据的DataFrame。
必须包含 'ts_code', 'trade_date', 'pct_chg', 'cat_l2_code',
'daily_industry_positive_benchmark', 'daily_industry_negative_benchmark' 列。
这些基准列通常通过运行 daily_industry_momentum_benchmark(df) 获得。
返回:
pd.DataFrame: 增加了 'daily_industry_deviation' 列的DataFrame。
"""
print("--- 计算日级别行业偏离度 (使用 pct_chg 和行业基准) ---")
# 确保所需的列存在
df = daily_industry_momentum_benchmark(df)
required_cols = ['pct_chg', 'daily_industry_positive_benchmark', 'daily_industry_negative_benchmark']
if not all(col in df.columns for col in required_cols):
print(f"错误: 计算日级别行业偏离度需要以下列: {required_cols}。请先运行 daily_industry_momentum_benchmark(df)。")
return df
# 根据规则计算日级别行业偏离度:
# 如果 pct_chg > 0 且 daily_industry_positive_benchmark > 0deviation = pct_chg - daily_industry_positive_benchmark
# 如果 pct_chg < 0 且 daily_industry_negative_benchmark < 0deviation = pct_chg - daily_industry_negative_benchmark
# 否则 deviation = 0
conditions = [
(df['pct_chg'] > 0) & (df['daily_industry_positive_benchmark'] > 0),
(df['pct_chg'] < 0) & (df['daily_industry_negative_benchmark'] < 0)
]
choices = [
df['pct_chg'] - df['daily_industry_positive_benchmark'],
df['pct_chg'] - df['daily_industry_negative_benchmark']
]
df['daily_industry_deviation'] = np.select(conditions, choices, default=0)
df = df.drop(columns=['daily_industry_positive_benchmark', 'daily_industry_negative_benchmark'])
print("日级别行业偏离度计算完成 (使用 pct_chg 和行业基准)。")
return df
def sentiment_panic_greed_index(df: pd.DataFrame, window_atr: int = 14, window_smooth: int = 5, factor_name: str = 'senti_panic_greed'):
"""
计算市场恐慌/贪婪指数 (原地修改)。
结合日内振幅、影线、跳空及与近期ATR的比较。
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ['_prev_close', '_atr', '_true_range', '_upper_shadow', '_lower_shadow', '_body', '_gap', '_volatility_surprise']
if not all(col in df.columns for col in ['open', 'high', 'low', 'close', 'vol']):
print(f"Error: DataFrame 缺少必需的 OHLCV 列。将为 {factor_name} 填充 NaN。")
df[factor_name] = np.nan
return
try:
df['_prev_close'] = df['close'].shift(1)
# 计算真实波幅 (TR) 和 ATR
df['_true_range'] = talib.TRANGE(df['high'], df['low'], df['_prev_close'])
df['_atr'] = talib.ATR(df['high'], df['low'], df['_prev_close'], timeperiod=window_atr)
# 计算影线和实体
df['_upper_shadow'] = df['high'] - np.maximum(df['open'], df['close'])
df['_lower_shadow'] = np.minimum(df['open'], df['close']) - df['low']
df['_body'] = np.abs(df['close'] - df['open'])
# 计算跳空
df['_gap'] = (df['open'] / df['_prev_close'] - 1).fillna(0)
# 波动性意外: 当日真实波幅相对于近期ATR的倍数乘以涨跌方向
# 如果真实波幅显著放大,根据涨跌幅赋予正负号,表明情绪的强度和方向
df['_volatility_surprise'] = (df['_true_range'] / (df['_atr'] + epsilon) -1) * np.sign(df['pct_chg'].fillna(0))
# 简化版情绪指标:(下影线 - 上影线) / ATR + 跳空幅度 + 当日涨跌幅, 然后平滑
# 更强的信号:波动性意外,结合跳空
# 考虑当日振幅相对于ATR的超额部分并结合实体方向
# ( (真实波幅/ATR) * 涨跌方向 ) + 跳空幅度
raw_senti = (df['_true_range'] / (df['_atr'] + epsilon)) * np.sign(df['pct_chg'].fillna(0)) + df['_gap'] * 2 # 放大跳空影响
df[factor_name] = raw_senti.rolling(window_smooth, min_periods=1).mean()
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
return df
def sentiment_market_breadth_proxy(df: pd.DataFrame, window_vol: int = 20, window_smooth: int = 3, factor_name: str = 'senti_breadth_proxy'):
"""
计算市场宽度情绪代理指标 (基于指数的价量配合度) (原地修改).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ['_rolling_avg_vol']
if not all(col in df.columns for col in ['pct_chg', 'vol']):
print(f"Error: DataFrame 缺少 'pct_chg''vol' 列。将为 {factor_name} 填充 NaN。")
df[factor_name] = np.nan
return
try:
df['_rolling_avg_vol'] = df['vol'].rolling(window_vol, min_periods=max(1, window_vol//2)).mean()
# 价量配合度:涨跌幅乘以相对成交量强度
raw_breadth = df['pct_chg'] * (df['vol'] / (df['_rolling_avg_vol'] + epsilon))
df[factor_name] = raw_breadth.rolling(window_smooth, min_periods=1).mean() # 平滑处理
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
return df
def sentiment_reversal_indicator(df: pd.DataFrame, window_ret: int = 5, window_vol: int = 5, factor_name: str = 'senti_reversal'):
"""
计算短期情绪反转因子 (原地修改).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ['_return_M', '_volatility_M']
if 'pct_chg' not in df.columns:
print(f"Error: DataFrame 缺少 'pct_chg' 列。将为 {factor_name} 填充 NaN。")
df[factor_name] = np.nan
return
try:
# 计算 M 日累计收益率 (这里用连乘近似,或者 sum of log returns)
# (close / close.shift(M)) -1
df['_return_M'] = (df['close'] / df['close'].shift(window_ret)) - 1
# df['_return_M'] = df['pct_chg'].rolling(window_ret, min_periods=1).sum() # 另一种近似
# 计算 M 日已实现波动率
df['_volatility_M'] = df['pct_chg'].rolling(window_vol, min_periods=max(1, window_vol//2)).std()
# 因子计算
df[factor_name] = -df['_return_M'] * df['_volatility_M']
# 对因子本身可以再做一次平滑
# df[factor_name] = df[factor_name].rolling(3, min_periods=1).mean()
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
return df