""" 动量因子模块 包含基于股票截面和日期截面的动量因子实现 """ import numpy as np import polars as pl from main.factor.operator_framework import StockWiseFactor, DateWiseFactor # -------------------- 股票截面因子:基于时间序列的动量因子 -------------------- class ReturnFactor(StockWiseFactor): """N日收益率因子""" def __init__(self, period: int = 20): super().__init__( name="return", parameters={"period": period}, required_factor_ids=["close"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算N日收益率(时间序列操作) return group_df["close"].pct_change(self.period).alias(self.factor_id) class VolatilityFactor(StockWiseFactor): """N日波动率因子""" def __init__(self, period: int = 20): super().__init__( name="volatility", parameters={"period": period}, required_factor_ids=["pct_chg"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算N日波动率(时间序列操作) return group_df["pct_chg"].rolling_std(self.period).alias(self.factor_id) class MomentumFactor(StockWiseFactor): """动量因子:过去N日累计收益率""" def __init__(self, period: int = 20): super().__init__( name="momentum", parameters={"period": period}, required_factor_ids=["pct_chg"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算N日累计动量(时间序列操作) return group_df["pct_chg"].rolling_sum(self.period).alias(self.factor_id) class MomentumAcceleration(StockWiseFactor): """ 动量加速因子: (短期波动率调整后动量 - 长期波动率调整后动量) 用于捕捉趋势正在形成或加强的股票 """ def __init__(self, short_period: int = 20, long_period: int = 60): super().__init__( name="momentum_acceleration", parameters={"short_period": short_period, "long_period": long_period}, required_factor_ids=["pct_chg"] ) self.short_period = short_period self.long_period = long_period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: epsilon = 1e-9 # 计算短期波动率调整后动量 short_momentum = group_df["pct_chg"].rolling_sum(self.short_period) short_vol = group_df["pct_chg"].rolling_std(self.short_period) short_adj_momentum = short_momentum / (short_vol + epsilon) # 计算长期波动率调整后动量 long_momentum = group_df["pct_chg"].rolling_sum(self.long_period) long_vol = group_df["pct_chg"].rolling_std(self.long_period) long_adj_momentum = long_momentum / (long_vol + epsilon) # 计算加速因子 acceleration = (short_adj_momentum - long_adj_momentum).alias(self.factor_id) return acceleration class TrendEfficiency(StockWiseFactor): """ 趋势效率因子: 过去N日价格净变化 / 过去N日每日价格变化的绝对值之和 衡量趋势的信噪比,值越接近1,趋势越清晰、噪声越小 """ def __init__(self, period: int = 20): super().__init__( name="trend_efficiency", parameters={"period": period}, # 此因子需要收盘价来计算 required_factor_ids=["close"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 1. 计算N日内的净价格变动(信号) # 使用 diff(n) 计算当前价格与n天前价格的差值 net_change = group_df["close"].diff(self.period).abs() # 2. 计算N日内每日价格变动的绝对值之和(总路径/噪声) # 先计算每日变动 diff(1),取绝对值,再滚动求和 total_path = group_df["close"].diff(1).abs().rolling_sum(self.period) # 3. 计算效率比率 epsilon = 1e-9 efficiency_ratio = (net_change / (total_path + epsilon)).alias(self.factor_id) return efficiency_ratio # -------------------- 统一计算函数 -------------------- def calculate_momentum_factors(df: pl.DataFrame) -> pl.DataFrame: """ 统一计算动量因子的函数 Parameters: df (pl.DataFrame): 输入的股票数据表,必须包含以下列: ts_code, trade_date, close, pct_chg, high, low, vol Returns: pl.DataFrame: 包含所有动量因子的DataFrame """ # 初始化结果DataFrame result_df = df.clone() # 定义要计算的因子列表 # 先计算股票截面因子(时间序列因子) stock_operators = [ ReturnFactor(5), ReturnFactor(20), VolatilityFactor(10), VolatilityFactor(30), MomentumFactor(10), MomentumFactor(30), RSI_Factor(14) ] # 依次应用股票截面因子算子 for operator in stock_operators: try: result_df = operator.apply(result_df) except Exception as e: print(f"计算股票截面因子 {operator.factor_id} 时出错: {e}") # 再计算日期截面因子(横截面排序因子) date_operators = [ CrossSectionalRanking("return_5d"), CrossSectionalRanking("return_20d"), CrossSectionalRanking("volatility_10d"), CrossSectionalRanking("momentum_10d") ] # 依次应用日期截面因子算子 for operator in date_operators: try: result_df = operator.apply(result_df) except Exception as e: print(f"计算日期截面因子 {operator.factor_id} 时出错: {e}") return result_df