""" 市场情绪因子模块 包含基于股票截面的市场情绪因子实现 """ import numpy as np import polars as pl import talib from main.factor.operator_framework import DateWiseFactor, StockWiseFactor class SentimentPanicGreedFactor(StockWiseFactor): """恐慌/贪婪指数因子""" def __init__(self, window_atr: int = 14, window_smooth: int = 5): super().__init__( name="sentiment_panic_greed", parameters={"window_atr": window_atr, "window_smooth": window_smooth}, required_factor_ids=["open", "high", "low", "close", "vol", "pct_chg"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 使用talib计算ATR close_array = group_df["close"].to_numpy() high_array = group_df["high"].to_numpy() low_array = group_df["low"].to_numpy() prev_close = group_df["close"].shift(1).to_numpy() window_atr = self.parameters["window_atr"] window_smooth = self.parameters["window_smooth"] # 计算ATR atr_values = talib.ATR(high_array, low_array, close_array, timeperiod=window_atr) # 计算真实波幅和波动性意外 tr = np.maximum(high_array - low_array, np.abs(high_array - prev_close), np.abs(low_array - prev_close)) volatility_surprise = (tr / (atr_values + 1e-8) - 1) * group_df["pct_chg"].to_numpy() # 计算情绪指数 sentiment = volatility_surprise * 2 # 放大跳空影响 # 平滑处理 smoothed_sentiment = talib.SMA(sentiment, timeperiod=window_smooth) return pl.Series(smoothed_sentiment).alias(self.factor_id) class SentimentBreadthFactor(StockWiseFactor): """市场宽度情绪代理因子""" def __init__(self, window_vol: int = 20, window_smooth: int = 3): super().__init__( name="sentiment_breadth", parameters={"window_vol": window_vol, "window_smooth": window_smooth}, required_factor_ids=["pct_chg", "vol"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: window_vol = self.parameters["window_vol"] window_smooth = self.parameters["window_smooth"] # 计算滚动平均成交量 vol = group_df["vol"].to_numpy() rolling_avg_vol = talib.SMA(vol, timeperiod=window_vol) # 计算价量配合度 pct_chg = group_df["pct_chg"].to_numpy() breadth = pct_chg * (vol / (rolling_avg_vol + 1e-8)) # 平滑处理 smoothed_breadth = talib.SMA(breadth, timeperiod=window_smooth) return pl.Series(smoothed_breadth).alias(self.factor_id) class SentimentReversalFactor(StockWiseFactor): """情绪反转因子""" def __init__(self, window_ret: int = 5, window_vol: int = 5): super().__init__( name="sentiment_reversal", parameters={"window_ret": window_ret, "window_vol": window_vol}, required_factor_ids=["pct_chg"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: window_ret = self.parameters["window_ret"] window_vol = self.parameters["window_vol"] # 计算累积收益率 pct_chg = group_df["pct_chg"].to_numpy() return_period = window_ret cum_return = np.array([np.prod(1 + pct_chg[i:i+return_period]) - 1 for i in range(len(pct_chg) - return_period + 1)]) cum_return = np.pad(cum_return, (return_period - 1, 0), constant_values=np.nan) # 计算波动率 volatility = talib.STDDEV(pct_chg, timeperiod=window_vol) # 计算反转因子 reversal = -cum_return * volatility return pl.Series(reversal).alias(self.factor_id) class PriceDeductionFactor(StockWiseFactor): """价格抵扣因子""" def __init__(self, n: int = 10): super().__init__( name="price_deduction", parameters={"n": n}, required_factor_ids=["close"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: n = self.parameters["n"] # 计算抵扣价(n-1周期前的价格) deduction_price = group_df["close"].shift(n - 1) price_diff = group_df["close"] - deduction_price return price_diff.alias(self.factor_id) class PriceDeductionRatioFactor(StockWiseFactor): """价格抵扣比例因子""" def __init__(self, n: int = 10): super().__init__( name="price_deduction_ratio", parameters={"n": n}, required_factor_ids=["close"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: n = self.parameters["n"] # 计算N周期SMA sma = group_df["close"].rolling_mean(n) # 计算抵扣价 deduction_price = group_df["close"].shift(n - 1) # 计算比例 diff = group_df["close"] - deduction_price ratio = diff / (sma + 1e-8) # 避免除零 return ratio.alias(self.factor_id) class IndustryMomentumLeadership(StockWiseFactor): factor_id = "industry_momentum_leadership" required_factor_ids = [ "industry_return_5_percentile", "industry_return_20_percentile", "roe" ] def __init__(self): super(IndustryMomentumLeadership, self).__init__( name=self.factor_id, parameters={}, required_factor_ids=self.required_factor_ids ) def calc_factor(self, g: pl.DataFrame) -> pl.Series: pct5 = g["industry_return_5_percentile"] pct20 = g["industry_return_20_percentile"] roe = g["roe"] # 动量综合:5日权重更高(短期龙头) momentum_score = 0.7 * pct5 + 0.3 * pct20 # 基本面质量:ROE 越高越好,取 log1p 防极端值 quality_score = pl.when(roe > 0).then(roe.log1p()).otherwise(0.0) # 龙头得分 = 动量 × 基本面 leadership = momentum_score * (quality_score + 1.0) return leadership.alias(self.factor_id) class LeadershipPersistenceScore(StockWiseFactor): factor_id = "leadership_persistence_score" required_factor_ids = [ "industry_return_5_percentile", "industry_return_20_percentile", "undist_profit_ps", "roe", "bps" ] def __init__(self): super(LeadershipPersistenceScore, self).__init__( name=self.factor_id, parameters={}, required_factor_ids=self.required_factor_ids ) def calc_factor(self, g: pl.DataFrame) -> pl.Series: pct5 = g["industry_return_5_percentile"] pct20 = g["industry_return_20_percentile"] undist = g["undist_profit_ps"] roe = g["roe"] bps = g["bps"] momentum = 0.6 * pct5 + 0.4 * pct20 # 基本面质量(全部取 log1p 处理) quality = ( pl.when(undist > 0).then(undist.log1p()).otherwise(0.0) + pl.when(roe > 0).then(roe.log1p()).otherwise(0.0) + pl.when(bps > 0).then(bps.log1p()).otherwise(0.0) ) score = momentum * (quality + 1.0) return score.alias(self.factor_id) class DynamicIndustryLeadership(DateWiseFactor): factor_id = "dynamic_industry_leadership" required_factor_ids = ["l2_code", "return_5", "lg_flow", "turnover_rate"] def __init__(self): super(DynamicIndustryLeadership, self).__init__( name=self.factor_id, parameters={}, required_factor_ids=self.required_factor_ids ) def calc_factor(self, g: pl.DataFrame) -> pl.Series: # 使用窗口函数:按 industry 分组计算 z-score mom = pl.col("return_5") flow = pl.col("lg_flow") turn = pl.col("turnover_rate").log1p() # 行业内均值和标准差 mom_mean = mom.mean().over("l2_code") mom_std = mom.std().over("l2_code") flow_mean = flow.mean().over("l2_code") flow_std = flow.std().over("l2_code") turn_mean = turn.mean().over("l2_code") turn_std = turn.std().over("l2_code") # 安全 z-score:避免 std=0 mom_z = pl.when(mom_std > 1e-8).then((mom - mom_mean) / mom_std).otherwise(0.0) flow_z = pl.when(flow_std > 1e-8).then((flow - flow_mean) / flow_std).otherwise(0.0) turn_z = pl.when(turn_std > 1e-8).then((turn - turn_mean) / turn_std).otherwise(0.0) # 合成因子 leadership = mom_z + flow_z + turn_z # 执行表达式并返回 Series result = g.select(leadership.alias(self.factor_id)) return result.to_series()