Files
NewQuant/src/indicators/indicators.py

650 lines
22 KiB
Python
Raw Normal View History

2025-09-16 09:59:38 +08:00
from abc import ABC
from typing import List, Union, Tuple, Optional
2025-07-10 15:07:31 +08:00
import numpy as np
import talib
2025-09-16 09:59:38 +08:00
from numpy.lib._stride_tricks_impl import sliding_window_view
2025-07-10 15:07:31 +08:00
from src.indicators.base_indicators import Indicator
2025-09-16 09:59:38 +08:00
class Empty(Indicator, ABC):
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array):
return []
def is_condition_met(self,
close: np.array,
open: np.array,
high: np.array,
low: np.array,
volume: np.array):
return True
def get_name(self):
return "Empty"
2025-07-10 15:07:31 +08:00
class RSI(Indicator):
"""
相对强弱指数 (RSI) 指标实现使用 TA-Lib 简化计算
"""
2025-07-15 22:45:51 +08:00
def __init__(
self,
window: int = 14,
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
2025-07-10 15:07:31 +08:00
self.window = window
2025-07-15 22:45:51 +08:00
self.shift_window = shift_window
2025-07-10 15:07:31 +08:00
2025-07-15 22:45:51 +08:00
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array,
) -> np.array: # 不使用
2025-07-10 15:07:31 +08:00
"""
根据收盘价列表计算RSI值使用 TA-Lib
Args:
close (np.array): 收盘价列表
其他 OHLCV 参数在此指标中不使用
Returns:
np.array: RSI值列表如果数据不足则列表开头为NaN
"""
# 使用 talib.RSI 直接计算
# 注意TA-Lib 会在数据不足时自动填充 NaN
rsi_values = talib.RSI(close, timeperiod=self.window)
# 将 numpy 数组转换为 list 并返回
return rsi_values
2025-07-15 22:45:51 +08:00
2025-07-10 15:07:31 +08:00
def get_name(self):
2025-07-15 22:45:51 +08:00
return f"rsi_{self.window}"
2025-07-10 15:07:31 +08:00
class HistoricalRange(Indicator):
"""
历史波动幅度指标计算过去 N 日的 (最高价 - 最低价) 的简单移动平均
"""
2025-07-15 22:45:51 +08:00
def __init__(
self, down_bound: float = None, up_bound: float = None, shift_window: int = 0
):
super().__init__(down_bound, up_bound)
self.shift_window = shift_window
2025-07-10 15:07:31 +08:00
2025-07-15 22:45:51 +08:00
def get_values(
self,
close: np.array, # 不使用
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array,
) -> np.array: # 不使用
2025-07-10 15:07:31 +08:00
"""
根据最高价和最低价列表计算过去 N 日的 (high - low) 值的简单移动平均
Args:
high (np.array): 最高价列表
low (np.array): 最低价列表
其他 OHLCV 参数在此指标中不使用
Returns:
np.array: 历史波动幅度指标值列表如果数据不足则列表开头为NaN
"""
# if not high or not low or len(high) != len(low):
# print(high, low, len(high), len(low))
# return []
# 计算每日的 (high - low) 范围
daily_ranges = high - low
# 将 numpy 数组转换为 list 并返回
2025-07-15 22:45:51 +08:00
return daily_ranges
def get_name(self):
return f"range_{self.shift_window}"
class DifferencedVolumeIndicator(Indicator):
"""
计算当前交易量与前一交易量的差值
volume[t] - volume[t-1]
用于识别交易量变化的趋势常用于平稳化交易量序列
"""
def __init__(
self, down_bound: float = None, up_bound: float = None, shift_window: int = 0
):
# 差值没有固定上下界,取决于实际交易量
super().__init__(down_bound, up_bound)
self.shift_window = shift_window
def get_values(
self,
close: np.array, # 不使用
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array,
) -> np.array:
"""
根据交易量计算其差分值
Args:
volume (np.array): 交易量列表
其他 OHLCV 参数在此指标中不使用
Returns:
np.array: 交易量差分值列表第一个值为NaN
"""
if not isinstance(volume, np.ndarray) or len(volume) < 2:
return np.full_like(
volume if isinstance(volume, np.ndarray) else [], np.nan, dtype=float
)
# 计算相邻交易量的差值
# np.diff(volume) 会比原数组少一个元素,前面补 NaN
diff_volume = np.concatenate(([np.nan], np.diff(volume)))
return diff_volume
def get_name(self) -> str:
return f"differenced_volume_{self.shift_window}"
class StochasticOscillator(Indicator):
"""
随机摆动指标 (%K)衡量收盘价在近期价格高低区间内的位置
这是一个平稳的动量摆动指标值域在 [0, 100] 之间
"""
def __init__(
self,
fastk_period: int = 14,
slowk_period: int = 3,
slowd_period: int = 3, # 在此实现中未使用 slowd但保留以符合标准
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.fastk_period = fastk_period
self.slowk_period = slowk_period
self.slowd_period = slowd_period
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
) -> np.array:
"""
根据最高价最低价和收盘价计算随机摆动指标 %K 的值
Args:
high (np.array): 最高价列表
low (np.array): 最低价列表
close (np.array): 收盘价列表
Returns:
np.array: 慢速 %K 线的值列表
"""
# TA-Lib 的 STOCH 函数返回 slowk 和 slowd 两条线
# 我们通常使用 slowk 作为主要的摆动指标
slowk, _ = talib.STOCH(
high,
low,
close,
fastk_period=self.fastk_period,
slowk_period=self.slowk_period,
slowk_matype=0, # 使用 SMA
slowd_period=self.slowd_period,
slowd_matype=0, # 使用 SMA
)
return slowk
def get_name(self):
return f"stoch_k_{self.fastk_period}_{self.slowk_period}"
class RateOfChange(Indicator):
"""
价格变化率 (ROC)衡量当前价格与 N 期前价格的百分比变化
这是一个平稳的动量指标围绕 0 波动
"""
def __init__(
self,
window: int = 10,
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array, # 不使用
) -> np.array:
"""
根据收盘价计算 ROC
Args:
close (np.array): 收盘价列表
Returns:
np.array: ROC 值列表
"""
roc_values = talib.ROC(close, timeperiod=self.window)
return roc_values
def get_name(self):
return f"roc_{self.window}"
class NormalizedATR(Indicator):
"""
归一化平均真实波幅 (NATR) ATR / Close * 100
将绝对波动幅度转换为相对波动百分比使其成为一个更平稳的波动率指标
"""
def __init__(
2025-09-16 09:59:38 +08:00
self,
window: int = 14,
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
2025-07-15 22:45:51 +08:00
):
super().__init__(down_bound, up_bound)
self.window = window
self.shift_window = shift_window
def get_values(
2025-09-16 09:59:38 +08:00
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
2025-07-15 22:45:51 +08:00
) -> np.array:
"""
根据最高价最低价和收盘价计算 NATR
Args:
high (np.array): 最高价列表
low (np.array): 最低价列表
close (np.array): 收盘价列表
Returns:
np.array: NATR 值列表
"""
# 使用 TA-Lib 直接计算 NATR
natr_values = talib.NATR(high, low, close, timeperiod=self.window)
return natr_values
2025-07-10 15:07:31 +08:00
def get_name(self):
2025-07-15 22:45:51 +08:00
return f"natr_{self.window}"
2025-09-16 09:59:38 +08:00
class ADX(Indicator):
"""
平均趋向指标 (ADX)用于衡量趋势的强度而非方向
是区分趋势行情和震荡行情的核心过滤指标
"""
def __init__(
2025-09-16 09:59:38 +08:00
self,
window: int = 14,
down_bound: float = None, # 例如,设置 down_bound=25 可过滤出强趋势行情
up_bound: float = None, # 例如,设置 up_bound=20 可过滤出震荡行情
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.shift_window = shift_window
def get_values(
2025-09-16 09:59:38 +08:00
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
) -> np.array:
"""
根据最高价最低价和收盘价计算ADX值
"""
adx_values = talib.ADX(high, low, close, timeperiod=self.window)
return adx_values
def get_name(self):
return f"adx_{self.window}"
class BollingerBandwidth(Indicator):
"""
布林带宽度计算公式为 (上轨 - 下轨) / 中轨
这是一个归一化的波动率指标用于识别波动性的收缩Squeeze和扩张
"""
2025-09-16 09:59:38 +08:00
def __init__(
2025-09-16 09:59:38 +08:00
self,
window: int = 20,
nbdev: float = 2.0, # 标准差倍数
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.nbdev = nbdev
self.shift_window = shift_window
def get_values(
2025-09-16 09:59:38 +08:00
self,
close: np.array,
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array, # 不使用
) -> np.array:
"""
根据收盘价计算布林带宽度
"""
upper, middle, lower = talib.BBANDS(
close,
timeperiod=self.window,
nbdevup=self.nbdev,
nbdevdn=self.nbdev,
2025-09-16 09:59:38 +08:00
matype=0 # 使用SMA
)
# 为避免除以0在 middle 为0或NaN的地方带宽也设为NaN
bandwidth = np.full_like(middle, np.nan)
mask = (middle > 0)
bandwidth[mask] = (upper[mask] - lower[mask]) / middle[mask] * 100
return bandwidth
def get_name(self):
2025-09-16 09:59:38 +08:00
return f"bbw_{self.window}_{int(self.nbdev * 10)}"
# ====================================================================
# 1. 通用版:价格范围与波动率比率 (Price Range to Volatility Ratio)
# ====================================================================
class PriceRangeToVolatilityRatio(Indicator):
"""
衡量一个n根K线窗口内的价格范围与ATR的比率
n_period: 窗口大小
atr_period: 计算ATR的周期
"""
def __init__(self, n_period: int = 3, atr_period: int = 14, down_bound: Optional[float] = None,
up_bound: Optional[float] = None):
super().__init__(down_bound, up_bound)
self.n_period = n_period
self.atr_period = atr_period
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array,
**kwargs) -> np.array:
# 计算整个窗口内的价格范围(最高价 - 最低价)
high_in_window = self._rolling_max(high, self.n_period)
low_in_window = self._rolling_min(low, self.n_period)
price_range = high_in_window - low_in_window
# 计算ATR
atr_values = talib.ATR(high, low, close, timeperiod=self.atr_period)
# 计算比率
ratio = price_range / atr_values
return ratio
def _rolling_max(self,arr: np.array, window: int) -> np.array:
if len(arr) < window:
return np.full_like(arr, np.nan)
# 创建滑动窗口视图
view = sliding_window_view(arr, window_shape=window)
# 对每个窗口求最大值
rolling_max = np.max(view, axis=1)
# 填充结果数组前面用NaN填充
result = np.full_like(arr, np.nan)
result[window - 1:] = rolling_max
return result
def _rolling_min(self, arr: np.array, window: int) -> np.array:
if len(arr) < window:
return np.full_like(arr, np.nan)
view = sliding_window_view(arr, window_shape=window)
rolling_min = np.min(view, axis=1)
result = np.full_like(arr, np.nan)
result[window - 1:] = rolling_min
return result
def get_name(self) -> str:
return f"price_range_to_vol_ratio_n{self.n_period}_atr{self.atr_period}"
# ====================================================================
# 2. 通用版动力K线信念度 (Impulse Candle Conviction)
# ====================================================================
class ImpulseCandleConviction(Indicator):
"""
量化指定K线收盘价在实体中的位置
n_period: 窗口大小
impulse_index_from_end: 动力K线在窗口中的位置从末尾数0为最后一根
"""
def __init__(self, n_period: int = 3, impulse_index_from_end: int = 1, down_bound: Optional[float] = None,
up_bound: Optional[float] = None):
super().__init__(down_bound, up_bound)
self.n_period = n_period
self.impulse_index_from_end = impulse_index_from_end
if self.impulse_index_from_end >= self.n_period:
raise ValueError("impulse_index_from_end must be less than n_period")
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array,
**kwargs) -> np.array:
conviction_values = np.full_like(close, np.nan)
# 使用切片获取动力K线的数据
impulse_high = np.roll(high, -self.impulse_index_from_end)
impulse_low = np.roll(low, -self.impulse_index_from_end)
impulse_close = np.roll(close, -self.impulse_index_from_end)
impulse_open = np.roll(open, -self.impulse_index_from_end)
# 检查K线是看涨还是看跌
is_bullish = impulse_close > impulse_open
# 计算K线实体范围
candle_range = impulse_high - impulse_low
# 看涨信念度
bullish_conviction = (impulse_close - impulse_low) / candle_range
# 看跌信念度
bearish_conviction = (impulse_high - impulse_close) / candle_range
# 根据看涨看跌应用不同的公式
conviction_values[is_bullish] = bullish_conviction[is_bullish]
conviction_values[~is_bullish] = bearish_conviction[~is_bullish]
# 确保分母不为0且只在有效的窗口位置返回结果
mask = (candle_range > 0)
conviction_values[~mask] = np.nan
# 由于使用了np.roll需要截取到原始数组的长度
return conviction_values
def get_name(self) -> str:
return f"conviction_n{self.n_period}_idx{self.impulse_index_from_end}"
# ====================================================================
# 3. 通用版:相对成交量 (Relative Volume)
# ====================================================================
class RelativeVolumeInWindow(Indicator):
"""
衡量指定K线的成交量与其前n根K线内的简单移动平均成交量之比
n_period: SMA的计算周期
impulse_index_from_end: 动力K线在窗口中的位置从末尾数0为最后一根
"""
def __init__(self, n_period: int = 20, impulse_index_from_end: int = 1, down_bound: Optional[float] = None,
up_bound: Optional[float] = None):
super().__init__(down_bound, up_bound)
self.n_period = n_period
self.impulse_index_from_end = impulse_index_from_end
if self.impulse_index_from_end >= self.n_period:
raise ValueError("impulse_index_from_end must be less than n_period")
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array,
**kwargs) -> np.array:
# 计算成交量的SMA
volume_sma = talib.SMA(volume, timeperiod=self.n_period)
# 提取指定位置的K线成交量
impulse_volume = np.roll(volume, -self.impulse_index_from_end)
# 提取SMA值
sma_at_position = np.roll(volume_sma, -self.impulse_index_from_end)
relative_volume = np.full_like(volume, np.nan)
mask = sma_at_position > 0
relative_volume[mask] = impulse_volume[mask] / sma_at_position[mask]
return relative_volume
def get_name(self) -> str:
2025-09-20 00:04:51 +08:00
return f"relative_volume_sma{self.n_period}_idx{self.impulse_index_from_end}"
class ROC_MA(Indicator):
"""
变动率的移动平均 (ROC_MA) 指标实现
该指标首先计算ROC然后对其结果应用移动平均以获得更平滑的动量曲线
"""
def __init__(
self,
roc_window: int = 60,
ma_window: int = 20,
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
"""
初始化 ROC_MA 指标
Args:
roc_window (int): 计算ROC所需的回看周期
ma_window (int): 对ROC值进行平滑的移动平均周期
down_bound (float): (可选) 用于条件判断的下轨
up_bound (float): (可选) 用于条件判断的上轨
shift_window (int): (可选) 指标值的时间偏移
"""
# 【关键】调用父类的初始化方法
super().__init__(down_bound, up_bound)
self.roc_window = roc_window
self.ma_window = ma_window
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array,
high: np.array,
low: np.array,
volume: np.array,
) -> np.array:
"""
根据收盘价列表计算 ROC_MA
Args:
close (np.array): 收盘价列表
其他 OHLCV 参数在此指标中不使用
Returns:
np.array: ROC_MA 值列表如果数据不足则列表开头为NaN
"""
# 步骤 1: 使用 talib.ROC 计算原始的ROC值
# TA-Lib 会在数据不足时自动填充 NaN
roc_values = talib.ROC(close, timeperiod=self.roc_window)
# 步骤 2: 对 roc_values 计算移动平均 (SMA)
# 注意在计算MA之前ROC已经产生了一些NaNTA-Lib的MA函数会处理这些NaN
# 并产生更多的NaN这是正常的。
roc_ma_values = talib.SMA(roc_values, timeperiod=self.ma_window)
# 返回最终的 numpy 数组
return roc_ma_values
def get_name(self) -> str:
"""
返回指标的唯一名称用于标识和调试
"""
2025-11-07 16:26:00 +08:00
return f"roc_ma_{self.roc_window}_{self.ma_window}"
from numpy.lib.stride_tricks import sliding_window_view
class ZScoreATR(Indicator):
def __init__(
self,
atr_window: int = 14,
z_window: int = 100,
down_bound: float = None,
up_bound: float = None,
):
super().__init__(down_bound, up_bound)
self.atr_window = atr_window
self.z_window = z_window
def get_values(self, close, open, high, low, volume) -> np.ndarray:
n = len(close)
min_len = self.atr_window + self.z_window
if n < min_len:
return np.full(n, np.nan, dtype=np.float64)
# Step 1: 计算 ATR (NumPy array)
atr = talib.ATR(high, low, close, timeperiod=self.atr_window) # shape: (n,)
# Step 2: 只对有效区域计算 z-score
start_idx = self.atr_window - 1 # ATR 从这里开始非 NaN
valid_atr = atr[start_idx:] # shape: (n - start_idx,)
valid_n = len(valid_atr)
if valid_n < self.z_window:
return np.full(n, np.nan, dtype=np.float64)
# Step 3: 使用 sliding_window_view 构造滚动窗口(无数据复制)
# windows: shape = (valid_n - z_window + 1, z_window)
windows = sliding_window_view(valid_atr, window_shape=self.z_window)
# Step 4: 向量化计算均值和标准差(沿窗口轴)
means = np.mean(windows, axis=1) # shape: (M,)
stds = np.std(windows, axis=1, ddof=0) # shape: (M,)
# Step 5: 计算 z-score当前值是窗口最后一个元素
current_vals = valid_atr[self.z_window - 1:] # 对齐窗口末尾
zscores_valid = np.empty_like(valid_atr)
zscores_valid[:self.z_window - 1] = np.nan
# 安全除法:避免除零
with np.errstate(divide='ignore', invalid='ignore'):
z = (current_vals - means) / stds
zscores_valid[self.z_window - 1:] = np.where(stds > 1e-12, z, 0.0)
# Step 6: 拼回完整长度(前面 ATR 无效部分为 NaN
result = np.full(n, np.nan, dtype=np.float64)
result[start_idx:] = zscores_valid
return result
def get_name(self):
return f"z_atr_{self.atr_window}_{self.z_window}"