import numpy as np import pandas as pd from typing import Optional, Dict, Any, List # 假设这些是你项目中的模块 from src.core_data import Bar, Order from src.strategies.base_strategy import Strategy from src.algo.TrendLine import calculate_latest_trendline_values_v2 class DualModeTrendlineHawkesStrategy(Strategy): """ 趋势线与霍克斯过程双模式策略 (V5 - 趋势/回归自适应版): - 支持两套独立的参数配置,分别对应趋势跟踪和均值回归逻辑。 - 开平仓条件共享,但交易方向相反。 - 内置冲突解决机制,用于处理两种模式同时发出开仓信号的情况。 - 保持了V4版本高效的增量计算特性。 """ def __init__( self, context: Any, main_symbol: str, trade_volume: int = 1, # 【核心修改】使用字典来配置两种模式 trend_params: Dict[str, Any] = None, reversion_params: Dict[str, Any] = None, # 【新增】模式启用开关 enabled_modes: Optional[List[str]] = None, # 【新增】信号冲突解决方案: 'TREND_PRIORITY', 'REVERSION_PRIORITY', 'NONE' conflict_resolution: str = 'TREND_PRIORITY', enable_log: bool = True, ): super().__init__(context, main_symbol, enable_log) self.main_symbol = main_symbol self.trade_volume = trade_volume # --- 【核心修改】参数结构化 --- # 提供默认参数,防止用户未提供 default_params = { "order_direction": ["BUY", "SELL"], "trendline_n": 50, "hawkes_kappa": 0.1, "hawkes_lookback": 50, "hawkes_entry_percent": 0.95, "hawkes_exit_percent": 0.50, } self.trend_params = default_params.copy() if trend_params: self.trend_params.update(trend_params) self.reversion_params = default_params.copy() if reversion_params: self.reversion_params.update(reversion_params) self.enabled_modes = enabled_modes or ['TREND', 'REVERSION'] self.conflict_resolution = conflict_resolution self.pos_meta: Dict[str, Dict[str, Any]] = {} # --- 【核心修改】为每个模式维护独立的状态 --- # 趋势模式状态 self._trend_last_hawkes_unscaled: float = 0.0 self._trend_hawkes_window: np.ndarray = np.array([], dtype=np.float64) self._trend_hawkes_alpha = np.exp(-self.trend_params['hawkes_kappa']) # 回归模式状态 self._reversion_last_hawkes_unscaled: float = 0.0 self._reversion_hawkes_window: np.ndarray = np.array([], dtype=np.float64) self._reversion_hawkes_alpha = np.exp(-self.reversion_params['hawkes_kappa']) print("DualModeTrendlineHawkesStrategy initialized.") print(f"Enabled modes: {self.enabled_modes}") print(f"Conflict resolution: {self.conflict_resolution}") # --- 辅助函数,用于状态管理 (可复用) --- def _initialize_hawkes_state(self, params: Dict, initial_volumes: np.ndarray) -> (float, np.ndarray): """根据给定参数和历史成交量,初始化霍克斯状态。""" print(f"Initializing Hawkes state with lookback {params['hawkes_lookback']}...") alpha = np.exp(-params['hawkes_kappa']) kappa = params['hawkes_kappa'] temp_hawkes_history = np.zeros_like(initial_volumes, dtype=np.float64) if len(initial_volumes) > 0: temp_hawkes_history[0] = initial_volumes[0] if not np.isnan(initial_volumes[0]) else 0.0 for i in range(1, len(initial_volumes)): temp_hawkes_history[i] = temp_hawkes_history[i - 1] * alpha + ( initial_volumes[i] if not np.isnan(initial_volumes[i]) else 0.0) last_hawkes_unscaled = temp_hawkes_history[-1] if len(temp_hawkes_history) > 0 else 0.0 hawkes_window = (temp_hawkes_history * kappa)[-params['hawkes_lookback']:] return last_hawkes_unscaled, hawkes_window def _update_hawkes_state_incrementally(self, params: Dict, latest_volume: float, last_unscaled: float, window: np.ndarray) -> (float, np.ndarray): """根据给定参数,增量更新霍克斯状态。""" alpha = np.exp(-params['hawkes_kappa']) kappa = params['hawkes_kappa'] new_hawkes_unscaled = last_unscaled * alpha + (latest_volume if not np.isnan(latest_volume) else 0.0) new_hawkes_scaled = new_hawkes_unscaled * kappa new_window = np.roll(window, -1) new_window[-1] = new_hawkes_scaled return new_hawkes_unscaled, new_window def on_init(self): super().on_init() self.pos_meta.clear() # 重置所有状态 self._trend_last_hawkes_unscaled = 0.0 self._trend_hawkes_window = np.array([], dtype=np.float64) self._reversion_last_hawkes_unscaled = 0.0 self._reversion_hawkes_window = np.array([], dtype=np.float64) self.pos_meta = self.context.load_state() def on_open_bar(self, open_price: float, symbol: str): self.symbol = symbol bar_history = self.get_bar_history() # 确保有足够的数据来初始化两个模式 min_bars_required = max( self.trend_params['trendline_n'] + 2, self.trend_params['hawkes_lookback'] + 2, self.reversion_params['trendline_n'] + 2, self.reversion_params['hawkes_lookback'] + 2 ) if len(bar_history) < min_bars_required: return # --- 状态初始化与更新 --- # 首次运行时,为两个启用的模式初始化状态 if self._trend_hawkes_window.size == 0 and 'TREND' in self.enabled_modes: initial_volumes = np.array([b.volume for b in bar_history], dtype=float) self._trend_last_hawkes_unscaled, self._trend_hawkes_window = self._initialize_hawkes_state( self.trend_params, initial_volumes[:-1] ) if self._reversion_hawkes_window.size == 0 and 'REVERSION' in self.enabled_modes: initial_volumes = np.array([b.volume for b in bar_history], dtype=float) self._reversion_last_hawkes_unscaled, self._reversion_hawkes_window = self._initialize_hawkes_state( self.reversion_params, initial_volumes[:-1] ) # 增量更新两个模式的状态 latest_volume = float(bar_history[-1].volume) if 'TREND' in self.enabled_modes: self._trend_last_hawkes_unscaled, self._trend_hawkes_window = self._update_hawkes_state_incrementally( self.trend_params, latest_volume, self._trend_last_hawkes_unscaled, self._trend_hawkes_window ) if 'REVERSION' in self.enabled_modes: self._reversion_last_hawkes_unscaled, self._reversion_hawkes_window = self._update_hawkes_state_incrementally( self.reversion_params, latest_volume, self._reversion_last_hawkes_unscaled, self._reversion_hawkes_window ) self.cancel_all_pending_orders(symbol) pos = self.get_current_positions().get(symbol, 0) meta = self.pos_meta.get(symbol) # --- 【核心修改】状态同步与异常处理 --- # 场景1: 有实际持仓,但策略无记录 (例如状态恢复失败)。这是最危险的情况。 # 策略必须强制平仓以恢复到已知状态,避免“僵尸”持仓。 if pos != 0 and not meta: self.log(f"警告:检测到实际持仓({pos})与策略状态(无记录)不一致!" f"可能由状态加载失败导致。将强制平仓以同步状态。", level='WARNING') direction_to_close = "CLOSE_LONG" if pos > 0 else "CLOSE_SHORT" self.send_market_order(direction_to_close, abs(pos)) return # 场景2: 无实际持仓,但策略仍有记录 (例如外部手动平仓或止损)。 # 策略应清理过时的元数据。 if pos == 0 and meta: self.log(f"信息:检测到策略状态({meta.get('direction')})与实际持仓(0)不一致。" f"可能是外部平仓导致。正在清理过时状态。", level='INFO') new_pos_meta = {k: v for k, v in self.pos_meta.items() if k != symbol} self.pos_meta = new_pos_meta self.save_state(new_pos_meta) meta = None # 必须更新meta变量以反映当前bar的真实状态 # --- 1. 平仓逻辑 --- if pos != 0: strategy_mode = meta.get('strategy_mode') params_to_use = self.trend_params if strategy_mode == 'TREND' else self.reversion_params window_to_use = self._trend_hawkes_window if strategy_mode == 'TREND' else self._reversion_hawkes_window if window_to_use.size > 0: latest_hawkes_value = window_to_use[-1] latest_hawkes_lower = np.quantile(window_to_use, params_to_use['hawkes_exit_percent']) if latest_hawkes_value < latest_hawkes_lower: self.log(f"[{strategy_mode}模式] 霍克斯出场信号触发,平仓。") self.send_market_order("CLOSE_LONG" if meta['direction'] == "BUY" else "CLOSE_SHORT", abs(pos)) self.pos_meta = {} self.save_state(self.pos_meta) return # --- 2. 开仓逻辑 --- if pos == 0 and self.trading: trend_signal = None reversion_signal = None # 分别计算两个模式的信号 if 'TREND' in self.enabled_modes: trend_signal = self._calculate_entry_signal( 'TREND', bar_history, self.trend_params, self._trend_hawkes_window ) if 'REVERSION' in self.enabled_modes: reversion_signal = self._calculate_entry_signal( 'REVERSION', bar_history, self.reversion_params, self._reversion_hawkes_window ) final_direction = None winning_mode = None # --- 信号冲突解决 --- if trend_signal and reversion_signal: self.log(f"信号冲突:趋势模式 ({trend_signal}) vs 回归模式 ({reversion_signal})") if self.conflict_resolution == 'TREND_PRIORITY': final_direction = trend_signal winning_mode = 'TREND' elif self.conflict_resolution == 'REVERSION_PRIORITY': final_direction = reversion_signal winning_mode = 'REVERSION' else: # 'NONE' self.log("冲突解决策略为'NONE',本次不开仓。") elif trend_signal: final_direction = trend_signal winning_mode = 'TREND' elif reversion_signal: final_direction = reversion_signal winning_mode = 'REVERSION' # 执行最终决策 if final_direction and winning_mode: params_to_use = self.trend_params if winning_mode == 'TREND' else self.reversion_params if final_direction in params_to_use['order_direction']: self.log(f"[{winning_mode}模式] 开仓信号确认: {final_direction}") self.send_open_order(final_direction, open_price, self.trade_volume, winning_mode) def _calculate_entry_signal(self, mode: str, bar_history: List[Bar], params: Dict, hawkes_window: np.ndarray) -> \ Optional[str]: """计算单个模式的入场信号,返回 'BUY', 'SELL' 或 None。""" if hawkes_window.size == 0: return None # 霍克斯确认 latest_hawkes_value = hawkes_window[-1] latest_hawkes_upper = np.quantile(hawkes_window, params['hawkes_entry_percent']) hawkes_confirmation = latest_hawkes_value > latest_hawkes_upper self.log(f'latest_hawkes_value:{latest_hawkes_value}, latest_hawkes_upper:{latest_hawkes_upper}') if not hawkes_confirmation: return None # 趋势线突破事件 close_prices = np.array([b.close for b in bar_history]) prices_for_trendline = close_prices[-params['trendline_n'] - 1:-1] trend_upper, trend_lower = calculate_latest_trendline_values_v2(prices_for_trendline) self.log(f'trend_upper: {trend_upper}, trend_lower: {trend_lower}') if trend_upper is not None and trend_lower is not None: prev_close = bar_history[-2].close last_close = bar_history[-1].close upper_break_event = last_close > trend_upper and prev_close < trend_upper lower_break_event = last_close < trend_lower and prev_close > trend_lower if upper_break_event: # 趋势模式:向上突破 -> 买入 # 回归模式:向上突破 -> 卖出 (认为是假突破,价格将回归) return "BUY" if mode == 'TREND' else "SELL" elif lower_break_event: # 趋势模式:向下突破 -> 卖出 # 回归模式:向下突破 -> 买入 (认为是超卖,价格将反弹) return "SELL" if mode == 'TREND' else "BUY" return None def send_open_order(self, direction: str, entry_price: float, volume: int, strategy_mode: str): current_time = self.get_current_time() order_id = f"{self.symbol}_{direction}_{current_time.strftime('%Y%m%d%H%M%S')}" order_direction = "BUY" if direction == "BUY" else "SELL" order = Order(id=order_id, symbol=self.symbol, direction=order_direction, volume=volume, price_type="LIMIT", submitted_time=current_time, offset="OPEN", limit_price=entry_price + (1 if direction == "BUY" else -1),) self.send_order(order) # 【核心修改】记录仓位属于哪个模式 self.pos_meta[self.symbol] = { "direction": direction, "volume": volume, "entry_price": entry_price, "strategy_mode": strategy_mode } self.save_state(self.pos_meta) self.log(f"发送开仓订单 ({strategy_mode}): {direction} {volume}手 @ Market Price (执行价约 {entry_price:.2f})") def send_market_order(self, direction: str, volume: int): current_time = self.get_current_time() order_id = f"{self.symbol}_{direction}_{current_time.strftime('%Y%m%d%H%M%S')}" order = Order(id=order_id, symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET", submitted_time=current_time, offset="CLOSE") self.send_order(order) self.log(f"发送平仓订单: {direction} {volume}手 @ Market Price") def on_rollover(self, old_symbol: str, new_symbol: str): super().on_rollover(old_symbol, new_symbol) self.cancel_all_pending_orders(new_symbol) self.pos_meta.clear()