tqsdk_real_engine更新,修复实盘无法换月的bug

This commit is contained in:
2025-12-15 22:08:21 +08:00
parent fda62556eb
commit d10df610e0

View File

@@ -330,6 +330,7 @@ class TqsdkEngine:
kline_row = self.klines.iloc[-i]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
self._last_underlying_symbol = kline_row.symbol
self.main(kline_row, self.klines.iloc[-i - 1])
print(
@@ -350,6 +351,7 @@ class TqsdkEngine:
new_bar = False
if is_trading_time:
self._check_roll_over()
kline_row = self.klines.iloc[-1]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
@@ -375,11 +377,9 @@ class TqsdkEngine:
# Tqsdk API 的 wait_update() 确保数据更新
self._api.wait_update()
if self.roll_over_mode and (
self._api.is_changing(self.quote, "underlying_symbol")
or self._last_underlying_symbol != self.quote.underlying_symbol
):
self._last_underlying_symbol = self.quote.underlying_symbol
self._last_underlying_symbol = self.quote.underlying_symbol
self._check_roll_over()
if new_bar and (last_min_k is None or last_min_k.datetime != self.klines_1min.iloc[-1].datetime):
last_min_k = self.klines_1min.iloc[-1]
@@ -412,14 +412,6 @@ class TqsdkEngine:
# 等待到整点-00 或 30 分且秒>1
now = datetime.now()
# if now.second <= 58:
# while True:
# now = datetime.now()
# if now.second >= 58:
# break
# self._api.wait_update()
# print(f'TqEngine:self.klines.iloc[-2].volume: {self.klines.iloc[-2].volume}')
# print(f'TqEngine:self.klines.iloc[-1].volume: {self.klines.iloc[-1].volume}')
while self.klines.iloc[-1].volume <= 0:
self._api.wait_update()
@@ -469,6 +461,90 @@ class TqsdkEngine:
# 处理订单和取消请求
self._process_queued_requests()
def _check_roll_over(self, timeout_seconds: int = 300):
"""
[增强版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。
Args:
timeout_seconds (int): 移仓换月的最大等待时间(秒)。
如果超过此时长仍未成功,将抛出异常。
"""
if not self._strategy.trading:
return
current_positions = self._context.get_current_positions()
if not current_positions:
return
current_dominant_symbol = self.quote.underlying_symbol
# 使用 list() 避免在迭代过程中修改字典
for position_symbol, quantity in list(current_positions.items()):
# 如果存在持仓 (quantity != 0) 且持仓合约不是当前的主力合约
if quantity != 0 and position_symbol != current_dominant_symbol:
print("=" * 60)
print(f"TqsdkEngine: [换月开始] 检测到持仓需要换月!")
print(f" - 旧合约: {position_symbol} ({quantity} 手)")
print(f" - 新主力合约: {current_dominant_symbol}")
print(f" - 开始执行阻塞式移仓,超时时间: {timeout_seconds} 秒...")
print("=" * 60)
start_time = time.monotonic()
# 步骤 1: 确保 TargetPosTask 已为新旧合约初始化
if position_symbol not in self.target_pos_dict:
self.target_pos_dict[position_symbol] = TargetPosTask(self._api, position_symbol)
if current_dominant_symbol not in self.target_pos_dict:
self.target_pos_dict[current_dominant_symbol] = TargetPosTask(self._api, current_dominant_symbol)
# 步骤 2: 发送移仓指令
# TargetPosTask 是幂等的,重复调用 set_target_volume 不会产生副作用
self.target_pos_dict[position_symbol].set_target_volume(0)
self.target_pos_dict[current_dominant_symbol].set_target_volume(quantity)
print(" - [移仓指令已发送] 平仓旧合约,开仓新合约。")
# 步骤 3: 进入等待循环,直到换月成功或超时
while True:
# 3.1 检查是否超时
if time.monotonic() - start_time > timeout_seconds:
# 如果超时,记录当前状态并抛出致命异常
latest_positions = self._context.get_current_positions()
error_msg = (
f"TqsdkEngine: [换月失败] 移仓操作超时 ({timeout_seconds}秒)\n"
f" - 目标: 从 {position_symbol} 移仓到 {current_dominant_symbol}\n"
f" - 当前旧合约持仓: {latest_positions.get(position_symbol, 'N/A')}\n"
f" - 当前新合约持仓: {latest_positions.get(current_dominant_symbol, 'N/A')}\n"
" - 策略将停止,请手动检查账户状态!"
)
print(error_msg)
raise TimeoutError(error_msg)
# 3.2 等待 API 更新
self._api.wait_update()
# 3.3 获取最新的持仓状态
latest_positions = self._context.get_current_positions()
old_pos_volume = latest_positions.get(position_symbol, 0)
new_pos_volume = latest_positions.get(current_dominant_symbol, 0)
# 3.4 检查成功条件
if old_pos_volume == 0 and new_pos_volume == quantity:
print("-" * 60)
print(f"TqsdkEngine: [换月成功] 移仓操作已确认完成。")
print(f" - 旧合约 {position_symbol} 持仓: {old_pos_volume}")
print(f" - 新合约 {current_dominant_symbol} 持仓: {new_pos_volume}")
print("-" * 60)
# 步骤 4: 通知策略层,换月已完成
if hasattr(self._strategy, "on_rollover"):
self._strategy.on_rollover(position_symbol, current_dominant_symbol)
# 成功后跳出内层循环
break
def main(self, kline_row, prev_kline_row):
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
@@ -496,28 +572,11 @@ class TqsdkEngine:
self.last_processed_bar = last_bar
if (
self.roll_over_mode
and self.last_processed_bar is not None
and self._last_underlying_symbol != self.last_processed_bar.symbol
and self._strategy.trading
):
self._is_rollover_bar = True
print(
f"TqsdkEngine: 检测到换月信号!从 {self._last_underlying_symbol} 切换到 {self.quote.underlying_symbol}"
)
self._close_all_positions_at_end()
self._strategy.cancel_all_pending_orders()
self._strategy.on_rollover(
self.last_processed_bar.symbol, self._last_underlying_symbol
)
else:
self._strategy.on_open_bar(kline_row.open, self._last_underlying_symbol)
# 处理订单和取消请求
if self._strategy.trading is True:
self._process_queued_requests()
self._strategy.on_open_bar(kline_row.open, self._last_underlying_symbol)
# 处理订单和取消请求
if self._strategy.trading is True:
self._process_queued_requests()
self.partial_bar = Bar(
datetime=kline_dt,