From 27ea270353cd806badf716d1bda271790034b34b Mon Sep 17 00:00:00 2001
From: liaozhaorun <1300336796@qq.com>
Date: Fri, 19 Dec 2025 14:11:32 +0800
Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0qmt=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
qmt/dashboard.html | 258 ++++++++++++++++++++++
qmt/heartbeat.txt | 1 +
qmt/qmt_trader.py | 521 +++++++++++++++++++++++++++++++--------------
qmt/start.bat | 75 +++++++
4 files changed, 693 insertions(+), 162 deletions(-)
create mode 100644 qmt/dashboard.html
create mode 100644 qmt/heartbeat.txt
create mode 100644 qmt/start.bat
diff --git a/qmt/dashboard.html b/qmt/dashboard.html
new file mode 100644
index 0000000..49b14b0
--- /dev/null
+++ b/qmt/dashboard.html
@@ -0,0 +1,258 @@
+
+
+
+
+
+ QMT 实盘监控看板
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ status.account_id || '---' }}
+ {{ status.start_time || '---' }}
+
+
+ {{ status.last_loop_update || '---' }}
+
+
+
+ 手动刷新
+
+
+
+
+ {{ tradingStatusText }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 暂无策略数据 / Redis未连接
+
+
+
{{ strategyName }}
+
+
+
+ {{ scope.row.vol }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/qmt/heartbeat.txt b/qmt/heartbeat.txt
new file mode 100644
index 0000000..0d646fb
--- /dev/null
+++ b/qmt/heartbeat.txt
@@ -0,0 +1 @@
+2025-12-19 14:11:10
\ No newline at end of file
diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py
index bcca240..f388bdc 100644
--- a/qmt/qmt_trader.py
+++ b/qmt/qmt_trader.py
@@ -1,5 +1,5 @@
# coding:utf-8
-import time, datetime, traceback, sys, json, os
+import time, datetime, traceback, sys, json, os, threading
import logging
import redis
from xtquant import xtdata
@@ -7,27 +7,46 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
-# 全局变量
-CURRENT_LOG_DATE = None
-CONFIG = {}
-ORDER_CACHE = {}
+# FastAPI 相关
+from fastapi import FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+from fastapi.responses import FileResponse
+import uvicorn
-# ================= 1. 日志系统 (按日期直写) =================
+# ================= 0. Windows 防卡死补丁 =================
+try:
+ import ctypes
+ kernel32 = ctypes.windll.kernel32
+ # 禁用快速编辑模式 (0x0040)
+ kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 128)
+except:
+ pass
+
+# ================= 1. 全局状态管理 =================
+class SystemState:
+ def __init__(self):
+ self.xt_trader = None
+ self.acc = None
+ self.pos_manager = None
+ self.callback = None
+ self.is_running = True
+ self.start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+ self.last_heartbeat = "Initializing..."
+ self.config = {}
+
+GLOBAL_STATE = SystemState()
+CURRENT_LOG_DATE = None
+ORDER_CACHE = {} # 内存缓存: OrderID -> (Strategy, Code, Action)
+
+# ================= 2. 增强型日志系统 =================
def setup_logger():
- """
- 配置日志系统:
- 每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log
- """
global CURRENT_LOG_DATE
-
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
- # 获取今日日期
today_str = datetime.date.today().strftime('%Y-%m-%d')
CURRENT_LOG_DATE = today_str
-
log_file = os.path.join(log_dir, f"{today_str}.log")
logger = logging.getLogger("QMT_Trader")
@@ -41,8 +60,9 @@ def setup_logger():
logger.removeHandler(handler)
except: pass
+ # 格式中增加 线程名,方便排查是 API 线程还是 交易线程
formatter = logging.Formatter(
- '[%(asctime)s] [%(levelname)s] %(message)s',
+ '[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
@@ -50,19 +70,18 @@ def setup_logger():
file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
file_handler.setFormatter(formatter)
- # 控制台输出
+ # 控制台输出 (强制刷新流,防止命令行卡住不显示)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
+ stream_handler.flush = sys.stdout.flush
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
-
return logger
-# 初始化日志
logger = setup_logger()
-# ================= 2. 配置加载 =================
+# ================= 3. 配置加载 =================
def load_config(config_file='config.json'):
if getattr(sys, 'frozen', False):
base_path = os.path.dirname(sys.executable)
@@ -81,40 +100,65 @@ def load_config(config_file='config.json'):
logger.error(f"配置文件错误: {e}")
sys.exit(1)
-# ================= 3. 业务逻辑类 =================
+# ================= 4. 业务逻辑类 =================
class PositionManager:
def __init__(self, r_client):
self.r = r_client
+
def _get_key(self, strategy_name):
return f"POS:{strategy_name}"
+
def mark_holding(self, strategy_name, code):
- """乐观占位"""
self.r.hsetnx(self._get_key(strategy_name), code, 0)
+
def rollback_holding(self, strategy_name, code):
- """失败回滚"""
key = self._get_key(strategy_name)
val = self.r.hget(key, code)
if val is not None and int(val) == 0:
self.r.hdel(key, code)
logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}")
+
def update_actual_volume(self, strategy_name, code, delta_vol):
- """成交更新"""
key = self._get_key(strategy_name)
new_vol = self.r.hincrby(key, code, int(delta_vol))
if new_vol <= 0:
self.r.hdel(key, code)
new_vol = 0
return new_vol
+
def get_position(self, strategy_name, code):
vol = self.r.hget(self._get_key(strategy_name), code)
return int(vol) if vol else 0
+
def get_holding_count(self, strategy_name):
return self.r.hlen(self._get_key(strategy_name))
+
def get_all_virtual_positions(self, strategy_name):
return self.r.hgetall(self._get_key(strategy_name))
+
def force_delete(self, strategy_name, code):
self.r.hdel(self._get_key(strategy_name), code)
+ def clean_stale_placeholders(self, strategy_name, xt_trader, acc):
+ try:
+ key = self._get_key(strategy_name)
+ all_pos = self.r.hgetall(key)
+ if not all_pos: return
+
+ active_orders = xt_trader.query_stock_orders(acc, cancelable_only=True)
+ active_codes = [o.stock_code for o in active_orders] if active_orders else []
+
+ real_positions = xt_trader.query_stock_positions(acc)
+ real_holdings = [p.stock_code for p in real_positions if p.volume > 0] if real_positions else []
+
+ for code, vol_str in all_pos.items():
+ if int(vol_str) == 0:
+ if (code not in real_holdings) and (code not in active_codes):
+ self.r.hdel(key, code)
+ logger.warning(f"[{strategy_name}] 自动清理僵尸占位: {code}")
+ except Exception as e:
+ logger.error(f"清理僵尸占位异常: {e}")
+
class DailySettlement:
def __init__(self, xt_trader, acc, pos_mgr, strategies):
self.trader = xt_trader
@@ -122,10 +166,10 @@ class DailySettlement:
self.pos_mgr = pos_mgr
self.strategies = strategies
self.has_settled = False
+
def run_settlement(self):
logger.info("="*40)
logger.info("执行收盘清算流程...")
- # 1. 撤单
try:
orders = self.trader.query_stock_orders(self.acc, cancelable_only=True)
if orders:
@@ -133,25 +177,28 @@ class DailySettlement:
self.trader.cancel_order_stock(self.acc, o.order_id)
time.sleep(2)
except: pass
- # 2. 获取实盘
+
real_positions = self.trader.query_stock_positions(self.acc)
- real_pos_map = {}
- if real_positions:
- for p in real_positions:
- if p.volume > 0: real_pos_map[p.stock_code] = p.volume
- # 3. 校准Redis
+ real_pos_map = {p.stock_code: p.volume for p in real_positions if p.volume > 0} if real_positions else {}
+
for strategy in self.strategies:
virtual = self.pos_mgr.get_all_virtual_positions(strategy)
- for code, v in virtual.items():
+ for code, v_str in virtual.items():
+ v = int(v_str)
if code not in real_pos_map:
- logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放")
+ logger.warning(f" [修正] {strategy} 幽灵持仓 {code} (Redis={v}) -> 强制释放")
self.pos_mgr.force_delete(strategy, code)
+ elif v == 0 and code in real_pos_map:
+ real_vol = real_pos_map[code]
+ self.pos_mgr.update_actual_volume(strategy, code, real_vol)
+ logger.info(f" [修正] {strategy} 修正占位符 {code} 0 -> {real_vol}")
+
logger.info("清算完成")
self.has_settled = True
+
def reset_flag(self):
self.has_settled = False
-# ================= 4. 回调类 =================
class MyXtQuantTraderCallback(XtQuantTraderCallback):
def __init__(self, pos_mgr):
self.pos_mgr = pos_mgr
@@ -164,110 +211,158 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
cache_info = ORDER_CACHE.get(trade.order_id)
if not cache_info: return
strategy, _, action = cache_info
-
logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}")
if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume)
elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume)
except: traceback.print_exc()
def on_order_error(self, err):
try:
- logger.error(f"下单失败: {err.error_msg}")
+ logger.error(f"下单失败回调: {err.error_msg} OrderID:{err.order_id}")
cache = ORDER_CACHE.get(err.order_id)
if cache and cache[2] == 'BUY':
self.pos_mgr.rollback_holding(cache[0], cache[1])
del ORDER_CACHE[err.order_id]
except: pass
-# ================= 5. 核心消息处理 =================
+# ================= 5. 核心消息处理 (重写版:拒绝静默失败) =================
def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager):
queue_key = f"{strategy_name}_real"
+
+ # 1. 获取消息
msg_json = r_client.lpop(queue_key)
- if not msg_json: return
+ if not msg_json:
+ return
+ # 2. 存入历史并解析 (打印原始消息,确保知道收到了什么)
+ logger.info(f"-------- 处理消息 [{strategy_name}] --------")
+ logger.info(f"收到原始消息: {msg_json}")
+
try:
- # 归档
r_client.rpush(f"{queue_key}:history", msg_json)
- data = json.loads(msg_json)
- # 1. 过滤回测
- if data.get('is_backtest'): return
-
- # 2. 校验日期 (只处理当日消息)
- msg_ts = data.get('timestamp')
- today_str = datetime.date.today().strftime('%Y-%m-%d')
- if msg_ts:
- msg_date = msg_ts.split(' ')[0]
- if msg_date != today_str:
- logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}")
- return
- else:
- logger.warning(f"[{strategy_name}] 消息无时间戳,忽略")
+ try:
+ data = json.loads(msg_json)
+ except json.JSONDecodeError:
+ logger.error("JSON 解析失败,跳过消息")
return
- stock_code = data['stock_code']
- action = data['action']
- price = float(data['price'])
+ # 3. 基础校验 (每一步失败都必须打印 Log)
+ if data.get('is_backtest'):
+ logger.warning(f"检测到回测标记 is_backtest=True,忽略此消息")
+ return
+
+ msg_ts = data.get('timestamp')
+ if not msg_ts:
+ logger.warning(f"消息缺失时间戳 timestamp,忽略")
+ return
+
+ today_str = datetime.date.today().strftime('%Y-%m-%d')
+ msg_date = msg_ts.split(' ')[0]
+ if msg_date != today_str:
+ logger.warning(f"消息日期过期: {msg_date} != 今日 {today_str},忽略")
+ return
+
+ # 4. 提取关键字段
+ stock_code = data.get('stock_code')
+ action = data.get('action')
+ price = float(data.get('price', 0))
total_slots = int(data.get('total_slots', 1))
+ if not stock_code or not action:
+ logger.error(f"缺少关键字段: Code={stock_code}, Action={action}")
+ return
+
+ logger.info(f"解析成功: {action} {stock_code} @ {price}, 目标槽位: {total_slots}")
+
+ # 5. QMT 存活检查
+ if xt_trader is None or acc is None:
+ logger.error("严重错误: QMT 对象未初始化 (xt_trader is None)")
+ return
+
+ # 6. 买入逻辑
if action == 'BUY':
- # 槽位检查
holding = pos_manager.get_holding_count(strategy_name)
empty = total_slots - holding
+
+ logger.info(f"检查持仓: 当前占用 {holding} / 总槽位 {total_slots} -> 剩余 {empty}")
+
if empty <= 0:
- logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})")
+ logger.warning(f"拦截买入: 槽位已满,不执行下单")
return
+ # 查询资金
asset = xt_trader.query_stock_asset(acc)
if not asset:
- logger.error("无法查询资产,QMT可能未就绪")
+ logger.error("API 错误: query_stock_asset 返回 None,可能是 QMT 断连或未同步")
return
- # 金额计算
+ logger.info(f"当前可用资金: {asset.cash:.2f}")
+
amt = asset.cash / empty
if amt < 2000:
- logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})")
+ logger.warning(f"拦截买入: 单笔金额过小 ({amt:.2f} < 2000)")
return
- if price <= 0: price = 1.0
- vol = int(amt / price / 100) * 100
-
- if vol >= 100:
- oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
- if oid != -1:
- logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}")
- ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY')
- pos_manager.mark_holding(strategy_name, stock_code)
- else:
- logger.error(f"[{strategy_name}] 下单被拒绝 (-1)")
- else:
- logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})")
+ if price <= 0:
+ logger.warning(f"价格异常: {price},强制设为1.0以计算股数(仅测试用)")
+ price = 1.0
+ vol = int(amt / price / 100) * 100
+ logger.info(f"计算股数: 资金{amt:.2f} / 价格{price} -> {vol}股")
+
+ if vol < 100:
+ logger.warning(f"拦截买入: 股数不足 100 ({vol})")
+ return
+
+ # 执行下单
+ oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
+
+ if oid != -1:
+ logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 买入 {vol}")
+ ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY')
+ pos_manager.mark_holding(strategy_name, stock_code)
+ else:
+ logger.error(f"XXX 下单请求被拒绝 (Result=-1),请检查 QMT 终端报错")
+
+ # 7. 卖出逻辑
elif action == 'SELL':
v_vol = pos_manager.get_position(strategy_name, stock_code)
+ logger.info(f"Redis 记录持仓: {v_vol}")
+
if v_vol > 0:
real_pos = xt_trader.query_stock_positions(acc)
+ if real_pos is None:
+ logger.error("API 错误: query_stock_positions 返回 None")
+ return
+
rp = next((p for p in real_pos if p.stock_code==stock_code), None)
can_use = rp.can_use_volume if rp else 0
-
+ logger.info(f"实盘可用持仓: {can_use}")
+
final = min(v_vol, can_use)
if final > 0:
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell')
if oid != -1:
- logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}")
+ logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 卖出 {final}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL')
+ else:
+ logger.error(f"XXX 下单请求被拒绝 (Result=-1)")
else:
- logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}")
+ logger.warning(f"拦截卖出: 最终计算卖出量为 0 (虚拟:{v_vol}, 实盘:{can_use})")
else:
- logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出")
-
- except Exception:
- logger.error("消息处理异常", exc_info=True)
+ logger.warning(f"拦截卖出: Redis 中无此持仓记录,忽略")
+
+ else:
+ logger.error(f"未知的 Action: {action}")
+ except Exception as e:
+ logger.error(f"消息处理发生未捕获异常: {str(e)}", exc_info=True)
# ================= 6. QMT初始化 =================
def init_qmt_trader(qmt_path, account_id, account_type, pos_manager):
try:
session_id = int(time.time())
+ logger.info(f"正在连接 QMT (Path: {qmt_path})...")
trader = XtQuantTrader(qmt_path, session_id)
acc = StockAccount(account_id, account_type)
callback = MyXtQuantTraderCallback(pos_manager)
@@ -280,127 +375,229 @@ def init_qmt_trader(qmt_path, account_id, account_type, pos_manager):
callback.is_connected = True
return trader, acc, callback
else:
- logger.error(f"QMT 连接失败 Code:{res}")
+ logger.error(f"QMT 连接失败 Code:{res} (请检查 QMT 是否登录且路径正确)")
return None, None, None
except Exception as e:
- logger.error(f"初始化异常: {e}")
+ logger.error(f"初始化异常: {e}", exc_info=True)
return None, None, None
-# ================= 7. 主程序 =================
-if __name__ == '__main__':
- logger.info(">>> 系统启动 (实盘生产模式) <<<")
+# ================= 7. 交易逻辑主循环 =================
+def trading_loop():
+ global logger
+ threading.current_thread().name = "TradeThread"
+ logger.info(">>> 交易逻辑子线程启动 <<<")
- # 加载配置
- CONFIG = load_config('config.json')
+ GLOBAL_STATE.config = load_config('config.json')
+ CONFIG = GLOBAL_STATE.config
redis_cfg = CONFIG['redis']
qmt_cfg = CONFIG['qmt']
watch_list = CONFIG['strategies']
- # 连接Redis
try:
r = redis.Redis(**redis_cfg, decode_responses=True)
r.ping()
pos_manager = PositionManager(r)
+ GLOBAL_STATE.pos_manager = pos_manager
+ logger.info("Redis 连接成功")
except Exception as e:
logger.critical(f"Redis 连接失败: {e}")
- sys.exit(1)
+ return
- # 初次连接 QMT
+ # 初始化
xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
+ GLOBAL_STATE.xt_trader = xt_trader
+ GLOBAL_STATE.acc = acc
+ GLOBAL_STATE.callback = callback
settler = None
if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
+ for s in watch_list:
+ pos_manager.clean_stale_placeholders(s, xt_trader, acc)
- logger.info(">>> 进入主循环监听 <<<")
+ logger.info(">>> 进入主轮询循环 <<<")
+
+ last_health_check = 0 # 上次深度检查时间
- while True:
+ while GLOBAL_STATE.is_running:
try:
- # --- 1. 日志跨天处理 ---
+ # 1. 基础心跳更新
+ GLOBAL_STATE.last_heartbeat = datetime.datetime.now().strftime('%H:%M:%S')
+
+ # 2. 状态诊断与自动修复 (关键修改!!!)
+ # 每 15 秒执行一次“深度探测”,而不是每一轮都看 callback
+ if time.time() - last_health_check > 15:
+ last_health_check = time.time()
+
+ is_alive_physically = False
+
+ # 尝试通过“查资产”来验证连接是否真的活着
+ if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc:
+ try:
+ asset = GLOBAL_STATE.xt_trader.query_stock_asset(GLOBAL_STATE.acc)
+ if asset:
+ is_alive_physically = True
+ # 【核心修复】:如果物理探测成功,强行修正 callback 状态
+ if GLOBAL_STATE.callback and not GLOBAL_STATE.callback.is_connected:
+ GLOBAL_STATE.callback.is_connected = True
+ logger.info("✅ [自愈] 检测到资产查询正常,修正伪造的断开状态 (False -> True)")
+ except:
+ pass
+
+ # 只有当 逻辑断开(callback) AND 物理断开(无法查资产) 时,才判定为断线
+ current_status = GLOBAL_STATE.callback.is_connected if GLOBAL_STATE.callback else False
+
+ # 减少日志刷屏:只有状态真的异常时才打印
+ if not current_status and not is_alive_physically:
+ logger.warning(f"⚠️ 线程存活检查 | 逻辑状态:{current_status} | 物理探测:失败")
+
+ # 3. 断线重连逻辑
+ # 只有“物理探测”彻底失败了,才执行重连
+ if not is_alive_physically:
+ # 避让 QMT 夜间重启高峰期 (23:20 - 23:35)
+ # 避免在这段时间疯狂重连打印日志
+ now_hm = datetime.datetime.now().strftime('%H%M')
+ if '2320' <= now_hm <= '2335':
+ logger.info("⏳ QMT维护时段,暂停重连,休眠60秒...")
+ time.sleep(60)
+ continue
+
+ if datetime.date.today().weekday() >= 5: # 周末
+ time.sleep(3600)
+ continue
+
+ logger.warning("🚫 确认连接丢失,执行重连...")
+ if GLOBAL_STATE.xt_trader:
+ try: GLOBAL_STATE.xt_trader.stop()
+ except: pass
+
+ new_trader, new_acc, new_cb = init_qmt_trader(
+ qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
+ )
+
+ if new_trader:
+ GLOBAL_STATE.xt_trader = new_trader
+ GLOBAL_STATE.acc = new_acc
+ GLOBAL_STATE.callback = new_cb
+ settler = DailySettlement(new_trader, new_acc, pos_manager, watch_list)
+ logger.info("✅ 重连成功")
+ else:
+ logger.error("❌ 重连失败,60秒后重试")
+ time.sleep(60)
+ continue
+
+ # 4. 日志轮转与心跳文件
today_str = datetime.date.today().strftime('%Y-%m-%d')
if today_str != CURRENT_LOG_DATE:
- logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...")
logger = setup_logger()
- # --- 2. 断线重连 (仅工作日) ---
- need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected)
- if need_reconnect:
- # 检查是否为周末 (5=周六, 6=周日)
- if datetime.date.today().weekday() >= 5:
- logger.info("当前是周末,暂停重连,休眠1小时")
- time.sleep(3600)
- continue
+ try:
+ with open("heartbeat.txt", "w") as f:
+ f.write(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
+ except: pass
- logger.warning("连接丢失,尝试重连...")
- if xt_trader:
- try: xt_trader.stop()
- except: pass
-
- xt_trader, acc, callback = init_qmt_trader(
- qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
- )
-
- if xt_trader:
- settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
- logger.info("重连成功")
- else:
- logger.error("重连失败,60秒后重试")
- time.sleep(60)
- continue
-
- # --- 3. 交易时段轮询逻辑 ---
- now = datetime.datetime.now()
- current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS
+ # 5. 交易逻辑处理
+ current_time_str = datetime.datetime.now().strftime('%H%M%S')
+ is_trading_time = ('091500' <= current_time_str <= '113000') or ('130000' <= current_time_str <= '150000')
- # 默认休眠 60秒
- sleep_sec = 60
- is_trading_time = False
-
- # 判断时段
- # 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒
- if '091500' <= current_time_str <= '100000':
- sleep_sec = 1
- is_trading_time = True
-
- # 10:00:01 - 11:30:00 -> 低频 60秒
- elif '100000' < current_time_str <= '113000':
- sleep_sec = 60
- is_trading_time = True
-
- # 13:00:00 - 14:50:00 -> 低频 60秒
- elif '130000' <= current_time_str < '145000':
- sleep_sec = 60
- is_trading_time = True
-
- # 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒
- elif '145000' <= current_time_str <= '150000':
- sleep_sec = 1
- is_trading_time = True
-
- # --- 4. 执行业务 ---
- if is_trading_time:
- # 每日重置清算标记
+ # 如果连接正常(无论 callback 怎么说,只要上面探测过了,xt_trader 就是可用的)
+ if is_trading_time and GLOBAL_STATE.xt_trader:
if settler and settler.has_settled:
settler.reset_flag()
-
- # 处理信号
for s in watch_list:
- process_strategy_queue(s, r, xt_trader, acc, pos_manager)
-
- # --- 5. 收盘清算 (15:05 - 15:10) ---
+ process_strategy_queue(s, r, GLOBAL_STATE.xt_trader, GLOBAL_STATE.acc, pos_manager)
+
elif '150500' <= current_time_str <= '151000':
if settler and not settler.has_settled:
settler.run_settlement()
- sleep_sec = 60
- # 执行休眠
- time.sleep(sleep_sec)
+ time.sleep(1 if is_trading_time else 5)
- except KeyboardInterrupt:
- logger.info("用户停止")
- break
except Exception as e:
- logger.critical("主循环未捕获异常", exc_info=True)
- time.sleep(10)
\ No newline at end of file
+ logger.critical("交易循环异常", exc_info=True)
+ time.sleep(10)
+
+# ================= 8. FastAPI 接口 =================
+app = FastAPI(title="QMT Monitor")
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+@app.get("/")
+async def read_root():
+ if os.path.exists("dashboard.html"):
+ return FileResponse("dashboard.html")
+ return {"error": "Dashboard not found"}
+
+@app.get("/api/status")
+def get_status():
+ connected = False
+ if GLOBAL_STATE.callback:
+ connected = GLOBAL_STATE.callback.is_connected
+ return {
+ "running": True,
+ "qmt_connected": connected,
+ "start_time": GLOBAL_STATE.start_time,
+ "last_loop_update": GLOBAL_STATE.last_heartbeat,
+ "account_id": GLOBAL_STATE.acc.account_id if GLOBAL_STATE.acc else "Unknown"
+ }
+
+@app.get("/api/positions")
+def get_positions():
+ real_pos_list = []
+ virtual_pos_map = {}
+
+ if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc and GLOBAL_STATE.callback and GLOBAL_STATE.callback.is_connected:
+ try:
+ positions = GLOBAL_STATE.xt_trader.query_stock_positions(GLOBAL_STATE.acc)
+ if positions:
+ for p in positions:
+ if p.volume > 0:
+ real_pos_list.append({
+ "code": p.stock_code,
+ "volume": p.volume,
+ "can_use": p.can_use_volume,
+ "market_value": p.market_value
+ })
+ except: pass
+
+ if GLOBAL_STATE.config and GLOBAL_STATE.pos_manager:
+ for s in GLOBAL_STATE.config.get('strategies', []):
+ v_data = GLOBAL_STATE.pos_manager.get_all_virtual_positions(s)
+ virtual_pos_map[s] = v_data
+
+ return {
+ "real_positions": real_pos_list,
+ "virtual_positions": virtual_pos_map
+ }
+
+@app.get("/api/logs")
+def get_logs(lines: int = 50):
+ today_str = datetime.date.today().strftime('%Y-%m-%d')
+ log_path = os.path.join("logs", f"{today_str}.log")
+ if not os.path.exists(log_path):
+ return {"logs": ["暂无今日日志"]}
+ try:
+ with open(log_path, 'r', encoding='utf-8') as f:
+ all_lines = f.readlines()
+ return {"logs": [line.strip() for line in all_lines[-lines:]]}
+ except Exception as e:
+ return {"logs": [f"读取失败: {str(e)}"]}
+
+# ================= 9. 启动入口 =================
+if __name__ == '__main__':
+ # 使用 -u 参数运行是最佳实践: python -u main.py
+ # 但这里也在代码里强制 flush 了
+ print(">>> 系统正在启动...")
+
+ t = threading.Thread(target=trading_loop, daemon=True)
+ t.start()
+
+ print("Web服务启动: http://localhost:8001")
+ uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning")
\ No newline at end of file
diff --git a/qmt/start.bat b/qmt/start.bat
new file mode 100644
index 0000000..1d019df
--- /dev/null
+++ b/qmt/start.bat
@@ -0,0 +1,75 @@
+@echo off
+setlocal enabledelayedexpansion
+
+:: ================= =================
+:: Ŀ¼ (ȷ·ȷ)
+set "WORK_DIR=C:\Data\Project\NewStock\qmt"
+:: Pythonű
+set "SCRIPT_NAME=qmt_trader.py"
+:: Դ
+set MAX_RETRIES=5
+:: Լ
+set RETRY_COUNT=0
+:: Եȴʱ()
+set RETRY_WAIT=10
+:: ־Ŀ¼
+set "LOG_DIR=%WORK_DIR%\logs\launcher"
+:: ===========================================
+
+:: 1. лĿ¼
+cd /d "%WORK_DIR%"
+title QMT ʵػϵͳ [Port:8001]
+
+:: 2. ־Ŀ¼
+if not exist "%LOG_DIR%" mkdir "%LOG_DIR%"
+
+:: ȡΪ־ļ (ڴ䳣Windowsʽ)
+set "TODAY=%date:~0,4%-%date:~5,2%-%date:~8,2%"
+set "LOG_FILE=%LOG_DIR%\%TODAY%.log"
+
+echo ==================================================
+echo QMT ʵ̽ϵͳ
+echo ʱ: %time%
+echo ־: %LOG_FILE%
+echo : http://localhost:8001
+echo ==================================================
+
+:LOOP
+echo.
+echo [%time%] ӽ...
+echo [%time%] ӽ... >> "%LOG_FILE%"
+
+:: 3. Python ű
+:: ʹ uv run 2>&1 Ҳд־
+uv run %SCRIPT_NAME% >> "%LOG_FILE%" 2>&1
+
+:: 4. ˳
+set EXIT_CODE=%errorlevel%
+echo [%time%] 쳣˳: %EXIT_CODE% >> "%LOG_FILE%"
+echo : ˳ (Code: %EXIT_CODE%)
+
+:: 5.
+if %RETRY_COUNT% GEQ %MAX_RETRIES% (
+ echo [%time%] ﵽԴϵͳֹͣ >> "%LOG_FILE%"
+ :: ʾ
+ msg * "QMT ϵͳѱԶָ־"
+ goto FAIL
+)
+
+set /a RETRY_COUNT+=1
+echo [%time%] ȴ %RETRY_WAIT% е %RETRY_COUNT% ... >> "%LOG_FILE%"
+echo ڵȴ (%RETRY_COUNT%/%MAX_RETRIES%)...
+timeout /t %RETRY_WAIT% >nul
+
+goto LOOP
+
+:FAIL
+title QMT ʵػϵͳ [ѱ]
+color 4F
+echo.
+echo ==========================================
+echo ϵͳֹͣ
+echo ־ļ: %LOG_FILE%
+echo ==========================================
+pause
+exit /b 1
\ No newline at end of file