246 lines
11 KiB
Python
246 lines
11 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
import talib
|
||
from collections import deque
|
||
from typing import Optional, Any, List, Dict, Tuple
|
||
|
||
from src.core_data import Bar, Order
|
||
from src.indicators.base_indicators import Indicator
|
||
from src.indicators.indicators import Empty
|
||
from src.strategies.base_strategy import Strategy
|
||
|
||
|
||
# =============================================================================
|
||
# 策略实现 (ImbalanceZFlowStrategy)
|
||
# =============================================================================
|
||
|
||
class ImbalanceZFlowStrategy(Strategy):
|
||
"""
|
||
一个基于稳态核心指标“Z-Flow”的纯粹动量策略。
|
||
|
||
核心哲学:
|
||
1. 根本性解决稳态问题:先将原始imbalance序列通过Z-Score转化为
|
||
稳态序列Z_I(t),所有后续分析都基于此坚实基础。
|
||
2. 核心驱动 Z_Flow: 对稳态的Z_I(t)序列计算MACD,以度量“统计
|
||
显著性”的动量,形成一个高质量的稳态振荡器。
|
||
3. 真实的回测逻辑:所有开仓和风险计算都严格基于当前bar的open_price,
|
||
杜绝前视偏差。
|
||
4. 极致简洁:策略由单一指标、两个对称阈值驱动,逻辑纯粹,鲁棒性强。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
context: Any,
|
||
main_symbol: str,
|
||
enable_log: bool,
|
||
trade_volume: int,
|
||
# --- 【霍克斯过程参数】 ---
|
||
hawkes_lookback: int = 60,
|
||
hawkes_alpha: float = 0.8,
|
||
hawkes_beta: float = 0.2,
|
||
# --- 【Z-Score 标准化参数】 ---
|
||
z_score_period: int = 200,
|
||
# --- 【Z-Flow 指标参数】 ---
|
||
z_flow_fast_ema_period: int = 12,
|
||
z_flow_slow_ema_period: int = 26,
|
||
# --- 【交易阈值】 ---
|
||
entry_threshold: float = 0.5,
|
||
exit_threshold: float = 0.1,
|
||
# --- 【风险管理】 ---
|
||
atr_period: int = 20,
|
||
stop_loss_atr_multiplier: float = 3.0,
|
||
# --- 其他 ---
|
||
order_direction: Optional[List[str]] = None,
|
||
indicators: Optional[List[Indicator]] = None,
|
||
):
|
||
super().__init__(context, main_symbol, enable_log)
|
||
if order_direction is None: order_direction = ['BUY', 'SELL']
|
||
|
||
# --- 参数赋值 ---
|
||
self.trade_volume = trade_volume
|
||
self.hawkes_lookback = hawkes_lookback
|
||
self.hawkes_alpha = hawkes_alpha
|
||
self.hawkes_beta = hawkes_beta
|
||
self.z_score_period = z_score_period
|
||
self.z_flow_fast_ema_period = z_flow_fast_ema_period
|
||
self.z_flow_slow_ema_period = z_flow_slow_ema_period
|
||
self.entry_threshold = entry_threshold
|
||
self.exit_threshold = exit_threshold
|
||
self.atr_period = atr_period
|
||
self.stop_loss_atr_multiplier = stop_loss_atr_multiplier
|
||
self.order_direction = order_direction
|
||
|
||
# --- 内部状态变量 ---
|
||
self._imbalance_history: deque = deque(maxlen=self.z_score_period + self.z_flow_slow_ema_period)
|
||
|
||
# 核心指标
|
||
self.z_flow = 0.0
|
||
self.prev_z_flow = 0.0
|
||
|
||
self.position_meta: Dict[str, Any] = self.context.load_state()
|
||
self.main_symbol = main_symbol
|
||
self.order_id_counter = 0
|
||
|
||
if indicators is None: indicators = [Empty(), Empty()]
|
||
self.indicators = indicators
|
||
|
||
self.log(f"ImbalanceZFlowStrategy Initialized")
|
||
|
||
def on_init(self):
|
||
super().on_init()
|
||
self.cancel_all_pending_orders(self.main_symbol)
|
||
self.position_meta = self.context.load_state()
|
||
|
||
def on_open_bar(self, open_price: float, symbol: str):
|
||
self.symbol = symbol
|
||
bar_history = self.get_bar_history()
|
||
required_bars = self._imbalance_history.maxlen + self.hawkes_lookback + 5
|
||
if len(bar_history) < required_bars:
|
||
return
|
||
|
||
self.prev_z_flow = self.z_flow
|
||
|
||
self._update_indicators_and_state(bar_history)
|
||
|
||
position_volume = self.get_current_positions().get(self.symbol, 0)
|
||
self._sync_position_state(position_volume, symbol)
|
||
|
||
if not self.trading: return
|
||
|
||
if position_volume != 0:
|
||
self.manage_open_position(position_volume, bar_history[-1])
|
||
else:
|
||
self.evaluate_entry_signal(open_price, bar_history) # 传入open_price
|
||
|
||
def _update_indicators_and_state(self, bar_history: List[Bar]):
|
||
"""核心计算函数:I(t) -> Z_I(t) -> Z_Flow(t)"""
|
||
# --- 1. 计算瞬时不均衡力量 I(t) ---
|
||
long_events, short_events = [], []
|
||
lookback_bars = bar_history[-self.hawkes_lookback:]
|
||
for i, bar in enumerate(lookback_bars):
|
||
event_age = len(lookback_bars) - 1 - i
|
||
mark = (bar.high - bar.low) * bar.volume
|
||
if mark > 0:
|
||
if bar.close > bar.open:
|
||
long_events.append({'age': event_age, 'mark': mark})
|
||
elif bar.close < bar.open:
|
||
short_events.append({'age': event_age, 'mark': mark})
|
||
|
||
lambda_long = sum(self.hawkes_alpha * e['mark'] * np.exp(-self.hawkes_beta * e['age']) for e in long_events)
|
||
lambda_short = sum(self.hawkes_alpha * e['mark'] * np.exp(-self.hawkes_beta * e['age']) for e in short_events)
|
||
imbalance = lambda_long - lambda_short
|
||
self._imbalance_history.append(imbalance)
|
||
|
||
if len(self._imbalance_history) < self.z_score_period:
|
||
self.z_flow = 0.0
|
||
return
|
||
|
||
imbalance_series = pd.Series(list(self._imbalance_history))
|
||
|
||
# --- 2. Z-Score 标准化,得到稳态序列 Z_I(t) ---
|
||
rolling_mean = imbalance_series.rolling(window=self.z_score_period).mean()
|
||
rolling_std = imbalance_series.rolling(window=self.z_score_period).std()
|
||
|
||
# 避免除以零,并处理初始NaN值
|
||
if rolling_std.iloc[-1] > 1e-9:
|
||
z_score_series = (imbalance_series - rolling_mean) / rolling_std
|
||
else:
|
||
z_score_series = pd.Series(0.0, index=imbalance_series.index)
|
||
|
||
z_score_series.fillna(0.0, inplace=True)
|
||
|
||
# --- 3. 对稳态序列 Z_I(t) 计算MACD,得到 Z_Flow ---
|
||
fast_ema = z_score_series.ewm(span=self.z_flow_fast_ema_period, adjust=False).mean()
|
||
slow_ema = z_score_series.ewm(span=self.z_flow_slow_ema_period, adjust=False).mean()
|
||
self.z_flow = fast_ema.iloc[-1] - slow_ema.iloc[-1]
|
||
|
||
def manage_open_position(self, volume: int, current_bar: Bar):
|
||
"""由Z-Flow驱动的统一平仓逻辑,并辅以价格止损。"""
|
||
meta = self.position_meta.get(self.symbol)
|
||
if not meta: return
|
||
|
||
is_long = volume > 0
|
||
|
||
# 1. 价格硬止损
|
||
stop_loss_price = meta['stop_loss_price']
|
||
if (is_long and current_bar.low <= stop_loss_price) or \
|
||
(not is_long and current_bar.high >= stop_loss_price):
|
||
self.log(f"ATR Stop Loss Hit at {stop_loss_price:.4f}")
|
||
self.close_position("CLOSE_LONG" if is_long else "CLOSE_SHORT", abs(volume))
|
||
return
|
||
|
||
# 2. Z_Flow 驱动的平仓 (动能耗散)
|
||
exit_triggered = False
|
||
if is_long and self.z_flow < self.exit_threshold:
|
||
exit_triggered = True
|
||
elif not is_long and self.z_flow > -self.exit_threshold:
|
||
exit_triggered = True
|
||
|
||
if exit_triggered:
|
||
self.log(f"Z-Flow Dissipation Exit. Direction: {'LONG' if is_long else 'SHORT'}. "
|
||
f"Z-Flow ({self.z_flow:.2f}) returned to neutral zone.")
|
||
self.close_position("CLOSE_LONG" if is_long else "CLOSE_SHORT", abs(volume))
|
||
|
||
def evaluate_entry_signal(self, open_price: float, bar_history: List[Bar]):
|
||
"""当Z-Flow穿越入场阈值时,在open_price开仓。"""
|
||
direction = None
|
||
if "BUY" in self.order_direction and self.prev_z_flow < self.entry_threshold <= self.z_flow:
|
||
direction = "BUY"
|
||
elif "SELL" in self.order_direction and self.prev_z_flow > -self.entry_threshold >= self.z_flow:
|
||
direction = "SELL"
|
||
|
||
if direction:
|
||
self.log(
|
||
f"Z-Flow Signal: {direction}. Z-Flow: {self.z_flow:.2f} crossed threshold. Entry on Open: {open_price}")
|
||
|
||
# 使用完整的bar_history计算ATR
|
||
highs = np.array([b.high for b in bar_history], dtype=float)
|
||
lows = np.array([b.low for b in bar_history], dtype=float)
|
||
closes = np.array([b.close for b in bar_history], dtype=float)
|
||
current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1]
|
||
|
||
if current_atr <= 0: return
|
||
|
||
# ** 关键修正:基于 open_price 计算止损 **
|
||
stop_loss_price = open_price - self.stop_loss_atr_multiplier * current_atr if direction == "BUY" \
|
||
else open_price + self.stop_loss_atr_multiplier * current_atr
|
||
|
||
# ** 关键修正:记录 open_price 为入场价 **
|
||
meta = {'entry_price': open_price, 'stop_loss_price': stop_loss_price}
|
||
|
||
self.send_market_order(direction, self.trade_volume, "OPEN", meta)
|
||
self.save_state(self.position_meta)
|
||
|
||
# ... (辅助函数保持不变) ...
|
||
def _sync_position_state(self, position_volume, symbol):
|
||
meta = self.position_meta.get(symbol)
|
||
if position_volume != 0 and not meta:
|
||
self.log(f"警告:持仓({position_volume})与策略状态不一致!将强制平仓。", level='WARNING')
|
||
self.close_position("CLOSE_LONG" if position_volume > 0 else "CLOSE_SHORT", abs(position_volume))
|
||
return
|
||
if position_volume == 0 and meta:
|
||
self.log(f"信息:清理过时的策略状态。", level='INFO')
|
||
self.position_meta.pop(symbol, None)
|
||
self.save_state(self.position_meta)
|
||
|
||
def close_position(self, direction: str, volume: int):
|
||
self.send_market_order(direction, volume, offset="CLOSE")
|
||
if self.symbol in self.position_meta:
|
||
self.position_meta.pop(self.symbol, None)
|
||
self.save_state(self.position_meta)
|
||
|
||
def send_market_order(self, direction: str, volume: int, offset: str, meta: Optional[Dict] = None):
|
||
if offset == "OPEN" and meta: self.position_meta[self.symbol] = meta
|
||
order_id = f"{self.symbol}_{direction}_MARKET_{self.order_id_counter}"
|
||
self.order_id_counter += 1
|
||
order = Order(id=order_id, symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET",
|
||
submitted_time=self.get_current_time(), offset=offset)
|
||
self.send_order(order)
|
||
|
||
def on_rollover(self, old_symbol: str, new_symbol: str):
|
||
super().on_rollover(old_symbol, new_symbol)
|
||
self.position_meta = {}
|
||
self._imbalance_history.clear()
|
||
self.z_flow = 0.0
|
||
self.prev_z_flow = 0.0
|
||
self.log("Rollover detected. All strategy states have been reset.") |