From ec2f3a29cdc7351c0e056bb234d248c6aa6a3ded Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Mon, 15 Dec 2025 23:52:30 +0800 Subject: [PATCH] =?UTF-8?q?tqsdk=5Freal=5Fengine=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E5=A4=8D=E6=B2=A1=E6=9C=89=E5=8C=BA=E5=88=86?= =?UTF-8?q?=E5=93=81=E7=A7=8D=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tqsdk_real_engine.py | 158 +++++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 63 deletions(-) diff --git a/src/tqsdk_real_engine.py b/src/tqsdk_real_engine.py index ec3fee2..f95574d 100644 --- a/src/tqsdk_real_engine.py +++ b/src/tqsdk_real_engine.py @@ -79,6 +79,8 @@ class TqsdkEngine: # if not self.symbol: # raise ValueError("strategy_params 必须包含 'symbol' 字段") self.symbol = symbol + self.product_id = self.symbol.split("@")[1] + self.is_checked_rollover = False # 获取 K 线数据(Tqsdk 自动处理) # 这里假设策略所需 K 线周期在 strategy_params 中,否则默认60秒(1分钟K线) @@ -351,7 +353,9 @@ class TqsdkEngine: new_bar = False if is_trading_time: - self._check_roll_over() + if not self.is_checked_rollover: + self._check_roll_over() + self.is_checked_rollover = True kline_row = self.klines.iloc[-1] kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = kline_dt.tz_convert(BEIJING_TZ) @@ -379,7 +383,9 @@ class TqsdkEngine: self._last_underlying_symbol = self.quote.underlying_symbol - self._check_roll_over() + if not self.is_checked_rollover: + self._check_roll_over() + self.is_checked_rollover = True if new_bar and (last_min_k is None or last_min_k.datetime != self.klines_1min.iloc[-1].datetime): last_min_k = self.klines_1min.iloc[-1] @@ -462,87 +468,113 @@ class TqsdkEngine: self._process_queued_requests() - def _check_roll_over(self, timeout_seconds: int = 300): + def _check_roll_over(self, timeout_seconds: int = 120): """ - [增强版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。 + [最安全版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。 + - 仅处理本引擎负责的品种 (self.product_id)。 + - 完全忽略账户中其他品种的持仓。 - Args: - timeout_seconds (int): 移仓换月的最大等待时间(秒)。 - 如果超过此时长仍未成功,将抛出异常。 - """ + Args: + timeout_seconds (int): 移仓换月的最大等待时间(秒)。 + """ if not self._strategy.trading: return + # 1. 获取当前市场最新的主力合约 (这是我们的目标合约) + current_dominant_symbol = self.quote.underlying_symbol + if not current_dominant_symbol: + # 在某些开盘瞬间可能获取不到,直接跳过本次检查 + return + + # 2. 获取账户所有持仓 current_positions = self._context.get_current_positions() if not current_positions: return - current_dominant_symbol = self.quote.underlying_symbol + # 3. 筛选出本引擎需要处理的、需要换月的旧合约持仓 + # - 键: 旧合约代码 (e.g., "CZCE.FG605") + # - 值: 持仓数量 (e.g., 10 or -10) + old_contracts_to_rollover: Dict[str, int] = {} + for pos_symbol, quantity in current_positions.items(): + # 条件一: 是本引擎负责的品种 (e.g., "CZCE.FG605".startswith("CZCE.FG")) + # 条件二: 不是当前最新的主力合约 + # 条件三: 有实际持仓 + if (pos_symbol.startswith(self.product_id) and + pos_symbol != current_dominant_symbol and + quantity != 0): + old_contracts_to_rollover[pos_symbol] = quantity - # 使用 list() 避免在迭代过程中修改字典 - for position_symbol, quantity in list(current_positions.items()): - # 如果存在持仓 (quantity != 0) 且持仓合约不是当前的主力合约 - if quantity != 0 and position_symbol != current_dominant_symbol: + # 如果没有需要处理的旧合约,直接返回 + if not old_contracts_to_rollover: + return - print("=" * 60) - print(f"TqsdkEngine: [换月开始] 检测到持仓需要换月!") - print(f" - 旧合约: {position_symbol} ({quantity} 手)") - print(f" - 新主力合约: {current_dominant_symbol}") - print(f" - 开始执行阻塞式移仓,超时时间: {timeout_seconds} 秒...") - print("=" * 60) + # 4. 如果检测到需要换月的持仓,则执行阻塞式移仓 + total_target_quantity = sum(old_contracts_to_rollover.values()) - start_time = time.monotonic() + print("=" * 70) + print(f"TqsdkEngine ({self.product_id}): [换月开始] 检测到需要移仓的旧合约!") + for old_symbol, qty in old_contracts_to_rollover.items(): + print(f" - 待平仓旧合约: {old_symbol} ({qty} 手)") + print(f" - 目标新合约: {current_dominant_symbol}") + print(f" - 新合约目标总持仓: {total_target_quantity} 手") + print(f" - 开始执行阻塞式移仓,超时时间: {timeout_seconds} 秒...") + print("=" * 70) - # 步骤 1: 确保 TargetPosTask 已为新旧合约初始化 - if position_symbol not in self.target_pos_dict: - self.target_pos_dict[position_symbol] = TargetPosTask(self._api, position_symbol) - if current_dominant_symbol not in self.target_pos_dict: - self.target_pos_dict[current_dominant_symbol] = TargetPosTask(self._api, current_dominant_symbol) + start_time = time.monotonic() - # 步骤 2: 发送移仓指令 - # TargetPosTask 是幂等的,重复调用 set_target_volume 不会产生副作用 - self.target_pos_dict[position_symbol].set_target_volume(0) - self.target_pos_dict[current_dominant_symbol].set_target_volume(quantity) - print(" - [移仓指令已发送] 平仓旧合约,开仓新合约。") + # 5. 发送所有移仓指令 + # 5.1 平掉所有检测到的旧合约 + for old_symbol in old_contracts_to_rollover.keys(): + if old_symbol not in self.target_pos_dict: + self.target_pos_dict[old_symbol] = TargetPosTask(self._api, old_symbol) + self.target_pos_dict[old_symbol].set_target_volume(0) - # 步骤 3: 进入等待循环,直到换月成功或超时 - while True: - # 3.1 检查是否超时 - if time.monotonic() - start_time > timeout_seconds: - # 如果超时,记录当前状态并抛出致命异常 - latest_positions = self._context.get_current_positions() - error_msg = ( - f"TqsdkEngine: [换月失败] 移仓操作超时 ({timeout_seconds}秒)!\n" - f" - 目标: 从 {position_symbol} 移仓到 {current_dominant_symbol}\n" - f" - 当前旧合约持仓: {latest_positions.get(position_symbol, 'N/A')}\n" - f" - 当前新合约持仓: {latest_positions.get(current_dominant_symbol, 'N/A')}\n" - " - 策略将停止,请手动检查账户状态!" - ) - print(error_msg) - raise TimeoutError(error_msg) + # 5.2 在新合约上建立合并后的总目标仓位 + if current_dominant_symbol not in self.target_pos_dict: + self.target_pos_dict[current_dominant_symbol] = TargetPosTask(self._api, current_dominant_symbol) + self.target_pos_dict[current_dominant_symbol].set_target_volume(total_target_quantity) - # 3.2 等待 API 更新 - self._api.wait_update() + print(f" - [移仓指令已发送] 正在处理 {len(old_contracts_to_rollover)} 个旧合约的平仓...") - # 3.3 获取最新的持仓状态 - latest_positions = self._context.get_current_positions() - old_pos_volume = latest_positions.get(position_symbol, 0) - new_pos_volume = latest_positions.get(current_dominant_symbol, 0) + # 6. 进入等待循环,直到所有换月操作完成或超时 + while True: + # 6.1 检查是否超时 + if time.monotonic() - start_time > timeout_seconds: + latest_positions = self._context.get_current_positions() + error_msg = ( + f"TqsdkEngine ({self.product_id}): [换月失败] 移仓操作超时 ({timeout_seconds}秒)!\n" + f" - 目标: 将 {list(old_contracts_to_rollover.keys())} 移仓到 {current_dominant_symbol}\n" + f" - 当前新合约持仓: {latest_positions.get(current_dominant_symbol, 'N/A')}\n" + " - 策略将停止,请立即手动检查账户状态!" + ) + print(error_msg) + raise TimeoutError(error_msg) - # 3.4 检查成功条件 - if old_pos_volume == 0 and new_pos_volume == quantity: - print("-" * 60) - print(f"TqsdkEngine: [换月成功] 移仓操作已确认完成。") - print(f" - 旧合约 {position_symbol} 持仓: {old_pos_volume}") - print(f" - 新合约 {current_dominant_symbol} 持仓: {new_pos_volume}") - print("-" * 60) + self._api.wait_update() - # 步骤 4: 通知策略层,换月已完成 - if hasattr(self._strategy, "on_rollover"): - self._strategy.on_rollover(position_symbol, current_dominant_symbol) + # 6.2 检查成功条件 + latest_positions = self._context.get_current_positions() - # 成功后跳出内层循环 - break + # 检查所有旧合约仓位是否已归零 + all_old_cleared = all(latest_positions.get(s, 0) == 0 for s in old_contracts_to_rollover) + + # 检查新合约仓位是否已达到目标 + new_pos_correct = latest_positions.get(current_dominant_symbol, 0) == total_target_quantity + + if all_old_cleared and new_pos_correct: + print("-" * 70) + print(f"TqsdkEngine ({self.product_id}): [换月成功] 移仓操作已确认完成。") + print(f" - 所有旧合约持仓已清零。") + print(f" - 新合约 {current_dominant_symbol} 持仓: {total_target_quantity}") + print("-" * 70) + + # 6.3 通知策略层 (只需通知一次) + if hasattr(self._strategy, "on_rollover"): + # 传递第一个旧合约符号作为代表 + representative_old_symbol = list(old_contracts_to_rollover.keys())[0] + self._strategy.on_rollover(representative_old_symbol, current_dominant_symbol) + + break # 成功,跳出循环 def main(self, kline_row, prev_kline_row):