Files

279 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =====================================================================================
# 以下是新增的 ValueMigrationStrategy 策略代码
# =====================================================================================
from collections import deque
from datetime import timedelta, time
import numpy as np
import pandas as pd
from typing import List, Any, Optional, Dict
import talib
from src.core_data import Bar, Order
from src.strategies.ValueMigrationStrategy.data_class import ProfileStats, calculate_profile_from_bars
from src.strategies.base_strategy import Strategy
# = ===================================================================
# 全局辅助函数 (Global Helper Functions)
# 将这些函数放在文件顶部,以便所有策略类都能调用
# =====================================================================
def compute_price_volume_distribution(bars: List[Bar], tick_size: float) -> Optional[pd.Series]:
"""
[全局函数] 从K线数据中计算出原始的价格-成交量分布。
"""
if not bars:
return None
data = []
# 为了性能我们只处理有限数量的bars防止内存问题
# 在实际应用中,更高效的实现是必要的
for bar in bars[-500:]: # 添加一个安全限制
price_range = np.arange(bar.low, bar.high + tick_size, tick_size)
if len(price_range) == 0 or bar.volume == 0: continue
# 将成交量近似分布到K线覆盖的每个tick上
volume_per_tick = bar.volume / len(price_range)
for price in price_range:
data.append({'price': price, 'volume': volume_per_tick})
if not data:
return None
df = pd.DataFrame(data)
if df.empty:
return None
return df.groupby('price')['volume'].sum().sort_index()
# 确保在文件顶部导入
from scipy.signal import find_peaks
def find_hvns_with_distance(price_volume_dist: pd.Series, distance_in_ticks: int) -> List[float]:
"""
[全局函数] 使用峰值查找算法根据峰值间的最小距离来识别HVNs。
Args:
price_volume_dist: 价格-成交量分布序列。
distance_in_ticks: 两个HVN之间必须间隔的最小tick数量。
Returns:
一个包含所有被识别出的HVN价格的列表。
"""
if price_volume_dist.empty or len(price_volume_dist) < 3:
return []
# distance参数确保找到的峰值之间至少相隔N个点
peaks_indices, _ = find_peaks(price_volume_dist.values, distance=distance_in_ticks)
if len(peaks_indices) == 0:
return [price_volume_dist.idxmax()] # 默认返回POC
hvn_prices = price_volume_dist.index[peaks_indices].tolist()
return hvn_prices
class ValueMigrationStrategy(Strategy):
# 确保在文件顶部导入
from scipy.signal import find_peaks
# =====================================================================================
# 以下是全新的、基于HVN回测逻辑的 HVNPullbackStrategy 策略代码
# =====================================================================================
"""
一个基于动态HVN突破后回测的量化交易策略。(适配无回调函数的框架)
该策略首先动态识别出市场中重要的成交量密集区(HVNs)。当价格
明确穿越一个HVN后它并不立即追逐而是预期价格会有一个短暂的
回测行为并在HVN附近的一个偏移位置挂限价单以更高概率顺势入场。
"""
def __init__(
self,
context: Any,
main_symbol: str,
enable_log: bool,
trade_volume: int,
tick_size: float = 1,
profile_period: int = 100,
recalc_interval: int = 4,
hvn_distance_ticks: int = 1,
entry_offset_atr: float = 0.2,
stop_loss_atr: float = 1.0,
take_profit_atr: float = 1.0,
atr_period: int = 14,
order_direction=None,
indicators=[None, None],
):
super().__init__(context, main_symbol, enable_log)
if order_direction is None:
order_direction = ['BUY', 'SELL']
self.trade_volume = trade_volume
self.tick_size = tick_size
self.profile_period = profile_period
self.recalc_interval = recalc_interval
self.hvn_distance_ticks = hvn_distance_ticks
self.entry_offset_atr = entry_offset_atr
self.stop_loss_atr = stop_loss_atr
self.take_profit_atr = take_profit_atr
self.atr_period = atr_period
self.order_direction = order_direction
self.indicator_long = indicators[0]
self.indicator_short = indicators[1]
self.main_symbol = main_symbol
self.order_id_counter = 0
self._bar_counter = 0
self._cached_hvns: List[float] = []
self._last_order_id: Optional[str] = None
# 元数据存储:
self.position_meta: Dict[str, Any] = {} # 存储已成交持仓的止盈止损
self._pending_order_meta: Dict[str, Any] = {} # 存储未成交挂单的预设参数
def on_open_bar(self, open_price: float, symbol: str):
self.symbol = symbol
self._bar_counter += 1
bar_history = self.get_bar_history()
required_len = max(self.profile_period, self.atr_period) + 1
if len(bar_history) < required_len:
return
# # --- 1. 取消上一根K线未成交的限价单 ---
# if self._last_order_id and self._last_order_id in self.get_pending_orders():
# self.cancel_order(self._last_order_id)
# self.log(f"已取消上一根K线的挂单: {self._last_order_id}")
# # 如果挂单被取消,清除对应的预设元数据
# if self._last_order_id in self._pending_order_meta:
# del self._pending_order_meta[self._last_order_id]
# self._last_order_id = None
self.cancel_all_pending_orders(self.symbol)
# --- 2. 管理现有持仓 (逻辑核心调整) ---
position_volume = self.get_current_positions().get(self.symbol, 0)
if position_volume != 0:
self.manage_open_position(position_volume, open_price)
return # 有持仓则不进行新的开仓评估
# --- 3. 周期性地计算并缓存所有的HVNs ---
if self._bar_counter % self.recalc_interval == 1:
profile_bars = bar_history[-self.profile_period:]
dist = compute_price_volume_distribution(profile_bars, self.tick_size)
if dist is not None and not dist.empty:
self._cached_hvns = find_hvns_with_distance(dist, self.hvn_distance_ticks)
self.log(f"New HVNs identified at: {[f'{p:.2f}' for p in self._cached_hvns]}")
if not self._cached_hvns: return
# --- 4. 评估新机会 (穿越后挂单逻辑) ---
self.evaluate_entry_signal(bar_history)
def manage_open_position(self, volume: int, current_price: float):
"""在on_open_bar中主动管理已开仓位的止盈止损。"""
# [关键逻辑]: 检测是否为新成交的持仓
if self.symbol not in self.position_meta:
# 这是一个新持仓。我们必须从挂单的元数据中恢复止盈止损参数。
# 这里假设只有一个挂单能成交。如果有多个,需要更复杂的匹配逻辑。
if not self._pending_order_meta:
self.log("Error: New position detected but no pending order meta found.")
# 紧急情况:立即平仓或设置默认止损
return
# 从挂单元数据中获取参数,并“过户”到持仓元数据
# 由于我们每次只挂一个单,取第一个即可
order_id = next(iter(self._pending_order_meta))
meta = self._pending_order_meta.pop(order_id) # 取出并从pending中删除
self.position_meta[self.symbol] = meta
self.log(f"新持仓确认。已设置TP/SL: {meta}")
# [常规逻辑]: 检查止盈止损
meta = self.position_meta[self.symbol]
sl_price = meta['sl_price']
tp_price = meta['tp_price']
if volume > 0: # 多头
if current_price <= sl_price:
self.log(f"多头止损触发 at {current_price:.2f}")
self.close_position("CLOSE_LONG", abs(volume))
elif current_price >= tp_price:
self.log(f"多头止盈触发 at {current_price:.2f}")
self.close_position("CLOSE_LONG", abs(volume))
elif volume < 0: # 空头
if current_price >= sl_price:
self.log(f"空头止损触发 at {current_price:.2f}")
self.close_position("CLOSE_SHORT", abs(volume))
elif current_price <= tp_price:
self.log(f"空头止盈触发 at {current_price:.2f}")
self.close_position("CLOSE_SHORT", abs(volume))
def evaluate_entry_signal(self, bar_history: List[Bar]):
prev_close = bar_history[-2].close
current_close = bar_history[-1].close
highs = np.array([b.high for b in bar_history], dtype=float)
lows = np.array([b.low for b in bar_history], dtype=float)
closes = np.array([b.close for b in bar_history], dtype=float)
current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1]
if current_atr < self.tick_size: return
for hvn in sorted(self._cached_hvns):
if "BUY" in self.order_direction and (prev_close < hvn < current_close):
if self.indicator_long is None or self.indicator_long.is_condition_met(*self.get_indicator_tuple()):
limit_price = hvn + self.entry_offset_atr * current_atr
self.log(f"价格向上穿越HVN({hvn:.2f}). 在 {limit_price:.2f} 挂限价买单。")
self.send_hvn_limit_order("BUY", limit_price, current_atr)
return
if "SELL" in self.order_direction and (prev_close > hvn > current_close):
if self.indicator_short is None or self.indicator_short.is_condition_met(
*self.get_indicator_tuple()):
limit_price = hvn - self.entry_offset_atr * current_atr
self.log(f"价格向下穿越HVN({hvn:.2f}). 在 {limit_price:.2f} 挂限价卖单。")
self.send_hvn_limit_order("SELL", limit_price, current_atr)
return
def send_hvn_limit_order(self, direction: str, limit_price: float, entry_atr: float):
# 预先计算止盈止损价格
sl_price = limit_price - self.stop_loss_atr * entry_atr if direction == "BUY" else limit_price + self.stop_loss_atr * entry_atr
tp_price = limit_price + self.take_profit_atr * entry_atr if direction == "BUY" else limit_price - self.take_profit_atr * entry_atr
order_id = f"{self.symbol}_{direction}_LIMIT_{self.order_id_counter}"
self.order_id_counter += 1
# 将这些参数存储到 pending_order_meta 中
self._pending_order_meta[order_id] = {'sl_price': sl_price, 'tp_price': tp_price}
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=self.trade_volume,
price_type="LIMIT", limit_price=limit_price, submitted_time=self.get_current_time(),
offset="OPEN"
)
sent_order = self.send_order(order)
if sent_order:
self._last_order_id = sent_order.id
def close_position(self, direction: str, volume: int):
self.send_market_order(direction, volume)
if self.symbol in self.position_meta:
del self.position_meta[self.symbol] # 平仓后清理持仓元数据
def send_market_order(self, direction: str, volume: int, offset: str = "CLOSE"):
order_id = f"{self.symbol}_{direction}_{offset}_{self.get_current_time().strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=volume,
price_type="MARKET", submitted_time=self.get_current_time(), offset=offset
)
self.send_order(order)