import numpy as np import talib from typing import Optional, Any, List from src.core_data import Bar, Order from src.indicators.base_indicators import Indicator from src.indicators.indicators import Empty from src.strategies.base_strategy import Strategy class AreaReversalStrategy(Strategy): """ 面积反转策略(含跟踪止损出场) 逻辑: - 面积扩张 + 强度达标 + 局部见顶 + 面积收缩 → 等待反向突破开仓 - 出场:跟踪止损(回调出场) """ def __init__( self, context: Any, main_symbol: str, enable_log: bool, trade_volume: int, ma_period: int = 14, area_window: int = 14, strength_window: int = 50, breakout_window: int = 20, quantile_threshold: float = 0.5, top_k: int = 3, trailing_points: float = 100.0, # 跟踪止损点数 trailing_percent: float = None, # 或用百分比(如 0.01 = 1%) order_direction: Optional[List[str]] = None, indicators: Optional[List[Indicator]] = None, ): super().__init__(context, main_symbol, enable_log) if order_direction is None: order_direction = ["BUY", "SELL"] if indicators is None: indicators = [Empty(), Empty()] self.trade_volume = trade_volume self.ma_period = ma_period self.area_window = area_window self.strength_window = strength_window self.breakout_window = breakout_window self.quantile_threshold = quantile_threshold self.top_k = top_k self.trailing_points = trailing_points self.trailing_percent = trailing_percent self.order_direction = order_direction self.indicators = indicators # 跟踪止损状态 self.entry_price = None self.highest_high = None # 多头持仓期间最高价 self.lowest_low = None # 空头持仓期间最低价 self.order_id_counter = 0 self.min_bars_needed = max( ma_period, area_window * 3, strength_window, breakout_window ) + 10 self.log("AreaReversalStrategy with Trailing Stop Initialized") def _calculate_areas(self, closes: np.array, ma: np.array) -> np.array: diffs = np.abs(closes - ma) areas = talib.SUM(diffs, self.area_window) return areas def on_open_bar(self, open_price: float, symbol: str): self.symbol = symbol bar_history = self.get_bar_history() if len(bar_history) < self.min_bars_needed or not self.trading: return position = self.get_current_positions().get(self.symbol, 0) current_bar = bar_history[-1] # === 计算指标 === closes = np.array([b.close for b in bar_history], dtype=float) ma = talib.SMA(closes, self.ma_period) areas = self._calculate_areas(closes, ma) A1 = areas[-1] A2 = areas[-2] if len(areas) >= 2 else 0 # 强度评估窗口 historical_areas = areas[-(self.strength_window + 1):-1] if len(historical_areas) < self.strength_window: return # === 面积信号条件 === area_contracting = (A1 < A2) and (A2 > 0) threshold = np.nanpercentile(historical_areas, self.quantile_threshold * 100) strength_satisfied = (A2 >= threshold) top_k_values = np.partition(historical_areas, -self.top_k)[-self.top_k:] local_peak = (A2 >= np.min(top_k_values)) area_signal = area_contracting and strength_satisfied and local_peak # === 突破判断 === recent_bars = bar_history[-self.breakout_window:] highest = max(b.high for b in recent_bars) lowest = min(b.low for b in recent_bars) # =============== 开仓逻辑 =============== if position == 0 and area_signal: if "BUY" in self.order_direction and current_bar.high >= highest: self.send_market_order("BUY", self.trade_volume, "OPEN") self.entry_price = current_bar.close self.highest_high = current_bar.high self.lowest_low = None self.log(f"🚀 Long Entry | A2={A2:.4f}") elif "SELL" in self.order_direction and current_bar.low <= lowest: self.send_market_order("SELL", self.trade_volume, "OPEN") self.entry_price = current_bar.close self.lowest_low = current_bar.low self.highest_high = None self.log(f"⬇️ Short Entry | A2={A2:.4f}") # =============== 跟踪止损出场逻辑 =============== elif position != 0 and self.entry_price is not None: if position > 0: # 更新最高价 if self.highest_high is None or current_bar.high > self.highest_high: self.highest_high = current_bar.high # 计算止损价 if self.trailing_percent is not None: trailing_offset = self.highest_high * self.trailing_percent else: trailing_offset = self.trailing_points stop_loss_price = self.highest_high - trailing_offset if current_bar.low <= stop_loss_price: self.close_position("CLOSE_LONG", position) self._reset_state() self.log(f"CloseOperation (Long Trailing Stop) @ {stop_loss_price:.5f}") else: # position < 0 # 更新最低价 if self.lowest_low is None or current_bar.low < self.lowest_low: self.lowest_low = current_bar.low # 计算止损价 if self.trailing_percent is not None: trailing_offset = self.lowest_low * self.trailing_percent else: trailing_offset = self.trailing_points stop_loss_price = self.lowest_low + trailing_offset if current_bar.high >= stop_loss_price: self.close_position("CLOSE_SHORT", -position) self._reset_state() self.log(f"CloseOperation (Short Trailing Stop) @ {stop_loss_price:.5f}") def _reset_state(self): """重置跟踪止损状态""" self.entry_price = None self.highest_high = None self.lowest_low = None # --- 模板方法 --- def on_init(self): super().on_init() self.cancel_all_pending_orders(self.main_symbol) self._reset_state() def close_position(self, direction: str, volume: int): self.send_market_order(direction, volume, offset="CLOSE") def send_market_order(self, direction: str, volume: int, offset: str): order_id = f"{self.symbol}_{direction}_MARKET_{self.order_id_counter}" self.order_id_counter += 1 order = Order( id=order_id, symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET", submitted_time=self.get_current_time(), offset=offset, ) self.send_order(order) def on_rollover(self, old_symbol: str, new_symbol: str): super().on_rollover(old_symbol, new_symbol) self._reset_state() self.log("Rollover: Reset trailing stop state.")