160 lines
5.5 KiB
Python
160 lines
5.5 KiB
Python
"""
|
||
动量因子模块
|
||
包含基于股票截面和日期截面的动量因子实现
|
||
"""
|
||
|
||
import numpy as np
|
||
import polars as pl
|
||
from main.factor.operator_framework import StockWiseFactor, DateWiseFactor
|
||
|
||
|
||
# -------------------- 股票截面因子:基于时间序列的动量因子 --------------------
|
||
class ReturnFactor(StockWiseFactor):
|
||
"""N日收益率因子"""
|
||
|
||
def __init__(self, period: int = 20):
|
||
super().__init__(
|
||
name="return",
|
||
parameters={"period": period},
|
||
required_factor_ids=["close"]
|
||
)
|
||
self.period = period
|
||
|
||
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
|
||
# 计算N日收益率(时间序列操作)
|
||
return group_df["close"].pct_change(self.period).alias(self.factor_id)
|
||
|
||
|
||
class VolatilityFactor(StockWiseFactor):
|
||
"""N日波动率因子"""
|
||
|
||
def __init__(self, period: int = 20):
|
||
super().__init__(
|
||
name="volatility",
|
||
parameters={"period": period},
|
||
required_factor_ids=["pct_chg"]
|
||
)
|
||
self.period = period
|
||
|
||
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
|
||
# 计算N日波动率(时间序列操作)
|
||
return group_df["pct_chg"].rolling_std(self.period).alias(self.factor_id)
|
||
|
||
|
||
class MomentumFactor(StockWiseFactor):
|
||
"""动量因子:过去N日累计收益率"""
|
||
|
||
def __init__(self, period: int = 20):
|
||
super().__init__(
|
||
name="momentum",
|
||
parameters={"period": period},
|
||
required_factor_ids=["pct_chg"]
|
||
)
|
||
self.period = period
|
||
|
||
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
|
||
# 计算N日累计动量(时间序列操作)
|
||
return group_df["pct_chg"].rolling_sum(self.period).alias(self.factor_id)
|
||
|
||
class MomentumAcceleration(StockWiseFactor):
|
||
"""
|
||
动量加速因子:
|
||
(短期波动率调整后动量 - 长期波动率调整后动量)
|
||
用于捕捉趋势正在形成或加强的股票
|
||
"""
|
||
|
||
def __init__(self, short_period: int = 20, long_period: int = 60):
|
||
super().__init__(
|
||
name="momentum_acceleration",
|
||
parameters={"short_period": short_period, "long_period": long_period},
|
||
required_factor_ids=["pct_chg"]
|
||
)
|
||
self.short_period = short_period
|
||
self.long_period = long_period
|
||
|
||
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
|
||
epsilon = 1e-9
|
||
|
||
# 计算短期波动率调整后动量
|
||
short_momentum = group_df["pct_chg"].rolling_sum(self.short_period)
|
||
short_vol = group_df["pct_chg"].rolling_std(self.short_period)
|
||
short_adj_momentum = short_momentum / (short_vol + epsilon)
|
||
|
||
# 计算长期波动率调整后动量
|
||
long_momentum = group_df["pct_chg"].rolling_sum(self.long_period)
|
||
long_vol = group_df["pct_chg"].rolling_std(self.long_period)
|
||
long_adj_momentum = long_momentum / (long_vol + epsilon)
|
||
|
||
# 计算加速因子
|
||
acceleration = (short_adj_momentum - long_adj_momentum).alias(self.factor_id)
|
||
|
||
return acceleration
|
||
|
||
|
||
class TrendEfficiency(StockWiseFactor):
|
||
"""
|
||
趋势效率因子:
|
||
过去N日价格净变化 / 过去N日每日价格变化的绝对值之和
|
||
衡量趋势的信噪比,值越接近1,趋势越清晰、噪声越小
|
||
"""
|
||
|
||
def __init__(self, period: int = 20):
|
||
super().__init__(
|
||
name="trend_efficiency",
|
||
parameters={"period": period},
|
||
# 此因子需要收盘价来计算
|
||
required_factor_ids=["close"]
|
||
)
|
||
self.period = period
|
||
|
||
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
|
||
# 1. 计算N日内的净价格变动(信号)
|
||
# 使用 diff(n) 计算当前价格与n天前价格的差值
|
||
net_change = group_df["close"].diff(self.period).abs()
|
||
|
||
# 2. 计算N日内每日价格变动的绝对值之和(总路径/噪声)
|
||
# 先计算每日变动 diff(1),取绝对值,再滚动求和
|
||
total_path = group_df["close"].diff(1).abs().rolling_sum(self.period)
|
||
|
||
# 3. 计算效率比率
|
||
epsilon = 1e-9
|
||
efficiency_ratio = (net_change / (total_path + epsilon)).alias(self.factor_id)
|
||
|
||
return efficiency_ratio
|
||
|
||
class SimpleVolatilityFactor(StockWiseFactor):
|
||
factor_id = "simple_volatility"
|
||
required_factor_ids = ["high", "low", "vol"]
|
||
|
||
def __init__(self):
|
||
super(SimpleVolatilityFactor, self).__init__(
|
||
name=self.factor_id,
|
||
parameters={},
|
||
required_factor_ids=self.required_factor_ids
|
||
)
|
||
|
||
def calc_factor(self, g: pl.DataFrame) -> pl.Series:
|
||
high = g["high"]
|
||
low = g["low"]
|
||
vol = g["vol"]
|
||
|
||
# Step 1: 计算 EM_i,t
|
||
# 注意:shift(1) 得到 t-1 的值
|
||
em = ((high + low) - (high.shift(1) + low.shift(1))) / 2.0
|
||
em = em.fill_null(0.0) # 第一天无前值,设为0
|
||
|
||
# Step 2: 计算 BR_i,t
|
||
# 避免除零:若 High == Low,设 BR = 0
|
||
range_ = high - low
|
||
br = vol / range_
|
||
br = br.fill_null(0.0).replace({float('inf'): 0.0, float('-inf'): 0.0})
|
||
|
||
# Step 3: 计算 MM_i,t = EM / BR
|
||
mm = em / br
|
||
mm = mm.fill_null(0.0).replace({float('inf'): 0.0, float('-inf'): 0.0})
|
||
|
||
# Step 4: 计算 239 日简单移动平均
|
||
emv = mm.rolling_mean(window_size=239, min_periods=1)
|
||
|
||
return emv.alias(self.factor_id)
|