1、新增dp策略

This commit is contained in:
2025-09-16 09:59:38 +08:00
parent 5cd926884d
commit 9a58fec9ca
120 changed files with 69683 additions and 325 deletions

View File

@@ -1,9 +1,27 @@
from typing import List, Union
from abc import ABC
from typing import List, Union, Tuple, Optional
import numpy as np
import talib
from numpy.lib._stride_tricks_impl import sliding_window_view
from src.indicators.base_indicators import Indicator
class Empty(Indicator, ABC):
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array):
return []
def is_condition_met(self,
close: np.array,
open: np.array,
high: np.array,
low: np.array,
volume: np.array):
return True
def get_name(self):
return "Empty"
class RSI(Indicator):
"""
@@ -236,23 +254,23 @@ class NormalizedATR(Indicator):
"""
def __init__(
self,
window: int = 14,
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
self,
window: int = 14,
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
) -> np.array:
"""
根据最高价、最低价和收盘价计算 NATR 值。
@@ -270,6 +288,7 @@ class NormalizedATR(Indicator):
def get_name(self):
return f"natr_{self.window}"
class ADX(Indicator):
"""
平均趋向指标 (ADX),用于衡量趋势的强度而非方向。
@@ -277,23 +296,23 @@ class ADX(Indicator):
"""
def __init__(
self,
window: int = 14,
down_bound: float = None, # 例如,设置 down_bound=25 可过滤出强趋势行情
up_bound: float = None, # 例如,设置 up_bound=20 可过滤出震荡行情
shift_window: int = 0,
self,
window: int = 14,
down_bound: float = None, # 例如,设置 down_bound=25 可过滤出强趋势行情
up_bound: float = None, # 例如,设置 up_bound=20 可过滤出震荡行情
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
) -> np.array:
"""
根据最高价、最低价和收盘价计算ADX值。
@@ -303,7 +322,6 @@ class ADX(Indicator):
def get_name(self):
return f"adx_{self.window}"
class BollingerBandwidth(Indicator):
@@ -311,13 +329,14 @@ class BollingerBandwidth(Indicator):
布林带宽度,计算公式为 (上轨 - 下轨) / 中轨。
这是一个归一化的波动率指标用于识别波动性的收缩Squeeze和扩张。
"""
def __init__(
self,
window: int = 20,
nbdev: float = 2.0, # 标准差倍数
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
self,
window: int = 20,
nbdev: float = 2.0, # 标准差倍数
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
@@ -325,12 +344,12 @@ class BollingerBandwidth(Indicator):
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array, # 不使用
self,
close: np.array,
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array, # 不使用
) -> np.array:
"""
根据收盘价计算布林带宽度。
@@ -340,7 +359,7 @@ class BollingerBandwidth(Indicator):
timeperiod=self.window,
nbdevup=self.nbdev,
nbdevdn=self.nbdev,
matype=0 # 使用SMA
matype=0 # 使用SMA
)
# 为避免除以0在 middle 为0或NaN的地方带宽也设为NaN
bandwidth = np.full_like(middle, np.nan)
@@ -349,5 +368,157 @@ class BollingerBandwidth(Indicator):
return bandwidth
def get_name(self):
return f"bbw_{self.window}_{int(self.nbdev*10)}"
return f"bbw_{self.window}_{int(self.nbdev * 10)}"
# ====================================================================
# 1. 通用版:价格范围与波动率比率 (Price Range to Volatility Ratio)
# ====================================================================
class PriceRangeToVolatilityRatio(Indicator):
"""
衡量一个n根K线窗口内的价格范围与ATR的比率。
n_period: 窗口大小。
atr_period: 计算ATR的周期。
"""
def __init__(self, n_period: int = 3, atr_period: int = 14, down_bound: Optional[float] = None,
up_bound: Optional[float] = None):
super().__init__(down_bound, up_bound)
self.n_period = n_period
self.atr_period = atr_period
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array,
**kwargs) -> np.array:
# 计算整个窗口内的价格范围(最高价 - 最低价)
high_in_window = self._rolling_max(high, self.n_period)
low_in_window = self._rolling_min(low, self.n_period)
price_range = high_in_window - low_in_window
# 计算ATR
atr_values = talib.ATR(high, low, close, timeperiod=self.atr_period)
# 计算比率
ratio = price_range / atr_values
return ratio
def _rolling_max(self,arr: np.array, window: int) -> np.array:
if len(arr) < window:
return np.full_like(arr, np.nan)
# 创建滑动窗口视图
view = sliding_window_view(arr, window_shape=window)
# 对每个窗口求最大值
rolling_max = np.max(view, axis=1)
# 填充结果数组前面用NaN填充
result = np.full_like(arr, np.nan)
result[window - 1:] = rolling_max
return result
def _rolling_min(self, arr: np.array, window: int) -> np.array:
if len(arr) < window:
return np.full_like(arr, np.nan)
view = sliding_window_view(arr, window_shape=window)
rolling_min = np.min(view, axis=1)
result = np.full_like(arr, np.nan)
result[window - 1:] = rolling_min
return result
def get_name(self) -> str:
return f"price_range_to_vol_ratio_n{self.n_period}_atr{self.atr_period}"
# ====================================================================
# 2. 通用版动力K线信念度 (Impulse Candle Conviction)
# ====================================================================
class ImpulseCandleConviction(Indicator):
"""
量化指定K线收盘价在实体中的位置。
n_period: 窗口大小。
impulse_index_from_end: 动力K线在窗口中的位置从末尾数0为最后一根
"""
def __init__(self, n_period: int = 3, impulse_index_from_end: int = 1, down_bound: Optional[float] = None,
up_bound: Optional[float] = None):
super().__init__(down_bound, up_bound)
self.n_period = n_period
self.impulse_index_from_end = impulse_index_from_end
if self.impulse_index_from_end >= self.n_period:
raise ValueError("impulse_index_from_end must be less than n_period")
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array,
**kwargs) -> np.array:
conviction_values = np.full_like(close, np.nan)
# 使用切片获取动力K线的数据
impulse_high = np.roll(high, -self.impulse_index_from_end)
impulse_low = np.roll(low, -self.impulse_index_from_end)
impulse_close = np.roll(close, -self.impulse_index_from_end)
impulse_open = np.roll(open, -self.impulse_index_from_end)
# 检查K线是看涨还是看跌
is_bullish = impulse_close > impulse_open
# 计算K线实体范围
candle_range = impulse_high - impulse_low
# 看涨信念度
bullish_conviction = (impulse_close - impulse_low) / candle_range
# 看跌信念度
bearish_conviction = (impulse_high - impulse_close) / candle_range
# 根据看涨看跌应用不同的公式
conviction_values[is_bullish] = bullish_conviction[is_bullish]
conviction_values[~is_bullish] = bearish_conviction[~is_bullish]
# 确保分母不为0且只在有效的窗口位置返回结果
mask = (candle_range > 0)
conviction_values[~mask] = np.nan
# 由于使用了np.roll需要截取到原始数组的长度
return conviction_values
def get_name(self) -> str:
return f"conviction_n{self.n_period}_idx{self.impulse_index_from_end}"
# ====================================================================
# 3. 通用版:相对成交量 (Relative Volume)
# ====================================================================
class RelativeVolumeInWindow(Indicator):
"""
衡量指定K线的成交量与其前n根K线内的简单移动平均成交量之比。
n_period: SMA的计算周期。
impulse_index_from_end: 动力K线在窗口中的位置从末尾数0为最后一根
"""
def __init__(self, n_period: int = 20, impulse_index_from_end: int = 1, down_bound: Optional[float] = None,
up_bound: Optional[float] = None):
super().__init__(down_bound, up_bound)
self.n_period = n_period
self.impulse_index_from_end = impulse_index_from_end
if self.impulse_index_from_end >= self.n_period:
raise ValueError("impulse_index_from_end must be less than n_period")
def get_values(self, close: np.array, open: np.array, high: np.array, low: np.array, volume: np.array,
**kwargs) -> np.array:
# 计算成交量的SMA
volume_sma = talib.SMA(volume, timeperiod=self.n_period)
# 提取指定位置的K线成交量
impulse_volume = np.roll(volume, -self.impulse_index_from_end)
# 提取SMA值
sma_at_position = np.roll(volume_sma, -self.impulse_index_from_end)
relative_volume = np.full_like(volume, np.nan)
mask = sma_at_position > 0
relative_volume[mask] = impulse_volume[mask] / sma_at_position[mask]
return relative_volume
def get_name(self) -> str:
return f"relative_volume_sma{self.n_period}_idx{self.impulse_index_from_end}"