Files
NewQuant/futures_trading_strategies/MA/KalmanStrategy/KalmanStrategy3.py

222 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
import talib
from collections import deque
from typing import Optional, Any, List, Dict
from src.core_data import Bar, Order
from src.indicators.base_indicators import Indicator
from src.indicators.indicators import Empty
from src.strategies.base_strategy import Strategy
class DualModeKalmanStrategy(Strategy):
"""
基于卡尔曼因子对称性的双模自适应策略
因子定义: Deviation = (Current_Close - Kalman_Price) / ATR
"""
def __init__(
self,
context: Any,
main_symbol: str,
enable_log: bool,
trade_volume: int,
strategy_mode: str = 'TREND', # 'TREND' 或 'REVERSION'
kalman_process_noise: float = 0.01,
kalman_measurement_noise: float = 0.5,
atr_period: int = 23,
atr_lookback: int = 100,
atr_percentile_threshold: float = 25.0,
entry_threshold_atr: float = 2.5, # 入场偏离倍数
stop_loss_atr: float = 2.0, # 保护性硬止损倍数
trend_trailing_atr: float = 2.5, # 趋势模式下卡尔曼轨道的宽度
order_direction=None,
indicators: Optional[List[Indicator]] = None,
):
super().__init__(context, main_symbol, enable_log)
if order_direction is None:
order_direction = ['BUY', 'SELL']
self.strategy_mode = strategy_mode.upper()
self.trade_volume = trade_volume
self.atr_period = atr_period
self.atr_lookback = atr_lookback
self.atr_percentile_threshold = atr_percentile_threshold
self.entry_threshold_atr = entry_threshold_atr
self.stop_loss_atr = stop_loss_atr
self.trend_trailing_atr = entry_threshold_atr
# 卡尔曼状态
self.Q = kalman_process_noise
self.R = kalman_measurement_noise
self.P = 1.0
self.x_hat = 0.0
self.kalman_initialized = False
self._atr_history: deque = deque(maxlen=self.atr_lookback)
self.position_meta: Dict[str, Any] = self.context.load_state()
self.order_id_counter = 0
self.order_direction = order_direction
if indicators is None:
self.indicators = [Empty(), Empty()]
else:
self.indicators = indicators
self.log(f"Initialized [{self.strategy_mode}] Mode with Kalman Symmetry.")
def on_open_bar(self, open_price: float, symbol: str):
self.symbol = symbol
bar_history = self.get_bar_history()
if len(bar_history) < max(self.atr_period, self.atr_lookback) + 2: return
self.cancel_all_pending_orders(symbol)
# 1. 计算核心指标
highs = np.array([b.high for b in bar_history], dtype=float)
lows = np.array([b.low for b in bar_history], dtype=float)
closes = np.array([b.close for b in bar_history], dtype=float)
current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1]
self._atr_history.append(current_atr)
if current_atr <= 0 or len(self._atr_history) < self.atr_lookback: return
# 2. 更新卡尔曼滤波器
if not self.kalman_initialized:
self.x_hat = closes[-1]
self.kalman_initialized = True
x_hat_minus = self.x_hat
P_minus = self.P + self.Q
K = P_minus / (P_minus + self.R)
self.x_hat = x_hat_minus + K * (closes[-1] - x_hat_minus)
self.P = (1 - K) * P_minus
kalman_price = self.x_hat
# 3. 计算对称性因子:偏离度 (Deviation in ATR)
deviation_in_atr = (closes[-1] - kalman_price) / current_atr
# 4. 状态校验与持仓管理
position_volume = self.get_current_positions().get(self.symbol, 0)
if self.trading:
if position_volume != 0:
self.manage_logic(position_volume, bar_history[-1], current_atr, kalman_price, deviation_in_atr)
else:
# 波动率过滤:只在波动率处于自身中高水平时入场
# atr_threshold = np.percentile(list(self._atr_history), self.atr_percentile_threshold)
# if current_atr >= atr_threshold:
self.evaluate_entry_signal(bar_history[-1], deviation_in_atr, current_atr, open_price)
def manage_logic(self, volume: int, current_bar: Bar, current_atr: float, kalman_price: float, dev: float):
"""
基于对称因子的出场逻辑
"""
meta = self.position_meta.get(self.symbol)
if not meta: return
# A. 保护性硬止损 (防止跳空或极端行情)
initial_stop = meta.get('initial_stop_price', 0)
if (volume > 0 and current_bar.low <= initial_stop) or (volume < 0 and current_bar.high >= initial_stop):
self.log(f"Hard Stop Hit. Price: {current_bar.close}")
self.close_position("CLOSE_LONG" if volume > 0 else "CLOSE_SHORT", abs(volume))
return
# B. 对称因子出场逻辑
if self.strategy_mode == 'TREND':
if volume > 0:
trend_floor = kalman_price - self.trend_trailing_atr * current_atr
if dev < 0:
self.log(f"TREND: Structural Floor Hit at {trend_floor:.4f}")
self.close_position("CLOSE_LONG", abs(volume))
else:
trend_ceiling = kalman_price + self.trend_trailing_atr * current_atr
if dev > 0:
self.log(f"TREND: Structural Ceiling Hit at {trend_ceiling:.4f}")
self.close_position("CLOSE_SHORT", abs(volume))
elif self.strategy_mode == 'REVERSION':
# 回归模式:出场基于“均值修复成功”或“偏离失控止损”
# 1. 目标达成:回归到均值附近 (止盈)
if volume > 0:
trend_floor = kalman_price - self.trend_trailing_atr * current_atr
if dev > 0:
self.log(f"TREND: Structural Floor Hit at {trend_floor:.4f}")
self.close_position("CLOSE_LONG", abs(volume))
else:
trend_ceiling = kalman_price + self.trend_trailing_atr * current_atr
if dev < 0:
self.log(f"TREND: Structural Ceiling Hit at {trend_ceiling:.4f}")
self.close_position("CLOSE_SHORT", abs(volume))
def evaluate_entry_signal(self, current_bar: Bar, dev: float, current_atr: float, open_price: float):
"""
基于对称因子的入场逻辑
"""
direction = None
if self.strategy_mode == 'TREND':
# 趋势:顺着偏离方向入场
if dev > self.entry_threshold_atr and self.indicators[0].is_condition_met(*self.get_indicator_tuple()):
direction = "BUY"
elif dev < -self.entry_threshold_atr and self.indicators[1].is_condition_met(*self.get_indicator_tuple()):
direction = "SELL"
elif self.strategy_mode == 'REVERSION':
# 回归:逆着偏离方向入场
if dev > self.entry_threshold_atr and self.indicators[1].is_condition_met(*self.get_indicator_tuple()):
direction = "SELL" # 超买做空
elif dev < -self.entry_threshold_atr and self.indicators[0].is_condition_met(*self.get_indicator_tuple()):
direction = "BUY" # 超卖做多
if direction:
# 使用最小变动单位修正价格此处假设最小变动为0.5实盘应从context获取
tick_size = 1
entry_price = open_price
# 设置保护性硬止损
stop_offset = self.stop_loss_atr * current_atr
stop_price = entry_price - stop_offset if direction == "BUY" else entry_price + stop_offset
meta = {'entry_price': entry_price, 'initial_stop_price': stop_price}
self.log(f"Entry Signal: {self.strategy_mode} | {direction} | Dev: {dev:.2f}")
self.send_custom_order(entry_price, direction, self.trade_volume, "OPEN", meta)
# --- 辅助函数 ---
def send_custom_order(self, price: float, direction: str, volume: int, offset: str, meta: Dict):
self.position_meta[self.symbol] = meta
order_type = "LIMIT"
order_id = f"{self.symbol}_{direction}_{order_type}_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction,
volume=volume, price_type=order_type,
submitted_time=self.get_current_time(), offset=offset, limit_price=price
)
self.send_order(order)
self.save_state(self.position_meta)
def close_position(self, direction: str, volume: int):
order_id = f"{self.symbol}_{direction}_MARKET_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction,
volume=volume, price_type="MARKET",
submitted_time=self.get_current_time(), offset="CLOSE"
)
self.send_order(order)
self.position_meta.pop(self.symbol, None)
self.save_state(self.position_meta)
def on_rollover(self, old_symbol: str, new_symbol: str):
super().on_rollover(old_symbol, new_symbol)
self.position_meta = {}
self.kalman_initialized = False
self._atr_history.clear()
self.log("Rollover: States Reset.")