1、vp策略-v2

This commit is contained in:
2025-10-06 22:13:38 +08:00
parent 4712565749
commit 9358dba814
10 changed files with 2229 additions and 9205 deletions

File diff suppressed because one or more lines are too long

View File

@@ -276,13 +276,3 @@ class ValueMigrationStrategy(Strategy):
)
self.send_order(order)
def send_market_order(self, direction: str, volume: int, offset: str = "CLOSE"):
# ... (与之前版本相同) ...
order_id = f"{self.symbol}_{direction}_{offset}_{self.get_current_time().strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=volume,
price_type="MARKET", submitted_time=self.get_current_time(), offset=offset
)
self.send_order(order)

View File

@@ -0,0 +1,315 @@
# =====================================================================================
# 以下是新增的 ValueMigrationStrategy 策略代码
# =====================================================================================
from collections import deque
from datetime import timedelta, time
import numpy as np
import pandas as pd
from typing import List, Any, Optional, Dict
import talib
from src.core_data import Bar, Order
from src.strategies.ValueMigrationStrategy.data_class import ProfileStats, calculate_profile_from_bars
from src.strategies.base_strategy import Strategy
# = ===================================================================
# 全局辅助函数 (Global Helper Functions)
# 将这些函数放在文件顶部,以便所有策略类都能调用
# =====================================================================
def compute_price_volume_distribution(bars: List[Bar], tick_size: float) -> Optional[pd.Series]:
"""
[全局函数] 从K线数据中计算出原始的价格-成交量分布。
"""
if not bars:
return None
data = []
# 为了性能我们只处理有限数量的bars防止内存问题
# 在实际应用中,更高效的实现是必要的
for bar in bars[-500:]: # 添加一个安全限制
price_range = np.arange(bar.low, bar.high + tick_size, tick_size)
if len(price_range) == 0 or bar.volume == 0: continue
# 将成交量近似分布到K线覆盖的每个tick上
volume_per_tick = bar.volume / len(price_range)
for price in price_range:
data.append({'price': price, 'volume': volume_per_tick})
if not data:
return None
df = pd.DataFrame(data)
if df.empty:
return None
return df.groupby('price')['volume'].sum().sort_index()
# 确保在文件顶部导入
from scipy.signal import find_peaks
def find_hvns_with_distance(price_volume_dist: pd.Series, distance_in_ticks: int) -> List[float]:
"""
[全局函数] 使用峰值查找算法根据峰值间的最小距离来识别HVNs。
Args:
price_volume_dist: 价格-成交量分布序列。
distance_in_ticks: 两个HVN之间必须间隔的最小tick数量。
Returns:
一个包含所有被识别出的HVN价格的列表。
"""
if price_volume_dist.empty or len(price_volume_dist) < 3:
return []
# distance参数确保找到的峰值之间至少相隔N个点
peaks_indices, _ = find_peaks(price_volume_dist.values, distance=distance_in_ticks)
if len(peaks_indices) == 0:
return [price_volume_dist.idxmax()] # 默认返回POC
hvn_prices = price_volume_dist.index[peaks_indices].tolist()
return hvn_prices
def find_hvns_strict(price_volume_dist: pd.Series, window_radius: int) -> List[float]:
"""
[全局函数] 使用严格的“滚动窗口最大值”定义来识别HVNs。
一个点是HVN当且仅当它的成交量大于其左右各 `window_radius` 个点的成交量。
Args:
price_volume_dist: 价格-成交量分布序列。
window_radius: 定义了检查窗口的半径 (即您所说的 N)。
Returns:
一个包含所有被识别出的HVN价格的列表。
"""
if price_volume_dist.empty or window_radius == 0:
return [price_volume_dist.idxmax()] if not price_volume_dist.empty else []
# 1. 确保价格序列是连续的用0填充缺失的ticks
full_price_range = np.arange(price_volume_dist.index.min(),
price_volume_dist.index.max() + price_volume_dist.index.to_series().diff().min(),
price_volume_dist.index.to_series().diff().min())
continuous_dist = price_volume_dist.reindex(full_price_range, fill_value=0)
# 2. 计算滚动窗口最大值
window_size = 2 * window_radius + 1
rolling_max = continuous_dist.rolling(window=window_size, center=True).max()
# 3. 找到那些自身成交量就等于其窗口最大值的点
is_hvn = (continuous_dist == rolling_max) & (continuous_dist > 0)
hvn_prices = continuous_dist[is_hvn].index.tolist()
# 4. 处理平顶山如果连续多个点都是HVN只保留中间那个
if not hvn_prices:
return [price_volume_dist.idxmax()] # 如果找不到返回POC
final_hvns = []
i = 0
while i < len(hvn_prices):
# 找到一个连续HVN块
j = i
while j + 1 < len(hvn_prices) and (hvn_prices[j + 1] - hvn_prices[j]) < (
2 * price_volume_dist.index.to_series().diff().min()):
j += 1
# 取这个连续块的中间点
middle_index = i + (j - i) // 2
final_hvns.append(hvn_prices[middle_index])
i = j + 1
return final_hvns
# 确保在文件顶部导入
from scipy.signal import find_peaks
# =====================================================================================
# 以下是V2版本的、简化了状态管理的 HVNPullbackStrategy 代码
# =====================================================================================
class ValueMigrationStrategy(Strategy):
"""
一个基于动态HVN突破后回测的量化交易策略。(V2: 简化状态管理)
V2版本简化了内部状态管理移除了基于order_id的复杂元数据传递
使用更直接、更健壮的单一状态变量来处理挂单的止盈止损参数,
完美适配“单次单持仓”的策略逻辑。
"""
def __init__(
self,
context: Any,
main_symbol: str,
enable_log: bool,
trade_volume: int,
tick_size: float = 1,
profile_period: int = 100,
recalc_interval: int = 4,
hvn_distance_ticks: int = 20,
entry_offset_atr: float = 0.0,
stop_loss_atr: float = 1.0,
take_profit_atr: float = 2.0,
atr_period: int = 14,
order_direction=None,
indicators=[None, None],
):
super().__init__(context, main_symbol, enable_log)
if order_direction is None:
order_direction = ['BUY', 'SELL']
self.trade_volume = trade_volume
self.tick_size = tick_size
self.profile_period = profile_period
self.recalc_interval = recalc_interval
self.hvn_distance_ticks = hvn_distance_ticks
self.entry_offset_atr = entry_offset_atr
self.stop_loss_atr = stop_loss_atr
self.take_profit_atr = take_profit_atr
self.atr_period = atr_period
self.order_direction = order_direction
self.indicator_long = indicators[0]
self.indicator_short = indicators[1]
self.main_symbol = main_symbol
self.order_id_counter = 0
self._bar_counter = 0
self._cached_hvns: List[float] = []
# --- V2: 简化的状态管理 ---
self._pending_sl_price: Optional[float] = None
self._pending_tp_price: Optional[float] = None
def on_open_bar(self, open_price: float, symbol: str):
self.symbol = symbol
self._bar_counter += 1
bar_history = self.get_bar_history()
required_len = max(self.profile_period, self.atr_period) + 1
if len(bar_history) < required_len:
return
# --- 1. 取消所有挂单并重置挂单状态 ---
self.cancel_all_pending_orders(self.symbol)
# self._pending_sl_price = None
# self._pending_tp_price = None
# --- 2. 管理现有持仓 ---
position_volume = self.get_current_positions().get(self.symbol, 0)
if position_volume != 0:
self.manage_open_position(position_volume, open_price)
return
# --- 3. 周期性地计算HVNs ---
if self._bar_counter % self.recalc_interval == 1:
profile_bars = bar_history[-self.profile_period:]
dist = compute_price_volume_distribution(profile_bars, self.tick_size)
if dist is not None and not dist.empty:
# self._cached_hvns = find_hvns_with_distance(dist, self.hvn_distance_ticks)
self._cached_hvns = find_hvns_strict(dist, self.hvn_distance_ticks)
self.log(f"New HVNs identified at: {[f'{p:.2f}' for p in self._cached_hvns]}")
if not self._cached_hvns: return
# --- 4. 评估新机会 (挂单逻辑) ---
self.evaluate_entry_signal(bar_history)
def manage_open_position(self, volume: int, current_price: float):
"""主动管理已开仓位的止盈止损。"""
# # [V2 关键逻辑]: 检测是否为新持仓
# # 如果这是一个新持仓,并且我们有预设的止盈止损,就将其存入
# if self._pending_sl_price is not None and self._pending_tp_price is not None:
# meta = {'sl_price': self._pending_sl_price, 'tp_price': self._pending_tp_price}
# self.position_meta = meta
# self.log(f"新持仓确认。已设置TP/SL: {meta}")
# else:
# # 这种情况理论上不应发生,但作为保护
# self.log("Error: New position detected but no pending TP/SL values found.")
# self.close_position("CLOSE_LONG" if volume > 0 else "CLOSE_SHORT", abs(volume))
# return
# [常规逻辑]: 检查止盈止损
sl_price = self._pending_sl_price
tp_price = self._pending_tp_price
if volume > 0: # 多头
if current_price <= sl_price or current_price >= tp_price:
action = "止损" if current_price <= sl_price else "止盈"
self.log(f"多头{action}触发 at {current_price:.2f}")
self.close_position("CLOSE_LONG", abs(volume))
elif volume < 0: # 空头
if current_price >= sl_price or current_price <= tp_price:
action = "止损" if current_price >= sl_price else "止盈"
self.log(f"空头{action}触发 at {current_price:.2f}")
self.close_position("CLOSE_SHORT", abs(volume))
def evaluate_entry_signal(self, bar_history: List[Bar]):
prev_close = bar_history[-2].close
current_close = bar_history[-1].close
highs = np.array([b.high for b in bar_history], dtype=float)
lows = np.array([b.low for b in bar_history], dtype=float)
closes = np.array([b.close for b in bar_history], dtype=float)
current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1]
if current_atr < self.tick_size: return
for hvn in sorted(self._cached_hvns):
# (为了简洁,买卖逻辑合并)
direction = None
if "BUY" in self.order_direction and (prev_close < hvn < current_close):
direction = "BUY"
pass_filter = self.indicator_long is None or self.indicator_long.is_condition_met(
*self.get_indicator_tuple())
elif "SELL" in self.order_direction and (prev_close > hvn > current_close):
direction = "SELL"
pass_filter = self.indicator_short is None or self.indicator_short.is_condition_met(
*self.get_indicator_tuple())
else:
continue # 没有触发穿越
if direction and pass_filter:
offset = self.entry_offset_atr * current_atr
limit_price = hvn + offset if direction == "BUY" else hvn - offset
self.log(f"价格穿越HVN({hvn:.2f}). 在 {limit_price:.2f} 挂限价{direction}单。")
self.send_hvn_limit_order(direction, limit_price, current_atr)
return # 每次只挂一个单
def send_hvn_limit_order(self, direction: str, limit_price: float, entry_atr: float):
# [V2 关键逻辑]: 直接更新实例变量
self._pending_sl_price = limit_price - self.stop_loss_atr * entry_atr if direction == "BUY" else limit_price + self.stop_loss_atr * entry_atr
self._pending_tp_price = limit_price + self.take_profit_atr * entry_atr if direction == "BUY" else limit_price - self.take_profit_atr * entry_atr
order_id = f"{self.symbol}_{direction}_LIMIT_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=self.trade_volume,
price_type="LIMIT", limit_price=limit_price, submitted_time=self.get_current_time(),
offset="OPEN"
)
self.send_order(order)
def close_position(self, direction: str, volume: int):
self.send_market_order(direction, volume)
def send_market_order(self, direction: str, volume: int, offset: str = "CLOSE"):
order_id = f"{self.symbol}_{direction}_{offset}_{self.get_current_time().strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=volume,
price_type="MARKET", submitted_time=self.get_current_time(), offset=offset
)
self.send_order(order)

View File

@@ -0,0 +1,317 @@
# =====================================================================================
# 以下是新增的 ValueMigrationStrategy 策略代码
# =====================================================================================
from collections import deque
from datetime import timedelta, time
import numpy as np
import pandas as pd
from typing import List, Any, Optional, Dict
import talib
from src.core_data import Bar, Order
from src.strategies.ValueMigrationStrategy.data_class import ProfileStats, calculate_profile_from_bars
from src.strategies.base_strategy import Strategy
# = ===================================================================
# 全局辅助函数 (Global Helper Functions)
# 将这些函数放在文件顶部,以便所有策略类都能调用
# =====================================================================
def compute_price_volume_distribution(bars: List[Bar], tick_size: float) -> Optional[pd.Series]:
"""
[全局函数] 从K线数据中计算出原始的价格-成交量分布。
"""
if not bars:
return None
data = []
# 为了性能我们只处理有限数量的bars防止内存问题
# 在实际应用中,更高效的实现是必要的
for bar in bars[-500:]: # 添加一个安全限制
price_range = np.arange(bar.low, bar.high + tick_size, tick_size)
if len(price_range) == 0 or bar.volume == 0: continue
# 将成交量近似分布到K线覆盖的每个tick上
volume_per_tick = bar.volume / len(price_range)
for price in price_range:
data.append({'price': price, 'volume': volume_per_tick})
if not data:
return None
df = pd.DataFrame(data)
if df.empty:
return None
return df.groupby('price')['volume'].sum().sort_index()
# 确保在文件顶部导入
from scipy.signal import find_peaks
def find_hvns_with_distance(price_volume_dist: pd.Series, distance_in_ticks: int) -> List[float]:
"""
[全局函数] 使用峰值查找算法根据峰值间的最小距离来识别HVNs。
Args:
price_volume_dist: 价格-成交量分布序列。
distance_in_ticks: 两个HVN之间必须间隔的最小tick数量。
Returns:
一个包含所有被识别出的HVN价格的列表。
"""
if price_volume_dist.empty or len(price_volume_dist) < 3:
return []
# distance参数确保找到的峰值之间至少相隔N个点
peaks_indices, _ = find_peaks(price_volume_dist.values, distance=distance_in_ticks)
if len(peaks_indices) == 0:
return [price_volume_dist.idxmax()] # 默认返回POC
hvn_prices = price_volume_dist.index[peaks_indices].tolist()
return hvn_prices
# 确保在文件顶部导入
from scipy.signal import find_peaks
# =====================================================================================
# 以下是V2版本的、简化了状态管理的 HVNPullbackStrategy 代码
# =====================================================================================
# 引入必要的类型,确保代码清晰
from typing import Any, Dict, Optional, List
import numpy as np
import talib
class ValueMigrationStrategy(Strategy):
"""
一个基于动态HVN突破后回测的量化交易策略。(V3: 集成上下文状态管理)
V3版本完全集成BacktestContext的状态管理功能实现了策略重启后的状态恢复。
- 状态被简化为两个核心变量_pending_sl_price 和 _pending_tp_price。
- 在策略初始化时安全地加载状态,并兼容空状态或旧版状态。
- 在下单或平仓时立即持久化状态,确保数据一致性。
- 增加了逻辑检查,处理重启后可能出现的状态与实际持仓不一致的问题。
"""
def __init__(
self,
context: Any, # 通常会是 BacktestContext
main_symbol: str,
enable_log: bool,
trade_volume: int,
tick_size: float = 1,
profile_period: int = 100,
recalc_interval: int = 4,
hvn_distance_ticks: int = 1,
entry_offset_atr: float = 0.0,
stop_loss_atr: float = 1.0,
take_profit_atr: float = 1.0,
atr_period: int = 14,
order_direction=None,
indicators=[None, None],
):
super().__init__(context, main_symbol, enable_log)
# --- 参数初始化 (保持不变) ---
if order_direction is None:
order_direction = ['BUY', 'SELL']
self.trade_volume = trade_volume
self.tick_size = tick_size
self.profile_period = profile_period
self.recalc_interval = recalc_interval
self.hvn_distance_ticks = hvn_distance_ticks
self.entry_offset_atr = entry_offset_atr
self.stop_loss_atr = stop_loss_atr
self.take_profit_atr = take_profit_atr
self.atr_period = atr_period
self.order_direction = order_direction
self.indicator_long = indicators[0]
self.indicator_short = indicators[1]
self.main_symbol = main_symbol
self.order_id_counter = 0
self._bar_counter = 0
self._cached_hvns: List[float] = []
# --- 新增: 初始化时加载状态 ---
self._pending_sl_price: Optional[float] = None
self._pending_tp_price: Optional[float] = None
self._load_state_from_context()
def _get_state_dict(self) -> Dict[str, Any]:
"""一个辅助函数,用于生成当前需要保存的状态字典。"""
return {
"_pending_sl_price": self._pending_sl_price,
"_pending_tp_price": self._pending_tp_price,
}
def _load_state_from_context(self):
"""
[新增] 从上下文中加载状态,并进行健壮性处理。
"""
loaded_state = self.context.load_state()
if not loaded_state:
self.log("未找到历史状态,进行全新初始化。")
return
# 使用 .get() 方法安全地读取即使key不存在或state为空也不会报错。
# 这完美解决了“读取的state的key不一样”的问题。
self._pending_sl_price = loaded_state.get("_pending_sl_price")
self._pending_tp_price = loaded_state.get("_pending_tp_price")
if self._pending_sl_price is not None:
self.log(f"成功从上下文加载状态: SL={self._pending_sl_price}, TP={self._pending_tp_price}")
else:
self.log("加载的状态为空或格式不兼容,视为全新初始化。")
def on_open_bar(self, open_price: float, symbol: str):
self.symbol = symbol
self._bar_counter += 1
bar_history = self.get_bar_history()
required_len = max(self.profile_period, self.atr_period) + 1
if len(bar_history) < required_len:
return
# 取消所有挂单这符合原逻辑确保每根bar都是新的开始
self.cancel_all_pending_orders(self.symbol)
position_volume = self.get_current_positions().get(self.symbol, 0)
# --- 新增: 状态一致性检查 ---
# 场景:策略重启后,加载了之前的止盈止损状态,但发现实际上并没有持仓
# (可能因为上次平仓后、清空状态前程序就关闭了)。
# 这种情况下,状态是无效的“幽灵状态”,必须清除。
if position_volume == 0 and self._pending_sl_price is not None:
self.log("检测到状态与实际持仓不符 (有状态但无持仓),重置本地状态。")
self._pending_sl_price = None
self._pending_tp_price = None
self.context.save_state(self._get_state_dict()) # 立即同步清除后的状态
# --- 1. 管理现有持仓 (如果存在) ---
if position_volume != 0:
self.manage_open_position(position_volume, open_price)
return
# 周期性地计算HVNs
if self._bar_counter % self.recalc_interval == 1:
profile_bars = bar_history[-self.profile_period:]
dist = compute_price_volume_distribution(profile_bars, self.tick_size)
if dist is not None and not dist.empty:
self._cached_hvns = find_hvns_with_distance(dist, self.hvn_distance_ticks)
self.log(f"识别到新的高价值节点: {[f'{p:.2f}' for p in self._cached_hvns]}")
if not self._cached_hvns: return
# 评估新机会 (挂单逻辑)
self.evaluate_entry_signal(bar_history)
def manage_open_position(self, volume: int, current_price: float):
"""
[修改] 主动管理已开仓位的止盈止损。
不再使用 position_meta直接依赖实例变量。
"""
# [关键安全检查]: 如果有持仓,但却没有止盈止损状态,这是一个危险的信号。
# 可能是状态文件损坏或逻辑错误。为控制风险,应立即平仓。
if self._pending_sl_price is None or self._pending_tp_price is None:
self.log("风险警告:存在持仓但无有效的止盈止损价格,立即市价平仓!")
self.close_position("CLOSE_LONG" if volume > 0 else "CLOSE_SHORT", abs(volume))
return
sl_price = self._pending_sl_price
tp_price = self._pending_tp_price
# 止盈止损逻辑 (保持不变)
if volume > 0: # 多头
if current_price <= sl_price or current_price >= tp_price:
action = "止损" if current_price <= sl_price else "止盈"
self.log(f"多头{action}触发于 {current_price:.2f} (SL: {sl_price}, TP: {tp_price})")
self.close_position("CLOSE_LONG", abs(volume))
elif volume < 0: # 空头
if current_price >= sl_price or current_price <= tp_price:
action = "止损" if current_price >= sl_price else "止盈"
self.log(f"空头{action}触发于 {current_price:.2f} (SL: {sl_price}, TP: {tp_price})")
self.close_position("CLOSE_SHORT", abs(volume))
def evaluate_entry_signal(self, bar_history: List[Bar]):
# [修改] 在挂单前先重置旧的挂单状态虽然on_open_bar开头也做了但这里更保险
self._pending_sl_price = None
self._pending_tp_price = None
# ... 原有挂单信号计算逻辑保持不变 ...
prev_close = bar_history[-2].close
current_close = bar_history[-1].close
highs = np.array([b.high for b in bar_history], dtype=float)
lows = np.array([b.low for b in bar_history], dtype=float)
closes = np.array([b.close for b in bar_history], dtype=float)
current_atr = talib.ATR(highs, lows, closes, self.atr_period)[-1]
if current_atr < self.tick_size: return
for hvn in sorted(self._cached_hvns):
if "BUY" in self.order_direction and (prev_close < hvn < current_close):
direction = "BUY"
pass_filter = self.indicator_long is None or self.indicator_long.is_condition_met(
*self.get_indicator_tuple())
elif "SELL" in self.order_direction and (prev_close > hvn > current_close):
direction = "SELL"
pass_filter = self.indicator_short is None or self.indicator_short.is_condition_met(
*self.get_indicator_tuple())
else:
continue
if direction and pass_filter:
offset = self.entry_offset_atr * current_atr
limit_price = hvn + offset if direction == "BUY" else hvn - offset
self.log(f"价格穿越HVN({hvn:.2f})。在 {limit_price:.2f} 挂限价{direction}单。")
self.send_hvn_limit_order(direction, limit_price, current_atr)
return
def send_hvn_limit_order(self, direction: str, limit_price: float, entry_atr: float):
# 1. 设置实例的止盈止损状态
self._pending_sl_price = limit_price - self.stop_loss_atr * entry_atr if direction == "BUY" else limit_price + self.stop_loss_atr * entry_atr
self._pending_tp_price = limit_price + self.take_profit_atr * entry_atr if direction == "BUY" else limit_price - self.take_profit_atr * entry_atr
# 2. [新增] 状态已更新,立即通过上下文持久化
self.context.save_state(self._get_state_dict())
self.log(f"状态已更新并保存: SL={self._pending_sl_price}, TP={self._pending_tp_price}")
# 3. 发送订单
order_id = f"{self.symbol}_{direction}_LIMIT_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=self.trade_volume,
price_type="LIMIT", limit_price=limit_price, submitted_time=self.get_current_time(),
offset="OPEN"
)
self.send_order(order)
def close_position(self, direction: str, volume: int):
"""[修改] 平仓时,必须清空状态并立即保存。"""
# 1. 发送平仓市价单
self.send_market_order(direction, volume)
# 2. 清空本地的止盈止损状态
self._pending_sl_price = None
self._pending_tp_price = None
# 3. [新增] 状态已清空,立即通过上下文持久化这个“空状态”
self.context.save_state(self._get_state_dict())
self.log("持仓已平,相关的止盈止损状态已清空并保存。")
def send_market_order(self, direction: str, volume: int, offset: str = "CLOSE"):
# ... 此辅助函数保持不变 ...
order_id = f"{self.symbol}_{direction}_{offset}_{self.get_current_time().strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
order = Order(
id=order_id, symbol=self.symbol, direction=direction, volume=volume,
price_type="MARKET", submitted_time=self.get_current_time(), offset=offset
)
self.send_order(order)