import numpy as np import pandas as pd import talib from collections import deque from typing import Optional, Any, List, Dict, Tuple from src.core_data import Bar, Order from src.indicators.base_indicators import Indicator from src.indicators.indicators import Empty from src.strategies.base_strategy import Strategy # ============================================================================= # 策略实现 (ImbalanceZFlowStrategy) # ============================================================================= class ImbalanceZFlowStrategy(Strategy): """ 一个基于稳态核心指标“Z-Flow”的纯粹动量策略。 核心哲学: 1. 根本性解决稳态问题:先将原始imbalance序列通过Z-Score转化为 稳态序列Z_I(t),所有后续分析都基于此坚实基础。 2. 核心驱动 Z_Flow: 对稳态的Z_I(t)序列计算MACD,以度量“统计 显著性”的动量,形成一个高质量的稳态振荡器。 3. 真实的回测逻辑:所有开仓和风险计算都严格基于当前bar的open_price, 杜绝前视偏差。 4. 极致简洁:策略由单一指标、两个对称阈值驱动,逻辑纯粹,鲁棒性强。 """ def __init__( self, context: Any, main_symbol: str, enable_log: bool, trade_volume: int, # --- 【霍克斯过程参数】 --- hawkes_lookback: int = 60, hawkes_alpha: float = 0.8, hawkes_beta: float = 0.2, # --- 【Z-Score 标准化参数】 --- z_score_period: int = 200, # --- 【Z-Flow 指标参数】 --- z_flow_fast_ema_period: int = 12, z_flow_slow_ema_period: int = 26, # --- 【交易阈值】 --- entry_threshold: float = 0.5, exit_threshold: float = 0.1, # --- 【风险管理】 --- atr_period: int = 20, stop_loss_atr_multiplier: float = 3.0, # --- 其他 --- order_direction: Optional[List[str]] = None, indicators: Optional[List[Indicator]] = None, ): super().__init__(context, main_symbol, enable_log) if order_direction is None: order_direction = ['BUY', 'SELL'] # --- 参数赋值 --- self.trade_volume = trade_volume self.hawkes_lookback = hawkes_lookback self.hawkes_alpha = hawkes_alpha self.hawkes_beta = hawkes_beta self.z_score_period = z_score_period self.z_flow_fast_ema_period = z_flow_fast_ema_period self.z_flow_slow_ema_period = z_flow_slow_ema_period self.entry_threshold = entry_threshold self.exit_threshold = exit_threshold self.atr_period = atr_period self.stop_loss_atr_multiplier = stop_loss_atr_multiplier self.order_direction = order_direction # --- 内部状态变量 --- self._imbalance_history: deque = deque(maxlen=self.z_score_period + self.z_flow_slow_ema_period) # 核心指标 self.z_flow = 0.0 self.prev_z_flow = 0.0 self.position_meta: Dict[str, Any] = self.context.load_state() self.main_symbol = main_symbol self.order_id_counter = 0 if indicators is None: indicators = [Empty(), Empty()] self.indicators = indicators self.log(f"ImbalanceZFlowStrategy Initialized") def on_init(self): super().on_init() self.cancel_all_pending_orders(self.main_symbol) self.position_meta = self.context.load_state() def on_open_bar(self, open_price: float, symbol: str): self.symbol = symbol bar_history = self.get_bar_history() required_bars = self._imbalance_history.maxlen + self.hawkes_lookback + 5 if len(bar_history) < required_bars: return self.prev_z_flow = self.z_flow self._update_indicators_and_state(bar_history) position_volume = self.get_current_positions().get(self.symbol, 0) self._sync_position_state(position_volume, symbol) if not self.trading: return if position_volume != 0: self.manage_open_position(position_volume, bar_history[-1]) else: self.evaluate_entry_signal(open_price, bar_history) # 传入open_price def _update_indicators_and_state(self, bar_history: List[Bar]): """核心计算函数:I(t) -> Z_I(t) -> Z_Flow(t)""" # --- 1. 计算瞬时不均衡力量 I(t) --- long_events, short_events = [], [] lookback_bars = bar_history[-self.hawkes_lookback:] for i, bar in enumerate(lookback_bars): event_age = len(lookback_bars) - 1 - i mark = (bar.high - bar.low) * bar.volume if mark > 0: if bar.close > bar.open: long_events.append({'age': event_age, 'mark': mark}) elif bar.close < bar.open: short_events.append({'age': event_age, 'mark': mark}) lambda_long = sum(self.hawkes_alpha * e['mark'] * np.exp(-self.hawkes_beta * e['age']) for e in long_events) lambda_short = sum(self.hawkes_alpha * e['mark'] * np.exp(-self.hawkes_beta * e['age']) for e in short_events) imbalance = lambda_long - lambda_short self._imbalance_history.append(imbalance) if len(self._imbalance_history) < self.z_score_period: self.z_flow = 0.0 return imbalance_series = pd.Series(list(self._imbalance_history)) # --- 2. Z-Score 标准化,得到稳态序列 Z_I(t) --- rolling_mean = imbalance_series.rolling(window=self.z_score_period).mean() rolling_std = imbalance_series.rolling(window=self.z_score_period).std() # 避免除以零,并处理初始NaN值 if rolling_std.iloc[-1] > 1e-9: z_score_series = (imbalance_series - rolling_mean) / rolling_std else: z_score_series = pd.Series(0.0, index=imbalance_series.index) z_score_series.fillna(0.0, inplace=True) # --- 3. 对稳态序列 Z_I(t) 计算MACD,得到 Z_Flow --- fast_ema = z_score_series.ewm(span=self.z_flow_fast_ema_period, adjust=False).mean() slow_ema = z_score_series.ewm(span=self.z_flow_slow_ema_period, adjust=False).mean() self.z_flow = fast_ema.iloc[-1] - slow_ema.iloc[-1] def manage_open_position(self, volume: int, current_bar: Bar): """由Z-Flow驱动的统一平仓逻辑,并辅以价格止损。""" meta = self.position_meta.get(self.symbol) if not meta: return is_long = volume > 0 # 1. 价格硬止损 stop_loss_price = meta['stop_loss_price'] if (is_long and current_bar.low <= stop_loss_price) or \ (not is_long and current_bar.high >= stop_loss_price): self.log(f"ATR Stop Loss Hit at {stop_loss_price:.4f}") self.close_position("CLOSE_LONG" if is_long else "CLOSE_SHORT", abs(volume)) return # 2. Z_Flow 驱动的平仓 (动能耗散) exit_triggered = False if is_long and self.z_flow < self.exit_threshold: exit_triggered = True elif not is_long and self.z_flow > -self.exit_threshold: exit_triggered = True if exit_triggered: self.log(f"Z-Flow Dissipation Exit. Direction: {'LONG' if is_long else 'SHORT'}. " f"Z-Flow ({self.z_flow:.2f}) returned to neutral zone.") self.close_position("CLOSE_LONG" if is_long else "CLOSE_SHORT", abs(volume)) def evaluate_entry_signal(self, open_price: float, bar_history: List[Bar]): """当Z-Flow穿越入场阈值时,在open_price开仓。""" direction = None if "BUY" in self.order_direction and self.prev_z_flow < self.entry_threshold <= self.z_flow: direction = "BUY" elif "SELL" in self.order_direction and self.prev_z_flow > -self.entry_threshold >= self.z_flow: direction = "SELL" if direction: self.log( f"Z-Flow Signal: {direction}. Z-Flow: {self.z_flow:.2f} crossed threshold. Entry on Open: {open_price}") # 使用完整的bar_history计算ATR highs = np.array([b.high for b in bar_history], dtype=float) lows = np.array([b.low for b in bar_history], dtype=float) closes = np.array([b.close for b in bar_history], dtype=float) current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1] if current_atr <= 0: return # ** 关键修正:基于 open_price 计算止损 ** stop_loss_price = open_price - self.stop_loss_atr_multiplier * current_atr if direction == "BUY" \ else open_price + self.stop_loss_atr_multiplier * current_atr # ** 关键修正:记录 open_price 为入场价 ** meta = {'entry_price': open_price, 'stop_loss_price': stop_loss_price} self.send_market_order(direction, self.trade_volume, "OPEN", meta) self.save_state(self.position_meta) # ... (辅助函数保持不变) ... def _sync_position_state(self, position_volume, symbol): meta = self.position_meta.get(symbol) if position_volume != 0 and not meta: self.log(f"警告:持仓({position_volume})与策略状态不一致!将强制平仓。", level='WARNING') self.close_position("CLOSE_LONG" if position_volume > 0 else "CLOSE_SHORT", abs(position_volume)) return if position_volume == 0 and meta: self.log(f"信息:清理过时的策略状态。", level='INFO') self.position_meta.pop(symbol, None) self.save_state(self.position_meta) def close_position(self, direction: str, volume: int): self.send_market_order(direction, volume, offset="CLOSE") if self.symbol in self.position_meta: self.position_meta.pop(self.symbol, None) self.save_state(self.position_meta) def send_market_order(self, direction: str, volume: int, offset: str, meta: Optional[Dict] = None): if offset == "OPEN" and meta: self.position_meta[self.symbol] = meta order_id = f"{self.symbol}_{direction}_MARKET_{self.order_id_counter}" self.order_id_counter += 1 order = Order(id=order_id, symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET", submitted_time=self.get_current_time(), offset=offset) self.send_order(order) def on_rollover(self, old_symbol: str, new_symbol: str): super().on_rollover(old_symbol, new_symbol) self.position_meta = {} self._imbalance_history.clear() self.z_flow = 0.0 self.prev_z_flow = 0.0 self.log("Rollover detected. All strategy states have been reset.")