From d10df610e0a7a134bba4afaf52d048e780df337a Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Mon, 15 Dec 2025 22:08:21 +0800 Subject: [PATCH] =?UTF-8?q?tqsdk=5Freal=5Fengine=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E5=A4=8D=E5=AE=9E=E7=9B=98=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=8D=A2=E6=9C=88=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tqsdk_real_engine.py | 127 ++++++++++++++++++++++++++++----------- 1 file changed, 93 insertions(+), 34 deletions(-) diff --git a/src/tqsdk_real_engine.py b/src/tqsdk_real_engine.py index d67bd38..ec3fee2 100644 --- a/src/tqsdk_real_engine.py +++ b/src/tqsdk_real_engine.py @@ -330,6 +330,7 @@ class TqsdkEngine: kline_row = self.klines.iloc[-i] kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = kline_dt.tz_convert(BEIJING_TZ) + self._last_underlying_symbol = kline_row.symbol self.main(kline_row, self.klines.iloc[-i - 1]) print( @@ -350,6 +351,7 @@ class TqsdkEngine: new_bar = False if is_trading_time: + self._check_roll_over() kline_row = self.klines.iloc[-1] kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = kline_dt.tz_convert(BEIJING_TZ) @@ -375,11 +377,9 @@ class TqsdkEngine: # Tqsdk API 的 wait_update() 确保数据更新 self._api.wait_update() - if self.roll_over_mode and ( - self._api.is_changing(self.quote, "underlying_symbol") - or self._last_underlying_symbol != self.quote.underlying_symbol - ): - self._last_underlying_symbol = self.quote.underlying_symbol + self._last_underlying_symbol = self.quote.underlying_symbol + + self._check_roll_over() if new_bar and (last_min_k is None or last_min_k.datetime != self.klines_1min.iloc[-1].datetime): last_min_k = self.klines_1min.iloc[-1] @@ -412,14 +412,6 @@ class TqsdkEngine: # 等待到整点-00 或 30 分且秒>1 now = datetime.now() - # if now.second <= 58: - # while True: - # now = datetime.now() - # if now.second >= 58: - # break - # self._api.wait_update() - # print(f'TqEngine:self.klines.iloc[-2].volume: {self.klines.iloc[-2].volume}') - # print(f'TqEngine:self.klines.iloc[-1].volume: {self.klines.iloc[-1].volume}') while self.klines.iloc[-1].volume <= 0: self._api.wait_update() @@ -469,6 +461,90 @@ class TqsdkEngine: # 处理订单和取消请求 self._process_queued_requests() + + def _check_roll_over(self, timeout_seconds: int = 300): + """ + [增强版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。 + + Args: + timeout_seconds (int): 移仓换月的最大等待时间(秒)。 + 如果超过此时长仍未成功,将抛出异常。 + """ + if not self._strategy.trading: + return + + current_positions = self._context.get_current_positions() + if not current_positions: + return + + current_dominant_symbol = self.quote.underlying_symbol + + # 使用 list() 避免在迭代过程中修改字典 + for position_symbol, quantity in list(current_positions.items()): + # 如果存在持仓 (quantity != 0) 且持仓合约不是当前的主力合约 + if quantity != 0 and position_symbol != current_dominant_symbol: + + print("=" * 60) + print(f"TqsdkEngine: [换月开始] 检测到持仓需要换月!") + print(f" - 旧合约: {position_symbol} ({quantity} 手)") + print(f" - 新主力合约: {current_dominant_symbol}") + print(f" - 开始执行阻塞式移仓,超时时间: {timeout_seconds} 秒...") + print("=" * 60) + + start_time = time.monotonic() + + # 步骤 1: 确保 TargetPosTask 已为新旧合约初始化 + if position_symbol not in self.target_pos_dict: + self.target_pos_dict[position_symbol] = TargetPosTask(self._api, position_symbol) + if current_dominant_symbol not in self.target_pos_dict: + self.target_pos_dict[current_dominant_symbol] = TargetPosTask(self._api, current_dominant_symbol) + + # 步骤 2: 发送移仓指令 + # TargetPosTask 是幂等的,重复调用 set_target_volume 不会产生副作用 + self.target_pos_dict[position_symbol].set_target_volume(0) + self.target_pos_dict[current_dominant_symbol].set_target_volume(quantity) + print(" - [移仓指令已发送] 平仓旧合约,开仓新合约。") + + # 步骤 3: 进入等待循环,直到换月成功或超时 + while True: + # 3.1 检查是否超时 + if time.monotonic() - start_time > timeout_seconds: + # 如果超时,记录当前状态并抛出致命异常 + latest_positions = self._context.get_current_positions() + error_msg = ( + f"TqsdkEngine: [换月失败] 移仓操作超时 ({timeout_seconds}秒)!\n" + f" - 目标: 从 {position_symbol} 移仓到 {current_dominant_symbol}\n" + f" - 当前旧合约持仓: {latest_positions.get(position_symbol, 'N/A')}\n" + f" - 当前新合约持仓: {latest_positions.get(current_dominant_symbol, 'N/A')}\n" + " - 策略将停止,请手动检查账户状态!" + ) + print(error_msg) + raise TimeoutError(error_msg) + + # 3.2 等待 API 更新 + self._api.wait_update() + + # 3.3 获取最新的持仓状态 + latest_positions = self._context.get_current_positions() + old_pos_volume = latest_positions.get(position_symbol, 0) + new_pos_volume = latest_positions.get(current_dominant_symbol, 0) + + # 3.4 检查成功条件 + if old_pos_volume == 0 and new_pos_volume == quantity: + print("-" * 60) + print(f"TqsdkEngine: [换月成功] 移仓操作已确认完成。") + print(f" - 旧合约 {position_symbol} 持仓: {old_pos_volume}") + print(f" - 新合约 {current_dominant_symbol} 持仓: {new_pos_volume}") + print("-" * 60) + + # 步骤 4: 通知策略层,换月已完成 + if hasattr(self._strategy, "on_rollover"): + self._strategy.on_rollover(position_symbol, current_dominant_symbol) + + # 成功后跳出内层循环 + break + + def main(self, kline_row, prev_kline_row): kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = kline_dt.tz_convert(BEIJING_TZ) @@ -496,28 +572,11 @@ class TqsdkEngine: self.last_processed_bar = last_bar - if ( - self.roll_over_mode - and self.last_processed_bar is not None - and self._last_underlying_symbol != self.last_processed_bar.symbol - and self._strategy.trading - ): - self._is_rollover_bar = True - print( - f"TqsdkEngine: 检测到换月信号!从 {self._last_underlying_symbol} 切换到 {self.quote.underlying_symbol}" - ) - self._close_all_positions_at_end() - self._strategy.cancel_all_pending_orders() - - self._strategy.on_rollover( - self.last_processed_bar.symbol, self._last_underlying_symbol - ) - else: - self._strategy.on_open_bar(kline_row.open, self._last_underlying_symbol) - # 处理订单和取消请求 - if self._strategy.trading is True: - self._process_queued_requests() + self._strategy.on_open_bar(kline_row.open, self._last_underlying_symbol) + # 处理订单和取消请求 + if self._strategy.trading is True: + self._process_queued_requests() self.partial_bar = Bar( datetime=kline_dt,