fix:修复实盘缺少早盘第一根k线的bug

This commit is contained in:
2026-04-12 00:59:32 +08:00
parent a6aced2308
commit fd0708ecb8

View File

@@ -5,6 +5,7 @@ from datetime import date, datetime, timedelta
from typing import Literal, Type, Dict, Any, List, Optional from typing import Literal, Type, Dict, Any, List, Optional
import pandas as pd import pandas as pd
import time import time
import math
# 导入你提供的 core_data 中的类型 # 导入你提供的 core_data 中的类型
from src.common_utils import ( from src.common_utils import (
@@ -43,30 +44,17 @@ class TqsdkEngine:
""" """
def __init__( def __init__(
self, self,
strategy_class: Type[Strategy], strategy_class: Type[Strategy],
strategy_params: Dict[str, Any], strategy_params: Dict[str, Any],
api: TqApi, api: TqApi,
roll_over_mode: bool = False, # 是否开启换月模式检测 roll_over_mode: bool = False, # 是否开启换月模式检测
symbol: str = None, symbol: str = None,
duration_seconds: int = 1, duration_seconds: int = 1,
history_length: int = 50, history_length: int = 50,
close_bar_delta: timedelta = None, close_bar_delta: timedelta = None,
): ):
""" """初始化 Tqsdk 回测引擎。"""
初始化 Tqsdk 回测引擎。
Args:
strategy_class (Type[Strategy]): 策略类。
strategy_params (Dict[str, Any]): 传递给策略的参数字典。
data_path (str): 本地 K 线数据文件路径,用于 TqSim 加载。
initial_capital (float): 初始资金。
slippage_rate (float): 交易滑点率(在 Tqsdk 中通常需要手动实现或通过费用设置)。
commission_rate (float): 交易佣金率(在 Tqsdk 中通常需要手动实现或通过费用设置)。
roll_over_mode (bool): 是否启用换月检测。
start_time (Optional[datetime]): 回测开始时间。
end_time (Optional[datetime]): 回测结束时间。
"""
self.strategy_class = strategy_class self.strategy_class = strategy_class
self.strategy_params = strategy_params self.strategy_params = strategy_params
self.roll_over_mode = roll_over_mode self.roll_over_mode = roll_over_mode
@@ -74,37 +62,23 @@ class TqsdkEngine:
self.close_bar_delta = close_bar_delta self.close_bar_delta = close_bar_delta
self.next_close_time = None self.next_close_time = None
# Tqsdk API 和模拟器
# 这里使用 file_path 参数指定本地数据文件
self._api: TqApi = api self._api: TqApi = api
# 从策略参数中获取主symbolTqsdkContext 需要知道它
# self.symbol: str = strategy_params.get("symbol")
# if not self.symbol:
# raise ValueError("strategy_params 必须包含 'symbol' 字段")
self.symbol = symbol self.symbol = symbol
self.product_id = self.symbol.split("@")[1] self.product_id = self.symbol.split("@")[1]
self.is_checked_rollover = False self.is_checked_rollover = False
# 获取 K 线数据Tqsdk 自动处理)
# 这里假设策略所需 K 线周期在 strategy_params 中否则默认60秒1分钟K线
self.bar_duration_seconds: int = strategy_params.get("bar_duration_seconds", 60) self.bar_duration_seconds: int = strategy_params.get("bar_duration_seconds", 60)
# self._main_kline_serial = self._api.get_kline_serial(
# self.symbol, self.bar_duration_seconds
# )
# 初始化上下文 # 初始化上下文
identifier = generate_strategy_identifier(strategy_class, strategy_params) identifier = generate_strategy_identifier(strategy_class, strategy_params)
self._context: TqsdkContext = TqsdkContext( self._context: TqsdkContext = TqsdkContext(
api=self._api, state_repository=JsonFileStateRepository(identifier) api=self._api, state_repository=JsonFileStateRepository(identifier)
) # 实例化策略,并将上下文传递给它 )
self._strategy: Strategy = self.strategy_class( self._strategy: Strategy = self.strategy_class(
context=self._context, **self.strategy_params context=self._context, **self.strategy_params
) )
self._context.set_engine( self._context.set_engine(self)
self
) # 将引擎自身传递给上下文,以便 Context 可以访问引擎属性
self.portfolio_snapshots: List[PortfolioSnapshot] = [] self.portfolio_snapshots: List[PortfolioSnapshot] = []
self.trade_history: List[Trade] = [] self.trade_history: List[Trade] = []
@@ -120,8 +94,8 @@ class TqsdkEngine:
self._is_rollover_bar: bool = False # 换月信号 self._is_rollover_bar: bool = False # 换月信号
self._last_underlying_symbol = self.symbol # 用于检测主力合约换月 self._last_underlying_symbol = self.symbol # 用于检测主力合约换月
# 获取行情与K线序列
self.now = None self.now = None
self.quote = None
self.quote = api.get_quote(symbol) self.quote = api.get_quote(symbol)
self.klines = api.get_kline_serial( self.klines = api.get_kline_serial(
self.quote.underlying_symbol, self.quote.underlying_symbol,
@@ -132,7 +106,6 @@ class TqsdkEngine:
self.partial_bar: Bar = None self.partial_bar: Bar = None
self.kline_row = None self.kline_row = None
self.target_pos_dict = {} self.target_pos_dict = {}
# 边界检测状态记录上一根bar结束时是否有持仓 # 边界检测状态记录上一根bar结束时是否有持仓
@@ -142,39 +115,34 @@ class TqsdkEngine:
@property @property
def is_rollover_bar(self) -> bool: def is_rollover_bar(self) -> bool:
""" """属性:判断当前 K 线是否为换月 K 线(即检测到主力合约切换)。"""
属性:判断当前 K 线是否为换月 K 线(即检测到主力合约切换)。
"""
return self._is_rollover_bar return self._is_rollover_bar
def _process_queued_requests(self): def _process_queued_requests(self):
""" """异步处理 Context 中排队的订单和取消请求。"""
异步处理 Context 中排队的订单和取消请求。
"""
# 处理订单 # 处理订单
while self._context.order_queue: while self._context.order_queue:
order_to_send: Order = self._context.order_queue.popleft() order_to_send: Order = self._context.order_queue.popleft()
print(f"Engine: 处理订单请求: {order_to_send}") print(f"Engine: 处理订单请求: {order_to_send}")
# 映射 core_data.Order 到 Tqsdk 的订单参数
tqsdk_direction = "" tqsdk_direction = ""
tqsdk_offset = "" tqsdk_offset = ""
if order_to_send.direction == "BUY": if order_to_send.direction == "BUY":
tqsdk_direction = "BUY" tqsdk_direction = "BUY"
tqsdk_offset = order_to_send.offset or "OPEN" # 默认开仓 tqsdk_offset = order_to_send.offset or "OPEN"
elif order_to_send.direction == "SELL": elif order_to_send.direction == "SELL":
tqsdk_direction = "SELL" tqsdk_direction = "SELL"
tqsdk_offset = order_to_send.offset or "OPEN" # 默认开仓 tqsdk_offset = order_to_send.offset or "OPEN"
elif order_to_send.direction == "CLOSE_LONG": elif order_to_send.direction == "CLOSE_LONG":
tqsdk_direction = "SELL" tqsdk_direction = "SELL"
tqsdk_offset = order_to_send.offset or "CLOSE" # 平多,默认平仓 tqsdk_offset = order_to_send.offset or "CLOSE"
elif order_to_send.direction == "CLOSE_SHORT": elif order_to_send.direction == "CLOSE_SHORT":
tqsdk_direction = "BUY" tqsdk_direction = "BUY"
tqsdk_offset = order_to_send.offset or "CLOSE" # 平空,默认平仓 tqsdk_offset = order_to_send.offset or "CLOSE"
else: else:
print(f"Engine: 未知订单方向: {order_to_send.direction}") print(f"Engine: 未知订单方向: {order_to_send.direction}")
continue # 跳过此订单 continue
if "SHFE" in order_to_send.symbol: if "SHFE" in order_to_send.symbol:
tqsdk_offset = "OPEN" tqsdk_offset = "OPEN"
@@ -194,10 +162,7 @@ class TqsdkEngine:
self.target_pos_dict[order_to_send.symbol] = TargetPosTask( self.target_pos_dict[order_to_send.symbol] = TargetPosTask(
self._api, order_to_send.symbol self._api, order_to_send.symbol
) )
self.target_pos_dict[order_to_send.symbol].set_target_volume(target_volume)
self.target_pos_dict[order_to_send.symbol].set_target_volume(
target_volume
)
else: else:
try: try:
tq_order = self._api.insert_order( tq_order = self._api.insert_order(
@@ -205,11 +170,9 @@ class TqsdkEngine:
direction=tqsdk_direction, direction=tqsdk_direction,
offset=tqsdk_offset, offset=tqsdk_offset,
volume=order_to_send.volume, volume=order_to_send.volume,
# Tqsdk 市价单 limit_price 设为 None限价单则传递价格
limit_price=( limit_price=(
order_to_send.limit_price order_to_send.limit_price
if order_to_send.price_type == "LIMIT" if order_to_send.price_type == "LIMIT"
# else self.quote.bid_price1 + (1 if tqsdk_direction == "BUY" else -1)
else ( else (
self.quote.bid_price1 self.quote.bid_price1
if tqsdk_direction == "SELL" if tqsdk_direction == "SELL"
@@ -217,19 +180,13 @@ class TqsdkEngine:
) )
), ),
) )
# 更新原始 Order 对象与 Tqsdk 的订单ID和状态
order_to_send.id = tq_order.order_id order_to_send.id = tq_order.order_id
# order_to_send.order_id = tq_order.order_id
# order_to_send.status = tq_order.status
order_to_send.submitted_time = pd.to_datetime( order_to_send.submitted_time = pd.to_datetime(
tq_order.insert_date_time, unit="ns", utc=True tq_order.insert_date_time, unit="ns", utc=True
) )
self._api.wait_update()
self._api.wait_update() # 等待一次更新
except Exception as e: except Exception as e:
print(f"Engine: 发送订单 {order_to_send.id} 失败: {e}") print(f"Engine: 发送订单 {order_to_send.id} 失败: {e}")
# order_to_send.status = "ERROR"
# 处理取消请求 # 处理取消请求
while self._context.cancel_queue: while self._context.cancel_queue:
@@ -239,225 +196,166 @@ class TqsdkEngine:
if tq_order_to_cancel and tq_order_to_cancel.status == "ALIVE": if tq_order_to_cancel and tq_order_to_cancel.status == "ALIVE":
try: try:
self._api.cancel_order(tq_order_to_cancel) self._api.cancel_order(tq_order_to_cancel)
self._api.wait_update() # 等待取消确认 self._api.wait_update()
print( print(f"Engine: 订单 {order_id_to_cancel} 已尝试取消。当前状态: {tq_order_to_cancel.status}")
f"Engine: 订单 {order_id_to_cancel} 已尝试取消。当前状态: {tq_order_to_cancel.status}"
)
except Exception as e: except Exception as e:
print(f"Engine: 取消订单 {order_id_to_cancel} 失败: {e}") print(f"Engine: 取消订单 {order_id_to_cancel} 失败: {e}")
else: else:
print( print(f"Engine: 订单 {order_id_to_cancel} 不存在或已非活动状态,无法取消。")
f"Engine: 订单 {order_id_to_cancel} 不存在或已非活动状态,无法取消。"
)
def _record_portfolio_snapshot(self, current_time: datetime): def _record_portfolio_snapshot(self, current_time: datetime):
""" """记录当前投资组合的快照。"""
记录当前投资组合的快照。
"""
account: TqAccount = self._api.get_account() account: TqAccount = self._api.get_account()
current_positions = self._context.get_current_positions() current_positions = self._context.get_current_positions()
# 计算当前持仓市值
total_market_value = 0.0 total_market_value = 0.0
current_prices: Dict[str, float] = {} current_prices: Dict[str, float] = {}
for symbol, qty in current_positions.items(): for symbol, qty in current_positions.items():
# 获取当前合约的最新价格
quote = self._api.get_quote(symbol) quote = self._api.get_quote(symbol)
if quote.last_price: # 确保价格是最近的 if quote.last_price:
price = quote.last_price price = quote.last_price
current_prices[symbol] = price current_prices[symbol] = price
total_market_value += ( total_market_value += price * qty * quote.volume_multiple
price * qty * quote.volume_multiple
) # volume_multiple 乘数
else: else:
# 如果没有最新价格使用最近的K线收盘价作为估算
# 在实盘或连续回测中,通常会有最新的行情
print(f"警告: 未获取到 {symbol} 最新价格,可能影响净值计算。") print(f"警告: 未获取到 {symbol} 最新价格,可能影响净值计算。")
# 可以尝试从 K 线获取最近价格
kline = self._api.get_kline_serial(symbol, self.bar_duration_seconds) kline = self._api.get_kline_serial(symbol, self.bar_duration_seconds)
if not kline.empty: if not kline.empty:
last_kline = kline.iloc[-2] last_kline = kline.iloc[-2]
price = last_kline.close price = last_kline.close
current_prices[symbol] = price current_prices[symbol] = price
total_market_value += ( total_market_value += price * qty * self._api.get_instrument(symbol).volume_multiple
price * qty * self._api.get_instrument(symbol).volume_multiple
) # 使用 instrument 的乘数
total_value = (
account.available + account.frozen_margin + total_market_value
) # Tqsdk 的 balance 已包含持仓市值和冻结资金
# Tqsdk 的 total_profit/balance 已经包含了所有盈亏和资金
snapshot = PortfolioSnapshot( snapshot = PortfolioSnapshot(
datetime=current_time, datetime=current_time,
total_value=account.balance, # Tqsdk 的 balance 包含了可用资金、冻结保证金和持仓市值 total_value=account.balance,
cash=account.available, cash=account.available,
positions=current_positions, positions=current_positions,
price_at_snapshot=current_prices, price_at_snapshot=current_prices,
) )
self.portfolio_snapshots.append(snapshot) self.portfolio_snapshots.append(snapshot)
def _close_all_positions_at_end(self):
"""
回测结束时,平掉所有剩余持仓。
"""
current_positions = self._context.get_current_positions()
if not current_positions:
print("回测结束:没有需要平仓的持仓。")
return
print("回测结束:开始平仓所有剩余持仓...")
for symbol, qty in current_positions.items():
order_direction: Literal["BUY", "SELL"]
if qty > 0: # 多头持仓,卖出平仓
order_direction = "SELL"
else: # 空头持仓,买入平仓
order_direction = "BUY"
TargetPosTask(self._api, symbol).set_target_volume(0)
# # 使用市价单快速平仓
# tq_order = self._api.insert_order(
# symbol=symbol,
# direction=order_direction,
# offset="CLOSE", # 平仓
# volume=abs(qty),
# limit_price=self
# )
# print(f"平仓订单已发送: {symbol} {order_direction} {abs(qty)} 手")
# 等待订单完成
# while tq_order.status == "ALIVE":
# self._api.wait_update()
# if tq_order.status == "FINISHED":
# print(f"订单 {tq_order.order_id} 平仓完成。")
# else:
# print(f"订单 {tq_order.order_id} 平仓失败或未完成,状态: {tq_order.status}")
def _run_async(self): def _run_async(self):
""" """
异步运行回测的主循环。 异步运行回测的主循环。
包含三个核心阶段:历史预热阶段、实盘状态对齐阶段、实盘轮询监听阶段。
""" """
print(f"TqsdkEngine: 开始加载历史数据加载k线数量{self.history_length}") print(f"TqsdkEngine: 开始加载历史数据加载k线数量{self.history_length}")
self._strategy.trading = False self._strategy.trading = False
self._strategy.real_trading = True self._strategy.real_trading = True
# ==============================================================================
# [阶段 1] 分析当前环境,执行历史数据预热
# ==============================================================================
is_trading_time = is_futures_trading_time() is_trading_time = is_futures_trading_time()
now_dt = datetime.now(pd.Timestamp.utcnow().tz_convert(BEIJING_TZ).tzinfo)
last_kline_dt = pd.to_datetime(self.klines.iloc[-1].datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
for i in range(self.history_length + 1, 0 if not is_trading_time else 1, -1): # 核心漏洞修复:判断最新一根 K 线 (iloc[-1]) 是否属于“未闭合的当前/未来 K 线”
# 1. 处于交易时间内 -> iloc[-1] 是正在跳动的当根 K 线
# 2. 最新 K 线的时间戳在未来 (如 08:58 启动时天勤提前下发的 09:00 K线)
has_unclosed_bar = is_trading_time or (last_kline_dt > now_dt)
# 如果存在未闭合的当根K线历史回放只需到 iloc[-2] (即最后一根完整闭合K线)
# 否则,回放到 iloc[-1] (当前所有K线均已闭合如周末或夜盘收盘后)
warmup_stop_index = 2 if has_unclosed_bar else 1
for i in range(self.history_length + 1, warmup_stop_index - 1, -1):
kline_row = self.klines.iloc[-i] kline_row = self.klines.iloc[-i]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
self._last_underlying_symbol = kline_row.symbol self._last_underlying_symbol = kline_row.symbol
# main 函数内部会自动暂存当前 bar 并吐出前一根 bar
self.main(kline_row, self.klines.iloc[-i - 1]) self.main(kline_row, self.klines.iloc[-i - 1])
print( print(
f"TqsdkEngine: 加载历史k线完成, bars数量:{len(self.all_bars)},last bar datetime:{self.all_bars[-1].datetime}" f"TqsdkEngine: 加载历史k线完成, bars数量:{len(self.all_bars)}, last bar datetime:{self.all_bars[-1].datetime}")
)
self._strategy.trading = True self._strategy.trading = True
self._last_underlying_symbol = self.quote.underlying_symbol self._last_underlying_symbol = self.quote.underlying_symbol
print( print(
f"TqsdkEngine: self._last_underlying_symbol:{self._last_underlying_symbol}, is_trading_time:{is_trading_time}" f"TqsdkEngine: self._last_underlying_symbol:{self._last_underlying_symbol}, is_trading_time:{is_trading_time}")
)
# 初始化边界检测状态:根据实际持仓设置(处理引擎重启情况) # ==============================================================================
# [阶段 2] 实盘状态初始化与游标对齐
# ==============================================================================
current_positions = self._context.get_current_positions() current_positions = self._context.get_current_positions()
self.prev_bar_had_position = ( self.prev_bar_had_position = (current_positions.get(self.quote.underlying_symbol, 0) != 0)
current_positions.get(self.quote.underlying_symbol, 0) != 0 print(f"TqsdkEngine: 边界检测状态初始化完成prev_bar_had_position={self.prev_bar_had_position}")
)
print(
f"TqsdkEngine: 边界检测状态初始化完成prev_bar_had_position={self.prev_bar_had_position}"
)
# 初始化策略 (如果策略有 on_init 方法)
if hasattr(self._strategy, "on_init"): if hasattr(self._strategy, "on_init"):
self._strategy.on_init() self._strategy.on_init()
new_bar = False new_bar = False
if is_trading_time: if is_trading_time:
# 盘中重启处理:直接处理当前未闭合的 K 线,恢复策略状态
if not self.is_checked_rollover: if not self.is_checked_rollover:
self._check_roll_over() self._check_roll_over()
self.is_checked_rollover = True self.is_checked_rollover = True
kline_row = self.klines.iloc[-1] kline_row = self.klines.iloc[-1]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
print(f"TqsdkEngine: 当前是交易时间处理最新一根k线datetime:{kline_dt}") print(f"TqsdkEngine: 当前是交易时间处理最新一根k线datetime:{kline_dt}")
self._check_boundary_and_close() # 边界检测上一根bar新开仓且触及边界价则平仓 self._check_boundary_and_close()
self.main(self.klines.iloc[-1], self.klines.iloc[-2]) self.main(self.klines.iloc[-1], self.klines.iloc[-2])
new_bar = True new_bar = True
kline_row = self.klines.iloc[-1] # 游标对齐至当前正在跳动的 K 线,等待下一根
self.kline_row = kline_row self.kline_row = self.klines.iloc[-1]
else:
# 迭代 K 线数据 # 盘前或非交易时间启动:
# 使用 self._api.get_kline_serial 获取到的 K 线是 Pandas DataFrame # 若底层预生成了集合竞价K线游标必须指向上一个【真实闭合】的K线
# 直接迭代其行Bar更符合回测逻辑 # 只有这样,当 09:00:00 真正到达时,引擎才能判定 (上一根K线 != 09:00) 从而触发交易逻辑
if has_unclosed_bar:
self.kline_row = self.klines.iloc[-2]
else:
self.kline_row = self.klines.iloc[-1]
for bar in self.all_bars[-5:]: for bar in self.all_bars[-5:]:
print(bar) print(bar)
print( print(f"TqsdkEngine: 开始等待最新数据, all bars -1:{self.all_bars[-1].datetime}")
f"TqsdkEngine: 开始等待最新数据, all bars -1:{self.all_bars[-1].datetime}"
)
last_min_k = None last_min_k = None
while True:
# Tqsdk API 的 wait_update() 确保数据更新
self._api.wait_update()
# ==============================================================================
# [阶段 3] 实盘核心轮询循环
# ==============================================================================
while True:
self._api.wait_update()
self._last_underlying_symbol = self.quote.underlying_symbol self._last_underlying_symbol = self.quote.underlying_symbol
if not self.is_checked_rollover: if not self.is_checked_rollover:
self._check_roll_over() self._check_roll_over()
self.is_checked_rollover = True self.is_checked_rollover = True
if new_bar and ( # --- 处理即将收盘的情况 (Close Bar 逻辑) ---
last_min_k is None # 必须在当根K线确认开启(new_bar=True)的情况下,监控最新 1分钟线 变动
or last_min_k.datetime != self.klines_1min.iloc[-1].datetime if new_bar and (last_min_k is None or last_min_k.datetime != self.klines_1min.iloc[-1].datetime):
):
last_min_k = self.klines_1min.iloc[-1] last_min_k = self.klines_1min.iloc[-1]
if self.kline_row is not None: if self.kline_row is not None:
kline_dt = pd.to_datetime( kline_dt = pd.to_datetime(self.kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
self.kline_row.datetime, unit="ns", utc=True is_close_bar = is_bar_pre_close_period(kline_dt, int(self.kline_row.duration), pre_close_minutes=1)
)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
is_close_bar = is_bar_pre_close_period(
kline_dt, int(self.kline_row.duration), pre_close_minutes=1
)
if is_close_bar: if is_close_bar:
print( print(f"TqsdkEngine: close bar, kline_dt:{kline_dt}, now: {datetime.now()}")
f"TqsdkEngine: close bar, kline_dt:{kline_dt}, now: {datetime.now()}"
)
self.close_bar(self.kline_row) self.close_bar(self.kline_row)
new_bar = False new_bar = False
# if self._api.is_changing(self.klines.iloc[-1], "open"): # --- 检测新 K 线产生 ---
# print(f"TqsdkEngine: open change!, open:{self.klines.iloc[-1].open}, now: {datetime.now()}") # 如果本地记录的 kline_row 与天勤下发的最新 K 线时间不一致,说明发生了周期切换
if self.kline_row is None or self.kline_row.datetime != self.klines.iloc[-1].datetime:
if (
self.kline_row is None
or self.kline_row.datetime != self.klines.iloc[-1].datetime
):
# 到这里一定满足“整点-00/30 且秒>1”
kline_row = self.klines.iloc[-1] kline_row = self.klines.iloc[-1]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
# 等待到整点-00 或 30 分且秒>1
now = datetime.now()
# 必须等待有真实成交量产生,防止被无效的初始化数据欺骗
while self.klines.iloc[-1].volume <= 0: while self.klines.iloc[-1].volume <= 0:
self._api.wait_update() self._api.wait_update()
# 原有逻辑:强制对齐系统时钟,等待整点/特定分钟 (针对策略的特定需求)
while True: while True:
now = datetime.now() now = datetime.now()
minute = now.minute minute = now.minute
@@ -465,28 +363,23 @@ class TqsdkEngine:
hour = now.hour hour = now.hour
if (minute % 5 == 0) and (second >= 0) and hour != 8 and hour != 20: if (minute % 5 == 0) and (second >= 0) and hour != 8 and hour != 20:
break break
# 小粒度休眠,防止 CPU 空转
self._api.wait_update() self._api.wait_update()
if ( # 二次确认:时间戳确实较上一次闭合的 Bar 发生了推进
kline_dt.hour != self.all_bars[-1].datetime.hour if (kline_dt.hour != self.all_bars[-1].datetime.hour or kline_dt.minute != self.all_bars[
or kline_dt.minute != self.all_bars[-1].datetime.minute -1].datetime.minute):
):
print( print(
f"TqsdkEngine: 新k线产生, k line datetime:{kline_dt}, now: {datetime.now()}, open: {self.klines.iloc[-1].open}" f"TqsdkEngine: 新k线产生, k line datetime:{kline_dt}, now: {datetime.now()}, open: {self.klines.iloc[-1].open}")
)
self.kline_row = self.klines.iloc[-1] self.kline_row = self.klines.iloc[-1]
self._check_boundary_and_close()
self._check_boundary_and_close() # 边界检测上一根bar新开仓且触及边界价则平仓
self.main(self.klines.iloc[-1], self.klines.iloc[-2]) self.main(self.klines.iloc[-1], self.klines.iloc[-2])
new_bar = True new_bar = True
def close_bar(self, kline_row): def close_bar(self, kline_row):
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) """处理 K 线即将闭合的收尾逻辑。"""
kline_dt = kline_dt.tz_convert(BEIJING_TZ) kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
if len(self.all_bars) > 0: if len(self.all_bars) > 0:
# 创建 core_data.Bar 对象
current_bar = Bar( current_bar = Bar(
datetime=kline_dt, datetime=kline_dt,
symbol=self._last_underlying_symbol, symbol=self._last_underlying_symbol,
@@ -503,53 +396,35 @@ class TqsdkEngine:
if self._strategy.trading is True: if self._strategy.trading is True:
self._strategy.on_close_bar(current_bar) self._strategy.on_close_bar(current_bar)
# 处理订单和取消请求
self._process_queued_requests() self._process_queued_requests()
def _check_roll_over(self, timeout_seconds: int = 120): def _check_roll_over(self, timeout_seconds: int = 120):
""" """
[最安全版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。 [最安全版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。
- 仅处理本引擎负责的品种 (self.product_id)。
- 完全忽略账户中其他品种的持仓。
Args:
timeout_seconds (int): 移仓换月的最大等待时间(秒)。
""" """
if not self._strategy.trading: if not self._strategy.trading:
return return
# 1. 获取当前市场最新的主力合约 (这是我们的目标合约)
current_dominant_symbol = self.quote.underlying_symbol current_dominant_symbol = self.quote.underlying_symbol
if not current_dominant_symbol: if not current_dominant_symbol:
# 在某些开盘瞬间可能获取不到,直接跳过本次检查
return return
# 2. 获取账户所有持仓
current_positions = self._context.get_current_positions() current_positions = self._context.get_current_positions()
if not current_positions: if not current_positions:
return return
# 3. 筛选出本引擎需要处理的、需要换月的旧合约持仓
# - 键: 旧合约代码 (e.g., "CZCE.FG605")
# - 值: 持仓数量 (e.g., 10 or -10)
old_contracts_to_rollover: Dict[str, int] = {} old_contracts_to_rollover: Dict[str, int] = {}
for pos_symbol, quantity in current_positions.items(): for pos_symbol, quantity in current_positions.items():
# 条件一: 是本引擎负责的品种 (e.g., "CZCE.FG605".startswith("CZCE.FG"))
# 条件二: 不是当前最新的主力合约
# 条件三: 有实际持仓
if ( if (
pos_symbol.startswith(self.product_id) pos_symbol.startswith(self.product_id)
and pos_symbol != current_dominant_symbol and pos_symbol != current_dominant_symbol
and quantity != 0 and quantity != 0
): ):
old_contracts_to_rollover[pos_symbol] = quantity old_contracts_to_rollover[pos_symbol] = quantity
# 如果没有需要处理的旧合约,直接返回
if not old_contracts_to_rollover: if not old_contracts_to_rollover:
return return
# 4. 如果检测到需要换月的持仓,则执行阻塞式移仓
total_target_quantity = sum(old_contracts_to_rollover.values()) total_target_quantity = sum(old_contracts_to_rollover.values())
print("=" * 70) print("=" * 70)
@@ -563,29 +438,20 @@ class TqsdkEngine:
start_time = time.monotonic() start_time = time.monotonic()
# 5. 发送所有移仓指令
# 5.1 平掉所有检测到的旧合约
for old_symbol in old_contracts_to_rollover.keys(): for old_symbol in old_contracts_to_rollover.keys():
if old_symbol not in self.target_pos_dict: if old_symbol not in self.target_pos_dict:
self.target_pos_dict[old_symbol] = TargetPosTask(self._api, old_symbol) self.target_pos_dict[old_symbol] = TargetPosTask(self._api, old_symbol)
self.target_pos_dict[old_symbol].set_target_volume(0) self.target_pos_dict[old_symbol].set_target_volume(0)
# 5.2 在新合约上建立合并后的总目标仓位
if current_dominant_symbol not in self.target_pos_dict: if current_dominant_symbol not in self.target_pos_dict:
self.target_pos_dict[current_dominant_symbol] = TargetPosTask( self.target_pos_dict[current_dominant_symbol] = TargetPosTask(
self._api, current_dominant_symbol self._api, current_dominant_symbol
) )
self.target_pos_dict[current_dominant_symbol].set_target_volume( self.target_pos_dict[current_dominant_symbol].set_target_volume(total_target_quantity)
total_target_quantity
)
print( print(f" - [移仓指令已发送] 正在处理 {len(old_contracts_to_rollover)} 个旧合约的平仓...")
f" - [移仓指令已发送] 正在处理 {len(old_contracts_to_rollover)} 个旧合约的平仓..."
)
# 6. 进入等待循环,直到所有换月操作完成或超时
while True: while True:
# 6.1 检查是否超时
if time.monotonic() - start_time > timeout_seconds: if time.monotonic() - start_time > timeout_seconds:
latest_positions = self._context.get_current_positions() latest_positions = self._context.get_current_positions()
error_msg = ( error_msg = (
@@ -598,93 +464,51 @@ class TqsdkEngine:
raise TimeoutError(error_msg) raise TimeoutError(error_msg)
self._api.wait_update() self._api.wait_update()
# 6.2 检查成功条件
latest_positions = self._context.get_current_positions() latest_positions = self._context.get_current_positions()
# 检查所有旧合约仓位是否已归零 all_old_cleared = all(latest_positions.get(s, 0) == 0 for s in old_contracts_to_rollover)
all_old_cleared = all( new_pos_correct = (latest_positions.get(current_dominant_symbol, 0) == total_target_quantity)
latest_positions.get(s, 0) == 0 for s in old_contracts_to_rollover
)
# 检查新合约仓位是否已达到目标
new_pos_correct = (
latest_positions.get(current_dominant_symbol, 0)
== total_target_quantity
)
if all_old_cleared and new_pos_correct: if all_old_cleared and new_pos_correct:
print("-" * 70) print("-" * 70)
print( print(f"TqsdkEngine ({self.product_id}): [换月成功] 移仓操作已确认完成。")
f"TqsdkEngine ({self.product_id}): [换月成功] 移仓操作已确认完成。"
)
print(f" - 所有旧合约持仓已清零。") print(f" - 所有旧合约持仓已清零。")
print( print(f" - 新合约 {current_dominant_symbol} 持仓: {total_target_quantity}")
f" - 新合约 {current_dominant_symbol} 持仓: {total_target_quantity}"
)
print("-" * 70) print("-" * 70)
# 6.3 通知策略层 (只需通知一次)
if hasattr(self._strategy, "on_rollover"): if hasattr(self._strategy, "on_rollover"):
# 传递第一个旧合约符号作为代表 representative_old_symbol = list(old_contracts_to_rollover.keys())[0]
representative_old_symbol = list(old_contracts_to_rollover.keys())[ self._strategy.on_rollover(representative_old_symbol, current_dominant_symbol)
0 break
]
self._strategy.on_rollover(
representative_old_symbol, current_dominant_symbol
)
break # 成功,跳出循环
def _check_boundary_and_close(self): def _check_boundary_and_close(self):
""" """
检查上一根bar是否新开仓且触及边界价如果是则平仓。 检查上一根bar是否新开仓且触及边界价如果是则平仓。
只在实盘循环中调用,避免预热阶段误平仓。 只在实盘循环中调用,避免预热阶段误平仓。
使用 self.klines.iloc[-2] 获取上一根K线数据。
""" """
import math
import time
from datetime import datetime
# 获取上一根K线数据
prev_kline = self.klines.iloc[-2] prev_kline = self.klines.iloc[-2]
current_positions = self._context.get_current_positions() current_positions = self._context.get_current_positions()
current_qty = current_positions.get(self._last_underlying_symbol, 0) current_qty = current_positions.get(self._last_underlying_symbol, 0)
# 检测是否是上一根bar新开仓上一根bar没有持仓但现在有持仓
if not self.prev_bar_had_position and current_qty != 0: if not self.prev_bar_had_position and current_qty != 0:
avg_price = self._context.get_average_position_price( avg_price = self._context.get_average_position_price(self._last_underlying_symbol)
self._last_underlying_symbol
)
if avg_price is not None: if avg_price is not None:
# 修复1使用 math.isclose 解决浮点数精度问题 is_long_boundary = (current_qty > 0) and math.isclose(avg_price, prev_kline.low, abs_tol=1e-5)
is_long_boundary = (current_qty > 0) and math.isclose( is_short_boundary = (current_qty < 0) and math.isclose(avg_price, prev_kline.high, abs_tol=1e-5)
avg_price, prev_kline.low, abs_tol=1e-5
)
is_short_boundary = (current_qty < 0) and math.isclose(
avg_price, prev_kline.high, abs_tol=1e-5
)
if is_long_boundary or is_short_boundary: if is_long_boundary or is_short_boundary:
direction = "CLOSE_LONG" if current_qty > 0 else "CLOSE_SHORT" direction = "CLOSE_LONG" if current_qty > 0 else "CLOSE_SHORT"
boundary_price = ( boundary_price = prev_kline.low if current_qty > 0 else prev_kline.high
prev_kline.low if current_qty > 0 else prev_kline.high
)
print("=" * 60) print("=" * 60)
print(f"🚨 [务实对齐] 触发边界防御机制!") print(f"🚨 [务实对齐] 触发边界防御机制!")
print( print(f" - 持仓方向: {'多仓' if current_qty > 0 else '空仓'} ({current_qty}手)")
f" - 持仓方向: {'多仓' if current_qty > 0 else '空仓'} ({current_qty}手)"
)
print(f" - 物理均价: {avg_price} == 上根K线极值: {boundary_price}") print(f" - 物理均价: {avg_price} == 上根K线极值: {boundary_price}")
print( print(f" - 说明: 极大概率在回测中不会成交,立即物理对齐以保护策略状态!")
f" - 说明: 极大概率在回测中不会成交,立即物理对齐以保护策略状态!"
)
print("=" * 60) print("=" * 60)
# 修复2补全 Order 必需参数,防止框架报错
close_order = Order( close_order = Order(
id=f"SYS_ALIGN_CLOSE_{int(time.time() * 1000)}", id=f"SYS_ALIGN_CLOSE_{int(time.time() * 1000)}",
symbol=self._last_underlying_symbol, symbol=self._last_underlying_symbol,
@@ -698,35 +522,30 @@ class TqsdkEngine:
self._context.send_order(close_order) self._context.send_order(close_order)
self._process_queued_requests() self._process_queued_requests()
# 修复3阻塞等待 TargetPosTask 执行完毕,防止策略复读 wait_timeout = time.monotonic() + 10
wait_timeout = time.monotonic() + 10 # 最大等待 10 秒
while True: while True:
self._api.wait_update() self._api.wait_update()
temp_qty = self._context.get_current_positions().get( temp_qty = self._context.get_current_positions().get(self._last_underlying_symbol, 0)
self._last_underlying_symbol, 0
)
if temp_qty == 0: if temp_qty == 0:
print("✅ [务实对齐] 平仓确认完成,账户已归零。") print("✅ [务实对齐] 平仓确认完成,账户已归零。")
current_qty = 0 # 同步本地状态 current_qty = 0
break break
if time.monotonic() > wait_timeout: if time.monotonic() > wait_timeout:
print( print("⚠️ [务实对齐] 警告: 平仓确认超时,策略可能陷入混乱状态!")
"⚠️ [务实对齐] 警告: 平仓确认超时,策略可能陷入混乱状态!"
)
break break
# 更新状态记录本根bar开始时的持仓情况供下一根bar检测使用
self.prev_bar_had_position = current_qty != 0 self.prev_bar_had_position = current_qty != 0
def main(self, kline_row, prev_kline_row): def main(self, kline_row, prev_kline_row):
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) """
kline_dt = kline_dt.tz_convert(BEIJING_TZ) 核心数据推入逻辑。
注意:此处使用了延迟机制,暂存当前传入的 kline_row并把上一根 K 线追加进历史序列。
"""
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ)
if self.partial_bar is not None: if self.partial_bar is not None:
last_bar = Bar( last_bar = Bar(
datetime=pd.to_datetime( datetime=pd.to_datetime(prev_kline_row.datetime, unit="ns", utc=True).tz_convert(BEIJING_TZ),
prev_kline_row.datetime, unit="ns", utc=True
).tz_convert(BEIJING_TZ),
symbol=self.partial_bar.symbol, symbol=self.partial_bar.symbol,
open=prev_kline_row.open, open=prev_kline_row.open,
high=prev_kline_row.high, high=prev_kline_row.high,
@@ -748,26 +567,17 @@ class TqsdkEngine:
self.last_processed_bar = last_bar self.last_processed_bar = last_bar
self._strategy.on_open_bar(kline_row.open, self._last_underlying_symbol) self._strategy.on_open_bar(kline_row.open, self._last_underlying_symbol)
# 处理订单和取消请求
if self._strategy.trading is True: if self._strategy.trading is True:
self._process_queued_requests() self._process_queued_requests()
self.partial_bar = Bar( self.partial_bar = Bar(
datetime=kline_dt, datetime=kline_dt,
symbol=self.quote.underlying_symbol, symbol=self.quote.underlying_symbol,
open=0, open=0, high=0, low=0, close=0, volume=0, open_oi=0, close_oi=0,
high=0,
low=0,
close=0,
volume=0,
open_oi=0,
close_oi=0,
) )
def run(self): def run(self):
""" """同步调用异步回测主循环。"""
同步调用异步回测主循环。
"""
try: try:
self._run_async() self._run_async()
except KeyboardInterrupt: except KeyboardInterrupt:
@@ -777,26 +587,16 @@ class TqsdkEngine:
print("TqsdkEngine: API 已关闭。") print("TqsdkEngine: API 已关闭。")
def get_results(self) -> Dict[str, Any]: def get_results(self) -> Dict[str, Any]:
""" """返回回测结果数据,供结果分析模块使用。"""
返回回测结果数据,供结果分析模块使用。
"""
final_portfolio_value = 0.0 final_portfolio_value = 0.0
if self.portfolio_snapshots: if self.portfolio_snapshots:
final_portfolio_value = self.portfolio_snapshots[-1].total_value final_portfolio_value = self.portfolio_snapshots[-1].total_value
# else:
# final_portfolio_value = self.initial_capital # 如果没有快照,则净值是初始资金
# total_return_percentage = (
# (final_portfolio_value - self.initial_capital) / self.initial_capital
# ) * 100 if self.initial_capital != 0 else 0.0
return { return {
"portfolio_snapshots": self.portfolio_snapshots, "portfolio_snapshots": self.portfolio_snapshots,
"trade_history": self.trade_history, "trade_history": self.trade_history,
# "initial_capital": self.initial_capital,
"all_bars": self.all_bars, "all_bars": self.all_bars,
"final_portfolio_value": final_portfolio_value, "final_portfolio_value": final_portfolio_value,
# "total_return_percentage": total_return_percentage,
} }
def get_bar_history(self): def get_bar_history(self):
@@ -813,4 +613,4 @@ class TqsdkEngine:
return self.low_list return self.low_list
elif key == "volume": elif key == "volume":
return self.volume_list return self.volume_list
return None return None