from typing import List, Union import numpy as np import talib from src.indicators.base_indicators import Indicator class RSI(Indicator): """ 相对强弱指数 (RSI) 指标实现,使用 TA-Lib 简化计算。 """ def __init__( self, window: int = 14, down_bound: float = None, up_bound: float = None, shift_window: int = 0, ): super().__init__(down_bound, up_bound) self.window = window self.shift_window = shift_window def get_values( self, close: np.array, open: np.array, # 不使用 high: np.array, # 不使用 low: np.array, # 不使用 volume: np.array, ) -> np.array: # 不使用 """ 根据收盘价列表计算RSI值,使用 TA-Lib。 Args: close (np.array): 收盘价列表。 其他 OHLCV 参数在此指标中不使用。 Returns: np.array: RSI值列表。如果数据不足,则列表开头为NaN。 """ # 使用 talib.RSI 直接计算 # 注意:TA-Lib 会在数据不足时自动填充 NaN rsi_values = talib.RSI(close, timeperiod=self.window) # 将 numpy 数组转换为 list 并返回 return rsi_values def get_name(self): return f"rsi_{self.window}" class HistoricalRange(Indicator): """ 历史波动幅度指标:计算过去 N 日的 (最高价 - 最低价) 的简单移动平均。 """ def __init__( self, down_bound: float = None, up_bound: float = None, shift_window: int = 0 ): super().__init__(down_bound, up_bound) self.shift_window = shift_window def get_values( self, close: np.array, # 不使用 open: np.array, # 不使用 high: np.array, low: np.array, volume: np.array, ) -> np.array: # 不使用 """ 根据最高价和最低价列表计算过去 N 日的 (high - low) 值的简单移动平均。 Args: high (np.array): 最高价列表。 low (np.array): 最低价列表。 其他 OHLCV 参数在此指标中不使用。 Returns: np.array: 历史波动幅度指标值列表。如果数据不足,则列表开头为NaN。 """ # if not high or not low or len(high) != len(low): # print(high, low, len(high), len(low)) # return [] # 计算每日的 (high - low) 范围 daily_ranges = high - low # 将 numpy 数组转换为 list 并返回 return daily_ranges def get_name(self): return f"range_{self.shift_window}" class DifferencedVolumeIndicator(Indicator): """ 计算当前交易量与前一交易量的差值。 volume[t] - volume[t-1]。 用于识别交易量变化的趋势,常用于平稳化交易量序列。 """ def __init__( self, down_bound: float = None, up_bound: float = None, shift_window: int = 0 ): # 差值没有固定上下界,取决于实际交易量 super().__init__(down_bound, up_bound) self.shift_window = shift_window def get_values( self, close: np.array, # 不使用 open: np.array, # 不使用 high: np.array, # 不使用 low: np.array, # 不使用 volume: np.array, ) -> np.array: """ 根据交易量计算其差分值。 Args: volume (np.array): 交易量列表。 其他 OHLCV 参数在此指标中不使用。 Returns: np.array: 交易量差分值列表。第一个值为NaN。 """ if not isinstance(volume, np.ndarray) or len(volume) < 2: return np.full_like( volume if isinstance(volume, np.ndarray) else [], np.nan, dtype=float ) # 计算相邻交易量的差值 # np.diff(volume) 会比原数组少一个元素,前面补 NaN diff_volume = np.concatenate(([np.nan], np.diff(volume))) return diff_volume def get_name(self) -> str: return f"differenced_volume_{self.shift_window}" class StochasticOscillator(Indicator): """ 随机摆动指标 (%K),衡量收盘价在近期价格高低区间内的位置。 这是一个平稳的动量摆动指标,值域在 [0, 100] 之间。 """ def __init__( self, fastk_period: int = 14, slowk_period: int = 3, slowd_period: int = 3, # 在此实现中未使用 slowd,但保留以符合标准 down_bound: float = None, up_bound: float = None, shift_window: int = 0, ): super().__init__(down_bound, up_bound) self.fastk_period = fastk_period self.slowk_period = slowk_period self.slowd_period = slowd_period self.shift_window = shift_window def get_values( self, close: np.array, open: np.array, # 不使用 high: np.array, low: np.array, volume: np.array, # 不使用 ) -> np.array: """ 根据最高价、最低价和收盘价计算随机摆动指标 %K 的值。 Args: high (np.array): 最高价列表。 low (np.array): 最低价列表。 close (np.array): 收盘价列表。 Returns: np.array: 慢速 %K 线的值列表。 """ # TA-Lib 的 STOCH 函数返回 slowk 和 slowd 两条线 # 我们通常使用 slowk 作为主要的摆动指标 slowk, _ = talib.STOCH( high, low, close, fastk_period=self.fastk_period, slowk_period=self.slowk_period, slowk_matype=0, # 使用 SMA slowd_period=self.slowd_period, slowd_matype=0, # 使用 SMA ) return slowk def get_name(self): return f"stoch_k_{self.fastk_period}_{self.slowk_period}" class RateOfChange(Indicator): """ 价格变化率 (ROC),衡量当前价格与 N 期前价格的百分比变化。 这是一个平稳的动量指标,围绕 0 波动。 """ def __init__( self, window: int = 10, down_bound: float = None, up_bound: float = None, shift_window: int = 0, ): super().__init__(down_bound, up_bound) self.window = window self.shift_window = shift_window def get_values( self, close: np.array, open: np.array, # 不使用 high: np.array, # 不使用 low: np.array, # 不使用 volume: np.array, # 不使用 ) -> np.array: """ 根据收盘价计算 ROC 值。 Args: close (np.array): 收盘价列表。 Returns: np.array: ROC 值列表。 """ roc_values = talib.ROC(close, timeperiod=self.window) return roc_values def get_name(self): return f"roc_{self.window}" class NormalizedATR(Indicator): """ 归一化平均真实波幅 (NATR),即 ATR / Close * 100。 将绝对波动幅度转换为相对波动百分比,使其成为一个更平稳的波动率指标。 """ def __init__( self, window: int = 14, down_bound: float = None, up_bound: float = None, shift_window: int = 0, ): super().__init__(down_bound, up_bound) self.window = window self.shift_window = shift_window def get_values( self, close: np.array, open: np.array, # 不使用 high: np.array, low: np.array, volume: np.array, # 不使用 ) -> np.array: """ 根据最高价、最低价和收盘价计算 NATR 值。 Args: high (np.array): 最高价列表。 low (np.array): 最低价列表。 close (np.array): 收盘价列表。 Returns: np.array: NATR 值列表。 """ # 使用 TA-Lib 直接计算 NATR natr_values = talib.NATR(high, low, close, timeperiod=self.window) return natr_values def get_name(self): return f"natr_{self.window}" class ADX(Indicator): """ 平均趋向指标 (ADX),用于衡量趋势的强度而非方向。 是区分趋势行情和震荡行情的核心过滤指标。 """ def __init__( self, window: int = 14, down_bound: float = None, # 例如,设置 down_bound=25 可过滤出强趋势行情 up_bound: float = None, # 例如,设置 up_bound=20 可过滤出震荡行情 shift_window: int = 0, ): super().__init__(down_bound, up_bound) self.window = window self.shift_window = shift_window def get_values( self, close: np.array, open: np.array, # 不使用 high: np.array, low: np.array, volume: np.array, # 不使用 ) -> np.array: """ 根据最高价、最低价和收盘价计算ADX值。 """ adx_values = talib.ADX(high, low, close, timeperiod=self.window) return adx_values def get_name(self): return f"adx_{self.window}" class BollingerBandwidth(Indicator): """ 布林带宽度,计算公式为 (上轨 - 下轨) / 中轨。 这是一个归一化的波动率指标,用于识别波动性的收缩(Squeeze)和扩张。 """ def __init__( self, window: int = 20, nbdev: float = 2.0, # 标准差倍数 down_bound: float = None, up_bound: float = None, shift_window: int = 0, ): super().__init__(down_bound, up_bound) self.window = window self.nbdev = nbdev self.shift_window = shift_window def get_values( self, close: np.array, open: np.array, # 不使用 high: np.array, # 不使用 low: np.array, # 不使用 volume: np.array, # 不使用 ) -> np.array: """ 根据收盘价计算布林带宽度。 """ upper, middle, lower = talib.BBANDS( close, timeperiod=self.window, nbdevup=self.nbdev, nbdevdn=self.nbdev, matype=0 # 使用SMA ) # 为避免除以0,在 middle 为0或NaN的地方,带宽也设为NaN bandwidth = np.full_like(middle, np.nan) mask = (middle > 0) bandwidth[mask] = (upper[mask] - lower[mask]) / middle[mask] * 100 return bandwidth def get_name(self): return f"bbw_{self.window}_{int(self.nbdev*10)}"