Files
NewStock/main/train/RollingRank.txt

919 lines
35 KiB
Plaintext
Raw Permalink Normal View History

2025-06-01 15:59:29 +08:00
#!/usr/bin/env python
# coding: utf-8
# In[1]:
# %load_ext autoreload
# %autoreload 2
import pandas as pd
import warnings
warnings.filterwarnings("ignore")
pd.set_option('display.max_columns', None)
# In[2]:
from utils.utils import read_and_merge_h5_data
print('daily data')
df = read_and_merge_h5_data('../../data/daily_data.h5', key='daily_data',
columns=['ts_code', 'trade_date', 'open', 'close', 'high', 'low', 'vol', 'pct_chg'],
df=None)
print('daily basic')
df = read_and_merge_h5_data('../../data/daily_basic.h5', key='daily_basic',
columns=['ts_code', 'trade_date', 'turnover_rate', 'pe_ttm', 'circ_mv', 'volume_ratio',
'is_st'], df=df, join='inner')
print('stk limit')
df = read_and_merge_h5_data('../../data/stk_limit.h5', key='stk_limit',
columns=['ts_code', 'trade_date', 'pre_close', 'up_limit', 'down_limit'],
df=df)
print('money flow')
df = read_and_merge_h5_data('../../data/money_flow.h5', key='money_flow',
columns=['ts_code', 'trade_date', 'buy_sm_vol', 'sell_sm_vol', 'buy_lg_vol', 'sell_lg_vol',
'buy_elg_vol', 'sell_elg_vol', 'net_mf_vol'],
df=df)
print('cyq perf')
df = read_and_merge_h5_data('../../data/cyq_perf.h5', key='cyq_perf',
columns=['ts_code', 'trade_date', 'his_low', 'his_high', 'cost_5pct', 'cost_15pct',
'cost_50pct',
'cost_85pct', 'cost_95pct', 'weight_avg', 'winner_rate'],
df=df)
print(df.info())
# In[3]:
print('industry')
industry_df = read_and_merge_h5_data('../../data/industry_data.h5', key='industry_data',
columns=['ts_code', 'l2_code', 'in_date'],
df=None, on=['ts_code'], join='left')
def merge_with_industry_data(df, industry_df):
# 确保日期字段是 datetime 类型
df['trade_date'] = pd.to_datetime(df['trade_date'])
industry_df['in_date'] = pd.to_datetime(industry_df['in_date'])
# 对 industry_df 按 ts_code 和 in_date 排序
industry_df_sorted = industry_df.sort_values(['in_date', 'ts_code'])
# 对原始 df 按 ts_code 和 trade_date 排序
df_sorted = df.sort_values(['trade_date', 'ts_code'])
# 使用 merge_asof 进行向后合并
merged = pd.merge_asof(
df_sorted,
industry_df_sorted,
by='ts_code', # 按 ts_code 分组
left_on='trade_date',
right_on='in_date',
direction='backward'
)
# 获取每个 ts_code 的最早 in_date 记录
min_in_date_per_ts = (industry_df_sorted
.groupby('ts_code')
.first()
.reset_index()[['ts_code', 'l2_code']])
# 填充未匹配到的记录trade_date 早于所有 in_date 的情况)
merged['l2_code'] = merged['l2_code'].fillna(
merged['ts_code'].map(min_in_date_per_ts.set_index('ts_code')['l2_code'])
)
# 保留需要的列并重置索引
result = merged.reset_index(drop=True)
return result
# 使用示例
df = merge_with_industry_data(df, industry_df)
# print(mdf[mdf['ts_code'] == '600751.SH'][['ts_code', 'trade_date', 'l2_code']])
# In[4]:
def calculate_indicators(df):
"""
计算四个指标当日涨跌幅、5日移动平均、RSI、MACD。
"""
df = df.sort_values('trade_date')
df['daily_return'] = (df['close'] - df['pre_close']) / df['pre_close'] * 100
# df['5_day_ma'] = df['close'].rolling(window=5).mean()
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss
df['RSI'] = 100 - (100 / (1 + rs))
# 计算MACD
ema12 = df['close'].ewm(span=12, adjust=False).mean()
ema26 = df['close'].ewm(span=26, adjust=False).mean()
df['MACD'] = ema12 - ema26
df['Signal_line'] = df['MACD'].ewm(span=9, adjust=False).mean()
df['MACD_hist'] = df['MACD'] - df['Signal_line']
# 4. 情绪因子1市场上涨比例Up Ratio
df['up_ratio'] = df['daily_return'].apply(lambda x: 1 if x > 0 else 0)
df['up_ratio_20d'] = df['up_ratio'].rolling(window=20).mean() # 过去20天上涨比例
# 5. 情绪因子2成交量变化率Volume Change Rate
df['volume_mean'] = df['vol'].rolling(window=20).mean() # 过去20天的平均成交量
df['volume_change_rate'] = (df['vol'] - df['volume_mean']) / df['volume_mean'] * 100 # 成交量变化率
# 6. 情绪因子3波动率Volatility
df['volatility'] = df['daily_return'].rolling(window=20).std() # 过去20天的日收益率标准差
# 7. 情绪因子4成交额变化率Amount Change Rate
df['amount_mean'] = df['amount'].rolling(window=20).mean() # 过去20天的平均成交额
df['amount_change_rate'] = (df['amount'] - df['amount_mean']) / df['amount_mean'] * 100 # 成交额变化率
return df
def generate_index_indicators(h5_filename):
df = pd.read_hdf(h5_filename, key='index_data')
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
df = df.sort_values('trade_date')
# 计算每个ts_code的相关指标
df_indicators = []
for ts_code in df['ts_code'].unique():
df_index = df[df['ts_code'] == ts_code].copy()
df_index = calculate_indicators(df_index)
df_indicators.append(df_index)
# 合并所有指数的结果
df_all_indicators = pd.concat(df_indicators, ignore_index=True)
# 保留trade_date列并将同一天的数据按ts_code合并成一行
df_final = df_all_indicators.pivot_table(
index='trade_date',
columns='ts_code',
values=['daily_return', 'RSI', 'MACD', 'Signal_line',
'MACD_hist', 'up_ratio_20d', 'volume_change_rate', 'volatility',
'amount_change_rate', 'amount_mean'],
aggfunc='last'
)
df_final.columns = [f"{col[1]}_{col[0]}" for col in df_final.columns]
df_final = df_final.reset_index()
return df_final
# 使用函数
h5_filename = '../../data/index_data.h5'
index_data = generate_index_indicators(h5_filename)
index_data = index_data.dropna()
# In[6]:
from utils.factor import get_act_factor
def read_industry_data(h5_filename):
# 读取 H5 文件中所有的行业数据
industry_data = pd.read_hdf(h5_filename, key='sw_daily', columns=[
'ts_code', 'trade_date', 'open', 'close', 'high', 'low', 'pe', 'pb', 'vol'
]) # 假设 H5 文件的键是 'industry_data'
industry_data = industry_data.sort_values(by=['ts_code', 'trade_date'])
industry_data = industry_data.reindex()
industry_data['trade_date'] = pd.to_datetime(industry_data['trade_date'], format='%Y%m%d')
grouped = industry_data.groupby('ts_code', group_keys=False)
industry_data['obv'] = grouped.apply(
lambda x: pd.Series(talib.OBV(x['close'].values, x['vol'].values), index=x.index)
)
industry_data['return_5'] = grouped['close'].apply(lambda x: x / x.shift(5) - 1)
industry_data['return_20'] = grouped['close'].apply(lambda x: x / x.shift(20) - 1)
industry_data = get_act_factor(industry_data, cat=False)
industry_data = industry_data.sort_values(by=['trade_date', 'ts_code'])
# # 计算每天每个 ts_code 的因子和当天所有 ts_code 的中位数的偏差
# factor_columns = ['obv', 'return_5', 'return_20', 'act_factor1', 'act_factor2', 'act_factor3', 'act_factor4'] # 因子列
#
# for factor in factor_columns:
# if factor in industry_data.columns:
# # 计算每天每个 ts_code 的因子值与当天所有 ts_code 的中位数的偏差
# industry_data[f'{factor}_deviation'] = industry_data.groupby('trade_date')[factor].transform(
# lambda x: x - x.mean())
industry_data['return_5_percentile'] = industry_data.groupby('trade_date')['return_5'].transform(
lambda x: x.rank(pct=True))
industry_data['return_20_percentile'] = industry_data.groupby('trade_date')['return_20'].transform(
lambda x: x.rank(pct=True))
industry_data = industry_data.drop(columns=['open', 'close', 'high', 'low', 'pe', 'pb', 'vol'])
industry_data = industry_data.rename(
columns={col: f'industry_{col}' for col in industry_data.columns if col not in ['ts_code', 'trade_date']})
industry_data = industry_data.rename(columns={'ts_code': 'cat_l2_code'})
return industry_data
industry_df = read_industry_data('../../data/sw_daily.h5')
# In[7]:
origin_columns = df.columns.tolist()
origin_columns = [col for col in origin_columns if
col not in ['turnover_rate', 'pe_ttm', 'volume_ratio', 'vol', 'pct_chg', 'l2_code', 'winner_rate']]
origin_columns = [col for col in origin_columns if col not in index_data.columns]
origin_columns = [col for col in origin_columns if 'cyq' not in col]
print(origin_columns)
# In[8]:
def filter_data(df):
# df = df.groupby('trade_date').apply(lambda x: x.nlargest(1000, 'act_factor1'))
df = df[~df['is_st']]
df = df[~df['ts_code'].str.endswith('BJ')]
df = df[~df['ts_code'].str.startswith('30')]
df = df[~df['ts_code'].str.startswith('68')]
df = df[~df['ts_code'].str.startswith('8')]
df = df[df['trade_date'] >= '20180101']
df = df.drop(columns=['in_date'])
df = df.reset_index(drop=True)
return df
df = filter_data(df)
# df = get_technical_factor(df)
# df = get_act_factor(df)
# df = get_money_flow_factor(df)
# df = get_alpha_factor(df)
# df = get_limit_factor(df)
# df = get_cyp_perf_factor(df)
# df = get_mv_factors(df)
df, _ = get_rolling_factor(df)
df, _ = get_simple_factor(df)
# df = df.merge(industry_df, on=['l2_code', 'trade_date'], how='left')
df = df.rename(columns={'l2_code': 'cat_l2_code'})
# df = df.merge(index_data, on='trade_date', how='left')
print(df.info())
# In[9]:
def create_deviation_within_dates(df, feature_columns):
groupby_col = 'cat_l2_code' # 使用 trade_date 进行分组
new_columns = {}
ret_feature_columns = feature_columns[:]
# 自动选择所有数值型特征
num_features = [col for col in feature_columns if 'cat' not in col and 'index' not in col]
# num_features = ['vol', 'pct_chg', 'turnover_rate', 'volume_ratio', 'cat_vol_spike', 'obv', 'maobv_6', 'return_5', 'return_10', 'return_20', 'std_return_5', 'std_return_15', 'std_return_90', 'std_return_90_2', 'act_factor1', 'act_factor2', 'act_factor3', 'act_factor4', 'act_factor5', 'act_factor6', 'rank_act_factor1', 'rank_act_factor2', 'rank_act_factor3', 'active_buy_volume_large', 'active_buy_volume_big', 'active_buy_volume_small', 'alpha_022', 'alpha_003', 'alpha_007', 'alpha_013']
num_features = [col for col in num_features if 'cat' not in col and 'industry' not in col]
num_features = [col for col in num_features if 'limit' not in col]
num_features = [col for col in num_features if 'cyq' not in col]
# 遍历所有数值型特征
for feature in num_features:
if feature == 'trade_date': # 不需要对 'trade_date' 计算偏差
continue
# grouped_mean = df.groupby(['trade_date'])[feature].transform('mean')
# deviation_col_name = f'deviation_mean_{feature}'
# new_columns[deviation_col_name] = df[feature] - grouped_mean
# ret_feature_columns.append(deviation_col_name)
grouped_mean = df.groupby(['trade_date', groupby_col])[feature].transform('mean')
deviation_col_name = f'deviation_mean_{feature}'
new_columns[deviation_col_name] = df[feature] - grouped_mean
ret_feature_columns.append(deviation_col_name)
# 将新计算的偏差特征与原始 DataFrame 合并
df = pd.concat([df, pd.DataFrame(new_columns)], axis=1)
# for feature in ['obv', 'return_20', 'act_factor1', 'act_factor2', 'act_factor3', 'act_factor4']:
# df[f'deviation_industry_{feature}'] = df[feature] - df[f'industry_{feature}']
return df, ret_feature_columns
# In[10]:
import pandas as pd
from scipy.stats import ks_2samp, wasserstein_distance
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
def remove_shifted_features(train_data, feature_columns, ks_threshold=0.05, wasserstein_threshold=0.1, size=0.8):
dropped_features = []
all_dates = train_data['trade_date'].unique() # 获取所有唯一的 trade_date
split_date = all_dates[int(len(all_dates) * size)] # 划分点为倒数第 validation_days 天
train_data_split = train_data[train_data['trade_date'] < split_date] # 训练集
val_data_split = train_data[train_data['trade_date'] >= split_date] # 验证集
# **统计数据漂移**
numeric_columns = train_data_split.select_dtypes(include=['float64', 'int64']).columns
numeric_columns = [col for col in numeric_columns if col in feature_columns]
for feature in numeric_columns:
ks_stat, p_value = ks_2samp(train_data_split[feature], val_data_split[feature])
wasserstein_dist = wasserstein_distance(train_data_split[feature], val_data_split[feature])
if p_value < ks_threshold or wasserstein_dist > wasserstein_threshold:
dropped_features.append(feature)
print(f"检测到 {len(dropped_features)} 个可能漂移的特征: {dropped_features}")
# **应用阈值进行最终筛选**
filtered_features = [f for f in feature_columns if f not in dropped_features]
return filtered_features, dropped_features
def remove_outliers_label_percentile(label: pd.Series, lower_percentile: float = 0.01, upper_percentile: float = 0.99,
log=True):
if not (0 <= lower_percentile < upper_percentile <= 1):
raise ValueError("Percentile values must satisfy 0 <= lower_percentile < upper_percentile <= 1.")
# Calculate lower and upper bounds based on percentiles
lower_bound = label.quantile(lower_percentile)
upper_bound = label.quantile(upper_percentile)
# Filter out values outside the bounds
filtered_label = label[(label >= lower_bound) & (label <= upper_bound)]
# Print the number of removed outliers
if log:
print(f"Removed {len(label) - len(filtered_label)} outliers.")
return filtered_label
def calculate_risk_adjusted_target(df, days=5):
df = df.sort_values(by=['ts_code', 'trade_date'])
df['future_close'] = df.groupby('ts_code')['close'].shift(-days)
df['future_open'] = df.groupby('ts_code')['open'].shift(-1)
df['future_return'] = (df['future_close'] - df['future_open']) / df['future_open']
df['future_volatility'] = df.groupby('ts_code')['future_return'].rolling(days, min_periods=1).std().reset_index(
level=0, drop=True)
sharpe_ratio = df['future_return'] * df['future_volatility']
sharpe_ratio.replace([np.inf, -np.inf], np.nan, inplace=True)
return sharpe_ratio
def calculate_score(df, days=5, lambda_param=1.0):
def calculate_max_drawdown(prices):
peak = prices.iloc[0] # 初始化峰值
max_drawdown = 0 # 初始化最大回撤
for price in prices:
if price > peak:
peak = price # 更新峰值
else:
drawdown = (peak - price) / peak # 计算当前回撤
max_drawdown = max(max_drawdown, drawdown) # 更新最大回撤
return max_drawdown
def compute_stock_score(stock_df):
stock_df = stock_df.sort_values(by=['trade_date'])
future_return = stock_df['future_return']
# 使用已有的 pct_chg 字段计算波动率
volatility = stock_df['pct_chg'].rolling(days).std().shift(-days)
max_drawdown = stock_df['close'].rolling(days).apply(calculate_max_drawdown, raw=False).shift(-days)
score = future_return - lambda_param * max_drawdown
return score
# # 确保 DataFrame 按照股票代码和交易日期排序
# df = df.sort_values(by=['ts_code', 'trade_date'])
# 对每个股票分别计算 score
df['score'] = df.groupby('ts_code').apply(compute_stock_score).reset_index(level=0, drop=True)
return df['score']
def remove_highly_correlated_features(df, feature_columns, threshold=0.9):
numeric_features = df[feature_columns].select_dtypes(include=[np.number]).columns.tolist()
if not numeric_features:
raise ValueError("No numeric features found in the provided data.")
corr_matrix = df[numeric_features].corr().abs()
upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
remaining_features = [col for col in feature_columns if col not in to_drop
or 'act' in col or 'af' in col]
return remaining_features
import pandas as pd
from sklearn.preprocessing import StandardScaler
def cross_sectional_standardization(df, features):
df_sorted = df.sort_values(by='trade_date') # 按时间排序
df_standardized = df_sorted.copy()
for date in df_sorted['trade_date'].unique():
# 获取当前时间点的数据
current_data = df_standardized[df_standardized['trade_date'] == date]
# 只对指定特征进行标准化
scaler = StandardScaler()
standardized_values = scaler.fit_transform(current_data[features])
# 将标准化结果重新赋值回去
df_standardized.loc[df_standardized['trade_date'] == date, features] = standardized_values
return df_standardized
import numpy as np
import pandas as pd
import statsmodels.api as sm
from concurrent.futures import ProcessPoolExecutor
def neutralize_manual(df, features, industry_col, mkt_cap_col):
""" 手动实现简单回归以提升速度 """
for col in features:
residuals = []
for _, group in df.groupby(industry_col):
if len(group) > 1:
x = np.log(group[mkt_cap_col]) # 市值对数
y = group[col] # 因子值
beta = np.cov(y, x)[0, 1] / np.var(x) # 计算斜率
alpha = np.mean(y) - beta * np.mean(x) # 计算截距
resid = y - (alpha + beta * x) # 计算残差
residuals.extend(resid)
else:
residuals.extend(group[col]) # 样本不足时保留原值
df[col] = residuals
return df
import gc
gc.collect()
def mad_filter(df, features, n=3):
for col in features:
median = df[col].median()
mad = np.median(np.abs(df[col] - median))
upper = median + n * mad
lower = median - n * mad
df[col] = np.clip(df[col], lower, upper) # 截断极值
return df
def percentile_filter(df, features, lower_percentile=0.01, upper_percentile=0.99):
for col in features:
# 按日期分组计算上下百分位数
lower_bound = df.groupby('trade_date')[col].transform(
lambda x: x.quantile(lower_percentile)
)
upper_bound = df.groupby('trade_date')[col].transform(
lambda x: x.quantile(upper_percentile)
)
# 截断超出范围的值
df[col] = np.clip(df[col], lower_bound, upper_bound)
return df
from scipy.stats import iqr
def iqr_filter(df, features):
for col in features:
df[col] = df.groupby('trade_date')[col].transform(
lambda x: (x - x.median()) / iqr(x) if iqr(x) != 0 else x
)
return df
def quantile_filter(df, features, lower_quantile=0.01, upper_quantile=0.99, window=60):
df = df.copy()
for col in features:
# 计算 rolling 统计量,需要按日期进行 groupby
rolling_lower = df.groupby('trade_date')[col].transform(
lambda x: x.rolling(window=min(len(x), window)).quantile(lower_quantile))
rolling_upper = df.groupby('trade_date')[col].transform(
lambda x: x.rolling(window=min(len(x), window)).quantile(upper_quantile))
# 对数据进行裁剪
df[col] = np.clip(df[col], rolling_lower, rolling_upper)
return df
# In[11]:
# print(test_data.head()[['act_factor1', 'act_factor2', 'ts_code', 'trade_date']])
# In[12]:
from sklearn.preprocessing import StandardScaler
import lightgbm as lgb
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
def train_light_model(train_data_df, params, feature_columns, callbacks, evals,
print_feature_importance=True, num_boost_round=100,
validation_days=180, use_pca=False, split_date=None): # 新增参数validation_days
# 确保数据按时间排序
train_data_df = train_data_df.sort_values(by='trade_date')
numeric_columns = train_data_df.select_dtypes(include=['float64', 'int64']).columns
numeric_columns = [col for col in numeric_columns if col in feature_columns]
# X_train.loc[:, numeric_columns] = scaler.fit_transform(X_train[numeric_columns])
# X_val.loc[:, numeric_columns] = scaler.transform(X_val[numeric_columns])
# train_data_df = cross_sectional_standardization(train_data_df, numeric_columns)
# 去除标签为空的样本
train_data_df = train_data_df.dropna(subset=['label'])
print('原始训练集大小: ', len(train_data_df))
# 按时间顺序划分训练集和验证集
if split_date is None:
all_dates = train_data_df['trade_date'].unique() # 获取所有唯一的 trade_date
split_date = all_dates[-validation_days] # 划分点为倒数第 validation_days 天
train_data_split = train_data_df[train_data_df['trade_date'] < split_date] # 训练集
val_data_split = train_data_df[train_data_df['trade_date'] >= split_date] # 验证集
# 打印划分结果
print(f"划分后的训练集大小: {len(train_data_split)}, 验证集大小: {len(val_data_split)}")
# 提取特征和标签
X_train = train_data_split[feature_columns]
y_train = train_data_split['label']
X_val = val_data_split[feature_columns]
y_val = val_data_split['label']
# 标准化数值特征
scaler = StandardScaler()
# 计算每个 trade_date 内的样本数LTR 需要 group 信息)
train_groups = train_data_split.groupby('trade_date').size().tolist()
val_groups = val_data_split.groupby('trade_date').size().tolist()
# 处理类别特征
categorical_feature = [col for col in feature_columns if 'cat' in col]
pca = None
if use_pca:
pca = PCA(n_components=0.95) # 或指定 n_components=固定值(如 10
numeric_features = [col for col in feature_columns if col not in categorical_feature]
numeric_pca = pca.fit_transform(X_train[numeric_features])
X_train = pd.concat([pd.DataFrame(numeric_pca, index=X_train.index), X_train[categorical_feature]], axis=1)
numeric_pca = pca.transform(X_val[numeric_features])
X_val = pd.concat([pd.DataFrame(numeric_pca, index=X_val.index), X_val[categorical_feature]], axis=1)
# 计算权重(基于时间)
# trade_date = train_data_split['trade_date'] # 交易日期
# weights = (trade_date - trade_date.min()).dt.days / (trade_date.max() - trade_date.min()).days + 1
# weights = train_data_split.groupby('trade_date')['std_return_5'].transform(
# lambda x: x / x.mean()
# )
ud = sorted(train_data_split["trade_date"].unique().tolist())
date_weights = {date: weight * weight for date, weight in zip(ud, np.linspace(1, 10, len(ud)))}
params['weight'] = train_data_split["trade_date"].map(date_weights).tolist()
train_dataset = lgb.Dataset(
X_train, label=y_train, group=train_groups,
categorical_feature=categorical_feature
)
# weights = val_data_split.groupby('trade_date')['std_return_5'].transform(
# lambda x: x / x.mean()
# )
val_dataset = lgb.Dataset(
X_val, label=y_val, group=val_groups,
categorical_feature=categorical_feature
)
# 训练模型
model = lgb.train(
params, train_dataset, num_boost_round=num_boost_round,
valid_sets=[train_dataset, val_dataset], valid_names=['train', 'valid'],
callbacks=callbacks
)
# 打印特征重要性(如果需要)
if print_feature_importance:
lgb.plot_metric(evals)
lgb.plot_importance(model, importance_type='split', max_num_features=20)
plt.show()
return model, scaler, pca
# In[13]:
days = 2
df = df.sort_values(by=['ts_code', 'trade_date'])
# df['future_return'] = df.groupby('ts_code', group_keys=False)['close'].apply(lambda x: x.shift(-days) / x - 1)
df['future_return'] = (df.groupby('ts_code')['close'].shift(-days) - df.groupby('ts_code')['open'].shift(-1)) / \
df.groupby('ts_code')['open'].shift(-1)
df['future_volatility'] = (
df.groupby('ts_code')['pct_chg']
.transform(lambda x: x.rolling(days).std().shift(-days))
)
df['future_score'] = calculate_score(df, days=2, lambda_param=0.3)
df['label'] = df.groupby('trade_date', group_keys=False)['future_score'].transform(
lambda x: pd.qcut(x, q=20, labels=False, duplicates='drop')
)
# df['future_score'] = (
# 0.7 * df['future_return']
# * 0.3 * df['future_volatility']
# )
# In[30]:
def select_pre_zt_stocks_dynamic(
stock_df,
):
stock_df = stock_df.groupby('trade_date', group_keys=False).apply(
lambda x: x.nlargest(1000, 'return_20')
)
return stock_df
pdf = select_pre_zt_stocks_dynamic(df)
filter_index = pdf['future_return'].between(pdf['future_return'].quantile(0.01), pdf['future_return'].quantile(0.99))
# filter_index = pdf['future_volatility'].between(pdf['future_volatility'].quantile(0.01),
# pdf['future_volatility'].quantile(0.99)) | filter_index
# In[ ]:
pdf = pdf.merge(industry_df, on=['cat_l2_code', 'trade_date'], how='left')
pdf = pdf.sort_values(['trade_date'])
pdf = pdf.replace([np.inf, -np.inf], np.nan)
feature_columns = [col for col in pdf.columns if col in pdf.columns]
feature_columns = [col for col in feature_columns if col not in ['trade_date',
'ts_code',
'label']]
feature_columns = [col for col in feature_columns if 'future' not in col]
feature_columns = [col for col in feature_columns if 'label' not in col]
feature_columns = [col for col in feature_columns if 'score' not in col]
feature_columns = [col for col in feature_columns if 'gen' not in col]
feature_columns = [col for col in feature_columns if 'cat_l2_code' not in col]
feature_columns = [col for col in feature_columns if col not in origin_columns]
feature_columns = [col for col in feature_columns if not col.startswith('_')]
numeric_columns = pdf.select_dtypes(include=['float64', 'int64']).columns
numeric_columns = [col for col in numeric_columns if col in feature_columns]
# feature_columns, _ = remove_shifted_features(pdf, feature_columns, size=0.8)
pdf = quantile_filter(pdf, numeric_columns)
pdf = cross_sectional_standardization(pdf, numeric_columns)
# print('去极值')
# train_data = quantile_filter(train_data, numeric_columns) # 去极值
# # print('中性化')
# # train_data = neutralize_manual(train_data, numeric_columns, industry_col='cat_l2_code', mkt_cap_col='log(circ_mv)') # 中性化
# print('去极值')
# test_data = quantile_filter(test_data, numeric_columns) # 去极值
feature_columns = remove_highly_correlated_features(pdf,
feature_columns)
print(len(pdf))
# In[123]:
# print('train data size: ', len(train_data))
label_gain = list(range(len(df['label'].unique())))
label_gain = [gain * gain for gain in label_gain]
light_params = {
'label_gain': label_gain,
'objective': 'lambdarank',
'metric': 'ndcg',
'learning_rate': 0.03,
'num_leaves': 32,
# 'min_data_in_leaf': 128,
'max_depth': 8,
'max_bin': 32,
'feature_fraction': 0.7,
# 'bagging_fraction': 0.7,
'bagging_freq': 5,
'lambda_l1': 0.1,
'lambda_l2': 0.1,
'boosting': 'gbdt',
'verbosity': -1,
'extra_trees': True,
'max_position': 5,
'ndcg_at': 1,
'quant_train_renew_leaf': True,
'lambdarank_truncation_level': 3,
# 'lambdarank_position_bias_regularization': 1,
'seed': 7
}
evals = {}
gc.collect()
# In[128]:
gc.collect()
def rolling_train_predict(df, train_days, test_days, feature_columns_origin, days=5, use_pca=False, validation_days=60,
filter_index=None):
# 1. 按照交易日期排序
unique_dates = df[df['trade_date'] >= '2020-01-01']['trade_date'].unique().tolist()
unique_dates = sorted(unique_dates)
n = len(unique_dates)
# 2. 计算需要跳过的天数,使后续窗口对齐
extra_days = (n - train_days) % test_days
start_index = extra_days # 从此索引开始滚动
predictions_list = []
for start in range(start_index, n - train_days - test_days + 1, test_days):
train_dates = unique_dates[start: start + train_days]
test_dates = unique_dates[start + train_days: start + train_days + test_days]
# 根据日期筛选数据
train_data = df[filter_index & df['trade_date'].isin(train_dates)]
test_data = df[df['trade_date'].isin(test_dates)]
train_data = train_data.sort_values('trade_date')
test_data = test_data.sort_values('trade_date')
# feature_columns, _ = remove_shifted_features(train_data, feature_columns_origin, size=0.8)
train_data = train_data.dropna(subset=feature_columns)
train_data = train_data.dropna(subset=['label'])
train_data = train_data.reset_index(drop=True)
# print(test_data.tail())
test_data = test_data.dropna(subset=feature_columns)
# test_data = test_data.dropna(subset=['label'])
test_data = test_data.reset_index(drop=True)
# print(len(train_data))
print(f"最小日期: {train_data['trade_date'].min().strftime('%Y-%m-%d')}")
print(f"最大日期: {train_data['trade_date'].max().strftime('%Y-%m-%d')}")
# print(len(test_data))
print(f"最小日期: {test_data['trade_date'].min().strftime('%Y-%m-%d')}")
print(f"最大日期: {test_data['trade_date'].max().strftime('%Y-%m-%d')}")
cat_columns = [col for col in df.columns if col.startswith('cat')]
for col in cat_columns:
train_data[col] = train_data[col].astype('category')
test_data[col] = test_data[col].astype('category')
label_gain = list(range(len(train_data['label'].unique())))
label_gain = [(gain + 1) * (gain + 1) for gain in label_gain]
light_params['label_gain'] = label_gain
# ud = train_data["trade_date"].unique()
# date_weights = {date: weight for date, weight in zip(ud, np.linspace(1, 2, len(unique_dates)))}
# light_params['weight'] = train_data["trade_date"].map(date_weights).tolist()
# print(f'feature_columns: {feature_columns}')
# feature_contri = [2 if feat.startswith('act_factor') else 1 for feat in feature_columns]
# light_params['feature_contri'] = feature_contri
model, _, _ = train_light_model(train_data.dropna(subset=['label']),
light_params, feature_columns,
[lgb.log_evaluation(period=100),
lgb.callback.record_evaluation(evals),
lgb.early_stopping(100, first_metric_only=True)
], evals,
num_boost_round=3000, validation_days=validation_days,
print_feature_importance=False, use_pca=False)
score_df = test_data.copy()
score_df['score'] = model.predict(score_df[feature_columns])
score_df = score_df.loc[score_df.groupby('trade_date')['score'].idxmax()]
score_df = score_df[['trade_date', 'score', 'ts_code']]
predictions_list.append(score_df)
# m = 5
# all_data = []
# for i, trade_date in enumerate(sorted(score_df['trade_date'].unique().tolist())):
# # 提取当前日期的数据
# current_data = score_df[score_df['trade_date'] == trade_date]
# all_data.append(current_data)
#
# numeric_columns = [col for col in feature_columns if col in current_data.select_dtypes(include=['float64', 'int64']).columns]
# current_data = cross_sectional_standardization(current_data, numeric_columns)
# current_data['score'] = model.predict(current_data[feature_columns])
# daily_top_score = current_data.loc[[current_data['score'].idxmax()]]
# predictions_list.append(daily_top_score[['trade_date', 'score', 'ts_code']])
#
# if i % m == 0:
# train_data_split = pd.concat(all_data)
# train_data_split = train_data_split.dropna(subset=['label'])
#
# X_train = train_data_split[feature_columns]
# y_train = train_data_split['label']
#
# train_groups = train_data_split.groupby('trade_date').size().tolist()
# categorical_feature = [col for col in feature_columns if 'cat' in col]
#
# train_dataset = lgb.Dataset(
# X_train, label=y_train, group=train_groups,
# categorical_feature=categorical_feature
# )
#
# model = lgb.train(
# light_params, train_dataset, num_boost_round=36,
# init_model=model
# )
# all_data = []
final_predictions = pd.concat(predictions_list, ignore_index=True)
return final_predictions
# In[129]:
gc.collect()
print(df[df['ts_code'] == '000001.SZ'].tail(1)[['act_factor1', 'act_factor2']])
print('finish')
# qdf = qdf[qdf['trade_date'] >= '2022-01-01']
final_predictions = rolling_train_predict(pdf[pdf['trade_date'] >= '2020-01-01'], 500, 20, feature_columns,
days=days, validation_days=60, filter_index=filter_index)
final_predictions.to_csv('predictions_test.tsv', index=False)
# In[126]:
print(df[df['ts_code'] == '000001.SZ'].tail(1)[['act_factor1', 'act_factor2']])
print('finish')
# In[29]:
train_data = pdf[filter_index & (pdf['trade_date'] == '2023-01-03')]
train_data = train_data.dropna(subset=['label'])
train_data = train_data.reset_index(drop=True)
print(len(train_data))
# In[34]:
# filter_index = pdf['future_return'].between(pdf['future_return'].quantile(0.01), pdf['future_return'].quantile(0.99))
train_data = pdf[filter_index & (pdf['trade_date'] == '2023-01-03')]
print(len(train_data))