feat(qmt): 优化定时重连机制避免与健康检查冲突
- 添加 is_scheduled_reconnecting 标志位协调重连逻辑 - 增强定时重连任务的日志前缀便于追踪 - 改进异常处理和资源清理日志 - 优化代码格式和注释
This commit is contained in:
@@ -209,39 +209,41 @@ class AutoReconnectScheduler:
|
||||
time.sleep(60)
|
||||
|
||||
def _scheduled_reconnect(self):
|
||||
"""执行定时重连任务"""
|
||||
self.logger.info(f"执行定时重连任务,时间: {self.reconnect_time}")
|
||||
"""执行定时重连任务(强制重连模式)"""
|
||||
self.logger.info(f"[AutoReconnectScheduler] 执行定时重连任务,时间: {self.reconnect_time}")
|
||||
|
||||
# 1. 检测当前连接状态
|
||||
statuses = self.manager.get_all_status()
|
||||
connected_count = sum(1 for s in statuses if s.is_connected)
|
||||
self.logger.info(f"当前连接状态: {connected_count}/{len(statuses)} 个终端在线")
|
||||
# 设置重连中标志位,通知主循环暂停健康检查重连
|
||||
self.manager.is_scheduled_reconnecting = True
|
||||
|
||||
# 2. 如果有连接,先断开
|
||||
if connected_count > 0:
|
||||
self.logger.info("正在断开所有终端连接...")
|
||||
try:
|
||||
# 强制断开所有终端连接(无论当前是否在线)
|
||||
self.logger.info("[AutoReconnectScheduler] 正在断开所有终端连接...")
|
||||
for unit in self.manager.units.values():
|
||||
try:
|
||||
if unit.xt_trader:
|
||||
unit.cleanup()
|
||||
self.logger.info(f"[AutoReconnectScheduler] 已断开终端 {unit.alias} 的连接")
|
||||
except Exception as e:
|
||||
self.logger.warning(f"断开终端 {unit.alias} 失败: {e}")
|
||||
self.logger.warning(f"[AutoReconnectScheduler] 断开终端 {unit.alias} 失败: {e}")
|
||||
|
||||
# 3. 等待几秒后重新连接
|
||||
self.logger.info("等待 3 秒后重新连接...")
|
||||
time.sleep(3)
|
||||
# 等待几秒后重新连接(固定等待时间)
|
||||
self.logger.info("[AutoReconnectScheduler] 等待 3 秒后重新连接...")
|
||||
time.sleep(3)
|
||||
|
||||
# 4. 重新连接所有终端
|
||||
self.logger.info("正在重新连接所有终端...")
|
||||
success_count = 0
|
||||
for unit in self.manager.units.values():
|
||||
if unit.connect():
|
||||
success_count += 1
|
||||
self.logger.info(f"终端 {unit.alias} 重连成功")
|
||||
else:
|
||||
self.logger.warning(f"终端 {unit.alias} 重连失败")
|
||||
# 重新连接所有终端
|
||||
self.logger.info("[AutoReconnectScheduler] 正在重新连接所有终端...")
|
||||
success_count = 0
|
||||
for unit in self.manager.units.values():
|
||||
if unit.connect():
|
||||
success_count += 1
|
||||
self.logger.info(f"[AutoReconnectScheduler] 终端 {unit.alias} 重连成功")
|
||||
else:
|
||||
self.logger.warning(f"[AutoReconnectScheduler] 终端 {unit.alias} 重连失败")
|
||||
|
||||
self.logger.info(f"定时重连完成: {success_count}/{len(self.manager.units)} 个终端重连成功")
|
||||
self.logger.info(f"[AutoReconnectScheduler] 定时重连完成: {success_count}/{len(self.manager.units)} 个终端重连成功")
|
||||
finally:
|
||||
# 确保无论成功与否都重置标志位
|
||||
self.manager.is_scheduled_reconnecting = False
|
||||
|
||||
def start(self):
|
||||
"""启动定时任务"""
|
||||
@@ -351,11 +353,11 @@ class TradingUnit:
|
||||
try:
|
||||
logging.getLogger("QMT_Engine").info(f"正在销毁终端 {self.alias} 的旧资源...")
|
||||
self.xt_trader.stop()
|
||||
self.xt_trader = None # 显式置空
|
||||
self.xt_trader = None # 显式置空
|
||||
self.callback = None
|
||||
time.sleep(1.5) # 给 C++ 引擎留出释放 down_queue 锁的时间
|
||||
except:
|
||||
pass
|
||||
time.sleep(1.5) # 给 C++ 引擎留出释放 down_queue 锁的时间
|
||||
except Exception as e:
|
||||
logging.getLogger("QMT_Engine").warning(f"销毁终端 {self.alias} 资源时出现异常: {e}")
|
||||
|
||||
def connect(self):
|
||||
"""连接 QMT 终端"""
|
||||
@@ -401,6 +403,7 @@ class MultiEngineManager:
|
||||
self.config = {}
|
||||
self.is_running = True
|
||||
self.start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.is_scheduled_reconnecting = False # 定时重连调度器正在执行标志
|
||||
self._initialized = True
|
||||
|
||||
def initialize(self, config_file='config.json'):
|
||||
@@ -467,8 +470,12 @@ class MultiEngineManager:
|
||||
if not is_unit_alive:
|
||||
# 避让 QMT 夜间重启高峰 (21:32 - 21:50)
|
||||
if not ('213200' <= curr_hms <= '215000'):
|
||||
self.logger.warning(f"🚫 终端 {unit.alias} 物理连接丢失,执行重连...")
|
||||
unit.connect()
|
||||
# 检查是否正在执行定时重连调度
|
||||
if self.is_scheduled_reconnecting:
|
||||
self.logger.info(f"⏳ 定时重连调度器正在执行,跳过健康检查重连...")
|
||||
else:
|
||||
self.logger.warning(f"🚫 终端 {unit.alias} 物理连接丢失,执行重连...")
|
||||
unit.connect()
|
||||
else:
|
||||
self.logger.info(f"⏳ 处于 QMT 重启时段 ({curr_hms}),跳过重连操作...")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user