Files
NewStock/main/factor/momentum_factors.py
2025-11-29 00:23:12 +08:00

175 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
动量因子模块
包含基于股票截面和日期截面的动量因子实现
"""
import numpy as np
import polars as pl
from main.factor.operator_framework import StockWiseFactor, DateWiseFactor
# -------------------- 股票截面因子:基于时间序列的动量因子 --------------------
class ReturnFactor(StockWiseFactor):
"""N日收益率因子"""
def __init__(self, period: int = 20):
super().__init__(
name="return",
parameters={"period": period},
required_factor_ids=["close"]
)
self.period = period
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
# 计算N日收益率时间序列操作
return group_df["close"].pct_change(self.period).alias(self.factor_id)
class VolatilityFactor(StockWiseFactor):
"""N日波动率因子"""
def __init__(self, period: int = 20):
super().__init__(
name="volatility",
parameters={"period": period},
required_factor_ids=["pct_chg"]
)
self.period = period
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
# 计算N日波动率时间序列操作
return group_df["pct_chg"].rolling_std(self.period).alias(self.factor_id)
class MomentumFactor(StockWiseFactor):
"""动量因子过去N日累计收益率"""
def __init__(self, period: int = 20):
super().__init__(
name="momentum",
parameters={"period": period},
required_factor_ids=["pct_chg"]
)
self.period = period
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
# 计算N日累计动量时间序列操作
return group_df["pct_chg"].rolling_sum(self.period).alias(self.factor_id)
class MomentumAcceleration(StockWiseFactor):
"""
动量加速因子:
(短期波动率调整后动量 - 长期波动率调整后动量)
用于捕捉趋势正在形成或加强的股票
"""
def __init__(self, short_period: int = 20, long_period: int = 60):
super().__init__(
name="momentum_acceleration",
parameters={"short_period": short_period, "long_period": long_period},
required_factor_ids=["pct_chg"]
)
self.short_period = short_period
self.long_period = long_period
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
epsilon = 1e-9
# 计算短期波动率调整后动量
short_momentum = group_df["pct_chg"].rolling_sum(self.short_period)
short_vol = group_df["pct_chg"].rolling_std(self.short_period)
short_adj_momentum = short_momentum / (short_vol + epsilon)
# 计算长期波动率调整后动量
long_momentum = group_df["pct_chg"].rolling_sum(self.long_period)
long_vol = group_df["pct_chg"].rolling_std(self.long_period)
long_adj_momentum = long_momentum / (long_vol + epsilon)
# 计算加速因子
acceleration = (short_adj_momentum - long_adj_momentum).alias(self.factor_id)
return acceleration
class TrendEfficiency(StockWiseFactor):
"""
趋势效率因子:
过去N日价格净变化 / 过去N日每日价格变化的绝对值之和
衡量趋势的信噪比值越接近1趋势越清晰、噪声越小
"""
def __init__(self, period: int = 20):
super().__init__(
name="trend_efficiency",
parameters={"period": period},
# 此因子需要收盘价来计算
required_factor_ids=["close"]
)
self.period = period
def calc_factor(self, group_df: pl.DataFrame) -> pl.Series:
# 1. 计算N日内的净价格变动信号
# 使用 diff(n) 计算当前价格与n天前价格的差值
net_change = group_df["close"].diff(self.period).abs()
# 2. 计算N日内每日价格变动的绝对值之和总路径/噪声)
# 先计算每日变动 diff(1),取绝对值,再滚动求和
total_path = group_df["close"].diff(1).abs().rolling_sum(self.period)
# 3. 计算效率比率
epsilon = 1e-9
efficiency_ratio = (net_change / (total_path + epsilon)).alias(self.factor_id)
return efficiency_ratio
# -------------------- 统一计算函数 --------------------
def calculate_momentum_factors(df: pl.DataFrame) -> pl.DataFrame:
"""
统一计算动量因子的函数
Parameters:
df (pl.DataFrame): 输入的股票数据表,必须包含以下列:
ts_code, trade_date, close, pct_chg, high, low, vol
Returns:
pl.DataFrame: 包含所有动量因子的DataFrame
"""
# 初始化结果DataFrame
result_df = df.clone()
# 定义要计算的因子列表
# 先计算股票截面因子(时间序列因子)
stock_operators = [
ReturnFactor(5),
ReturnFactor(20),
VolatilityFactor(10),
VolatilityFactor(30),
MomentumFactor(10),
MomentumFactor(30),
RSI_Factor(14)
]
# 依次应用股票截面因子算子
for operator in stock_operators:
try:
result_df = operator.apply(result_df)
except Exception as e:
print(f"计算股票截面因子 {operator.factor_id} 时出错: {e}")
# 再计算日期截面因子(横截面排序因子)
date_operators = [
CrossSectionalRanking("return_5d"),
CrossSectionalRanking("return_20d"),
CrossSectionalRanking("volatility_10d"),
CrossSectionalRanking("momentum_10d")
]
# 依次应用日期截面因子算子
for operator in date_operators:
try:
result_df = operator.apply(result_df)
except Exception as e:
print(f"计算日期截面因子 {operator.factor_id} 时出错: {e}")
return result_df