#!/usr/bin/env python # coding: utf-8 # In[1]: # %load_ext autoreload # %autoreload 2 import pandas as pd import warnings warnings.filterwarnings("ignore") pd.set_option('display.max_columns', None) # In[2]: from utils.utils import read_and_merge_h5_data print('daily data') df = read_and_merge_h5_data('../../data/daily_data.h5', key='daily_data', columns=['ts_code', 'trade_date', 'open', 'close', 'high', 'low', 'vol', 'pct_chg'], df=None) print('daily basic') df = read_and_merge_h5_data('../../data/daily_basic.h5', key='daily_basic', columns=['ts_code', 'trade_date', 'turnover_rate', 'pe_ttm', 'circ_mv', 'volume_ratio', 'is_st'], df=df, join='inner') print('stk limit') df = read_and_merge_h5_data('../../data/stk_limit.h5', key='stk_limit', columns=['ts_code', 'trade_date', 'pre_close', 'up_limit', 'down_limit'], df=df) print('money flow') df = read_and_merge_h5_data('../../data/money_flow.h5', key='money_flow', columns=['ts_code', 'trade_date', 'buy_sm_vol', 'sell_sm_vol', 'buy_lg_vol', 'sell_lg_vol', 'buy_elg_vol', 'sell_elg_vol', 'net_mf_vol'], df=df) print('cyq perf') df = read_and_merge_h5_data('../../data/cyq_perf.h5', key='cyq_perf', columns=['ts_code', 'trade_date', 'his_low', 'his_high', 'cost_5pct', 'cost_15pct', 'cost_50pct', 'cost_85pct', 'cost_95pct', 'weight_avg', 'winner_rate'], df=df) print(df.info()) # In[3]: print('industry') industry_df = read_and_merge_h5_data('../../data/industry_data.h5', key='industry_data', columns=['ts_code', 'l2_code', 'in_date'], df=None, on=['ts_code'], join='left') def merge_with_industry_data(df, industry_df): # 确保日期字段是 datetime 类型 df['trade_date'] = pd.to_datetime(df['trade_date']) industry_df['in_date'] = pd.to_datetime(industry_df['in_date']) # 对 industry_df 按 ts_code 和 in_date 排序 industry_df_sorted = industry_df.sort_values(['in_date', 'ts_code']) # 对原始 df 按 ts_code 和 trade_date 排序 df_sorted = df.sort_values(['trade_date', 'ts_code']) # 使用 merge_asof 进行向后合并 merged = pd.merge_asof( df_sorted, industry_df_sorted, by='ts_code', # 按 ts_code 分组 left_on='trade_date', right_on='in_date', direction='backward' ) # 获取每个 ts_code 的最早 in_date 记录 min_in_date_per_ts = (industry_df_sorted .groupby('ts_code') .first() .reset_index()[['ts_code', 'l2_code']]) # 填充未匹配到的记录(trade_date 早于所有 in_date 的情况) merged['l2_code'] = merged['l2_code'].fillna( merged['ts_code'].map(min_in_date_per_ts.set_index('ts_code')['l2_code']) ) # 保留需要的列并重置索引 result = merged.reset_index(drop=True) return result # 使用示例 df = merge_with_industry_data(df, industry_df) # print(mdf[mdf['ts_code'] == '600751.SH'][['ts_code', 'trade_date', 'l2_code']]) # In[4]: def calculate_indicators(df): """ 计算四个指标:当日涨跌幅、5日移动平均、RSI、MACD。 """ df = df.sort_values('trade_date') df['daily_return'] = (df['close'] - df['pre_close']) / df['pre_close'] * 100 # df['5_day_ma'] = df['close'].rolling(window=5).mean() delta = df['close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(window=14).mean() avg_loss = loss.rolling(window=14).mean() rs = avg_gain / avg_loss df['RSI'] = 100 - (100 / (1 + rs)) # 计算MACD ema12 = df['close'].ewm(span=12, adjust=False).mean() ema26 = df['close'].ewm(span=26, adjust=False).mean() df['MACD'] = ema12 - ema26 df['Signal_line'] = df['MACD'].ewm(span=9, adjust=False).mean() df['MACD_hist'] = df['MACD'] - df['Signal_line'] # 4. 情绪因子1:市场上涨比例(Up Ratio) df['up_ratio'] = df['daily_return'].apply(lambda x: 1 if x > 0 else 0) df['up_ratio_20d'] = df['up_ratio'].rolling(window=20).mean() # 过去20天上涨比例 # 5. 情绪因子2:成交量变化率(Volume Change Rate) df['volume_mean'] = df['vol'].rolling(window=20).mean() # 过去20天的平均成交量 df['volume_change_rate'] = (df['vol'] - df['volume_mean']) / df['volume_mean'] * 100 # 成交量变化率 # 6. 情绪因子3:波动率(Volatility) df['volatility'] = df['daily_return'].rolling(window=20).std() # 过去20天的日收益率标准差 # 7. 情绪因子4:成交额变化率(Amount Change Rate) df['amount_mean'] = df['amount'].rolling(window=20).mean() # 过去20天的平均成交额 df['amount_change_rate'] = (df['amount'] - df['amount_mean']) / df['amount_mean'] * 100 # 成交额变化率 return df def generate_index_indicators(h5_filename): df = pd.read_hdf(h5_filename, key='index_data') df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d') df = df.sort_values('trade_date') # 计算每个ts_code的相关指标 df_indicators = [] for ts_code in df['ts_code'].unique(): df_index = df[df['ts_code'] == ts_code].copy() df_index = calculate_indicators(df_index) df_indicators.append(df_index) # 合并所有指数的结果 df_all_indicators = pd.concat(df_indicators, ignore_index=True) # 保留trade_date列,并将同一天的数据按ts_code合并成一行 df_final = df_all_indicators.pivot_table( index='trade_date', columns='ts_code', values=['daily_return', 'RSI', 'MACD', 'Signal_line', 'MACD_hist', 'up_ratio_20d', 'volume_change_rate', 'volatility', 'amount_change_rate', 'amount_mean'], aggfunc='last' ) df_final.columns = [f"{col[1]}_{col[0]}" for col in df_final.columns] df_final = df_final.reset_index() return df_final # 使用函数 h5_filename = '../../data/index_data.h5' index_data = generate_index_indicators(h5_filename) index_data = index_data.dropna() # In[5]: import numpy as np import talib def get_rolling_factor(df): old_columns = df.columns.tolist()[:] # 按股票和日期排序 df = df.sort_values(by=['ts_code', 'trade_date']) grouped = df.groupby('ts_code', group_keys=False) # df["gap_next_open"] = (df["open"].shift(-1) - df["close"]) / df["close"] df['return_skew'] = grouped['pct_chg'].rolling(window=5).skew().reset_index(0, drop=True) df['return_kurtosis'] = grouped['pct_chg'].rolling(window=5).kurt().reset_index(0, drop=True) # 因子 1:短期成交量变化率 df['volume_change_rate'] = ( grouped['vol'].rolling(window=2).mean() / grouped['vol'].rolling(window=10).mean() - 1 ).reset_index(level=0, drop=True) # 确保索引对齐 # 因子 2:成交量突破信号 max_volume = grouped['vol'].rolling(window=5).max().reset_index(level=0, drop=True) # 确保索引对齐 df['cat_volume_breakout'] = (df['vol'] > max_volume) # 因子 3:换手率均线偏离度 mean_turnover = grouped['turnover_rate'].rolling(window=3).mean().reset_index(level=0, drop=True) std_turnover = grouped['turnover_rate'].rolling(window=3).std().reset_index(level=0, drop=True) df['turnover_deviation'] = (df['turnover_rate'] - mean_turnover) / std_turnover # 因子 4:换手率激增信号 df['cat_turnover_spike'] = (df['turnover_rate'] > mean_turnover + 2 * std_turnover) # 因子 5:量比均值 df['avg_volume_ratio'] = grouped['volume_ratio'].rolling(window=3).mean().reset_index(level=0, drop=True) # 因子 6:量比突破信号 max_volume_ratio = grouped['volume_ratio'].rolling(window=5).max().reset_index(level=0, drop=True) df['cat_volume_ratio_breakout'] = (df['volume_ratio'] > max_volume_ratio) df['vol_spike'] = grouped.apply( lambda x: pd.Series(x['vol'].rolling(20).mean(), index=x.index) ) df['vol_std_5'] = df['vol'].pct_change().rolling(5).std() # 计算 ATR df['atr_14'] = grouped.apply( lambda x: pd.Series(talib.ATR(x['high'].values, x['low'].values, x['close'].values, timeperiod=14), index=x.index) ) df['atr_6'] = grouped.apply( lambda x: pd.Series(talib.ATR(x['high'].values, x['low'].values, x['close'].values, timeperiod=6), index=x.index) ) # 计算 OBV 及其均线 df['obv'] = grouped.apply( lambda x: pd.Series(talib.OBV(x['close'].values, x['vol'].values), index=x.index) ) df['maobv_6'] = grouped.apply( lambda x: pd.Series(talib.SMA(x['obv'].values, timeperiod=6), index=x.index) ) df['rsi_3'] = grouped.apply( lambda x: pd.Series(talib.RSI(x['close'].values, timeperiod=3), index=x.index) ) # df['rsi_6'] = grouped.apply( # lambda x: pd.Series(talib.RSI(x['close'].values, timeperiod=6), index=x.index) # ) # df['rsi_9'] = grouped.apply( # lambda x: pd.Series(talib.RSI(x['close'].values, timeperiod=9), index=x.index) # ) # 计算 return_10 和 return_20 df['return_5'] = grouped['close'].apply(lambda x: x / x.shift(5) - 1) # df['return_10'] = grouped['close'].apply(lambda x: x / x.shift(10) - 1) df['return_20'] = grouped['close'].apply(lambda x: x / x.shift(20) - 1) # df['avg_close_5'] = grouped['close'].apply(lambda x: x.rolling(window=5).mean() / x) # 计算标准差指标 df['std_return_5'] = grouped['close'].apply(lambda x: x.pct_change().rolling(window=5).std()) # df['std_return_15'] = grouped['close'].apply(lambda x: x.pct_change().rolling(window=15).std()) # df['std_return_25'] = grouped['close'].apply(lambda x: x.pct_change().rolling(window=25).std()) df['std_return_90'] = grouped['close'].apply(lambda x: x.pct_change().rolling(window=90).std()) df['std_return_90_2'] = grouped['close'].apply(lambda x: x.shift(10).pct_change().rolling(window=90).std()) # 计算 EMA 指标 df['_ema_5'] = grouped['close'].apply( lambda x: pd.Series(talib.EMA(x.values, timeperiod=5), index=x.index) ) df['_ema_13'] = grouped['close'].apply( lambda x: pd.Series(talib.EMA(x.values, timeperiod=13), index=x.index) ) df['_ema_20'] = grouped['close'].apply( lambda x: pd.Series(talib.EMA(x.values, timeperiod=20), index=x.index) ) df['_ema_60'] = grouped['close'].apply( lambda x: pd.Series(talib.EMA(x.values, timeperiod=60), index=x.index) ) # 计算 act_factor1, act_factor2, act_factor3, act_factor4 df['act_factor1'] = grouped['_ema_5'].apply( lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 50 ) df['act_factor2'] = grouped['_ema_13'].apply( lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 40 ) df['act_factor3'] = grouped['_ema_20'].apply( lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 21 ) df['act_factor4'] = grouped['_ema_60'].apply( lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 10 ) # 根据 trade_date 截面计算排名 df['rank_act_factor1'] = df.groupby('trade_date', group_keys=False)['act_factor1'].rank(ascending=False, pct=True) df['rank_act_factor2'] = df.groupby('trade_date', group_keys=False)['act_factor2'].rank(ascending=False, pct=True) df['rank_act_factor3'] = df.groupby('trade_date', group_keys=False)['act_factor3'].rank(ascending=False, pct=True) df['log(circ_mv)'] = np.log(df['circ_mv']) def rolling_covariance(x, y, window): return x.rolling(window).cov(y) def delta(series, period): return series.diff(period) def rank(series): return series.rank(pct=True) def stddev(series, window): return series.rolling(window).std() window_high_volume = 5 window_close_stddev = 20 period_delta = 5 df['cov'] = rolling_covariance(df['high'], df['vol'], window_high_volume) df['delta_cov'] = delta(df['cov'], period_delta) df['_rank_stddev'] = rank(stddev(df['close'], window_close_stddev)) df['alpha_22_improved'] = -1 * df['delta_cov'] * df['_rank_stddev'] df['alpha_003'] = np.where(df['high'] != df['low'], (df['close'] - df['open']) / (df['high'] - df['low']), 0) df['alpha_007'] = grouped.apply(lambda x: x['close'].rolling(5).corr(x['vol'])).reset_index(level=0, drop=True) df['alpha_007'] = df.groupby('trade_date', group_keys=False)['alpha_007'].rank(ascending=True, pct=True) df['alpha_013'] = grouped['close'].transform(lambda x: x.rolling(5).sum() - x.rolling(20).sum()) df['alpha_013'] = df.groupby('trade_date', group_keys=False)['alpha_013'].rank(ascending=True, pct=True) df['cat_up_limit'] = (df['close'] == df['up_limit']) # 是否涨停(1表示涨停,0表示未涨停) df['cat_down_limit'] = (df['close'] == df['down_limit']) # 是否跌停(1表示跌停,0表示未跌停) df['up_limit_count_10d'] = grouped['cat_up_limit'].rolling(window=10, min_periods=1).sum().reset_index(level=0, drop=True) df['down_limit_count_10d'] = grouped['cat_down_limit'].rolling(window=10, min_periods=1).sum().reset_index(level=0, drop=True) # 3. 最近连续涨跌停天数 def calculate_consecutive_limits(series): """ 计算连续涨停/跌停天数。 """ consecutive_up = series * (series.groupby((series != series.shift()).cumsum()).cumcount() + 1) consecutive_down = series * (series.groupby((series != series.shift()).cumsum()).cumcount() + 1) return consecutive_up, consecutive_down # 连续涨停天数 df['consecutive_up_limit'] = grouped['cat_up_limit'].apply( lambda x: calculate_consecutive_limits(x)[0] ).reset_index(level=0, drop=True) df['vol_break'] = np.where((df['close'] > df['cost_85pct']) & (df['volume_ratio'] > 2), 1, 0) df['weight_roc5'] = grouped['weight_avg'].apply(lambda x: x.pct_change(5)) def rolling_corr(group): roc_close = group['close'].pct_change() roc_weight = group['weight_avg'].pct_change() return roc_close.rolling(10).corr(roc_weight) df['price_cost_divergence'] = grouped.apply(rolling_corr) df['smallcap_concentration'] = (1 / df['log(circ_mv)']) * (df['cost_85pct'] - df['cost_15pct']) # 16. 筹码稳定性指数 (20日波动率) df['weight_std20'] = grouped['weight_avg'].apply(lambda x: x.rolling(20).std()) df['cost_stability'] = df['weight_std20'] / grouped['weight_avg'].transform(lambda x: x.rolling(20).mean()) # 17. 成本区间突破标记 df['high_cost_break_days'] = grouped.apply(lambda g: g['close'].gt(g['cost_95pct']).rolling(5).sum()) # 20. 筹码-流动性风险 df['liquidity_risk'] = (df['cost_95pct'] - df['cost_5pct']) * ( 1 / grouped['vol'].transform(lambda x: x.rolling(10).mean())) # 7. 市值波动率因子 df['turnover_std'] = grouped['turnover_rate'].rolling(window=20).std().reset_index(level=0, drop=True) df['mv_volatility'] = grouped.apply(lambda x: x['turnover_std'] / x['log(circ_mv)']).reset_index(level=0, drop=True) # 8. 市值成长性因子 df['volume_growth'] = grouped['vol'].pct_change(periods=20).reset_index(level=0, drop=True) df['mv_growth'] = grouped.apply(lambda x: x['volume_growth'] / x['log(circ_mv)']).reset_index(level=0, drop=True) df["ar"] = df["high"].div(df["open"]).rolling(3).sum() / df["open"].div(df["low"]).rolling(3).sum() * 100 # 计算 BR 指标 df["pre_close"] = df["close"].shift(1) df["br_up"] = (df["high"] - df["pre_close"]).clip(lower=0) df["br_down"] = (df["pre_close"] - df["low"]).clip(lower=0) df["br"] = df["br_up"].rolling(3).sum() / df["br_down"].rolling(3).sum() * 100 df['arbr'] = df['ar'] - df['br'] df.drop(columns=["pre_close", "br_up", "br_down", 'ar', 'br'], inplace=True) df.drop(columns=['weight_std20'], inplace=True, errors='ignore') new_columns = [col for col in df.columns.tolist()[:] if col not in old_columns] return df, new_columns def get_simple_factor(df): old_columns = df.columns.tolist()[:] df = df.sort_values(by=['ts_code', 'trade_date']) alpha = 0.5 df['momentum_factor'] = df['volume_change_rate'] + alpha * df['turnover_deviation'] df['resonance_factor'] = df['volume_ratio'] * df['pct_chg'] df['log_close'] = np.log(df['close']) df['cat_vol_spike'] = df['vol'] > 2 * df['vol_spike'] df['up'] = (df['high'] - df[['close', 'open']].max(axis=1)) / df['close'] df['down'] = (df[['close', 'open']].min(axis=1) - df['low']) / df['close'] df['obv-maobv_6'] = df['obv'] - df['maobv_6'] # 计算比值指标 df['std_return_5 / std_return_90'] = df['std_return_5'] / df['std_return_90'] # df['std_return_5 / std_return_25'] = df['std_return_5'] / df['std_return_25'] # 计算标准差差值 df['std_return_90 - std_return_90_2'] = df['std_return_90'] - df['std_return_90_2'] # df['cat_af1'] = df['act_factor1'] > 0 df['cat_af2'] = df['act_factor2'] > df['act_factor1'] df['cat_af3'] = df['act_factor3'] > df['act_factor2'] df['cat_af4'] = df['act_factor4'] > df['act_factor3'] # 计算 act_factor5 和 act_factor6 df['act_factor5'] = df['act_factor1'] + df['act_factor2'] + df['act_factor3'] + df['act_factor4'] df['act_factor6'] = (df['act_factor1'] - df['act_factor2']) / np.sqrt( df['act_factor1'] ** 2 + df['act_factor2'] ** 2) df['active_buy_volume_large'] = df['buy_lg_vol'] / df['net_mf_vol'] df['active_buy_volume_big'] = df['buy_elg_vol'] / df['net_mf_vol'] df['active_buy_volume_small'] = df['buy_sm_vol'] / df['net_mf_vol'] df['buy_lg_vol_minus_sell_lg_vol'] = (df['buy_lg_vol'] - df['sell_lg_vol']) / df['net_mf_vol'] df['buy_elg_vol_minus_sell_elg_vol'] = (df['buy_elg_vol'] - df['sell_elg_vol']) / df['net_mf_vol'] df['log(circ_mv)'] = np.log(df['circ_mv']) df['ctrl_strength'] = (df['cost_85pct'] - df['cost_15pct']) / (df['his_high'] - df['his_low']) df['low_cost_dev'] = (df['close'] - df['cost_5pct']) / (df['cost_50pct'] - df['cost_5pct']) df['asymmetry'] = (df['cost_95pct'] - df['cost_50pct']) / (df['cost_50pct'] - df['cost_5pct']) df['lock_factor'] = df['turnover_rate'] * ( 1 - (df['cost_95pct'] - df['cost_5pct']) / (df['his_high'] - df['his_low'])) df['cat_vol_break'] = (df['close'] > df['cost_85pct']) & (df['volume_ratio'] > 2) df['cost_atr_adj'] = (df['cost_95pct'] - df['cost_5pct']) / df['atr_14'] # 12. 小盘股筹码集中度 df['smallcap_concentration'] = (1 / df['log(circ_mv)']) * (df['cost_85pct'] - df['cost_15pct']) df['cat_golden_resonance'] = ((df['close'] > df['weight_avg']) & (df['volume_ratio'] > 1.5) & (df['winner_rate'] > 0.7)) df['mv_turnover_ratio'] = df['turnover_rate'] / df['log(circ_mv)'] df['mv_adjusted_volume'] = df['vol'] / df['log(circ_mv)'] df['mv_weighted_turnover'] = df['turnover_rate'] * (1 / df['log(circ_mv)']) df['nonlinear_mv_volume'] = df['vol'] / df['log(circ_mv)'] df['mv_volume_ratio'] = df['volume_ratio'] / df['log(circ_mv)'] df['mv_momentum'] = df['turnover_rate'] * df['volume_ratio'] / df['log(circ_mv)'] drop_columns = [col for col in df.columns if col.startswith('_')] df.drop(columns=drop_columns, inplace=True, errors='ignore') new_columns = [col for col in df.columns.tolist()[:] if col not in old_columns] return df, new_columns # In[6]: from utils.factor import get_act_factor def read_industry_data(h5_filename): # 读取 H5 文件中所有的行业数据 industry_data = pd.read_hdf(h5_filename, key='sw_daily', columns=[ 'ts_code', 'trade_date', 'open', 'close', 'high', 'low', 'pe', 'pb', 'vol' ]) # 假设 H5 文件的键是 'industry_data' industry_data = industry_data.sort_values(by=['ts_code', 'trade_date']) industry_data = industry_data.reindex() industry_data['trade_date'] = pd.to_datetime(industry_data['trade_date'], format='%Y%m%d') grouped = industry_data.groupby('ts_code', group_keys=False) industry_data['obv'] = grouped.apply( lambda x: pd.Series(talib.OBV(x['close'].values, x['vol'].values), index=x.index) ) industry_data['return_5'] = grouped['close'].apply(lambda x: x / x.shift(5) - 1) industry_data['return_20'] = grouped['close'].apply(lambda x: x / x.shift(20) - 1) industry_data = get_act_factor(industry_data, cat=False) industry_data = industry_data.sort_values(by=['trade_date', 'ts_code']) # # 计算每天每个 ts_code 的因子和当天所有 ts_code 的中位数的偏差 # factor_columns = ['obv', 'return_5', 'return_20', 'act_factor1', 'act_factor2', 'act_factor3', 'act_factor4'] # 因子列 # # for factor in factor_columns: # if factor in industry_data.columns: # # 计算每天每个 ts_code 的因子值与当天所有 ts_code 的中位数的偏差 # industry_data[f'{factor}_deviation'] = industry_data.groupby('trade_date')[factor].transform( # lambda x: x - x.mean()) industry_data['return_5_percentile'] = industry_data.groupby('trade_date')['return_5'].transform( lambda x: x.rank(pct=True)) industry_data['return_20_percentile'] = industry_data.groupby('trade_date')['return_20'].transform( lambda x: x.rank(pct=True)) industry_data = industry_data.drop(columns=['open', 'close', 'high', 'low', 'pe', 'pb', 'vol']) industry_data = industry_data.rename( columns={col: f'industry_{col}' for col in industry_data.columns if col not in ['ts_code', 'trade_date']}) industry_data = industry_data.rename(columns={'ts_code': 'cat_l2_code'}) return industry_data industry_df = read_industry_data('../../data/sw_daily.h5') # In[7]: origin_columns = df.columns.tolist() origin_columns = [col for col in origin_columns if col not in ['turnover_rate', 'pe_ttm', 'volume_ratio', 'vol', 'pct_chg', 'l2_code', 'winner_rate']] origin_columns = [col for col in origin_columns if col not in index_data.columns] origin_columns = [col for col in origin_columns if 'cyq' not in col] print(origin_columns) # In[8]: def filter_data(df): # df = df.groupby('trade_date').apply(lambda x: x.nlargest(1000, 'act_factor1')) df = df[~df['is_st']] df = df[~df['ts_code'].str.endswith('BJ')] df = df[~df['ts_code'].str.startswith('30')] df = df[~df['ts_code'].str.startswith('68')] df = df[~df['ts_code'].str.startswith('8')] df = df[df['trade_date'] >= '20180101'] df = df.drop(columns=['in_date']) df = df.reset_index(drop=True) return df df = filter_data(df) # df = get_technical_factor(df) # df = get_act_factor(df) # df = get_money_flow_factor(df) # df = get_alpha_factor(df) # df = get_limit_factor(df) # df = get_cyp_perf_factor(df) # df = get_mv_factors(df) df, _ = get_rolling_factor(df) df, _ = get_simple_factor(df) # df = df.merge(industry_df, on=['l2_code', 'trade_date'], how='left') df = df.rename(columns={'l2_code': 'cat_l2_code'}) # df = df.merge(index_data, on='trade_date', how='left') print(df.info()) # In[9]: from scipy.stats import ks_2samp, wasserstein_distance from sklearn.metrics import roc_auc_score from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler def remove_shifted_features(train_data, test_data, feature_columns, ks_threshold=0.05, wasserstein_threshold=0.1, importance_threshold=0.05): dropped_features = [] # **统计数据漂移** numeric_columns = train_data.select_dtypes(include=['float64', 'int64']).columns numeric_columns = [col for col in numeric_columns if col in feature_columns] for feature in numeric_columns: ks_stat, p_value = ks_2samp(train_data[feature], test_data[feature]) wasserstein_dist = wasserstein_distance(train_data[feature], test_data[feature]) if p_value < ks_threshold and wasserstein_dist > wasserstein_threshold: dropped_features.append(feature) print(f"检测到 {len(dropped_features)} 个可能漂移的特征: {dropped_features}") # **应用阈值进行最终筛选** filtered_features = [f for f in feature_columns if f not in dropped_features] return filtered_features, dropped_features # In[10]: def create_deviation_within_dates(df, feature_columns): groupby_col = 'cat_l2_code' # 使用 trade_date 进行分组 new_columns = {} ret_feature_columns = feature_columns[:] # 自动选择所有数值型特征 num_features = [col for col in feature_columns if 'cat' not in col and 'index' not in col] # num_features = ['vol', 'pct_chg', 'turnover_rate', 'volume_ratio', 'cat_vol_spike', 'obv', 'maobv_6', 'return_5', 'return_10', 'return_20', 'std_return_5', 'std_return_15', 'std_return_90', 'std_return_90_2', 'act_factor1', 'act_factor2', 'act_factor3', 'act_factor4', 'act_factor5', 'act_factor6', 'rank_act_factor1', 'rank_act_factor2', 'rank_act_factor3', 'active_buy_volume_large', 'active_buy_volume_big', 'active_buy_volume_small', 'alpha_022', 'alpha_003', 'alpha_007', 'alpha_013'] num_features = [col for col in num_features if 'cat' not in col and 'industry' not in col] num_features = [col for col in num_features if 'limit' not in col] num_features = [col for col in num_features if 'cyq' not in col] # 遍历所有数值型特征 for feature in num_features: if feature == 'trade_date': # 不需要对 'trade_date' 计算偏差 continue # grouped_mean = df.groupby(['trade_date'])[feature].transform('mean') # deviation_col_name = f'deviation_mean_{feature}' # new_columns[deviation_col_name] = df[feature] - grouped_mean # ret_feature_columns.append(deviation_col_name) grouped_mean = df.groupby(['trade_date', groupby_col])[feature].transform('mean') deviation_col_name = f'deviation_mean_{feature}' new_columns[deviation_col_name] = df[feature] - grouped_mean ret_feature_columns.append(deviation_col_name) # 将新计算的偏差特征与原始 DataFrame 合并 df = pd.concat([df, pd.DataFrame(new_columns)], axis=1) # for feature in ['obv', 'return_20', 'act_factor1', 'act_factor2', 'act_factor3', 'act_factor4']: # df[f'deviation_industry_{feature}'] = df[feature] - df[f'industry_{feature}'] return df, ret_feature_columns # In[11]: import pandas as pd def remove_outliers_label_percentile(label: pd.Series, lower_percentile: float = 0.01, upper_percentile: float = 0.99, log=True): if not (0 <= lower_percentile < upper_percentile <= 1): raise ValueError("Percentile values must satisfy 0 <= lower_percentile < upper_percentile <= 1.") # Calculate lower and upper bounds based on percentiles lower_bound = label.quantile(lower_percentile) upper_bound = label.quantile(upper_percentile) # Filter out values outside the bounds filtered_label = label[(label >= lower_bound) & (label <= upper_bound)] # Print the number of removed outliers if log: print(f"Removed {len(label) - len(filtered_label)} outliers.") return filtered_label def calculate_risk_adjusted_target(df, days=5): df = df.sort_values(by=['ts_code', 'trade_date']) df['future_close'] = df.groupby('ts_code')['close'].shift(-days) df['future_open'] = df.groupby('ts_code')['open'].shift(-1) df['future_return'] = (df['future_close'] - df['future_open']) / df['future_open'] df['future_volatility'] = df.groupby('ts_code')['future_return'].rolling(days, min_periods=1).std().reset_index( level=0, drop=True) sharpe_ratio = df['future_return'] * df['future_volatility'] sharpe_ratio.replace([np.inf, -np.inf], np.nan, inplace=True) return sharpe_ratio def calculate_score(df, days=5, lambda_param=1.0): def calculate_max_drawdown(prices): peak = prices.iloc[0] # 初始化峰值 max_drawdown = 0 # 初始化最大回撤 for price in prices: if price > peak: peak = price # 更新峰值 else: drawdown = (peak - price) / peak # 计算当前回撤 max_drawdown = max(max_drawdown, drawdown) # 更新最大回撤 return max_drawdown def compute_stock_score(stock_df): stock_df = stock_df.sort_values(by=['trade_date']) future_return = stock_df['future_return'] volatility = stock_df['close'].pct_change().rolling(days).std().shift(-days) max_drawdown = stock_df['close'].rolling(days).apply(calculate_max_drawdown, raw=False).shift(-days) score = future_return - lambda_param * max_drawdown return score scores = df.groupby('ts_code').apply(lambda x: compute_stock_score(x)) scores = scores.reset_index(level=0, drop=True) return scores def remove_highly_correlated_features(df, feature_columns, threshold=0.9): numeric_features = df[feature_columns].select_dtypes(include=[np.number]).columns.tolist() if not numeric_features: raise ValueError("No numeric features found in the provided data.") # 先找出需要强制保留的特征 always_keep = [col for col in feature_columns if 'act' in col or 'af' in col] features_to_check = [col for col in numeric_features if col not in always_keep] corr_matrix = df[features_to_check].corr().abs() upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) to_drop = [column for column in upper.columns if any(upper[column] > threshold)] remaining_features = [col for col in feature_columns if col not in to_drop or col in always_keep] return remaining_features import pandas as pd from sklearn.preprocessing import StandardScaler def cross_sectional_standardization(df, features): df_sorted = df.sort_values(by='trade_date') # 按时间排序 df_standardized = df_sorted.copy() for date in df_sorted['trade_date'].unique(): # 获取当前时间点的数据 current_data = df_standardized[df_standardized['trade_date'] == date] # 只对指定特征进行标准化 scaler = StandardScaler() standardized_values = scaler.fit_transform(current_data[features]) # 将标准化结果重新赋值回去 df_standardized.loc[df_standardized['trade_date'] == date, features] = standardized_values return df_standardized import numpy as np import pandas as pd import statsmodels.api as sm from concurrent.futures import ProcessPoolExecutor def neutralize_manual(df, features, industry_col, mkt_cap_col): """ 手动实现简单回归以提升速度 """ for col in features: residuals = [] for _, group in df.groupby(industry_col): if len(group) > 1: x = np.log(group[mkt_cap_col]) # 市值对数 y = group[col] # 因子值 beta = np.cov(y, x)[0, 1] / np.var(x) # 计算斜率 alpha = np.mean(y) - beta * np.mean(x) # 计算截距 resid = y - (alpha + beta * x) # 计算残差 residuals.extend(resid) else: residuals.extend(group[col]) # 样本不足时保留原值 df[col] = residuals return df import gc gc.collect() def mad_filter(df, features, n=3): df = df.copy() for col in features: median = df[col].median() mad = np.median(np.abs(df[col] - median)) upper = median + n * mad lower = median - n * mad df[col] = np.clip(df[col], lower, upper) # 截断极值 return df def percentile_filter(df, features, lower_percentile=0.01, upper_percentile=0.99): df = df.copy() for col in features: # 按日期分组计算上下百分位数 lower_bound = df.groupby('trade_date')[col].transform( lambda x: x.quantile(lower_percentile) ) upper_bound = df.groupby('trade_date')[col].transform( lambda x: x.quantile(upper_percentile) ) # 截断超出范围的值 df[col] = np.clip(df[col], lower_bound, upper_bound) return df from scipy.stats import iqr def iqr_filter(df, features): df = df.copy() for col in features: df[col] = df.groupby('trade_date')[col].transform( lambda x: (x - x.median()) / iqr(x) if iqr(x) != 0 else x ) return df def quantile_filter(df, features, lower_quantile=0.01, upper_quantile=0.99, window=60): df = df.copy() for col in features: # 计算 rolling 统计量,需要按日期进行 groupby rolling_lower = df.groupby('trade_date')[col].transform(lambda x: x.rolling(window=min(len(x), window)).quantile(lower_quantile)) rolling_upper = df.groupby('trade_date')[col].transform(lambda x: x.rolling(window=min(len(x), window)).quantile(upper_quantile)) # 对数据进行裁剪 df[col] = np.clip(df[col], rolling_lower, rolling_upper) return df # In[12]: # print(test_data.head()[['act_factor1', 'act_factor2', 'ts_code', 'trade_date']]) # In[13]: from sklearn.preprocessing import StandardScaler import lightgbm as lgb import matplotlib.pyplot as plt from sklearn.decomposition import PCA def train_light_model(train_data_df, params, feature_columns, callbacks, evals, print_feature_importance=True, num_boost_round=100, validation_days=180, use_pca=False, split_date=None): # 新增参数:validation_days # 确保数据按时间排序 train_data_df = train_data_df.sort_values(by='trade_date') numeric_columns = train_data_df.select_dtypes(include=['float64', 'int64']).columns numeric_columns = [col for col in numeric_columns if col in feature_columns] # X_train.loc[:, numeric_columns] = scaler.fit_transform(X_train[numeric_columns]) # X_val.loc[:, numeric_columns] = scaler.transform(X_val[numeric_columns]) # train_data_df = cross_sectional_standardization(train_data_df, numeric_columns) # 去除标签为空的样本 train_data_df = train_data_df.dropna(subset=['label']) print('原始训练集大小: ', len(train_data_df)) # 按时间顺序划分训练集和验证集 if split_date is None: all_dates = train_data_df['trade_date'].unique() # 获取所有唯一的 trade_date split_date = all_dates[-validation_days] # 划分点为倒数第 validation_days 天 train_data_split = train_data_df[train_data_df['trade_date'] < split_date] # 训练集 val_data_split = train_data_df[train_data_df['trade_date'] >= split_date] # 验证集 # 打印划分结果 print(f"划分后的训练集大小: {len(train_data_split)}, 验证集大小: {len(val_data_split)}") # 提取特征和标签 X_train = train_data_split[feature_columns] y_train = train_data_split['label'] X_val = val_data_split[feature_columns] y_val = val_data_split['label'] # 标准化数值特征 scaler = StandardScaler() # 计算每个 trade_date 内的样本数(LTR 需要 group 信息) train_groups = train_data_split.groupby('trade_date').size().tolist() val_groups = val_data_split.groupby('trade_date').size().tolist() # 处理类别特征 categorical_feature = [col for col in feature_columns if 'cat' in col] pca = None if use_pca: pca = PCA(n_components=0.95) # 或指定 n_components=固定值(如 10) numeric_features = [col for col in feature_columns if col not in categorical_feature] numeric_pca = pca.fit_transform(X_train[numeric_features]) X_train = pd.concat([pd.DataFrame(numeric_pca, index=X_train.index), X_train[categorical_feature]], axis=1) numeric_pca = pca.transform(X_val[numeric_features]) X_val = pd.concat([pd.DataFrame(numeric_pca, index=X_val.index), X_val[categorical_feature]], axis=1) # 计算权重(基于时间) # trade_date = train_data_split['trade_date'] # 交易日期 # weights = (trade_date - trade_date.min()).dt.days / (trade_date.max() - trade_date.min()).days + 1 # weights = train_data_split.groupby('trade_date')['std_return_5'].transform( # lambda x: x / x.mean() # ) ud = sorted(train_data_split["trade_date"].unique().tolist()) date_weights = {date: weight * weight for date, weight in zip(ud, np.linspace(1, 10, len(ud)))} params['weight'] = train_data_split["trade_date"].map(date_weights).tolist() train_dataset = lgb.Dataset( X_train, label=y_train, group=train_groups, categorical_feature=categorical_feature ) # weights = val_data_split.groupby('trade_date')['std_return_5'].transform( # lambda x: x / x.mean() # ) val_dataset = lgb.Dataset( X_val, label=y_val, group=val_groups, categorical_feature=categorical_feature ) # 训练模型 model = lgb.train( params, train_dataset, num_boost_round=num_boost_round, valid_sets=[train_dataset, val_dataset], valid_names=['train', 'valid'], callbacks=callbacks ) # 打印特征重要性(如果需要) if print_feature_importance: lgb.plot_metric(evals) lgb.plot_importance(model, importance_type='split', max_num_features=20) plt.show() return model, scaler, pca # In[14]: days = 2 df['future_return'] = (df.groupby('ts_code')['close'].shift(-days) - df.groupby('ts_code')['open'].shift(-1)) / \ df.groupby('ts_code')['open'].shift(-1) df['future_volatility'] = ( df.groupby('ts_code')['future_return'] .transform(lambda x: x.rolling(days).std()) ) df['future_score'] = ( 0.7 * df['future_return'] + 0.3 * df['future_volatility'] ) filter_index = df['future_return'].between(df['future_return'].quantile(0.01), df['future_return'].quantile(0.99)) filter_index = df['future_volatility'].between(df['future_volatility'].quantile(0.01), df['future_volatility'].quantile(0.99)) | filter_index df['label'] = df.groupby('trade_date', group_keys=False)['future_score'].transform( lambda x: pd.qcut(x, q=50, labels=False, duplicates='drop') ) # industry_df = industry_df.sort_values(by=['trade_date']) # df = df.merge(industry_df, on=['cat_l2_code', 'trade_date'], how='left') df = df.sort_values(['trade_date']) df = df.replace([np.inf, -np.inf], np.nan) feature_columns = [col for col in df.columns] feature_columns = [col for col in feature_columns if col not in ['trade_date', 'ts_code', 'label']] feature_columns = [col for col in feature_columns if 'future' not in col] feature_columns = [col for col in feature_columns if 'label' not in col] feature_columns = [col for col in feature_columns if 'score' not in col] feature_columns = [col for col in feature_columns if 'gen' not in col] feature_columns = [col for col in feature_columns if 'cat_l2_code' not in col] feature_columns = [col for col in feature_columns if col not in origin_columns] feature_columns = [col for col in feature_columns if not col.startswith('_')] numeric_columns = df.select_dtypes(include=['float64', 'int64']).columns numeric_columns = [col for col in numeric_columns if col in feature_columns] qdf = quantile_filter(df, numeric_columns, window=20) # In[15]: # print('train data size: ', len(train_data)) label_gain = list(range(len(df['label'].unique()))) label_gain = [gain * gain for gain in label_gain] light_params = { 'label_gain': label_gain, 'objective': 'lambdarank', 'metric': 'lambdarank', 'learning_rate': 0.05, 'num_leaves': 1024, 'min_data_in_leaf': 128, 'max_depth': 16, 'max_bin': 1024, 'feature_fraction': 0.7, 'bagging_fraction': 1, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, # 'boosting': 'dart', 'verbosity': -1, 'extra_trees': True, 'max_position': 5, 'ndcg_at': 1, 'quant_train_renew_leaf': True, 'lambdarank_truncation_level': 5, 'lambdarank_position_bias_regularization': 1, 'seed': 7 } evals = {} gc.collect() # In[16]: gc.collect() def rolling_train_predict(df, train_days, test_days, industry_df, index_df, days=5, use_pca=False, validation_days=60, filter_index=None): # 1. 按照交易日期排序 unique_dates = df[df['trade_date'] >= '2020-01-01']['trade_date'].unique().tolist() unique_dates = sorted(unique_dates) n = len(unique_dates) # 2. 计算需要跳过的天数,使后续窗口对齐 extra_days = (n - train_days) % test_days start_index = extra_days # 从此索引开始滚动 predictions_list = [] def select_pre_zt_stocks_dynamic( stock_df, ): stock_df = stock_df.groupby('trade_date', group_keys=False).apply( lambda x: x.nlargest(1000, 'return_20') ) return stock_df df = select_pre_zt_stocks_dynamic(df) feature_columns = [col for col in df.columns] feature_columns = [col for col in feature_columns if col not in ['trade_date', 'ts_code', 'label']] feature_columns = [col for col in feature_columns if 'future' not in col] feature_columns = [col for col in feature_columns if 'label' not in col] feature_columns = [col for col in feature_columns if 'score' not in col] feature_columns = [col for col in feature_columns if 'gen' not in col] feature_columns = [col for col in feature_columns if 'cat_l2_code' not in col] feature_columns = [col for col in feature_columns if col not in origin_columns] feature_columns = [col for col in feature_columns if not col.startswith('_')] numeric_columns = df.select_dtypes(include=['float64', 'int64']).columns numeric_columns = [col for col in numeric_columns if col in feature_columns] df = cross_sectional_standardization(df, numeric_columns) for start in range(start_index, n - train_days - test_days + 1, test_days): gc.collect() train_dates = unique_dates[start: start + train_days] test_dates = unique_dates[start + train_days: start + train_days + test_days] # 根据日期筛选数据 train_data = df[filter_index & df['trade_date'].isin(train_dates)] test_data = df[df['trade_date'].isin(test_dates)] train_data = train_data.sort_values('trade_date') test_data = test_data.sort_values('trade_date') industry_df = industry_df.sort_values(by=['trade_date']) # index_df = index_df.sort_values(by=['trade_date']) train_data = train_data.merge(industry_df, on=['cat_l2_code', 'trade_date'], how='left') # # train_data = train_data.merge(index_df, on='trade_date', how='left') test_data = test_data.merge(industry_df, on=['cat_l2_code', 'trade_date'], how='left') # # test_data = test_data.merge(index_df, on='trade_date', how='left') # train_data, test_data = train_data.replace([np.inf, -np.inf], np.nan), test_data.replace([np.inf, -np.inf], # np.nan) feature_columns = [col for col in train_data.columns if col in train_data.columns] feature_columns = [col for col in feature_columns if col not in ['trade_date', 'ts_code', 'label']] feature_columns = [col for col in feature_columns if 'future' not in col] feature_columns = [col for col in feature_columns if 'label' not in col] feature_columns = [col for col in feature_columns if 'score' not in col] feature_columns = [col for col in feature_columns if 'gen' not in col] feature_columns = [col for col in feature_columns if 'cat_l2_code' not in col] feature_columns = [col for col in feature_columns if col not in origin_columns] feature_columns = [col for col in feature_columns if not col.startswith('_')] numeric_columns = train_data.select_dtypes(include=['float64', 'int64']).columns numeric_columns = [col for col in numeric_columns if col in feature_columns] # print('去极值') train_data = quantile_filter(train_data, numeric_columns) # 去极值 # # print('中性化') # # train_data = neutralize_manual(train_data, numeric_columns, industry_col='cat_l2_code', mkt_cap_col='log(circ_mv)') # 中性化 # print('去极值') test_data = quantile_filter(test_data, numeric_columns) # 去极值 all_dates = train_data['trade_date'].unique() # 获取所有唯一的 trade_date split_date = all_dates[-validation_days] # 划分点为倒数第 validation_days 天 train_data_split = train_data[train_data['trade_date'] < split_date] # 训练集 val_data_split = train_data[train_data['trade_date'] >= split_date] # 验证集 feature_columns, _ = remove_shifted_features(train_data_split[train_data_split['label'] == train_data_split['label'].max()], val_data_split[val_data_split['label'] == val_data_split['label'].max()], feature_columns) feature_columns = remove_highly_correlated_features(train_data[train_data['label'] == train_data['label'].max()], feature_columns) train_data = train_data.dropna(subset=feature_columns) train_data = train_data.dropna(subset=['label']) train_data = train_data.reset_index(drop=True) # print(test_data.tail()) test_data = test_data.dropna(subset=feature_columns) # test_data = test_data.dropna(subset=['label']) test_data = test_data.reset_index(drop=True) # print(len(train_data)) print(f"最小日期: {train_data['trade_date'].min().strftime('%Y-%m-%d')}") print(f"最大日期: {train_data['trade_date'].max().strftime('%Y-%m-%d')}") # print(len(test_data)) print(f"最小日期: {test_data['trade_date'].min().strftime('%Y-%m-%d')}") print(f"最大日期: {test_data['trade_date'].max().strftime('%Y-%m-%d')}") cat_columns = [col for col in df.columns if col.startswith('cat')] for col in cat_columns: train_data[col] = train_data[col].astype('category') test_data[col] = test_data[col].astype('category') label_gain = list(range(len(train_data['label'].unique()))) label_gain = [(gain + 1) * (gain + 1) for gain in label_gain] light_params['label_gain'] = label_gain # ud = train_data["trade_date"].unique() # date_weights = {date: weight for date, weight in zip(ud, np.linspace(1, 2, len(unique_dates)))} # light_params['weight'] = train_data["trade_date"].map(date_weights).tolist() # print(f'feature_columns: {feature_columns}') feature_contri = [2 if feat.startswith('act_factor') else 1 for feat in feature_columns] light_params['feature_contri'] = feature_contri model, _, _ = train_light_model(train_data.dropna(subset=['label']), light_params, feature_columns, [lgb.log_evaluation(period=100), lgb.callback.record_evaluation(evals), lgb.early_stopping(50, first_metric_only=True) ], evals, num_boost_round=3000, validation_days=validation_days, print_feature_importance=False, use_pca=False) score_df = test_data.copy() numeric_columns = score_df.select_dtypes(include=['float64', 'int64']).columns numeric_columns = [col for col in numeric_columns if col in feature_columns] score_df = cross_sectional_standardization(score_df, numeric_columns) score_df['score'] = model.predict(score_df[feature_columns]) score_df = score_df.loc[score_df.groupby('trade_date')['score'].idxmax()] score_df = score_df[['trade_date', 'score', 'ts_code']] predictions_list.append(score_df) # m = 5 # all_data = [] # for i, trade_date in enumerate(sorted(score_df['trade_date'].unique().tolist())): # # 提取当前日期的数据 # current_data = score_df[score_df['trade_date'] == trade_date] # all_data.append(current_data) # # numeric_columns = [col for col in feature_columns if col in current_data.select_dtypes(include=['float64', 'int64']).columns] # current_data = cross_sectional_standardization(current_data, numeric_columns) # current_data['score'] = model.predict(current_data[feature_columns]) # daily_top_score = current_data.loc[[current_data['score'].idxmax()]] # predictions_list.append(daily_top_score[['trade_date', 'score', 'ts_code']]) # # if i % m == 0: # train_data_split = pd.concat(all_data) # train_data_split = train_data_split.dropna(subset=['label']) # # X_train = train_data_split[feature_columns] # y_train = train_data_split['label'] # # train_groups = train_data_split.groupby('trade_date').size().tolist() # categorical_feature = [col for col in feature_columns if 'cat' in col] # # train_dataset = lgb.Dataset( # X_train, label=y_train, group=train_groups, # categorical_feature=categorical_feature # ) # # model = lgb.train( # light_params, train_dataset, num_boost_round=36, # init_model=model # ) # all_data = [] final_predictions = pd.concat(predictions_list, ignore_index=True) return final_predictions # In[ ]: gc.collect() print(df[df['ts_code'] == '000001.SZ'].tail(1)[['act_factor1', 'act_factor2']]) print('finish') # qdf = qdf[qdf['trade_date'] >= '2022-01-01'] final_predictions = rolling_train_predict(df[df['trade_date'] >= '2022-01-01'], 90, 5, industry_df, None, days=days, validation_days=10, filter_index=filter_index) final_predictions.to_csv('predictions_test.tsv', index=False) # In[ ]: print(df[df['ts_code'] == '000001.SZ'].tail(1)[['act_factor1', 'act_factor2']]) print('finish') # In[ ]: