Files
NewQuant/futures_trading_strategies/c/KalmanStrategy/HawkesVolatilityStrategy.py
liaozhaorun 711b86d33f 1、新增傅里叶策略
2、新增策略管理、策略重启功能
2025-11-20 16:15:45 +08:00

215 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
import talib
from collections import deque
from typing import Optional, Any, List, Dict
from src.core_data import Bar, Order
from src.indicators.base_indicators import Indicator
from src.indicators.indicators import Empty
from src.strategies.base_strategy import Strategy
# =============================================================================
# 策略实现 (V9 - 盘整突破版)
# =============================================================================
class ConsolidationBreakoutStrategy(Strategy):
"""
一个基于“盘整即突破之母”原则的终极波段策略。(V9)
本策略旨在通过量化的方法,识别市场能量的“压缩-释放”周期,
以捕捉趋势的主干行情,并严格规避无序的震荡。
1. 【战略层】: 使用长期均线定义宏观牛熊环境。
2. 【战术层】: 使用布林带宽度(BBW)的极度收缩,来识别“盘整准备”状态。
3. 【执行层】: 在准备状态下,等待价格“突破”凯尔特纳通道作为“大幅移动”的确认信号,
立即以市价入场并使用ATR动态追踪止盈来截取波段利润。
"""
def __init__(
self,
context: Any,
main_symbol: str,
enable_log: bool,
trade_volume: int,
# --- 【战略层】环境定义 ---
regime_ma_period: int = 200, # 定义牛熊市的长期均线
# --- 【战术层】盘整收缩识别 (Squeeze) ---
bollinger_period: int = 20, # 布林带周期
bollinger_std: float = 2.0, # 布林带标准差
bbw_lookback: int = 252, # BBW历史分位数回看窗口
bbw_squeeze_percentile: float = 15.0, # 定义“极度收缩”的百分位
# --- 【执行层】突破与风控 ---
keltner_period: int = 20, # 凯尔特纳通道周期
keltner_atr_multiplier: float = 1.5, # 凯尔特纳通道ATR乘数
atr_period: int = 20,
trailing_stop_atr_multiplier: float = 3.0,
initial_stop_atr_multiplier: float = 2.5,
order_direction: Optional[List[str]] = None,
indicators: Optional[List[Indicator]] = None,
):
super().__init__(context, main_symbol, enable_log)
if order_direction is None: order_direction = ['BUY', 'SELL']
self.trade_volume = trade_volume
self.regime_ma_period = regime_ma_period
self.bollinger_period = bollinger_period
self.bollinger_std = bollinger_std
self.bbw_lookback = bbw_lookback
self.bbw_squeeze_percentile = bbw_squeeze_percentile
self.keltner_period = keltner_period
self.keltner_atr_multiplier = keltner_atr_multiplier
self.atr_period = atr_period
self.trailing_stop_atr_multiplier = trailing_stop_atr_multiplier
self.initial_stop_atr_multiplier = initial_stop_atr_multiplier
self.order_direction = order_direction
# 状态变量
self._last_order_id: Optional[str] = None
self.position_meta: Dict[str, Any] = {}
self._bbw_history: deque = deque(maxlen=self.bbw_lookback)
self._in_squeeze_ready_state: bool = False
self.main_symbol = main_symbol
self.order_id_counter = 0
if indicators is None:
indicators = [Empty(), Empty()]
self.indicators = indicators
self.log("ConsolidationBreakoutStrategy (V9) Initialized.")
def on_init(self):
super().on_init()
self.cancel_all_pending_orders(self.main_symbol)
def on_open_bar(self, open_price: float, symbol: str):
self.symbol = symbol
bar_history = self.get_bar_history()
required_bars = max(self.regime_ma_period, self.bbw_lookback) + 1
if len(bar_history) < required_bars: return
# --- 数据预处理与指标计算 ---
highs = np.array([b.high for b in bar_history], dtype=float)
lows = np.array([b.low for b in bar_history], dtype=float)
closes = np.array([b.close for b in bar_history], dtype=float)
# 战略指标
regime_ma = talib.MA(closes, timeperiod=self.regime_ma_period)[-1]
# 战术指标 (Squeeze)
bb_upper, bb_mid, bb_lower = talib.BBANDS(closes, timeperiod=self.bollinger_period, nbdevup=self.bollinger_std,
nbdevdn=self.bollinger_std)
bb_width = (bb_upper[-1] - bb_lower[-1]) / bb_mid[-1] if bb_mid[-1] > 0 else 0
self._bbw_history.append(bb_width)
# 执行指标 (Breakout)
current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1]
keltner_ma = talib.MA(closes, timeperiod=self.keltner_period)
keltner_upper = keltner_ma[-1] + self.keltner_atr_multiplier * current_atr
keltner_lower = keltner_ma[-1] - self.keltner_atr_multiplier * current_atr
# --- 管理现有持仓 ---
position_volume = self.get_current_positions().get(self.symbol, 0)
if position_volume != 0:
self.manage_open_position(position_volume, bar_history[-1], current_atr)
return
# --- 评估新机会 ---
self.evaluate_entry_signal(bar_history[-1], regime_ma, bb_width, keltner_upper, keltner_lower)
def manage_open_position(self, volume: int, current_bar: Bar, current_atr: float):
"""使用ATR追踪止盈。"""
# (此部分与V6/V7版本代码逻辑相同成熟且有效)
meta = self.position_meta.get(self.symbol);
if not meta: return
initial_stop_price = meta['initial_stop_price']
if (volume > 0 and current_bar.low <= initial_stop_price) or (
volume < 0 and current_bar.high >= initial_stop_price):
self.log(f"Initial Stop Loss hit at {initial_stop_price:.2f}");
self.close_position("CLOSE_LONG" if volume > 0 else "CLOSE_SHORT", abs(volume));
return
trailing_stop_level = meta.get('trailing_stop_level', None)
if volume > 0:
meta['high_water_mark'] = max(meta.get('high_water_mark', meta['entry_price']), current_bar.high)
new_trailing_stop = meta['high_water_mark'] - self.trailing_stop_atr_multiplier * current_atr
if trailing_stop_level is None or new_trailing_stop > trailing_stop_level: trailing_stop_level = new_trailing_stop
trailing_stop_level = max(trailing_stop_level, initial_stop_price)
if current_bar.low <= trailing_stop_level: self.log(
f"ATR Trailing Stop hit for LONG at {trailing_stop_level:.2f}"); self.close_position("CLOSE_LONG",
abs(volume))
elif volume < 0:
meta['low_water_mark'] = min(meta.get('low_water_mark', meta['entry_price']), current_bar.low)
new_trailing_stop = meta['low_water_mark'] + self.trailing_stop_atr_multiplier * current_atr
if trailing_stop_level is None or new_trailing_stop < trailing_stop_level: trailing_stop_level = new_trailing_stop
trailing_stop_level = min(trailing_stop_level, initial_stop_price)
if current_bar.high >= trailing_stop_level: self.log(
f"ATR Trailing Stop hit for SHORT at {trailing_stop_level:.2f}"); self.close_position("CLOSE_SHORT",
abs(volume))
meta['trailing_stop_level'] = trailing_stop_level
def evaluate_entry_signal(self, current_bar: Bar, regime_ma: float, bb_width: float, keltner_upper: float,
keltner_lower: float):
"""执行“盘整-突破”的入场逻辑。"""
if len(self._bbw_history) < self.bbw_lookback: return
# 【战术层】检查是否进入“盘整准备”状态
bbw_threshold = np.percentile(list(self._bbw_history), self.bbw_squeeze_percentile)
if bb_width < bbw_threshold:
self._in_squeeze_ready_state = True
self.log(f"Squeeze Detected! BBW {bb_width:.4f} < Threshold {bbw_threshold:.4f}. Ready for breakout.")
return # 进入准备状态,等待突破
# 【执行层】在准备状态下,检查突破信号
if self._in_squeeze_ready_state:
direction = None
# 战略过滤 + 突破确认
if "BUY" in self.order_direction and current_bar.close > regime_ma and current_bar.close > keltner_upper and \
self.indicators[0].is_condition_met(*self.get_indicator_tuple()):
direction = "BUY"
elif "SELL" in self.order_direction and current_bar.close < regime_ma and current_bar.close < keltner_lower and \
self.indicators[1].is_condition_met(*self.get_indicator_tuple()):
direction = "SELL"
if direction:
self.log(f"Squeeze Fired! Direction: {direction}. Price broke Keltner Channel. Entering at market.")
# 市价单入场
entry_price = current_bar.close
current_atr = talib.ATR(
np.array([b.high for b in self.get_bar_history()[-self.atr_period:]], dtype=float),
np.array([b.low for b in self.get_bar_history()[-self.atr_period:]], dtype=float),
np.array([b.close for b in self.get_bar_history()[-self.atr_period:]], dtype=float),
self.atr_period
)[-1]
stop_loss_price = entry_price - self.initial_stop_atr_multiplier * current_atr if direction == "BUY" else entry_price + self.initial_stop_atr_multiplier * current_atr
meta = {'entry_price': entry_price, 'initial_stop_price': stop_loss_price}
self.send_market_order(direction, self.trade_volume, "OPEN", meta)
# 重置状态机
self._in_squeeze_ready_state = False
# --- 订单发送与仓位管理辅助函数 ---
def close_position(self, direction: str, volume: int):
self.send_market_order(direction, volume, offset="CLOSE");
if self.symbol in self.position_meta: del self.position_meta[self.symbol]
def send_market_order(self, direction: str, volume: int, offset: str, meta: Optional[Dict] = None):
if offset == "OPEN" and meta: self.position_meta[self.symbol] = meta
order_id = f"{self.symbol}_{direction}_MARKET_{self.order_id_counter}";
self.order_id_counter += 1;
order = Order(id=order_id, symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET",
submitted_time=self.get_current_time(), offset=offset);
self.send_order(order)
def on_rollover(self, old_symbol: str, new_symbol: str):
super().on_rollover(old_symbol, new_symbol);
self._last_order_id = None;
self.position_meta = {};
self._bbw_history.clear();
self._in_squeeze_ready_state = False;
self.log("Rollover detected. All strategy states have been reset.")