Files
NewStock/main/factor/factor.py
2025-05-04 22:00:05 +08:00

2550 lines
98 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
import talib
def get_rolling_factor(df):
old_columns = df.columns.tolist()[:]
# 按股票和日期排序(如果尚未排序)
df = df.sort_values(by=["ts_code", "trade_date"])
grouped = df.groupby("ts_code", group_keys=False)
epsilon = 1e-8
df["lg_elg_net_buy_vol"] = (
df["buy_lg_vol"] + df["buy_elg_vol"] - df["sell_lg_vol"] - df["sell_elg_vol"]
)
# 检查 'volume' 列是否存在且有效
df["flow_lg_elg_intensity"] = df["lg_elg_net_buy_vol"] / (df["vol"] + epsilon)
# 2. 散户与主力背离度 (Retail vs Institutional Divergence)
# 衡量小单净流入与(大单+超大单)净流入的差异或比率
df["sm_net_buy_vol"] = df["buy_sm_vol"] - df["sell_sm_vol"]
df["flow_divergence_diff"] = df["sm_net_buy_vol"] - df["lg_elg_net_buy_vol"]
# 比率形式可能更稳定
df["flow_divergence_ratio"] = df["sm_net_buy_vol"] / (
df["lg_elg_net_buy_vol"] + np.sign(df["lg_elg_net_buy_vol"]) * epsilon + epsilon
) # 复杂处理避免0/0
# 3. 资金流结构变动 (Flow Structure Change - Relative Strength of Large Flow)
# 大单+超大单买入额占总买入额的比例的变化
df["total_buy_vol"] = df["buy_sm_vol"] + df["buy_lg_vol"] + df["buy_elg_vol"]
df["lg_elg_buy_prop"] = (df["buy_lg_vol"] + df["buy_elg_vol"]) / (
df["total_buy_vol"] + epsilon
)
df["flow_struct_buy_change"] = grouped["lg_elg_buy_prop"].diff(1) # 1日变化
# 4. 资金流加速度 (Flow Acceleration)
# 净主力资金流的变化率(二阶导)
df["lg_elg_net_buy_vol_change"] = grouped["lg_elg_net_buy_vol"].diff(1)
df["flow_lg_elg_accel"] = grouped["lg_elg_net_buy_vol_change"].diff(1)
# # 5. 极端资金流事件 (Categorical: Extreme Flow Event)
# # 定义主力资金流强度是否处于其历史极端水平例如过去N天的90分位数以上或10分位数以下
# rolling_window = 20 # 可调整窗口期
# # Step 1: Calculate the rolling quantiles separately
# rolling_high = grouped['flow_lg_elg_intensity'].rolling(rolling_window, min_periods=1).quantile(0.9) # min_periods=1 保证窗口未满时也有输出
# rolling_low = grouped['flow_lg_elg_intensity'].rolling(rolling_window, min_periods=1).quantile(0.1)
# # Step 2: Assign the results to the DataFrame
# # 确保 df 和 rolling_high/low 的索引是一致的
# # 如果 df 的索引在此期间没有被修改过,这通常是安全的
# df['flow_lg_elg_intensity_rolling_high'] = rolling_high
# df['flow_lg_elg_intensity_rolling_low'] = rolling_low
# # Step 3: Continue with the logic using the new columns
# conditions_flow = [
# df['flow_lg_elg_intensity'] > df['flow_lg_elg_intensity_rolling_high'],
# df['flow_lg_elg_intensity'] < df['flow_lg_elg_intensity_rolling_low']
# ]
# choices_flow = [1, -1] # 1: 极端流入, -1: 极端流出
# df['cat_extreme_flow'] = np.select(conditions_flow, choices_flow, default=0)
# --- 筹码分布因子 ---
# 6. 筹码集中度 (Chip Concentration)
# 衡量筹码分布的紧密程度,例如 95% 与 5% 成本价的差距,相对于当前价格进行标准化
# 检查 'close' 列是否存在且有效
df["chip_concentration_range"] = (df["cost_95pct"] - df["cost_5pct"]) / (
df["close"] + epsilon
)
# 7. 筹码分布偏度 (Chip Distribution Skewness Proxy)
# 比较中位数成本 (cost_50pct) 和加权平均成本 (weight_avg)
# weight_avg > cost_50pct 暗示高成本区有较多筹码(右偏)
df["chip_skewness"] = (df["weight_avg"] - df["cost_50pct"]) / (
df["cost_50pct"] + epsilon
)
# 8. 浮筹比例 (Floating Chips Proxy)
# 衡量短期内例如15%成本线以下)的筹码比例与总获利盘比例的关系
# winner_rate 高但 cost_15pct 接近当前价,可能意味着大部分获利盘成本不高,易浮动
# 这里简化为:获利盘比例 与 (当前价-15%成本价)/当前价 的乘积
price_dist_cost15 = (df["close"] - df["cost_15pct"]) / (df["close"] + epsilon)
df["floating_chip_proxy"] = df["winner_rate"] * np.maximum(
0, price_dist_cost15
) # 只考虑价格高于15%成本线的情况
# 9. 成本支撑强度变化 (Cost Support Strength Change)
# 观察低位筹码成本(如 5% 或 15% 分位点)的变化率,看支撑位是上移还是下移
df["cost_support_15pct_change"] = (
grouped["cost_15pct"].pct_change(1) * 100
) # 百分比变化
# 10. 获利盘压力/支撑区 (Categorical: Winner Rate Zone & Price Position)
# 结合获利盘比例和当前价格相对于筹码成本的位置
# 例如: 价格在 85% 成本线之上 & 获利盘 > 0.8 -> 高位派发风险区?
# 价格在 15% 成本线之下 & 获利盘 < 0.2 -> 低位吸筹潜力区?
conditions_winner = [
(df["close"] > df["cost_85pct"]) & (df["winner_rate"] > 0.8), # 高位 & 高获利盘
(df["close"] < df["cost_15pct"]) & (df["winner_rate"] < 0.2), # 低位 & 低获利盘
(df["close"] > df["cost_50pct"])
& (df["winner_rate"] > 0.5), # 中高位 & 多数获利
(df["close"] < df["cost_50pct"])
& (df["winner_rate"] < 0.5), # 中低位 & 多数亏损
]
choices_winner = [1, 2, 3, 4] # 1:高风险区, 2:低潜力区, 3:中上获利区, 4:中下亏损区
df["cat_winner_price_zone"] = np.select(
conditions_winner, choices_winner, default=0
) # 0: 其他
# --- 结合因子 ---
# 11. 主力行为与筹码结构一致性 (Flow-Chip Consistency)
# 例如:主力净买入发生在价格接近下方筹码密集区(如 cost_15pct 到 cost_50pct
price_near_low_support = (df["close"] > df["cost_15pct"]) & (
df["close"] < df["cost_50pct"]
)
df["flow_chip_consistency"] = df[
"lg_elg_net_buy_vol"
] * price_near_low_support.astype(int)
# 可以进一步标准化或做成 categorical
# 12. 获利了结压力/承接盘强度 (Profit-Taking Pressure vs Absorption)
# 在高获利盘(winner_rate > 0.7)的情况下,观察主力资金是净流出(了结)还是净流入(高位换手/承接)
high_winner_rate_flag = (df["winner_rate"] > 0.7).astype(int)
df["profit_taking_vs_absorb"] = df["lg_elg_net_buy_vol"] * high_winner_rate_flag
# 正值表示高获利盘下主力仍在买入(承接),负值表示主力在卖出(了结)
# 清理临时列和可能产生的 NaN (可选,根据需要处理)
cols_to_drop = [
"lg_elg_net_buy_vol",
"sm_net_buy_vol",
"total_buy_vol",
"lg_elg_buy_prop",
"lg_elg_net_buy_vol_change",
"flow_lg_elg_intensity_rolling_high",
"flow_lg_elg_intensity_rolling_low",
]
# df = df.drop(columns=cols_to_drop)
window = 20
df["_is_positive"] = (df["pct_chg"] > 0).astype(int)
df["_is_negative"] = (df["pct_chg"] < 0).astype(int)
df["cat_is_positive"] = (df["pct_chg"] > 0).astype(int)
# 分离正负收益率 (用于计算各自的均值和平方均值)
# 注意:这里我们保留原始收益率用于计算,而不是 clip 到 0
df["_pos_returns"] = df["pct_chg"].where(
df["pct_chg"] > 0, 0
) # 非正设为0便于求和
df["_neg_returns"] = df["pct_chg"].where(
df["pct_chg"] < 0, 0
) # 非负设为0便于求和
# 计算收益率的平方 (用于计算 E[X^2])
df["_pos_returns_sq"] = np.square(df["_pos_returns"])
df["_neg_returns_sq"] = np.square(df["_neg_returns"]) # 平方后负数变正
# 4. 计算滚动统计量 (使用内置函数,速度较快)
# 计算正收益日的统计量
rolling_pos_count = (
grouped["_is_positive"].rolling(window, min_periods=max(1, window // 2)).sum()
)
rolling_pos_sum = (
grouped["_pos_returns"].rolling(window, min_periods=max(1, window // 2)).sum()
)
rolling_pos_sum_sq = (
grouped["_pos_returns_sq"]
.rolling(window, min_periods=max(1, window // 2))
.sum()
)
# 计算负收益日的统计量
rolling_neg_count = (
grouped["_is_negative"].rolling(window, min_periods=max(1, window // 2)).sum()
)
rolling_neg_sum = (
grouped["_neg_returns"].rolling(window, min_periods=max(1, window // 2)).sum()
)
rolling_neg_sum_sq = (
grouped["_neg_returns_sq"]
.rolling(window, min_periods=max(1, window // 2))
.sum()
)
# 5. 计算方差和标准差
pos_mean_sq = rolling_pos_sum_sq / rolling_pos_count
pos_mean = rolling_pos_sum / rolling_pos_count
pos_var = pos_mean_sq - np.square(pos_mean)
pos_var = pos_var.where(rolling_pos_count >= 2, np.nan).clip(lower=0)
upside_vol = np.sqrt(pos_var)
neg_mean_sq = rolling_neg_sum_sq / rolling_neg_count
neg_mean = rolling_neg_sum / rolling_neg_count # 注意 neg_mean 是负数
neg_var = neg_mean_sq - np.square(neg_mean)
neg_var = neg_var.where(rolling_neg_count >= 2, np.nan).clip(lower=0)
downside_vol = np.sqrt(neg_var)
# rolling 操作后结果带有 MultiIndex需要去除股票代码层级以便合并
df["upside_vol"] = upside_vol.reset_index(level=0, drop=True)
df["downside_vol"] = downside_vol.reset_index(level=0, drop=True)
df["vol_ratio"] = df["upside_vol"] / df["downside_vol"]
df["vol_ratio"] = (
df["vol_ratio"].replace([np.inf, -np.inf], np.nan).fillna(0)
) # 或 fillna(np.nan)
df["return_skew"] = (
grouped["pct_chg"].rolling(window=5).skew().reset_index(0, drop=True)
)
df["return_kurtosis"] = (
grouped["pct_chg"].rolling(window=5).kurt().reset_index(0, drop=True)
)
# 因子 1短期成交量变化率
df["volume_change_rate"] = (
grouped["vol"].rolling(window=2).mean()
/ grouped["vol"].rolling(window=10).mean()
- 1
).reset_index(
level=0, drop=True
) # 确保索引对齐
# 因子 2成交量突破信号
max_volume = (
grouped["vol"].rolling(window=5).max().reset_index(level=0, drop=True)
) # 确保索引对齐
df["cat_volume_breakout"] = df["vol"] > max_volume
# 因子 3换手率均线偏离度
mean_turnover = (
grouped["turnover_rate"]
.rolling(window=3)
.mean()
.reset_index(level=0, drop=True)
)
std_turnover = (
grouped["turnover_rate"].rolling(window=3).std().reset_index(level=0, drop=True)
)
df["turnover_deviation"] = (df["turnover_rate"] - mean_turnover) / std_turnover
# 因子 4换手率激增信号
df["cat_turnover_spike"] = df["turnover_rate"] > mean_turnover + 2 * std_turnover
# 因子 5量比均值
df["avg_volume_ratio"] = (
grouped["volume_ratio"].rolling(window=3).mean().reset_index(level=0, drop=True)
)
# 因子 6量比突破信号
max_volume_ratio = (
grouped["volume_ratio"].rolling(window=5).max().reset_index(level=0, drop=True)
)
df["cat_volume_ratio_breakout"] = df["volume_ratio"] > max_volume_ratio
df["vol_spike"] = grouped.apply(
lambda x: pd.Series(x["vol"].rolling(20).mean(), index=x.index)
)
df["vol_std_5"] = grouped["vol"].pct_change().rolling(window=5).std()
# 计算 ATR
df["atr_14"] = grouped.apply(
lambda x: pd.Series(
talib.ATR(
x["high"].values, x["low"].values, x["close"].values, timeperiod=14
),
index=x.index,
)
)
df["atr_6"] = grouped.apply(
lambda x: pd.Series(
talib.ATR(
x["high"].values, x["low"].values, x["close"].values, timeperiod=6
),
index=x.index,
)
)
# 计算 OBV 及其均线
df["obv"] = grouped.apply(
lambda x: pd.Series(
talib.OBV(x["close"].values, x["vol"].values), index=x.index
)
)
print(df.columns)
df["maobv_6"] = grouped.apply(
lambda x: pd.Series(talib.SMA(x["obv"].values, timeperiod=6), index=x.index)
)
df["rsi_3"] = grouped.apply(
lambda x: pd.Series(talib.RSI(x["close"].values, timeperiod=3), index=x.index)
)
# df['rsi_6'] = grouped.apply(
# lambda x: pd.Series(talib.RSI(x['close'].values, timeperiod=6), index=x.index)
# )
# df['rsi_9'] = grouped.apply(
# lambda x: pd.Series(talib.RSI(x['close'].values, timeperiod=9), index=x.index)
# )
# 计算 return_10 和 return_20
df["return_5"] = grouped["close"].apply(lambda x: x / x.shift(5) - 1)
# df['return_10'] = grouped['close'].apply(lambda x: x / x.shift(10) - 1)
df["return_20"] = grouped["close"].apply(lambda x: x / x.shift(20) - 1)
# df['avg_close_5'] = grouped['close'].apply(lambda x: x.rolling(window=5).mean() / x)
# 计算标准差指标
df["std_return_5"] = grouped["close"].apply(
lambda x: x.pct_change().rolling(window=5).std()
)
# df['std_return_15'] = grouped['close'].apply(lambda x: x.pct_change().rolling(window=15).std())
# df['std_return_25'] = grouped['close'].apply(lambda x: x.pct_change().rolling(window=25).std())
df["std_return_90"] = grouped["close"].apply(
lambda x: x.pct_change().rolling(window=90).std()
)
df["std_return_90_2"] = grouped["close"].apply(
lambda x: x.shift(10).pct_change().rolling(window=90).std()
)
# 计算 EMA 指标
df["_ema_5"] = grouped["close"].apply(
lambda x: pd.Series(talib.EMA(x.values, timeperiod=5), index=x.index)
)
df["_ema_13"] = grouped["close"].apply(
lambda x: pd.Series(talib.EMA(x.values, timeperiod=13), index=x.index)
)
df["_ema_20"] = grouped["close"].apply(
lambda x: pd.Series(talib.EMA(x.values, timeperiod=20), index=x.index)
)
df["_ema_60"] = grouped["close"].apply(
lambda x: pd.Series(talib.EMA(x.values, timeperiod=60), index=x.index)
)
# 计算 act_factor1, act_factor2, act_factor3, act_factor4
df["act_factor1"] = grouped["_ema_5"].apply(
lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 50
)
df["act_factor2"] = grouped["_ema_13"].apply(
lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 40
)
df["act_factor3"] = grouped["_ema_20"].apply(
lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 21
)
df["act_factor4"] = grouped["_ema_60"].apply(
lambda x: np.arctan((x / x.shift(1) - 1) * 100) * 57.3 / 10
)
# 根据 trade_date 截面计算排名
df["rank_act_factor1"] = df.groupby("trade_date", group_keys=False)[
"act_factor1"
].rank(ascending=False, pct=True)
df["rank_act_factor2"] = df.groupby("trade_date", group_keys=False)[
"act_factor2"
].rank(ascending=False, pct=True)
df["rank_act_factor3"] = df.groupby("trade_date", group_keys=False)[
"act_factor3"
].rank(ascending=False, pct=True)
df["log_circ_mv"] = np.log(df["circ_mv"])
window_high_volume = 5
window_close_stddev = 20
period_delta = 5
# 计算每只股票的滚动协方差
def calculate_rolling_cov(group):
return group["high"].rolling(window_high_volume).cov(group["vol"])
df["cov"] = grouped.apply(calculate_rolling_cov)
# 计算每只股票的协方差差分
def calculate_delta_cov(group):
return group["cov"].diff(period_delta)
df["delta_cov"] = grouped.apply(calculate_delta_cov)
# 计算每只股票的滚动标准差
def calculate_stddev_close(group):
return group["close"].rolling(window_close_stddev).std()
df["_stddev_close"] = grouped.apply(calculate_stddev_close)
df["_rank_stddev"] = df.groupby("trade_date")["_stddev_close"].rank(pct=True)
df["alpha_22_improved"] = -1 * df["delta_cov"] * df["_rank_stddev"]
df["alpha_003"] = np.where(
df["high"] != df["low"],
(df["close"] - df["open"]) / (df["high"] - df["low"]),
0,
)
df["alpha_007"] = grouped.apply(lambda x: x["close"].rolling(5).corr(x["vol"]))
df["alpha_007"] = df.groupby("trade_date", group_keys=False)["alpha_007"].rank(
ascending=True, pct=True
)
df["alpha_013"] = grouped["close"].transform(
lambda x: x.rolling(5).sum() - x.rolling(20).sum()
)
df["alpha_013"] = df.groupby("trade_date", group_keys=False)["alpha_013"].rank(
ascending=True, pct=True
)
df["vol_break"] = np.where(
(df["close"] > df["cost_85pct"]) & (df["volume_ratio"] > 2), 1, 0
)
df["weight_roc5"] = grouped["weight_avg"].apply(lambda x: x.pct_change(5))
def rolling_corr(group):
roc_close = group["close"].pct_change()
roc_weight = group["weight_avg"].pct_change()
return roc_close.rolling(10).corr(roc_weight)
df["price_cost_divergence"] = grouped.apply(rolling_corr)
df["smallcap_concentration"] = (1 / df["log_circ_mv"]) * (
df["cost_85pct"] - df["cost_15pct"]
)
# 16. 筹码稳定性指数 (20日波动率)
df["weight_std20"] = grouped["weight_avg"].apply(lambda x: x.rolling(20).std())
df["cost_stability"] = df["weight_std20"] / grouped["weight_avg"].transform(
lambda x: x.rolling(20).mean()
)
# 17. 成本区间突破标记
df["high_cost_break_days"] = grouped.apply(
lambda g: g["close"].gt(g["cost_95pct"]).rolling(5).sum()
)
# 20. 筹码-流动性风险
df["liquidity_risk"] = (df["cost_95pct"] - df["cost_5pct"]) * (
1 / grouped["vol"].transform(lambda x: x.rolling(10).mean())
)
# 7. 市值波动率因子 (使用 grouped)
df["turnover_std"] = grouped["turnover_rate"].transform(
lambda x: x.rolling(window=20).std()
)
df["mv_volatility"] = grouped.apply(lambda x: x["turnover_std"] / x["log_circ_mv"])
# 8. 市值成长性因子
df["volume_growth"] = grouped["vol"].pct_change(periods=20)
df["mv_growth"] = df["volume_growth"] / df["log_circ_mv"]
df.drop(columns=["weight_std20"], inplace=True, errors="ignore")
df.drop(
columns=[
"_is_positive",
"_is_negative",
"_pos_returns",
"_neg_returns",
"_pos_returns_sq",
"_neg_returns_sq",
],
inplace=True,
errors="ignore",
)
new_columns = [col for col in df.columns.tolist()[:] if col not in old_columns]
return df, new_columns
def get_simple_factor(df):
old_columns = df.columns.tolist()[:]
df = df.sort_values(by=["ts_code", "trade_date"])
alpha = 0.5
df["momentum_factor"] = df["volume_change_rate"] + alpha * df["turnover_deviation"]
df["resonance_factor"] = df["volume_ratio"] * df["pct_chg"]
df["log_close"] = np.log(df["close"])
df["cat_vol_spike"] = df["vol"] > 2 * df["vol_spike"]
df["up"] = (df["high"] - df[["close", "open"]].max(axis=1)) / df["close"]
df["down"] = (df[["close", "open"]].min(axis=1) - df["low"]) / df["close"]
df["obv_maobv_6"] = df["obv"] - df["maobv_6"]
# 计算比值指标
df["std_return_5_over_std_return_90"] = df["std_return_5"] / df["std_return_90"]
# df['std_return_5 / std_return_25'] = df['std_return_5'] / df['std_return_25']
# 计算标准差差值
df["std_return_90_minus_std_return_90_2"] = (
df["std_return_90"] - df["std_return_90_2"]
)
# df['cat_af1'] = df['act_factor1'] > 0
df["cat_af2"] = df["act_factor2"] > df["act_factor1"]
df["cat_af3"] = df["act_factor3"] > df["act_factor2"]
df["cat_af4"] = df["act_factor4"] > df["act_factor3"]
# 计算 act_factor5 和 act_factor6
df["act_factor5"] = (
df["act_factor1"] + df["act_factor2"] + df["act_factor3"] + df["act_factor4"]
)
df["act_factor6"] = (df["act_factor1"] - df["act_factor2"]) / np.sqrt(
df["act_factor1"] ** 2 + df["act_factor2"] ** 2
)
df["active_buy_volume_large"] = df["buy_lg_vol"] / df["net_mf_vol"]
df["active_buy_volume_big"] = df["buy_elg_vol"] / df["net_mf_vol"]
df["active_buy_volume_small"] = df["buy_sm_vol"] / df["net_mf_vol"]
df["buy_lg_vol_minus_sell_lg_vol"] = (df["buy_lg_vol"] - df["sell_lg_vol"]) / df[
"net_mf_vol"
]
df["buy_elg_vol_minus_sell_elg_vol"] = (
df["buy_elg_vol"] - df["sell_elg_vol"]
) / df["net_mf_vol"]
df["log_circ_mv"] = np.log(df["circ_mv"])
df["ctrl_strength"] = (df["cost_85pct"] - df["cost_15pct"]) / (
df["his_high"] - df["his_low"]
)
df["low_cost_dev"] = (df["close"] - df["cost_5pct"]) / (
df["cost_50pct"] - df["cost_5pct"]
)
df["asymmetry"] = (df["cost_95pct"] - df["cost_50pct"]) / (
df["cost_50pct"] - df["cost_5pct"]
)
df["lock_factor"] = df["turnover_rate"] * (
1 - (df["cost_95pct"] - df["cost_5pct"]) / (df["his_high"] - df["his_low"])
)
df["cat_vol_break"] = (df["close"] > df["cost_85pct"]) & (df["volume_ratio"] > 2)
df["cost_atr_adj"] = (df["cost_95pct"] - df["cost_5pct"]) / df["atr_14"]
# 12. 小盘股筹码集中度
df["smallcap_concentration"] = (1 / df["log_circ_mv"]) * (
df["cost_85pct"] - df["cost_15pct"]
)
df["cat_golden_resonance"] = (
(df["close"] > df["weight_avg"])
& (df["volume_ratio"] > 1.5)
& (df["winner_rate"] > 0.7)
)
df["mv_turnover_ratio"] = df["turnover_rate"] / df["log_circ_mv"]
df["mv_adjusted_volume"] = df["vol"] / df["log_circ_mv"]
df["mv_weighted_turnover"] = df["turnover_rate"] * (1 / df["log_circ_mv"])
df["nonlinear_mv_volume"] = df["vol"] / df["log_circ_mv"]
df["mv_volume_ratio"] = df["volume_ratio"] / df["log_circ_mv"]
df["mv_momentum"] = df["turnover_rate"] * df["volume_ratio"] / df["log_circ_mv"]
drop_columns = [col for col in df.columns if col.startswith("_")]
df.drop(columns=drop_columns, inplace=True, errors="ignore")
new_columns = [col for col in df.columns.tolist()[:] if col not in old_columns]
return df, new_columns
import pandas as pd
import numpy as np
from scipy.stats import linregress # For factor 4 (if implementing slope directly)
# from hurst import compute_Hc # For factor 18, needs pip install hurst
# import statsmodels.api as sm # For factor 16, needs pip install statsmodels
# --- Constants ---
epsilon = 1e-10 # Prevent division by zero
# --- Helper Functions ---
def _safe_divide(a, b, default_val=0):
"""Safe division, returns default_val for division by zero or NaN/inf results."""
with np.errstate(divide="ignore", invalid="ignore"):
result = a / b
# Replace NaN, Inf, -Inf resulting from division or invalid ops
result[~np.isfinite(result)] = default_val
return result
# --- Factor Calculation Functions (In-Place Modification) ---
# Category 1: Large Player Intent & Behavior
def lg_flow_mom_corr(
df: pd.DataFrame, N: int = 20, M: int = 60, factor_name: str = None
):
"""
Calculates Factor 1: Large Flow & Price Momentum Concordance (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"lg_flow_mom_corr_{N}_{M}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_net_lg_flow_val", "_rolling_net_lg_flow", "_price_mom"]
try:
df["_net_lg_flow_val"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
) * df["close"]
df["_rolling_net_lg_flow"] = (
df.groupby("ts_code")["_net_lg_flow_val"]
.rolling(N, min_periods=max(1, N // 2))
.sum()
.reset_index(level=0, drop=True)
)
df["_price_mom"] = df.groupby("ts_code")["close"].pct_change(N)
# Calculate correlation on the temporary Series to handle alignment
factor_series = (
df["_rolling_net_lg_flow"]
.rolling(M, min_periods=max(1, M // 2))
.corr(df["_price_mom"])
)
df[factor_name] = factor_series
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan # Assign NaN on error
finally:
# Cleanup intermediate columns
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def lg_buy_consolidation(
df: pd.DataFrame, N: int = 20, vol_quantile: float = 0.2, factor_name: str = None
):
"""
Calculates Factor 2: Large Buying during Consolidation (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"lg_buy_consolidation_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = [
"_rolling_std",
"_net_lg_flow_ratio",
"_rolling_net_lg_flow_ratio_mean",
"_std_threshold",
]
try:
df["_rolling_std"] = (
df.groupby("ts_code")["close"]
.rolling(N, min_periods=max(1, N // 2))
.std()
.reset_index(level=0, drop=True)
)
df["_net_lg_flow_ratio"] = _safe_divide(
(
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
),
df["vol"],
)
df["_rolling_net_lg_flow_ratio_mean"] = (
df.groupby("ts_code")["_net_lg_flow_ratio"]
.rolling(N, min_periods=max(1, N // 2))
.mean()
.reset_index(level=0, drop=True)
)
df["_std_threshold"] = df.groupby("trade_date")["_rolling_std"].transform(
lambda x: x.quantile(vol_quantile)
)
df[factor_name] = df["_rolling_net_lg_flow_ratio_mean"].where(
df["_rolling_std"] < df["_std_threshold"]
)
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def lg_flow_accel(df: pd.DataFrame, factor_name: str = "lg_flow_accel"):
"""
Calculates Factor 3: Large Flow Acceleration (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_net_lg_flow_vol"]
try:
df["_net_lg_flow_vol"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
)
df[factor_name] = df.groupby("ts_code")["_net_lg_flow_vol"].diff(1).diff(1)
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def intraday_lg_flow_corr(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 4: (Approx) Intraday Trend & Large Flow Correlation (In-place).
NOTE: Direct rolling correlation between two rolling series is complex/slow in pandas.
This provides a placeholder or requires significant optimization/pre-calculation.
WARNING: Modifies df in-place. Placeholder implementation returns NaN.
"""
if factor_name is None:
factor_name = f"intraday_lg_flow_corr_{N}"
print(f"Calculating {factor_name} (Placeholder - complex implementation)...")
df[factor_name] = (
np.nan
) # Placeholder, see previous thought process for detailed logic needed
print(f"Finished {factor_name} (Placeholder).")
# Category 2: Cost Basis & PnL Status
def profit_pressure(df: pd.DataFrame, factor_name: str = "profit_pressure"):
"""
Calculates Factor 5: Profit Pressure Index (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_profit_margin_85", "_profit_margin_95"]
try:
df["_profit_margin_85"] = _safe_divide(df["close"], df["cost_85pct"]) - 1
df["_profit_margin_95"] = _safe_divide(df["close"], df["cost_95pct"]) - 1
df[factor_name] = (
df["winner_rate"]
* 0.5
* (df["_profit_margin_85"] + df["_profit_margin_95"])
)
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def underwater_resistance(df: pd.DataFrame, factor_name: str = "underwater_resistance"):
"""
Calculates Factor 6: Resistance from Underwater Positions (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_underwater_ratio", "_dist_to_cost_15"]
try:
df["_underwater_ratio"] = 1.0 - df["winner_rate"]
df["_dist_to_cost_15"] = np.maximum(0, df["cost_15pct"] - df["close"]) / (
df["close"] + epsilon
)
df[factor_name] = df["_underwater_ratio"] * df["_dist_to_cost_15"]
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cost_conc_std(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 7: Cost Concentration Change (Std Dev) (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"cost_conc_std_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_cost_range_norm"]
try:
df["_cost_range_norm"] = _safe_divide(
(df["cost_85pct"] - df["cost_15pct"]), (df["weight_avg"] + epsilon)
)
# Need to calculate rolling std on the temp col before dropping it
factor_series = (
df.groupby("ts_code")["_cost_range_norm"]
.rolling(N, min_periods=max(1, N // 2))
.std()
.reset_index(level=0, drop=True)
)
df[factor_name] = factor_series
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def profit_decay(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 8: Profit Expectation Decay (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"profit_decay_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_ret_N", "_winner_rate_change_N"]
try:
df["_ret_N"] = (
_safe_divide(df["close"], df.groupby("ts_code")["close"].shift(N)) - 1
)
df["_winner_rate_change_N"] = df.groupby("ts_code")["winner_rate"].diff(N)
df[factor_name] = _safe_divide(df["_ret_N"], df["_winner_rate_change_N"])
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
# Category 3: Volatility Source & Market State
def vol_amp_loss(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 9: Volatility Amplification when Underwater (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"vol_amp_loss_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_vol_N", "_loss_degree"]
try:
df["_vol_N"] = (
df.groupby("ts_code")["pct_chg"]
.rolling(N, min_periods=max(1, N // 2))
.std()
.reset_index(level=0, drop=True)
)
df["_loss_degree"] = np.maximum(0, df["weight_avg"] - df["close"]) / (
df["close"] + epsilon
)
df[factor_name] = df["_vol_N"] * df["_loss_degree"]
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def vol_drop_profit_cnt(
df: pd.DataFrame,
N: int = 20,
M: int = 5,
profit_thresh: float = 0.1,
drop_thresh: float = -0.03,
vol_multiple: float = 2.0,
factor_name: str = None,
):
"""
Calculates Factor 10: High Volume Drop when Profitable (Count over M days) (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"vol_drop_profit_cnt_{M}"
print(f"Calculating {factor_name}...")
_temp_cols = [
"_is_profitable",
"_is_dropping",
"_rolling_mean_vol",
"_rolling_std_vol",
"_is_high_vol",
"_event",
]
try:
df["_is_profitable"] = df["close"] > df["weight_avg"] * (1 + profit_thresh)
df["_is_dropping"] = df["pct_chg"] < drop_thresh
df["_rolling_mean_vol"] = (
df.groupby("ts_code")["vol"]
.rolling(N, min_periods=1)
.mean()
.reset_index(level=0, drop=True)
)
df["_rolling_std_vol"] = (
df.groupby("ts_code")["vol"]
.rolling(N, min_periods=2)
.std()
.reset_index(level=0, drop=True)
.fillna(0)
)
df["_is_high_vol"] = df["vol"] > (
df["_rolling_mean_vol"] + vol_multiple * df["_rolling_std_vol"]
)
df["_event"] = (
df["_is_profitable"] & df["_is_dropping"] & df["_is_high_vol"]
).astype(int)
factor_series = (
df.groupby("ts_code")["_event"]
.rolling(M, min_periods=1)
.sum()
.reset_index(level=0, drop=True)
)
df[factor_name] = factor_series
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def lg_flow_vol_interact(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 11: Large Flow Driven Volatility (Interaction Term) (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"lg_flow_vol_interact_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = [
"_vol_N",
"_net_lg_flow_val",
"_total_val",
"_abs_net_lg_flow_ratio",
"_abs_net_lg_flow_ratio_N",
]
try:
df["_vol_N"] = (
df.groupby("ts_code")["pct_chg"]
.rolling(N, min_periods=max(1, N // 2))
.std()
.reset_index(level=0, drop=True)
)
df["_net_lg_flow_val"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
) * df["close"]
df["_total_val"] = df["vol"] * df["close"]
df["_abs_net_lg_flow_ratio"] = abs(df["_net_lg_flow_val"]) / (
df["_total_val"] + epsilon
)
df["_abs_net_lg_flow_ratio_N"] = (
df.groupby("ts_code")["_abs_net_lg_flow_ratio"]
.rolling(N, min_periods=max(1, N // 2))
.mean()
.reset_index(level=0, drop=True)
)
df[factor_name] = df["_vol_N"] * df["_abs_net_lg_flow_ratio_N"]
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cost_break_confirm_cnt(df: pd.DataFrame, M: int = 5, factor_name: str = None):
"""
Calculates Factor 12: Cost Breakout Confirmation Count (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"cost_break_confirm_cnt_{M}"
print(f"Calculating {factor_name}...")
_temp_cols = [
"_prev_cost_85",
"_prev_cost_15",
"_break_up",
"_break_down",
"_net_lg_flow_vol",
"_confirm_up",
"_confirm_down",
"_net_confirm",
]
try:
df["_prev_cost_85"] = df.groupby("ts_code")["cost_85pct"].shift(1)
df["_prev_cost_15"] = df.groupby("ts_code")["cost_15pct"].shift(1)
df["_break_up"] = df["close"] > df["_prev_cost_85"]
df["_break_down"] = df["close"] < df["_prev_cost_15"]
df["_net_lg_flow_vol"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
)
df["_confirm_up"] = (df["_break_up"] & (df["_net_lg_flow_vol"] > 0)).astype(int)
df["_confirm_down"] = (df["_break_down"] & (df["_net_lg_flow_vol"] < 0)).astype(
int
)
df["_net_confirm"] = df["_confirm_up"] - df["_confirm_down"]
factor_series = (
df.groupby("ts_code")["_net_confirm"]
.rolling(M, min_periods=1)
.sum()
.reset_index(level=0, drop=True)
)
df[factor_name] = factor_series
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
# Category 4: Technical Indicators & Market Behavior
def atr_norm_channel_pos(df: pd.DataFrame, N: int = 14, factor_name: str = None):
"""
Calculates Factor 13: ATR Normalized Channel Position (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"atr_norm_channel_pos_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = [
"_prev_close",
"_h_l",
"_h_pc",
"_l_pc",
"_tr",
"_atr_N",
"_roll_low_N",
]
try:
df["_prev_close"] = df.groupby("ts_code")["close"].shift(1)
df["_h_l"] = df["high"] - df["low"]
df["_h_pc"] = abs(df["high"] - df["_prev_close"])
df["_l_pc"] = abs(df["low"] - df["_prev_close"])
df["_tr"] = df[["_h_l", "_h_pc", "_l_pc"]].max(axis=1)
df["_atr_N"] = (
df.groupby("ts_code")["_tr"]
.rolling(N, min_periods=max(1, N // 2))
.mean()
.reset_index(level=0, drop=True)
)
df["_roll_low_N"] = (
df.groupby("ts_code")["low"]
.rolling(N, min_periods=max(1, N // 2))
.min()
.reset_index(level=0, drop=True)
)
df[factor_name] = _safe_divide((df["close"] - df["_roll_low_N"]), df["_atr_N"])
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def turnover_diff_skew(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 14: Skewness of Turnover Rate Change (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"turnover_diff_skew_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_turnover_diff"]
try:
# Assuming turnover_rate is in percentage points, diff is fine
df["_turnover_diff"] = df.groupby("ts_code")["turnover_rate"].diff(1)
factor_series = (
df.groupby("ts_code")["_turnover_diff"]
.rolling(N, min_periods=max(3, N // 2))
.skew()
.reset_index(level=0, drop=True)
)
df[factor_name] = factor_series
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def lg_sm_flow_diverge(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 15: Divergence between Large and Small Flow (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"lg_sm_flow_diverge_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = [
"_lg_flow_ratio",
"_sm_flow_ratio",
"_lg_flow_ratio_N",
"_sm_flow_ratio_N",
]
try:
df["_lg_flow_ratio"] = _safe_divide(
(
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
),
df["vol"],
)
df["_sm_flow_ratio"] = _safe_divide(
(df["buy_sm_vol"] - df["sell_sm_vol"]), df["vol"]
)
df["_lg_flow_ratio_N"] = (
df.groupby("ts_code")["_lg_flow_ratio"]
.rolling(N, min_periods=max(1, N // 2))
.mean()
.reset_index(level=0, drop=True)
)
df["_sm_flow_ratio_N"] = (
df.groupby("ts_code")["_sm_flow_ratio"]
.rolling(N, min_periods=max(1, N // 2))
.mean()
.reset_index(level=0, drop=True)
)
df[factor_name] = df["_lg_flow_ratio_N"] - df["_sm_flow_ratio_N"]
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cap_neutral_cost_metric(
df: pd.DataFrame, factor_name: str = "cap_neutral_cost_metric"
):
"""
Calculates Factor 16: Market Cap Neutralized Cost Metric (Placeholder).
Requires statsmodels and complex implementation.
WARNING: Modifies df in-place. Placeholder implementation returns NaN.
"""
print(f"Calculating {factor_name} (Placeholder - requires statsmodels)...")
df[factor_name] = np.nan
print(f"Finished {factor_name} (Placeholder).")
def pullback_strong(
df: pd.DataFrame,
N: int = 20,
M: int = 20,
gain_thresh: float = 0.2,
factor_name: str = None,
):
"""
Calculates Factor 17: Pullback Depth from Recent High for Strong Stocks (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"pullback_strong_{N}_{M}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_high_N", "_pullback_depth", "_recent_gain_M"]
try:
df["_high_N"] = (
df.groupby("ts_code")["high"]
.rolling(N, min_periods=max(1, N // 2))
.max()
.reset_index(level=0, drop=True)
)
df["_pullback_depth"] = _safe_divide(
(df["_high_N"] - df["close"]), df["_high_N"]
)
df["_recent_gain_M"] = (
_safe_divide(df["close"], df.groupby("ts_code")["close"].shift(M)) - 1
)
df[factor_name] = _safe_divide(df["_pullback_depth"], df["_recent_gain_M"])
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def hurst_exponent_flow(
df: pd.DataFrame, N: int = 60, flow_col: str = "net_mf_vol", factor_name: str = None
):
"""
Calculates Factor 18: Hurst Exponent of Money Flow (Placeholder).
Requires 'hurst' library and potentially slow rolling apply.
WARNING: Modifies df in-place. Placeholder implementation returns NaN.
"""
if factor_name is None:
factor_name = f"hurst_{flow_col}_{N}"
print(f"Calculating {factor_name} (Placeholder - requires hurst library)...")
try:
from hurst import compute_Hc
# Logic would go here, likely using rolling().apply() which is slow
# factor_series = df.groupby('ts_code')[flow_col]....apply(hurst_calc_func...)
df[factor_name] = np.nan # Placeholder
except ImportError:
print("Error: 'hurst' library not installed. Cannot calculate factor.")
df[factor_name] = np.nan
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
print(f"Finished {factor_name} (Placeholder).")
def vol_wgt_hist_pos(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 19: Volume Weighting at Historical Price Level (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"vol_wgt_hist_pos_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_hist_pos", "_rolling_mean_vol", "_vol_rel_strength"]
try:
df["_hist_pos"] = _safe_divide(
(df["close"] - df["his_low"]), (df["his_high"] - df["his_low"])
).clip(0, 1)
df["_rolling_mean_vol"] = (
df.groupby("ts_code")["vol"]
.rolling(N, min_periods=max(1, N // 2))
.mean()
.reset_index(level=0, drop=True)
)
df["_vol_rel_strength"] = _safe_divide(df["vol"], df["_rolling_mean_vol"])
df[factor_name] = df["_hist_pos"] * df["_vol_rel_strength"]
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def vol_adj_roc(df: pd.DataFrame, N: int = 20, factor_name: str = None):
"""
Calculates Factor 20: Volatility-Adjusted ROC (In-place).
WARNING: Modifies df in-place.
"""
if factor_name is None:
factor_name = f"vol_adj_roc_{N}"
print(f"Calculating {factor_name}...")
_temp_cols = ["_roc_N", "_vol_N"]
try:
df["_roc_N"] = (
_safe_divide(df["close"], df.groupby("ts_code")["close"].shift(N)) - 1
)
df["_vol_N"] = (
df.groupby("ts_code")["pct_chg"]
.rolling(N, min_periods=max(2, N // 2))
.std()
.reset_index(level=0, drop=True)
.fillna(0)
)
df[factor_name] = _safe_divide(df["_roc_N"], df["_vol_N"])
except Exception as e:
print(f"Error calculating {factor_name}: {e}")
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def calculate_complex_factor(
df: pd.DataFrame, factor_name: str = "complex_factor_deap_1"
):
"""
表达式: sub(protected_div_torch(A, B), C)
其中 A, B, C 及内部组件依赖于多个预计算因子列。
Args:
df (pd.DataFrame): 包含所有必需基础因子列的 DataFrame。
factor_name (str): 要在 df 中创建的新因子列的名称。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
如果在计算过程中缺少任何必需的列,将打印错误并填充 NaN。
"""
print(f"开始计算因子: {factor_name} (原地修改)...")
_temp_cols_list = [] # 用于记录中间计算列的名称
try:
# --- 分解计算表达式的各个部分 ---
# 计算组件 D
# D = sub(mul(pullback_strong_20_20, div(log_close, industry_return_5)), div(add(vol_adj_roc_20, vol_drop_profit_cnt_5), sub(nonlinear_mv_volume, alpha_007)))
_temp_d_term1_div = _safe_divide(df["log_close"], df["industry_return_5"])
_temp_d_term1 = df["pullback_strong_20_20"] * _temp_d_term1_div
_temp_d_term2_sub = df["nonlinear_mv_volume"] - df["alpha_007"]
_temp_d_term2_add = df["vol_adj_roc_20"] + df["vol_drop_profit_cnt_5"]
_temp_d_term2 = _safe_divide(_temp_d_term2_add, _temp_d_term2_sub)
df["_temp_D"] = _temp_d_term1 - _temp_d_term2
_temp_cols_list.extend(
[
"_temp_D",
"_temp_d_term1_div",
"_temp_d_term1",
"_temp_d_term2_sub",
"_temp_d_term2_add",
"_temp_d_term2",
]
)
# 计算组件 A
# A = add(add(mul(D, lg_buy_consolidation_20), lg_buy_consolidation_20), pullback_strong_20_20)
_temp_a_term1 = df["_temp_D"] * df["lg_buy_consolidation_20"]
_temp_a_term2 = _temp_a_term1 + df["lg_buy_consolidation_20"]
df["_temp_A"] = _temp_a_term2 + df["pullback_strong_20_20"]
_temp_cols_list.extend(["_temp_A", "_temp_a_term1", "_temp_a_term2"])
# 计算组件 F
# F = mul(add(net_mf_vol, std_return_5), sub(arbr, industry_act_factor5))
_temp_f_term1 = df["net_mf_vol"] + df["std_return_5"]
_temp_f_term2 = df["arbr"] - df["industry_act_factor5"]
df["_temp_F"] = _temp_f_term1 * _temp_f_term2
_temp_cols_list.extend(["_temp_F", "_temp_f_term1", "_temp_f_term2"])
# 计算组件 H
# H = add(add(industry_act_factor1, low_cost_dev), mul(mv_weighted_turnover, act_factor4))
_temp_h_term1 = df["industry_act_factor1"] + df["low_cost_dev"]
_temp_h_term2 = df["mv_weighted_turnover"] * df["act_factor4"]
df["_temp_H"] = _temp_h_term1 + _temp_h_term2
_temp_cols_list.extend(["_temp_H", "_temp_h_term1", "_temp_h_term2"])
# 计算组件 B
# B = div(add(add(F, vol), H), lg_elg_buy_prop)
_temp_b_term1 = df["_temp_F"] + df["vol"]
_temp_b_term2 = _temp_b_term1 + df["_temp_H"]
df["_temp_B"] = _safe_divide(_temp_b_term2, df["lg_elg_buy_prop"])
_temp_cols_list.extend(["_temp_B", "_temp_b_term1", "_temp_b_term2"])
# 计算组件 C
# C = div(div(intraday_lg_flow_corr_20, lg_elg_buy_prop), lg_elg_buy_prop)
# 注意: intraday_lg_flow_corr_20 可能本身就是 NaN 或需要特殊处理
_temp_c_term1 = _safe_divide(
df.get("intraday_lg_flow_corr_20", np.nan), df["lg_elg_buy_prop"]
) # 使用 .get 处理可能不存在的列
df["_temp_C"] = _safe_divide(_temp_c_term1, df["lg_elg_buy_prop"])
_temp_cols_list.extend(["_temp_C", "_temp_c_term1"])
# --- 计算最终表达式 ---
# final = sub(div(A, B), C)
_temp_final_term1 = _safe_divide(df["_temp_A"], df["_temp_B"])
final_factor_series = _temp_final_term1 - df["_temp_C"]
# --- 将最终结果赋值给 df 的新列 (原地修改) ---
df[factor_name] = final_factor_series
print(f"因子 {factor_name} 计算成功。")
except KeyError as e:
# 捕获因为缺少列而产生的错误
print(f"错误: 计算 {factor_name} 时缺少必需的列: {e}")
print("请确保输入的 DataFrame 包含所有表达式中引用的因子列。")
print("将为因子 {factor_name} 填充 NaN。")
df[factor_name] = np.nan # 出错时填充 NaN
except Exception as e:
# 捕获其他可能的计算错误
print(f"错误: 计算 {factor_name} 时发生意外错误: {e}")
print(f"将为因子 {factor_name} 填充 NaN。")
df[factor_name] = np.nan # 出错时填充 NaN
finally:
# --- 清理所有中间计算列 ---
cols_to_drop = [col for col in _temp_cols_list if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
# print(f"已清理 {len(cols_to_drop)} 个临时列 for {factor_name}.")
print(f"因子 {factor_name} 计算流程结束。")
# 函数不返回任何值,因为 df 是原地修改的
import pandas as pd
import numpy as np
# from scipy.stats import rankdata # rankdata is not needed if using pandas rank
# import statsmodels.api as sm # Needed for factor 19
# --- Constants ---
epsilon = 1e-10 # Prevent division by zero
# --- Helper Functions ---
def _safe_divide(numerator, denominator, default_val=0):
"""
安全的除法函数处理分母为零或接近零以及NaN/Inf的情况。
Args:
numerator (pd.Series): 分子.
denominator (pd.Series): 分母.
default_val (float): 当分母为零或结果无效时返回的默认值.
Returns:
pd.Series: 除法结果.
"""
with np.errstate(divide="ignore", invalid="ignore"):
# Convert inputs to numeric, coercing errors to NaN before division
num = pd.to_numeric(numerator, errors="coerce")
den = pd.to_numeric(denominator, errors="coerce")
# Perform division where denominator is not close to zero and inputs are valid numbers
result = np.where(np.abs(den) > epsilon, num / den, default_val)
# Ensure result is float, handle potential NaNs from coercion or division
result = pd.to_numeric(result, errors="coerce")
# Fill remaining NaNs if necessary
result = np.nan_to_num(
result, nan=default_val, posinf=default_val, neginf=default_val
)
# Ensure result index matches numerator's index if numerator is a Series
if isinstance(numerator, pd.Series):
return pd.Series(result, index=numerator.index)
else:
return pd.Series(result) # Fallback if numerator is not a Series (less likely)
# --- Cross-Sectional Factor Calculation Functions (In-Place Modification) ---
# Category 1: Cross-Sectional Flow & Behavior Strength
def cs_rank_net_lg_flow_val(
df: pd.DataFrame, factor_name: str = "cs_rank_net_lg_flow_val"
):
"""
Factor 1: 大单净额截面排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_net_lg_flow_val"]
try:
df["_net_lg_flow_val"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
) * df["close"]
df[factor_name] = df.groupby("trade_date")["_net_lg_flow_val"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_flow_divergence(
df: pd.DataFrame, factor_name: str = "cs_rank_flow_divergence"
):
"""
Factor 2: 大小单流向背离度截面排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_lg_ratio", "_sm_ratio", "_divergence"]
try:
df["_lg_ratio"] = _safe_divide(
(
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
),
df["vol"],
)
df["_sm_ratio"] = _safe_divide(
(df["buy_sm_vol"] - df["sell_sm_vol"]), df["vol"]
)
df["_divergence"] = df["_lg_ratio"] - df["_sm_ratio"]
df[factor_name] = df.groupby("trade_date")["_divergence"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_industry_adj_lg_flow(
df: pd.DataFrame, factor_name: str = "cs_rank_ind_adj_lg_flow"
):
"""
Factor 3: 行业内大单流强度排序 (In-place). Requires 'cat_l2_code'.
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_net_lg_flow_vol", "_industry_avg_flow", "_deviation"]
if "cat_l2_code" not in df.columns:
print(
f"Error calculating {factor_name}: Missing 'cat_l2_code' column. Assigning NaN."
)
df[factor_name] = np.nan
return
try:
df["_net_lg_flow_vol"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
) * df[
"close"
] # Or use vol
df["_industry_avg_flow"] = df.groupby(["trade_date", "cat_l2_code"])[
"_net_lg_flow_vol"
].transform("mean")
df["_deviation"] = df["_net_lg_flow_vol"] - df["_industry_avg_flow"]
df[factor_name] = df.groupby("trade_date")["_deviation"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_elg_buy_ratio(df: pd.DataFrame, factor_name: str = "cs_rank_elg_buy_ratio"):
"""
Factor 4: 超大单买入占比排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_elg_buy_ratio"]
try:
df["_elg_buy_ratio"] = _safe_divide(df["buy_elg_vol"], df["vol"])
df[factor_name] = df.groupby("trade_date")["_elg_buy_ratio"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
# Category 2: Cross-Sectional Cost Basis & PnL Status
def cs_rank_rel_profit_margin(
df: pd.DataFrame, factor_name: str = "cs_rank_rel_profit_margin"
):
"""
Factor 5: 相对盈利幅度排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_profit_margin"]
try:
df["_profit_margin"] = _safe_divide(
(df["close"] - df["weight_avg"]), df["close"]
)
df[factor_name] = df.groupby("trade_date")["_profit_margin"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_cost_breadth(df: pd.DataFrame, factor_name: str = "cs_rank_cost_breadth"):
"""
Factor 6: 成本分布宽度截面排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_cost_breadth"]
try:
df["_cost_breadth"] = _safe_divide(
(df["cost_85pct"] - df["cost_15pct"]), df["weight_avg"]
)
df[factor_name] = df.groupby("trade_date")["_cost_breadth"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_dist_to_upper_cost(
df: pd.DataFrame, factor_name: str = "cs_rank_dist_to_upper_cost"
):
"""
Factor 7: 股价相对高成本位距离排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_dist_to_95"]
try:
df["_dist_to_95"] = _safe_divide(df["close"], df["cost_95pct"])
df[factor_name] = df.groupby("trade_date")["_dist_to_95"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_winner_rate(df: pd.DataFrame, factor_name: str = "cs_rank_winner_rate"):
"""
Factor 8: 获利盘比例截面排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
try:
df[factor_name] = df.groupby("trade_date")["winner_rate"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
print(f"Finished {factor_name}.")
# Category 3: Cross-Sectional Price Action & Volatility
def cs_rank_intraday_range(
df: pd.DataFrame, factor_name: str = "cs_rank_intraday_range"
):
"""
Factor 9: 日内相对振幅排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_norm_range"]
try:
df["_norm_range"] = _safe_divide((df["high"] - df["low"]), df["close"])
df[factor_name] = df.groupby("trade_date")["_norm_range"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_close_pos_in_range(
df: pd.DataFrame, factor_name: str = "cs_rank_close_pos_in_range"
):
"""
Factor 10: 收盘价在日内位置排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_close_pos"]
try:
df["_close_pos"] = _safe_divide(
(df["close"] - df["low"]), (df["high"] - df["low"]), default_val=0.5
) # Assign 0.5 if high==low
df[factor_name] = df.groupby("trade_date")["_close_pos"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_opening_gap(df: pd.DataFrame, factor_name: str = "cs_rank_opening_gap"):
"""
Factor 11: 开盘相对跳空幅度排序 (In-place). Needs pre_close.
WARNING: Modifies df in-place. Assumes 'pre_close' exists.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_gap"]
if "pre_close" not in df.columns:
print(
f"Error calculating {factor_name}: Missing 'pre_close' column. Assigning NaN."
)
df[factor_name] = np.nan
return
try:
df["_gap"] = _safe_divide(df["open"], df["pre_close"]) - 1
df[factor_name] = df.groupby("trade_date")["_gap"].rank(pct=True)
except KeyError as e:
print(
f"Error calculating {factor_name}: Missing column {e} (likely 'open'). Assigning NaN."
)
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_pos_in_hist_range(
df: pd.DataFrame, factor_name: str = "cs_rank_pos_in_hist_range"
):
"""
Factor 12: 相对历史波动位置排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_hist_pos"]
try:
df["_hist_pos"] = _safe_divide(
(df["close"] - df["his_low"]), (df["his_high"] - df["his_low"])
).clip(
0, 1
) # Clip to 0-1 range
df[factor_name] = df.groupby("trade_date")["_hist_pos"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
# Category 4: Cross-Sectional Interaction & Composite Indicators
def cs_rank_vol_x_profit_margin(
df: pd.DataFrame, factor_name: str = "cs_rank_vol_x_profit_margin"
):
"""
Factor 13: 波动率与盈亏状态交互排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_daily_vol", "_profit_margin", "_interaction"]
try:
df["_daily_vol"] = abs(df["pct_chg"])
df["_profit_margin"] = _safe_divide(
(df["close"] - df["weight_avg"]), df["close"]
)
df["_interaction"] = df["_daily_vol"] * df["_profit_margin"]
df[factor_name] = df.groupby("trade_date")["_interaction"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_lg_flow_price_concordance(
df: pd.DataFrame, factor_name: str = "cs_rank_lg_flow_price_concordance"
):
"""
Factor 14: 大单流向与价格变动一致性排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_net_lg_flow_vol", "_concordance"]
try:
df["_net_lg_flow_vol"] = (
df["buy_lg_vol"]
+ df["buy_elg_vol"]
- df["sell_lg_vol"]
- df["sell_elg_vol"]
)
df["_concordance"] = df["_net_lg_flow_vol"] * df["pct_chg"]
df[factor_name] = df.groupby("trade_date")["_concordance"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_turnover_per_winner(
df: pd.DataFrame, factor_name: str = "cs_rank_turnover_per_winner"
):
"""
Factor 15: 高换手获利盘占比排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_turnover_per_winner"]
try:
df["_turnover_per_winner"] = _safe_divide(
df["turnover_rate"], df["winner_rate"]
)
df[factor_name] = df.groupby("trade_date")["_turnover_per_winner"].rank(
pct=True
)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_ind_cap_neutral_pe(
df: pd.DataFrame, factor_name: str = "cs_rank_ind_cap_neutral_pe"
):
"""
Factor 16: 行业市值中性化PE排序 (Placeholder).
Requires statsmodels and complex cross-sectional regression implementation.
WARNING: Modifies df in-place. Placeholder implementation returns NaN.
"""
print(f"Calculating {factor_name} (Placeholder - requires statsmodels)...")
df[factor_name] = np.nan
print(f"Finished {factor_name} (Placeholder).")
def cs_rank_volume_ratio(df: pd.DataFrame, factor_name: str = "cs_rank_volume_ratio"):
"""
Factor 17: 成交量相对强度排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
try:
# Assumes 'volume_ratio' (量比) column already exists
df[factor_name] = df.groupby("trade_date")["volume_ratio"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
print(f"Finished {factor_name}.")
def cs_rank_elg_buy_sell_sm_ratio(
df: pd.DataFrame, factor_name: str = "cs_rank_elg_buy_sell_sm_ratio"
):
"""
Factor 18: 超大单买入与小单卖出比排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_ratio"]
try:
df["_ratio"] = _safe_divide(df["buy_elg_vol"], df["sell_sm_vol"])
df[factor_name] = df.groupby("trade_date")["_ratio"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_cost_dist_vol_ratio(
df: pd.DataFrame, factor_name: str = "cs_rank_cost_dist_vol_ratio"
):
"""
Factor 19: 价格偏离成本程度与成交量放大交互排序 (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_dist", "_interaction"]
if "volume_ratio" not in df.columns:
print(
f"Error calculating {factor_name}: Missing 'volume_ratio' column. Assigning NaN."
)
df[factor_name] = np.nan
return
try:
df["_dist"] = abs(df["close"] - df["weight_avg"]) / (df["close"] + epsilon)
df["_interaction"] = df["_dist"] * df["volume_ratio"]
df[factor_name] = df.groupby("trade_date")["_interaction"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def cs_rank_size(df: pd.DataFrame, factor_name: str = "cs_rank_size"):
"""
Factor 20: 市值因子暴露度排序 (Log of circ_mv) (In-place).
WARNING: Modifies df in-place.
"""
print(f"Calculating {factor_name}...")
_temp_cols = ["_log_circ_mv"]
try:
# Use log1p for stability if circ_mv can be zero or very small
df["_log_circ_mv"] = np.log1p(df["circ_mv"])
df[factor_name] = df.groupby("trade_date")["_log_circ_mv"].rank(pct=True)
except KeyError as e:
print(f"Error calculating {factor_name}: Missing column {e}. Assigning NaN.")
df[factor_name] = np.nan
except Exception as e:
print(
f"An unexpected error occurred calculating {factor_name}: {e}. Assigning NaN."
)
df[factor_name] = np.nan
finally:
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"Finished {factor_name}.")
def add_financial_factor(
main_df: pd.DataFrame,
financial_df: pd.DataFrame,
ts_code_col: str = "ts_code",
trade_date_col: str = "trade_date",
ann_date_col: str = "ann_date", # 公告日期
f_ann_date_col: str = "f_ann_date", # 实际公告日期 (优先使用)
factor_value_col: str = "undist_profit_ps", # 财务指标值所在的列
new_factor_col_name: str = "retained_profit_per_share", # 新因子列的名称
) -> pd.DataFrame:
"""
将财务指标数据(如每股未分配利润)作为因子添加到主时间序列 DataFrame 中。
使用 merge_asof 根据股票代码和公告日期,将最新的财务指标值匹配到每个交易日。
Args:
main_df: 包含时间序列交易数据的主 DataFrame (至少包含 ts_code_col 和 trade_date_col)。
financial_df: 包含财务指标数据的 DataFrame (至少包含 ts_code_col,
ann_date_col 或 f_ann_date_col, 以及 factor_value_col)。
ts_code_col: 股票代码列在两个 DataFrame 中的名称。默认为 'ts_code'
trade_date_col: 交易日期列在 main_df 中的名称。默认为 'trade_date'
ann_date_col: 公告日期列在 financial_df 中的名称(作为 f_ann_date_col 的备选)。默认为 'ann_date'
f_ann_date_col: 实际公告日期列在 financial_df 中的名称(优先使用)。默认为 'f_ann_date'
factor_value_col: 财务指标值(即要添加的因子值)在 financial_df 中的列名。默认为 'undistr_pft_ps'
new_factor_col_name: 添加到 main_df 中的新因子列的名称。默认为 'retained_profit_per_share'
Returns:
包含新因子列的 main_df DataFrame。
"""
# --- 数据校验 ---
required_main_cols = [ts_code_col, trade_date_col]
if not all(col in main_df.columns for col in required_main_cols):
raise ValueError(f"主 DataFrame 必须包含列: {required_main_cols}")
required_financial_cols = [ts_code_col, factor_value_col]
if f_ann_date_col and f_ann_date_col in financial_df.columns:
effective_date_col = f_ann_date_col
elif ann_date_col and ann_date_col in financial_df.columns:
effective_date_col = ann_date_col
else:
raise ValueError(
f"财务指标 DataFrame 必须包含列 '{f_ann_date_col}''{ann_date_col}' 作为数据生效日期"
)
required_financial_cols.append(effective_date_col)
if not all(col in financial_df.columns for col in required_financial_cols):
raise ValueError(f"财务指标 DataFrame 必须包含列: {required_financial_cols}")
# --- 数据预处理 ---
# 复制 main_df 避免修改原始 DataFrame
main_df = main_df.copy()
# 确保日期列是 datetime 类型
main_df[trade_date_col] = pd.to_datetime(main_df[trade_date_col])
financial_df[effective_date_col] = pd.to_datetime(financial_df[effective_date_col])
# 确保股票代码是字符串类型,以便合并时类型一致
main_df[ts_code_col] = main_df[ts_code_col].astype(str)
financial_df[ts_code_col] = financial_df[ts_code_col].astype(str)
# 选取 financial_df 中需要合并的列,并为 merge_asof 准备日期列
financial_data_subset = financial_df[
[ts_code_col, effective_date_col, factor_value_col]
].copy()
# 重命名 effective_date_col 为一个统一的名称,方便 merge_asof
# merge_asof 需要 right_on 参数,使用原始列名即可,不需要重命名
# 为了使用 merge_asof两个 DataFrame 都必须按合并键 (ts_code) 和日期列排序
main_df = main_df.sort_values(by=[ts_code_col, trade_date_col])
financial_data_subset = financial_data_subset.sort_values(
by=[ts_code_col, effective_date_col]
)
# --- 使用 merge_asof 计算因子 ---
# 执行 as-of 合并
df_with_factor = pd.merge_asof(
main_df,
financial_data_subset,
left_on=trade_date_col, # main_df 中用于匹配的日期列
right_on=effective_date_col, # financial_data_subset 中用于匹配的日期列
by=ts_code_col, # 按股票代码进行分组匹配
direction="backward", # 匹配方向:向后查找(即找 <= trade_date 的最近数据)
# 如果您需要容忍日期上的微小差异,可以使用 tolerance 参数
# tolerance=pd.Timedelta('1 days')
)
# 清理:移除用于匹配的 effective_date_col以及原始 financial_df 中可能带来的其他重复列
# merge_asof 默认不会带上 right DataFrame 中用于合并的 key 列,但如果名称不同可能会带上
# 这里的清理主要针对 effective_date_col
if (
effective_date_col in df_with_factor.columns
and effective_date_col != trade_date_col
):
# 确保不是trade_date_col本身被意外重命名
df_with_factor = df_with_factor.drop(columns=[effective_date_col])
# 重命名新加入的因子列
# merge_asof 会将 factor_value_col 直接带入,名称不变
# 我们将其重命名为 new_factor_col_name
if factor_value_col != new_factor_col_name:
if factor_value_col in df_with_factor.columns:
df_with_factor = df_with_factor.rename(
columns={factor_value_col: new_factor_col_name}
)
else:
print(
f"警告: 合并后未找到列 '{factor_value_col}',无法重命名为 '{new_factor_col_name}'"
)
# --- 返回结果 ---
return df_with_factor
# --- ARBR 因子计算函数 ---
def calculate_arbr(df: pd.DataFrame, N: int = 26):
"""
计算 AR 和 BR 指标,并将结果原地添加到 DataFrame 中。
Args:
df (pd.DataFrame): 输入的 DataFrame必须包含 'ts_code', 'trade_date',
'open', 'high', 'low', 'close' 列。
建议预先按 ts_code, trade_date 排序。
N (int): 计算 AR, BR 的窗口期,默认为 26。
WARNING: 此函数会原地修改输入的 DataFrame 'df'
"""
ar_col_name = 'AR'
br_col_name = 'BR'
print(f"开始计算因子: {ar_col_name}, {br_col_name} (原地修改)...")
_temp_cols = [] # 记录中间列
try:
# 0. 确保排序 (虽然 groupby 会处理,但有序更保险)
# df.sort_values(['ts_code', 'trade_date'], inplace=True) # 如果不确定df已排序
# 1. 计算所需中间值
df["_h_minus_o"] = df["high"] - df["open"]
df["_o_minus_l"] = df["open"] - df["low"]
df["_prev_close"] = df.groupby("ts_code")["close"].shift(1)
# BR 计算需要 max(0, H-PC) 和 max(0, PC-L)
df["_h_minus_pc_pos"] = np.maximum(0, df["high"] - df["_prev_close"])
df["_pc_minus_l_pos"] = np.maximum(0, df["_prev_close"] - df["low"])
_temp_cols.extend(
[
"_h_minus_o",
"_o_minus_l",
"_prev_close",
"_h_minus_pc_pos",
"_pc_minus_l_pos",
]
)
# 2. 计算滚动和
# 使用 min_periods=N 确保有完整的窗口数据才计算,也可以用 N//2 等
min_p = N # 严格要求 N 天数据
grouped = df.groupby("ts_code")
sum_h_minus_o = (
grouped["_h_minus_o"]
.rolling(N, min_periods=min_p)
.sum()
.reset_index(level=0, drop=True)
)
sum_o_minus_l = (
grouped["_o_minus_l"]
.rolling(N, min_periods=min_p)
.sum()
.reset_index(level=0, drop=True)
)
sum_h_minus_pc_pos = (
grouped["_h_minus_pc_pos"]
.rolling(N, min_periods=min_p)
.sum()
.reset_index(level=0, drop=True)
)
sum_pc_minus_l_pos = (
grouped["_pc_minus_l_pos"]
.rolling(N, min_periods=min_p)
.sum()
.reset_index(level=0, drop=True)
)
# 3. 计算 AR 和 BR
df[ar_col_name] = (
_safe_divide(sum_h_minus_o, sum_o_minus_l, default_val=np.nan) * 100
) # AR 通常乘以 100
df[br_col_name] = (
_safe_divide(sum_h_minus_pc_pos, sum_pc_minus_l_pos, default_val=np.nan)
* 100
) # BR 通常乘以 100
df[f'{ar_col_name}_{br_col_name}'] = df[ar_col_name] - df[br_col_name]
print(f"因子 {ar_col_name}, {br_col_name} 计算成功。")
except KeyError as e:
print(f"错误: 计算 ARBR 时缺少必需的列: {e}")
print(f"将为因子 {ar_col_name}, {br_col_name} 填充 NaN。")
if ar_col_name not in df.columns:
df[ar_col_name] = np.nan
if br_col_name not in df.columns:
df[br_col_name] = np.nan
except Exception as e:
print(f"错误: 计算 ARBR 时发生意外错误: {e}")
print(f"将为因子 {ar_col_name}, {br_col_name} 填充 NaN。")
if ar_col_name not in df.columns:
df[ar_col_name] = np.nan
if br_col_name not in df.columns:
df[br_col_name] = np.nan
finally:
# 4. 清理中间列
cols_to_drop = [col for col in _temp_cols if col in df.columns]
if cols_to_drop:
df.drop(columns=cols_to_drop, inplace=True)
print(f"因子 {ar_col_name}, {br_col_name} 计算流程结束。")
def add_financial_factor(
main_df: pd.DataFrame,
financial_df: pd.DataFrame,
factor_value_col: str, # 财务指标值所在的列
ts_code_col: str = 'ts_code',
trade_date_col: str = 'trade_date',
ann_date_col: str = 'ann_date', # 公告日期
f_ann_date_col: str = 'f_ann_date', # 实际公告日期 (优先使用)
) -> pd.DataFrame:
"""
将财务指标数据(如每股未分配利润)作为因子添加到主时间序列 DataFrame 中。
使用 merge_asof 根据股票代码和公告日期,将最新的财务指标值匹配到每个交易日。
Args:
main_df: 包含时间序列交易数据的主 DataFrame (至少包含 ts_code_col 和 trade_date_col)。
financial_df: 包含财务指标数据的 DataFrame (至少包含 ts_code_col,
ann_date_col 或 f_ann_date_col, 以及 factor_value_col)。
ts_code_col: 股票代码列在两个 DataFrame 中的名称。默认为 'ts_code'
trade_date_col: 交易日期列在 main_df 中的名称。默认为 'trade_date'
ann_date_col: 公告日期列在 financial_df 中的名称(作为 f_ann_date_col 的备选)。默认为 'ann_date'
f_ann_date_col: 实际公告日期列在 financial_df 中的名称(优先使用)。默认为 'f_ann_date'
factor_value_col: 财务指标值(即要添加的因子值)在 financial_df 中的列名。默认为 'undistr_pft_ps'
new_factor_col_name: 添加到 main_df 中的新因子列的名称。默认为 'undist_profit_ps'
Returns:
包含新因子列的 main_df DataFrame。
"""
if factor_value_col in main_df.columns:
return main_df
new_factor_col_name = factor_value_col
# --- 数据校验 ---
required_main_cols = [ts_code_col, trade_date_col]
if not all(col in main_df.columns for col in required_main_cols):
raise ValueError(f"主 DataFrame 必须包含列: {required_main_cols}")
required_financial_cols = [ts_code_col, factor_value_col]
if f_ann_date_col and f_ann_date_col in financial_df.columns:
effective_date_col = f_ann_date_col
print(f"使用 '{f_ann_date_col}' 作为财务数据生效日期。")
elif ann_date_col and ann_date_col in financial_df.columns:
effective_date_col = ann_date_col
print(f"使用 '{ann_date_col}' 作为财务数据生效日期。")
else:
raise ValueError(f"财务指标 DataFrame 必须包含列 '{f_ann_date_col}''{ann_date_col}' 作为数据生效日期")
required_financial_cols.append(effective_date_col)
if not all(col in financial_df.columns for col in required_financial_cols):
raise ValueError(f"财务指标 DataFrame 必须包含列: {required_financial_cols}")
# --- 数据准备和清理 ---
# 确保日期列是 datetime 类型
# 使用 .copy() 避免 SettingWithCopyWarning
main_df = main_df.copy()
financial_df = financial_df.copy()
main_df[trade_date_col] = pd.to_datetime(main_df[trade_date_col], errors='coerce')
financial_df[effective_date_col] = pd.to_datetime(financial_df[effective_date_col], errors='coerce')
# 确保股票代码是字符串类型
main_df[ts_code_col] = main_df[ts_code_col].astype(str)
financial_df[ts_code_col] = financial_df[ts_code_col].astype(str)
# 选取 financial_df 中需要合并的列
financial_data_subset = financial_df[[ts_code_col, effective_date_col, factor_value_col]].copy()
# *** 新增:处理右表合并键中的空值 ***
initial_rows_financial = len(financial_data_subset)
financial_data_subset = financial_data_subset.dropna(subset=[ts_code_col, effective_date_col])
rows_dropped = initial_rows_financial - len(financial_data_subset)
if rows_dropped > 0:
print(f"警告: 从 financial_data_subset 中移除了 {rows_dropped} 行,因为其 '{ts_code_col}''{effective_date_col}' 列存在空值。")
if financial_data_subset.empty:
print(f"警告: 清理空值后 financial_data_subset 为空,无法添加因子 '{new_factor_col_name}'。将填充 NaN。")
main_df[new_factor_col_name] = np.nan
return main_df
# *** 修改:修正排序顺序以满足 merge_asof 要求 ***
# 先按 ts_code 排序,再按日期排序
# main_df = main_df.sort_values(by=[ts_code_col, trade_date_col])
# financial_data_subset = financial_data_subset.sort_values(by=[ts_code_col, effective_date_col])
main_df = main_df.sort_values(by=[trade_date_col, ts_code_col])
financial_data_subset = financial_data_subset.sort_values(by=[effective_date_col, ts_code_col])
# --- 使用 merge_asof 计算因子 ---
try:
df_with_factor = pd.merge_asof(
main_df,
financial_data_subset,
left_on=trade_date_col,
right_on=effective_date_col,
by=ts_code_col,
direction='backward'
)
except Exception as e:
print(f"merge_asof 执行失败: {e}")
# 根据需要决定如何处理错误,这里填充 NaN
main_df[new_factor_col_name] = np.nan
return main_df
# --- 清理与重命名 ---
# 移除右表的日期列(如果它与左表日期列名称不同)
if effective_date_col in df_with_factor.columns and effective_date_col != trade_date_col:
df_with_factor = df_with_factor.drop(columns=[effective_date_col])
# 重命名新加入的因子列
if factor_value_col != new_factor_col_name:
if factor_value_col in df_with_factor.columns:
df_with_factor = df_with_factor.rename(columns={factor_value_col: new_factor_col_name})
else:
# 这种情况理论上不应发生,因为 merge_asof 应该会把右表的非 key 列带过来
print(f"警告: 合并后未找到原始因子列 '{factor_value_col}',无法重命名。")
# 如果 factor_value_col 已是目标名称,则无需重命名
if new_factor_col_name not in df_with_factor.columns:
# 如果目标名称也不存在,则可能合并失败或列名有问题
df_with_factor[new_factor_col_name] = np.nan
# 如果 factor_value_col 就是目标名称,确保该列存在
elif new_factor_col_name not in df_with_factor.columns:
print(f"警告: 合并后未找到目标因子列 '{new_factor_col_name}'。填充 NaN。")
df_with_factor[new_factor_col_name] = np.nan
return df_with_factor
def calculate_cashflow_to_ev_factor(df: pd.DataFrame, cashflow_df: pd.DataFrame, balancesheet_df: pd.DataFrame, market_cap_col: str = 'total_mv', date_col: str = 'trade_date', ts_code_col: str = 'ts_code') -> pd.DataFrame:
"""
计算经营活动产生的现金流量净额TTM / 企业价值因子。
企业价值 = 司市值 + 负债合计 - 货币资金。
重要提示:本代码假设 add_financial_factor 能够将财务数据正确地合并到主数据框。
如果您使用 add_financial_factor 只获取单季度数据,那么
n_cashflow_act 将不是 TTM 值,这将导致最终因子计算不准确。
Args:
df (pd.DataFrame): 包含市场数据(需有市值列)和日期、股票代码的主数据框。
cashflow_df (pd.DataFrame): Tushare 现金流量表数据。
balancesheet_df (pd.DataFrame): Tushare 资产负债表数据。
market_cap_col (str): DataFrame 中代表公司总市值的列名,默认为 'total_mv'
date_col (str): DataFrame 中的日期列名,默认为 'trade_date'
ts_code_col (str): DataFrame 中的股票代码列名,默认为 'ts_code'
Returns:
pd.DataFrame: 添加了 'cashflow_to_ev_factor' 列的 DataFrame。
"""
df_factor = df.copy() # 创建副本以避免修改原始 DataFrame
# 0. 确保必要的市场市值列存在
if market_cap_col not in df_factor.columns:
print(f"错误: DataFrame 中缺少市值列 '{market_cap_col}'。无法计算因子。")
# 添加一个空的因子列并返回
df_factor['cashflow_to_ev_factor'] = np.nan
return df_factor
# 1. 获取经营活动产生的现金流量净额 (TTM - **注意这里的潜在不准确性**)
# 如果 add_financial_factor 只获取单季度,这里的 n_cashflow_act 将不是 TTM
df_factor = add_financial_factor(df_factor, cashflow_df, 'n_cashflow_act')
# 如果 add_financial_factor 能够正确处理 TTM那么上面的调用是正确的。
# 否则,您需要在 add_financial_factor 内部实现 TTM 逻辑,或者在调用 add_financial_factor
# 获取多个季度数据后,在这里手动进行 TTM 求和。
# 为了符合您的描述,我们暂时假设 add_financial_factor 已经处理了 TTM 或我们接受单季度的值
# 并命名为 ttm_n_cashflow_act 以示期望
# 重新命名获取的现金流列以便后续计算
cashflow_col_name = 'n_cashflow_act' # 获取的列名
ttm_cashflow_col = 'ttm_n_cashflow_act' # 因子计算中使用的列名
if cashflow_col_name in df_factor.columns:
df_factor = df_factor.rename(columns={cashflow_col_name: ttm_cashflow_col})
else:
# 如果 add_financial_factor 没成功添加列
print(f"错误: add_financial_factor 未能成功添加 '{cashflow_col_name}' 列。")
df_factor['cashflow_to_ev_factor'] = np.nan
return df_factor
# 2. 获取负债合计
df_factor = add_financial_factor(df_factor, balancesheet_df, 'total_liab')
liab_col_name = 'total_liab'
if liab_col_name not in df_factor.columns:
print(f"错误: add_financial_factor 未能成功添加 '{liab_col_name}' 列。")
df_factor['cashflow_to_ev_factor'] = np.nan
return df_factor
# 3. 获取货币资金
df_factor = add_financial_factor(df_factor, balancesheet_df, 'money_cap')
money_col_name = 'money_cap'
if money_col_name not in df_factor.columns:
print(f"错误: add_financial_factor 未能成功添加 '{money_col_name}' 列。")
df_factor['cashflow_to_ev_factor'] = np.nan
return df_factor
# 4. 计算企业价值 (Enterprise Value)
# 确保参与计算的列是数值类型,并处理 NaN (NaN + X = NaN, NaN - X = NaN)
enterprise_value = df_factor[market_cap_col].astype(float) * 10000 + df_factor[liab_col_name].astype(float) - df_factor[money_col_name].astype(float)
# 5. 计算最终因子经营活动产生的现金流量净额TTM / 企业价值
# 使用之前定义的安全除法
df_factor['cashflow_to_ev_factor'] = _safe_divide(df_factor[ttm_cashflow_col], enterprise_value)
# 6. 删除临时添加的财务数据列
cols_to_drop = [ttm_cashflow_col, liab_col_name, money_col_name]
df_factor = df_factor.drop(columns=[col for col in cols_to_drop if col in df_factor.columns])
return df_factor
def caculate_book_to_price_ratio(df: pd.DataFrame, fina_indicator_df: pd.DataFrame) -> pd.DataFrame:
if 'bps' not in df.columns:
df = add_financial_factor(df, fina_indicator_df, factor_value_col='bps')
df['book_to_price_ratio'] = df['bps'] / df['close']
df = df.drop(columns=['bps'])
return df
def turnover_rate_n(df: pd.DataFrame, n: int) -> pd.DataFrame:
df[f'turnover_rate_mean_{n}'] = df.groupby('ts_code', group_keys=False)['turnover_rate'].rolling(n).mean().reset_index(level=0, drop=True)
return df
def variance_n(df: pd.DataFrame, n: int) -> pd.DataFrame:
df[f'variance_{n}'] = df.groupby('ts_code', group_keys=False)['pct_chg'].rolling(n).var().reset_index(level=0, drop=True)
return df
def bbi_ratio_factor(df: pd.DataFrame) -> pd.DataFrame:
df_factor = df
# 确保数据按股票代码和日期排序,这对滚动计算非常重要
df_factor = df_factor.sort_values(by=['ts_code', 'trade_date'])
# 获取收盘价列
close_prices = df_factor['close']
# 1. 根据 ts_code 分组计算各周期的简单移动平均线 (SMA)
grouped = df_factor.groupby('ts_code', group_keys=False)
# 计算不同周期的 SMA并使用 reset_index 展平索引
sma3 = grouped['close'].rolling(3).mean().reset_index(level=0, drop=True)
sma6 = grouped['close'].rolling(6).mean().reset_index(level=0, drop=True)
sma12 = grouped['close'].rolling(12).mean().reset_index(level=0, drop=True)
sma24 = grouped['close'].rolling(24).mean().reset_index(level=0, drop=True)
# 2. 计算 BBI = (SMA3 + SMA6 + SMA12 + SMA24) / 4
print("计算 BBI...")
# 注意:如果任何一个 SMA 在某个位置是 NaN (例如,数据点不足),那么它们的和也将是 NaN
bbi = (sma3 + sma6 + sma12 + sma24) / 4
# 3. 计算最终因子 = BBI / 收盘价 (使用安全除法)
df_factor['bbi_ratio_factor'] = _safe_divide(bbi, close_prices)
return df_factor
def limit_factor(df: pd.DataFrame) -> pd.DataFrame:
grouped = df.groupby('ts_code', group_keys=False)
df["cat_up_limit"] = (
df["close"] == df["up_limit"]
) # 是否涨停1表示涨停0表示未涨停
df["cat_down_limit"] = (
df["close"] == df["down_limit"]
) # 是否跌停1表示跌停0表示未跌停
df["up_limit_count_10d"] = (
grouped["cat_up_limit"]
.rolling(window=10, min_periods=1)
.sum()
.reset_index(level=0, drop=True)
)
df["down_limit_count_10d"] = (
grouped["cat_down_limit"]
.rolling(window=10, min_periods=1)
.sum()
.reset_index(level=0, drop=True)
)
# 3. 最近连续涨跌停天数
def calculate_consecutive_limits(series):
"""
计算连续涨停/跌停天数。
"""
consecutive_up = series * (
series.groupby((series != series.shift()).cumsum()).cumcount() + 1
)
consecutive_down = series * (
series.groupby((series != series.shift()).cumsum()).cumcount() + 1
)
return consecutive_up, consecutive_down
# 连续涨停天数
df["consecutive_up_limit"] = grouped["cat_up_limit"].apply(
lambda x: calculate_consecutive_limits(x)[0]
)
return df