Files
NewQuant/futures_trading_strategies/FG/AreaReversal/AreaReversalStrategy.py

196 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import talib
from typing import Optional, Any, List
from src.core_data import Bar, Order
from src.indicators.base_indicators import Indicator
from src.indicators.indicators import Empty
from src.strategies.base_strategy import Strategy
class AreaReversalStrategy(Strategy):
"""
面积反转策略(含跟踪止损出场)
逻辑:
- 面积扩张 + 强度达标 + 局部见顶 + 面积收缩 → 等待反向突破开仓
- 出场:跟踪止损(回调出场)
"""
def __init__(
self,
context: Any,
main_symbol: str,
enable_log: bool,
trade_volume: int,
ma_period: int = 14,
area_window: int = 14,
strength_window: int = 50,
breakout_window: int = 20,
quantile_threshold: float = 0.5,
top_k: int = 3,
trailing_points: float = 100.0, # 跟踪止损点数
trailing_percent: float = None, # 或用百分比(如 0.01 = 1%
order_direction: Optional[List[str]] = None,
indicators: Optional[List[Indicator]] = None,
):
super().__init__(context, main_symbol, enable_log)
if order_direction is None:
order_direction = ["BUY", "SELL"]
if indicators is None:
indicators = [Empty(), Empty()]
self.trade_volume = trade_volume
self.ma_period = ma_period
self.area_window = area_window
self.strength_window = strength_window
self.breakout_window = breakout_window
self.quantile_threshold = quantile_threshold
self.top_k = top_k
self.trailing_points = trailing_points
self.trailing_percent = trailing_percent
self.order_direction = order_direction
self.indicators = indicators
# 跟踪止损状态
self.entry_price = None
self.highest_high = None # 多头持仓期间最高价
self.lowest_low = None # 空头持仓期间最低价
self.order_id_counter = 0
self.min_bars_needed = max(
ma_period,
area_window * 3,
strength_window,
breakout_window
) + 10
self.log("AreaReversalStrategy with Trailing Stop Initialized")
def _calculate_areas(self, closes: np.array, ma: np.array) -> np.array:
diffs = np.abs(closes - ma)
areas = talib.SUM(diffs, self.area_window)
return areas
def on_open_bar(self, open_price: float, symbol: str):
self.symbol = symbol
bar_history = self.get_bar_history()
if len(bar_history) < self.min_bars_needed or not self.trading:
return
position = self.get_current_positions().get(self.symbol, 0)
current_bar = bar_history[-1]
# === 计算指标 ===
closes = np.array([b.close for b in bar_history], dtype=float)
ma = talib.SMA(closes, self.ma_period)
areas = self._calculate_areas(closes, ma)
A1 = areas[-1]
A2 = areas[-2] if len(areas) >= 2 else 0
# 强度评估窗口
historical_areas = areas[-(self.strength_window + 1):-1]
if len(historical_areas) < self.strength_window:
return
# === 面积信号条件 ===
area_contracting = (A1 < A2) and (A2 > 0)
threshold = np.nanpercentile(historical_areas, self.quantile_threshold * 100)
strength_satisfied = (A2 >= threshold)
top_k_values = np.partition(historical_areas, -self.top_k)[-self.top_k:]
local_peak = (A2 >= np.min(top_k_values))
area_signal = area_contracting and strength_satisfied and local_peak
# === 突破判断 ===
recent_bars = bar_history[-self.breakout_window:]
highest = max(b.high for b in recent_bars)
lowest = min(b.low for b in recent_bars)
# =============== 开仓逻辑 ===============
if position == 0 and area_signal:
if "BUY" in self.order_direction and current_bar.high >= highest:
self.send_market_order("BUY", self.trade_volume, "OPEN")
self.entry_price = current_bar.close
self.highest_high = current_bar.high
self.lowest_low = None
self.log(f"🚀 Long Entry | A2={A2:.4f}")
elif "SELL" in self.order_direction and current_bar.low <= lowest:
self.send_market_order("SELL", self.trade_volume, "OPEN")
self.entry_price = current_bar.close
self.lowest_low = current_bar.low
self.highest_high = None
self.log(f"⬇️ Short Entry | A2={A2:.4f}")
# =============== 跟踪止损出场逻辑 ===============
elif position != 0 and self.entry_price is not None:
if position > 0:
# 更新最高价
if self.highest_high is None or current_bar.high > self.highest_high:
self.highest_high = current_bar.high
# 计算止损价
if self.trailing_percent is not None:
trailing_offset = self.highest_high * self.trailing_percent
else:
trailing_offset = self.trailing_points
stop_loss_price = self.highest_high - trailing_offset
if current_bar.low <= stop_loss_price:
self.close_position("CLOSE_LONG", position)
self._reset_state()
self.log(f"CloseOperation (Long Trailing Stop) @ {stop_loss_price:.5f}")
else: # position < 0
# 更新最低价
if self.lowest_low is None or current_bar.low < self.lowest_low:
self.lowest_low = current_bar.low
# 计算止损价
if self.trailing_percent is not None:
trailing_offset = self.lowest_low * self.trailing_percent
else:
trailing_offset = self.trailing_points
stop_loss_price = self.lowest_low + trailing_offset
if current_bar.high >= stop_loss_price:
self.close_position("CLOSE_SHORT", -position)
self._reset_state()
self.log(f"CloseOperation (Short Trailing Stop) @ {stop_loss_price:.5f}")
def _reset_state(self):
"""重置跟踪止损状态"""
self.entry_price = None
self.highest_high = None
self.lowest_low = None
# --- 模板方法 ---
def on_init(self):
super().on_init()
self.cancel_all_pending_orders(self.main_symbol)
self._reset_state()
def close_position(self, direction: str, volume: int):
self.send_market_order(direction, volume, offset="CLOSE")
def send_market_order(self, direction: str, volume: int, offset: str):
order_id = f"{self.symbol}_{direction}_MARKET_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id,
symbol=self.symbol,
direction=direction,
volume=volume,
price_type="MARKET",
submitted_time=self.get_current_time(),
offset=offset,
)
self.send_order(order)
def on_rollover(self, old_symbol: str, new_symbol: str):
super().on_rollover(old_symbol, new_symbol)
self._reset_state()
self.log("Rollover: Reset trailing stop state.")