tqsdk_real_engine更新,修复没有区分品种的bug

This commit is contained in:
2025-12-15 23:52:30 +08:00
parent d10df610e0
commit ec2f3a29cd

View File

@@ -79,6 +79,8 @@ class TqsdkEngine:
# if not self.symbol: # if not self.symbol:
# raise ValueError("strategy_params 必须包含 'symbol' 字段") # raise ValueError("strategy_params 必须包含 'symbol' 字段")
self.symbol = symbol self.symbol = symbol
self.product_id = self.symbol.split("@")[1]
self.is_checked_rollover = False
# 获取 K 线数据Tqsdk 自动处理) # 获取 K 线数据Tqsdk 自动处理)
# 这里假设策略所需 K 线周期在 strategy_params 中否则默认60秒1分钟K线 # 这里假设策略所需 K 线周期在 strategy_params 中否则默认60秒1分钟K线
@@ -351,7 +353,9 @@ class TqsdkEngine:
new_bar = False new_bar = False
if is_trading_time: if is_trading_time:
self._check_roll_over() if not self.is_checked_rollover:
self._check_roll_over()
self.is_checked_rollover = True
kline_row = self.klines.iloc[-1] kline_row = self.klines.iloc[-1]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True) kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True)
kline_dt = kline_dt.tz_convert(BEIJING_TZ) kline_dt = kline_dt.tz_convert(BEIJING_TZ)
@@ -379,7 +383,9 @@ class TqsdkEngine:
self._last_underlying_symbol = self.quote.underlying_symbol self._last_underlying_symbol = self.quote.underlying_symbol
self._check_roll_over() if not self.is_checked_rollover:
self._check_roll_over()
self.is_checked_rollover = True
if new_bar and (last_min_k is None or last_min_k.datetime != self.klines_1min.iloc[-1].datetime): if new_bar and (last_min_k is None or last_min_k.datetime != self.klines_1min.iloc[-1].datetime):
last_min_k = self.klines_1min.iloc[-1] last_min_k = self.klines_1min.iloc[-1]
@@ -462,87 +468,113 @@ class TqsdkEngine:
self._process_queued_requests() self._process_queued_requests()
def _check_roll_over(self, timeout_seconds: int = 300): def _check_roll_over(self, timeout_seconds: int = 120):
""" """
[增强版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。 [最安全版] 检查并处理实盘持仓换月,此函数会阻塞直到换月成功或超时。
- 仅处理本引擎负责的品种 (self.product_id)。
- 完全忽略账户中其他品种的持仓。
Args: Args:
timeout_seconds (int): 移仓换月的最大等待时间(秒)。 timeout_seconds (int): 移仓换月的最大等待时间(秒)。
如果超过此时长仍未成功,将抛出异常。 """
"""
if not self._strategy.trading: if not self._strategy.trading:
return return
# 1. 获取当前市场最新的主力合约 (这是我们的目标合约)
current_dominant_symbol = self.quote.underlying_symbol
if not current_dominant_symbol:
# 在某些开盘瞬间可能获取不到,直接跳过本次检查
return
# 2. 获取账户所有持仓
current_positions = self._context.get_current_positions() current_positions = self._context.get_current_positions()
if not current_positions: if not current_positions:
return return
current_dominant_symbol = self.quote.underlying_symbol # 3. 筛选出本引擎需要处理的、需要换月的旧合约持仓
# - 键: 旧合约代码 (e.g., "CZCE.FG605")
# - 值: 持仓数量 (e.g., 10 or -10)
old_contracts_to_rollover: Dict[str, int] = {}
for pos_symbol, quantity in current_positions.items():
# 条件一: 是本引擎负责的品种 (e.g., "CZCE.FG605".startswith("CZCE.FG"))
# 条件二: 不是当前最新的主力合约
# 条件三: 有实际持仓
if (pos_symbol.startswith(self.product_id) and
pos_symbol != current_dominant_symbol and
quantity != 0):
old_contracts_to_rollover[pos_symbol] = quantity
# 使用 list() 避免在迭代过程中修改字典 # 如果没有需要处理的旧合约,直接返回
for position_symbol, quantity in list(current_positions.items()): if not old_contracts_to_rollover:
# 如果存在持仓 (quantity != 0) 且持仓合约不是当前的主力合约 return
if quantity != 0 and position_symbol != current_dominant_symbol:
print("=" * 60) # 4. 如果检测到需要换月的持仓,则执行阻塞式移仓
print(f"TqsdkEngine: [换月开始] 检测到持仓需要换月!") total_target_quantity = sum(old_contracts_to_rollover.values())
print(f" - 旧合约: {position_symbol} ({quantity} 手)")
print(f" - 新主力合约: {current_dominant_symbol}")
print(f" - 开始执行阻塞式移仓,超时时间: {timeout_seconds} 秒...")
print("=" * 60)
start_time = time.monotonic() print("=" * 70)
print(f"TqsdkEngine ({self.product_id}): [换月开始] 检测到需要移仓的旧合约!")
for old_symbol, qty in old_contracts_to_rollover.items():
print(f" - 待平仓旧合约: {old_symbol} ({qty} 手)")
print(f" - 目标新合约: {current_dominant_symbol}")
print(f" - 新合约目标总持仓: {total_target_quantity}")
print(f" - 开始执行阻塞式移仓,超时时间: {timeout_seconds} 秒...")
print("=" * 70)
# 步骤 1: 确保 TargetPosTask 已为新旧合约初始化 start_time = time.monotonic()
if position_symbol not in self.target_pos_dict:
self.target_pos_dict[position_symbol] = TargetPosTask(self._api, position_symbol)
if current_dominant_symbol not in self.target_pos_dict:
self.target_pos_dict[current_dominant_symbol] = TargetPosTask(self._api, current_dominant_symbol)
# 步骤 2: 发送移仓指令 # 5. 发送所有移仓指令
# TargetPosTask 是幂等的,重复调用 set_target_volume 不会产生副作用 # 5.1 平掉所有检测到的旧合约
self.target_pos_dict[position_symbol].set_target_volume(0) for old_symbol in old_contracts_to_rollover.keys():
self.target_pos_dict[current_dominant_symbol].set_target_volume(quantity) if old_symbol not in self.target_pos_dict:
print(" - [移仓指令已发送] 平仓旧合约,开仓新合约。") self.target_pos_dict[old_symbol] = TargetPosTask(self._api, old_symbol)
self.target_pos_dict[old_symbol].set_target_volume(0)
# 步骤 3: 进入等待循环,直到换月成功或超时 # 5.2 在新合约上建立合并后的总目标仓位
while True: if current_dominant_symbol not in self.target_pos_dict:
# 3.1 检查是否超时 self.target_pos_dict[current_dominant_symbol] = TargetPosTask(self._api, current_dominant_symbol)
if time.monotonic() - start_time > timeout_seconds: self.target_pos_dict[current_dominant_symbol].set_target_volume(total_target_quantity)
# 如果超时,记录当前状态并抛出致命异常
latest_positions = self._context.get_current_positions()
error_msg = (
f"TqsdkEngine: [换月失败] 移仓操作超时 ({timeout_seconds}秒)\n"
f" - 目标: 从 {position_symbol} 移仓到 {current_dominant_symbol}\n"
f" - 当前旧合约持仓: {latest_positions.get(position_symbol, 'N/A')}\n"
f" - 当前新合约持仓: {latest_positions.get(current_dominant_symbol, 'N/A')}\n"
" - 策略将停止,请手动检查账户状态!"
)
print(error_msg)
raise TimeoutError(error_msg)
# 3.2 等待 API 更新 print(f" - [移仓指令已发送] 正在处理 {len(old_contracts_to_rollover)} 个旧合约的平仓...")
self._api.wait_update()
# 3.3 获取最新的持仓状态 # 6. 进入等待循环,直到所有换月操作完成或超时
latest_positions = self._context.get_current_positions() while True:
old_pos_volume = latest_positions.get(position_symbol, 0) # 6.1 检查是否超时
new_pos_volume = latest_positions.get(current_dominant_symbol, 0) if time.monotonic() - start_time > timeout_seconds:
latest_positions = self._context.get_current_positions()
error_msg = (
f"TqsdkEngine ({self.product_id}): [换月失败] 移仓操作超时 ({timeout_seconds}秒)\n"
f" - 目标: 将 {list(old_contracts_to_rollover.keys())} 移仓到 {current_dominant_symbol}\n"
f" - 当前新合约持仓: {latest_positions.get(current_dominant_symbol, 'N/A')}\n"
" - 策略将停止,请立即手动检查账户状态!"
)
print(error_msg)
raise TimeoutError(error_msg)
# 3.4 检查成功条件 self._api.wait_update()
if old_pos_volume == 0 and new_pos_volume == quantity:
print("-" * 60)
print(f"TqsdkEngine: [换月成功] 移仓操作已确认完成。")
print(f" - 旧合约 {position_symbol} 持仓: {old_pos_volume}")
print(f" - 新合约 {current_dominant_symbol} 持仓: {new_pos_volume}")
print("-" * 60)
# 步骤 4: 通知策略层,换月已完成 # 6.2 检查成功条件
if hasattr(self._strategy, "on_rollover"): latest_positions = self._context.get_current_positions()
self._strategy.on_rollover(position_symbol, current_dominant_symbol)
# 成功后跳出内层循环 # 检查所有旧合约仓位是否已归零
break all_old_cleared = all(latest_positions.get(s, 0) == 0 for s in old_contracts_to_rollover)
# 检查新合约仓位是否已达到目标
new_pos_correct = latest_positions.get(current_dominant_symbol, 0) == total_target_quantity
if all_old_cleared and new_pos_correct:
print("-" * 70)
print(f"TqsdkEngine ({self.product_id}): [换月成功] 移仓操作已确认完成。")
print(f" - 所有旧合约持仓已清零。")
print(f" - 新合约 {current_dominant_symbol} 持仓: {total_target_quantity}")
print("-" * 70)
# 6.3 通知策略层 (只需通知一次)
if hasattr(self._strategy, "on_rollover"):
# 传递第一个旧合约符号作为代表
representative_old_symbol = list(old_contracts_to_rollover.keys())[0]
self._strategy.on_rollover(representative_old_symbol, current_dominant_symbol)
break # 成功,跳出循环
def main(self, kline_row, prev_kline_row): def main(self, kline_row, prev_kline_row):