""" 动量因子模块 包含基于股票截面和日期截面的动量因子实现 """ import numpy as np import polars as pl from main.factor.operator_framework import StockWiseFactor, DateWiseFactor # -------------------- 股票截面因子:基于时间序列的动量因子 -------------------- class ReturnFactor(StockWiseFactor): """N日收益率因子""" def __init__(self, period: int = 20): super().__init__( name="return", parameters={"period": period}, required_factor_ids=["close"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算N日收益率(时间序列操作) return group_df["close"].pct_change(self.period).alias(self.factor_id) class VolatilityFactor(StockWiseFactor): """N日波动率因子""" def __init__(self, period: int = 20): super().__init__( name="volatility", parameters={"period": period}, required_factor_ids=["pct_chg"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算N日波动率(时间序列操作) return group_df["pct_chg"].rolling_std(self.period).alias(self.factor_id) class MomentumFactor(StockWiseFactor): """动量因子:过去N日累计收益率""" def __init__(self, period: int = 20): super().__init__( name="momentum", parameters={"period": period}, required_factor_ids=["pct_chg"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算N日累计动量(时间序列操作) return group_df["pct_chg"].rolling_sum(self.period).alias(self.factor_id) class MomentumAcceleration(StockWiseFactor): """ 动量加速因子: (短期波动率调整后动量 - 长期波动率调整后动量) 用于捕捉趋势正在形成或加强的股票 """ def __init__(self, short_period: int = 20, long_period: int = 60): super().__init__( name="momentum_acceleration", parameters={"short_period": short_period, "long_period": long_period}, required_factor_ids=["pct_chg"] ) self.short_period = short_period self.long_period = long_period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: epsilon = 1e-9 # 计算短期波动率调整后动量 short_momentum = group_df["pct_chg"].rolling_sum(self.short_period) short_vol = group_df["pct_chg"].rolling_std(self.short_period) short_adj_momentum = short_momentum / (short_vol + epsilon) # 计算长期波动率调整后动量 long_momentum = group_df["pct_chg"].rolling_sum(self.long_period) long_vol = group_df["pct_chg"].rolling_std(self.long_period) long_adj_momentum = long_momentum / (long_vol + epsilon) # 计算加速因子 acceleration = (short_adj_momentum - long_adj_momentum).alias(self.factor_id) return acceleration class TrendEfficiency(StockWiseFactor): """ 趋势效率因子: 过去N日价格净变化 / 过去N日每日价格变化的绝对值之和 衡量趋势的信噪比,值越接近1,趋势越清晰、噪声越小 """ def __init__(self, period: int = 20): super().__init__( name="trend_efficiency", parameters={"period": period}, # 此因子需要收盘价来计算 required_factor_ids=["close"] ) self.period = period def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 1. 计算N日内的净价格变动(信号) # 使用 diff(n) 计算当前价格与n天前价格的差值 net_change = group_df["close"].diff(self.period).abs() # 2. 计算N日内每日价格变动的绝对值之和(总路径/噪声) # 先计算每日变动 diff(1),取绝对值,再滚动求和 total_path = group_df["close"].diff(1).abs().rolling_sum(self.period) # 3. 计算效率比率 epsilon = 1e-9 efficiency_ratio = (net_change / (total_path + epsilon)).alias(self.factor_id) return efficiency_ratio class SimpleVolatilityFactor(StockWiseFactor): factor_id = "simple_volatility" required_factor_ids = ["high", "low", "vol"] def __init__(self): super(SimpleVolatilityFactor, self).__init__( name=self.factor_id, parameters={}, required_factor_ids=self.required_factor_ids ) def calc_factor(self, g: pl.DataFrame) -> pl.Series: high = g["high"] low = g["low"] vol = g["vol"] # Step 1: 计算 EM_i,t # 注意:shift(1) 得到 t-1 的值 em = ((high + low) - (high.shift(1) + low.shift(1))) / 2.0 em = em.fill_null(0.0) # 第一天无前值,设为0 # Step 2: 计算 BR_i,t # 避免除零:若 High == Low,设 BR = 0 range_ = high - low br = vol / range_ br = br.fill_null(0.0).replace({float('inf'): 0.0, float('-inf'): 0.0}) # Step 3: 计算 MM_i,t = EM / BR mm = em / br mm = mm.fill_null(0.0).replace({float('inf'): 0.0, float('-inf'): 0.0}) # Step 4: 计算 239 日简单移动平均 emv = mm.rolling_mean(window_size=239, min_periods=1) return emv.alias(self.factor_id)