Files
NewQuant/futures_trading_strategies/FG/TrendlineBreakoutStrategy/DualModeTrendlineHawkesStrategy.py
2025-10-05 00:09:59 +08:00

279 lines
13 KiB
Python

import numpy as np
import pandas as pd
from typing import Optional, Dict, Any, List
# 假设这些是你项目中的模块
from src.core_data import Bar, Order
from src.strategies.base_strategy import Strategy
from src.algo.TrendLine import calculate_latest_trendline_values_v2
class DualModeTrendlineHawkesStrategy(Strategy):
"""
趋势线与霍克斯过程双模式策略 (V5 - 趋势/回归自适应版):
- 支持两套独立的参数配置,分别对应趋势跟踪和均值回归逻辑。
- 开平仓条件共享,但交易方向相反。
- 内置冲突解决机制,用于处理两种模式同时发出开仓信号的情况。
- 保持了V4版本高效的增量计算特性。
"""
def __init__(
self,
context: Any,
main_symbol: str,
trade_volume: int = 1,
# 【核心修改】使用字典来配置两种模式
trend_params: Dict[str, Any] = None,
reversion_params: Dict[str, Any] = None,
# 【新增】模式启用开关
enabled_modes: Optional[List[str]] = None,
# 【新增】信号冲突解决方案: 'TREND_PRIORITY', 'REVERSION_PRIORITY', 'NONE'
conflict_resolution: str = 'TREND_PRIORITY',
enable_log: bool = True,
):
super().__init__(context, main_symbol, enable_log)
self.main_symbol = main_symbol
self.trade_volume = trade_volume
# --- 【核心修改】参数结构化 ---
# 提供默认参数,防止用户未提供
default_params = {
"order_direction": ["BUY", "SELL"],
"trendline_n": 50,
"hawkes_kappa": 0.1,
"hawkes_lookback": 50,
"hawkes_entry_percent": 0.95,
"hawkes_exit_percent": 0.50,
}
self.trend_params = default_params.copy()
if trend_params:
self.trend_params.update(trend_params)
self.reversion_params = default_params.copy()
if reversion_params:
self.reversion_params.update(reversion_params)
self.enabled_modes = enabled_modes or ['TREND', 'REVERSION']
self.conflict_resolution = conflict_resolution
self.pos_meta: Dict[str, Dict[str, Any]] = {}
# --- 【核心修改】为每个模式维护独立的状态 ---
# 趋势模式状态
self._trend_last_hawkes_unscaled: float = 0.0
self._trend_hawkes_window: np.ndarray = np.array([], dtype=np.float64)
self._trend_hawkes_alpha = np.exp(-self.trend_params['hawkes_kappa'])
# 回归模式状态
self._reversion_last_hawkes_unscaled: float = 0.0
self._reversion_hawkes_window: np.ndarray = np.array([], dtype=np.float64)
self._reversion_hawkes_alpha = np.exp(-self.reversion_params['hawkes_kappa'])
print("DualModeTrendlineHawkesStrategy initialized.")
print(f"Enabled modes: {self.enabled_modes}")
print(f"Conflict resolution: {self.conflict_resolution}")
# --- 辅助函数,用于状态管理 (可复用) ---
def _initialize_hawkes_state(self, params: Dict, initial_volumes: np.ndarray) -> (float, np.ndarray):
"""根据给定参数和历史成交量,初始化霍克斯状态。"""
print(f"Initializing Hawkes state with lookback {params['hawkes_lookback']}...")
alpha = np.exp(-params['hawkes_kappa'])
kappa = params['hawkes_kappa']
temp_hawkes_history = np.zeros_like(initial_volumes, dtype=np.float64)
if len(initial_volumes) > 0:
temp_hawkes_history[0] = initial_volumes[0] if not np.isnan(initial_volumes[0]) else 0.0
for i in range(1, len(initial_volumes)):
temp_hawkes_history[i] = temp_hawkes_history[i - 1] * alpha + (
initial_volumes[i] if not np.isnan(initial_volumes[i]) else 0.0)
last_hawkes_unscaled = temp_hawkes_history[-1] if len(temp_hawkes_history) > 0 else 0.0
hawkes_window = (temp_hawkes_history * kappa)[-params['hawkes_lookback']:]
return last_hawkes_unscaled, hawkes_window
def _update_hawkes_state_incrementally(self, params: Dict, latest_volume: float, last_unscaled: float,
window: np.ndarray) -> (float, np.ndarray):
"""根据给定参数,增量更新霍克斯状态。"""
alpha = np.exp(-params['hawkes_kappa'])
kappa = params['hawkes_kappa']
new_hawkes_unscaled = last_unscaled * alpha + (latest_volume if not np.isnan(latest_volume) else 0.0)
new_hawkes_scaled = new_hawkes_unscaled * kappa
new_window = np.roll(window, -1)
new_window[-1] = new_hawkes_scaled
return new_hawkes_unscaled, new_window
def on_init(self):
super().on_init()
self.pos_meta.clear()
# 重置所有状态
self._trend_last_hawkes_unscaled = 0.0
self._trend_hawkes_window = np.array([], dtype=np.float64)
self._reversion_last_hawkes_unscaled = 0.0
self._reversion_hawkes_window = np.array([], dtype=np.float64)
def on_open_bar(self, open_price: float, symbol: str):
bar_history = self.get_bar_history()
# 确保有足够的数据来初始化两个模式
min_bars_required = max(
self.trend_params['trendline_n'] + 2, self.trend_params['hawkes_lookback'] + 2,
self.reversion_params['trendline_n'] + 2, self.reversion_params['hawkes_lookback'] + 2
)
if len(bar_history) < min_bars_required:
return
# --- 状态初始化与更新 ---
# 首次运行时,为两个启用的模式初始化状态
if self._trend_hawkes_window.size == 0 and 'TREND' in self.enabled_modes:
initial_volumes = np.array([b.volume for b in bar_history], dtype=float)
self._trend_last_hawkes_unscaled, self._trend_hawkes_window = self._initialize_hawkes_state(
self.trend_params, initial_volumes[:-1]
)
if self._reversion_hawkes_window.size == 0 and 'REVERSION' in self.enabled_modes:
initial_volumes = np.array([b.volume for b in bar_history], dtype=float)
self._reversion_last_hawkes_unscaled, self._reversion_hawkes_window = self._initialize_hawkes_state(
self.reversion_params, initial_volumes[:-1]
)
# 增量更新两个模式的状态
latest_volume = float(bar_history[-1].volume)
if 'TREND' in self.enabled_modes:
self._trend_last_hawkes_unscaled, self._trend_hawkes_window = self._update_hawkes_state_incrementally(
self.trend_params, latest_volume, self._trend_last_hawkes_unscaled, self._trend_hawkes_window
)
if 'REVERSION' in self.enabled_modes:
self._reversion_last_hawkes_unscaled, self._reversion_hawkes_window = self._update_hawkes_state_incrementally(
self.reversion_params, latest_volume, self._reversion_last_hawkes_unscaled,
self._reversion_hawkes_window
)
self.cancel_all_pending_orders(symbol)
pos = self.get_current_positions().get(symbol, 0)
# --- 1. 平仓逻辑 ---
meta = self.pos_meta.get(symbol)
if meta and pos != 0:
strategy_mode = meta.get('strategy_mode')
params_to_use = self.trend_params if strategy_mode == 'TREND' else self.reversion_params
window_to_use = self._trend_hawkes_window if strategy_mode == 'TREND' else self._reversion_hawkes_window
if window_to_use.size > 0:
latest_hawkes_value = window_to_use[-1]
latest_hawkes_lower = np.quantile(window_to_use, params_to_use['hawkes_exit_percent'])
if latest_hawkes_value < latest_hawkes_lower:
self.log(f"[{strategy_mode}模式] 霍克斯出场信号触发,平仓。")
self.send_market_order("CLOSE_LONG" if meta['direction'] == "BUY" else "CLOSE_SHORT", abs(pos))
del self.pos_meta[symbol]
return
# --- 2. 开仓逻辑 ---
if pos == 0:
trend_signal = None
reversion_signal = None
# 分别计算两个模式的信号
if 'TREND' in self.enabled_modes:
trend_signal = self._calculate_entry_signal(
'TREND', bar_history, self.trend_params, self._trend_hawkes_window
)
if 'REVERSION' in self.enabled_modes:
reversion_signal = self._calculate_entry_signal(
'REVERSION', bar_history, self.reversion_params, self._reversion_hawkes_window
)
final_direction = None
winning_mode = None
# --- 信号冲突解决 ---
if trend_signal and reversion_signal:
self.log(f"信号冲突:趋势模式 ({trend_signal}) vs 回归模式 ({reversion_signal})")
if self.conflict_resolution == 'TREND_PRIORITY':
final_direction = trend_signal
winning_mode = 'TREND'
elif self.conflict_resolution == 'REVERSION_PRIORITY':
final_direction = reversion_signal
winning_mode = 'REVERSION'
else: # 'NONE'
self.log("冲突解决策略为'NONE',本次不开仓。")
elif trend_signal:
final_direction = trend_signal
winning_mode = 'TREND'
elif reversion_signal:
final_direction = reversion_signal
winning_mode = 'REVERSION'
# 执行最终决策
if final_direction and winning_mode:
params_to_use = self.trend_params if winning_mode == 'TREND' else self.reversion_params
if final_direction in params_to_use['order_direction']:
self.log(f"[{winning_mode}模式] 开仓信号确认: {final_direction}")
self.send_open_order(final_direction, open_price, self.trade_volume, winning_mode)
def _calculate_entry_signal(self, mode: str, bar_history: List[Bar], params: Dict, hawkes_window: np.ndarray) -> \
Optional[str]:
"""计算单个模式的入场信号,返回 'BUY', 'SELL' 或 None。"""
if hawkes_window.size == 0:
return None
# 霍克斯确认
latest_hawkes_value = hawkes_window[-1]
latest_hawkes_upper = np.quantile(hawkes_window, params['hawkes_entry_percent'])
hawkes_confirmation = latest_hawkes_value > latest_hawkes_upper
if not hawkes_confirmation:
return None
# 趋势线突破事件
close_prices = np.array([b.close for b in bar_history])
prices_for_trendline = close_prices[-params['trendline_n'] - 1:-1]
trend_upper, trend_lower = calculate_latest_trendline_values_v2(prices_for_trendline)
if trend_upper is not None and trend_lower is not None:
prev_close = bar_history[-2].close
last_close = bar_history[-1].close
upper_break_event = last_close > trend_upper and prev_close < trend_upper
lower_break_event = last_close < trend_lower and prev_close > trend_lower
if upper_break_event:
# 趋势模式:向上突破 -> 买入
# 回归模式:向上突破 -> 卖出 (认为是假突破,价格将回归)
return "BUY" if mode == 'TREND' else "SELL"
elif lower_break_event:
# 趋势模式:向下突破 -> 卖出
# 回归模式:向下突破 -> 买入 (认为是超卖,价格将反弹)
return "SELL" if mode == 'TREND' else "BUY"
return None
def send_open_order(self, direction: str, entry_price: float, volume: int, strategy_mode: str):
current_time = self.get_current_time()
order_id = f"{self.symbol}_{direction}_{current_time.strftime('%Y%m%d%H%M%S')}"
order_direction = "BUY" if direction == "BUY" else "SELL"
order = Order(id=order_id, symbol=self.symbol, direction=order_direction, volume=volume, price_type="MARKET",
submitted_time=current_time, offset="OPEN")
self.send_order(order)
# 【核心修改】记录仓位属于哪个模式
self.pos_meta[self.symbol] = {
"direction": direction,
"volume": volume,
"entry_price": entry_price,
"strategy_mode": strategy_mode
}
self.log(f"发送开仓订单 ({strategy_mode}): {direction} {volume}手 @ Market Price (执行价约 {entry_price:.2f})")
def send_market_order(self, direction: str, volume: int):
current_time = self.get_current_time()
order_id = f"{self.symbol}_{direction}_{current_time.strftime('%Y%m%d%H%M%S')}"
order = Order(id=order_id, symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET",
submitted_time=current_time, offset="CLOSE")
self.send_order(order)
self.log(f"发送平仓订单: {direction} {volume}手 @ Market Price")
def on_rollover(self, old_symbol: str, new_symbol: str):
super().on_rollover(old_symbol, new_symbol)
self.cancel_all_pending_orders(new_symbol)
self.pos_meta.clear()