diff --git a/qmt/dashboard.html b/qmt/dashboard.html new file mode 100644 index 0000000..49b14b0 --- /dev/null +++ b/qmt/dashboard.html @@ -0,0 +1,258 @@ + + + + + + QMT 实盘监控看板 + + + + + + + +
+ + + + + + {{ status.account_id || '---' }} + {{ status.start_time || '---' }} + + + {{ status.last_loop_update || '---' }} + + + + 手动刷新 + +
+ + + {{ tradingStatusText }} + +
+
+
+
+
+ + + + + + + + + + + + + + + + + + +
+ 暂无策略数据 / Redis未连接 +
+
+
{{ strategyName }}
+ + + + + + +
+
+
+
+ + + + + +
+
{{ line }}
+
+
+
+
+
+
+
+ + + + \ No newline at end of file diff --git a/qmt/heartbeat.txt b/qmt/heartbeat.txt new file mode 100644 index 0000000..0d646fb --- /dev/null +++ b/qmt/heartbeat.txt @@ -0,0 +1 @@ +2025-12-19 14:11:10 \ No newline at end of file diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index bcca240..f388bdc 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,5 +1,5 @@ # coding:utf-8 -import time, datetime, traceback, sys, json, os +import time, datetime, traceback, sys, json, os, threading import logging import redis from xtquant import xtdata @@ -7,27 +7,46 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# 全局变量 -CURRENT_LOG_DATE = None -CONFIG = {} -ORDER_CACHE = {} +# FastAPI 相关 +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse +import uvicorn -# ================= 1. 日志系统 (按日期直写) ================= +# ================= 0. Windows 防卡死补丁 ================= +try: + import ctypes + kernel32 = ctypes.windll.kernel32 + # 禁用快速编辑模式 (0x0040) + kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 128) +except: + pass + +# ================= 1. 全局状态管理 ================= +class SystemState: + def __init__(self): + self.xt_trader = None + self.acc = None + self.pos_manager = None + self.callback = None + self.is_running = True + self.start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.last_heartbeat = "Initializing..." + self.config = {} + +GLOBAL_STATE = SystemState() +CURRENT_LOG_DATE = None +ORDER_CACHE = {} # 内存缓存: OrderID -> (Strategy, Code, Action) + +# ================= 2. 增强型日志系统 ================= def setup_logger(): - """ - 配置日志系统: - 每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log - """ global CURRENT_LOG_DATE - log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) - # 获取今日日期 today_str = datetime.date.today().strftime('%Y-%m-%d') CURRENT_LOG_DATE = today_str - log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") @@ -41,8 +60,9 @@ def setup_logger(): logger.removeHandler(handler) except: pass + # 格式中增加 线程名,方便排查是 API 线程还是 交易线程 formatter = logging.Formatter( - '[%(asctime)s] [%(levelname)s] %(message)s', + '[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) @@ -50,19 +70,18 @@ def setup_logger(): file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) - # 控制台输出 + # 控制台输出 (强制刷新流,防止命令行卡住不显示) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) + stream_handler.flush = sys.stdout.flush logger.addHandler(file_handler) logger.addHandler(stream_handler) - return logger -# 初始化日志 logger = setup_logger() -# ================= 2. 配置加载 ================= +# ================= 3. 配置加载 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) @@ -81,40 +100,65 @@ def load_config(config_file='config.json'): logger.error(f"配置文件错误: {e}") sys.exit(1) -# ================= 3. 业务逻辑类 ================= +# ================= 4. 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): self.r = r_client + def _get_key(self, strategy_name): return f"POS:{strategy_name}" + def mark_holding(self, strategy_name, code): - """乐观占位""" self.r.hsetnx(self._get_key(strategy_name), code, 0) + def rollback_holding(self, strategy_name, code): - """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") + def update_actual_volume(self, strategy_name, code, delta_vol): - """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: self.r.hdel(key, code) new_vol = 0 return new_vol + def get_position(self, strategy_name, code): vol = self.r.hget(self._get_key(strategy_name), code) return int(vol) if vol else 0 + def get_holding_count(self, strategy_name): return self.r.hlen(self._get_key(strategy_name)) + def get_all_virtual_positions(self, strategy_name): return self.r.hgetall(self._get_key(strategy_name)) + def force_delete(self, strategy_name, code): self.r.hdel(self._get_key(strategy_name), code) + def clean_stale_placeholders(self, strategy_name, xt_trader, acc): + try: + key = self._get_key(strategy_name) + all_pos = self.r.hgetall(key) + if not all_pos: return + + active_orders = xt_trader.query_stock_orders(acc, cancelable_only=True) + active_codes = [o.stock_code for o in active_orders] if active_orders else [] + + real_positions = xt_trader.query_stock_positions(acc) + real_holdings = [p.stock_code for p in real_positions if p.volume > 0] if real_positions else [] + + for code, vol_str in all_pos.items(): + if int(vol_str) == 0: + if (code not in real_holdings) and (code not in active_codes): + self.r.hdel(key, code) + logger.warning(f"[{strategy_name}] 自动清理僵尸占位: {code}") + except Exception as e: + logger.error(f"清理僵尸占位异常: {e}") + class DailySettlement: def __init__(self, xt_trader, acc, pos_mgr, strategies): self.trader = xt_trader @@ -122,10 +166,10 @@ class DailySettlement: self.pos_mgr = pos_mgr self.strategies = strategies self.has_settled = False + def run_settlement(self): logger.info("="*40) logger.info("执行收盘清算流程...") - # 1. 撤单 try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: @@ -133,25 +177,28 @@ class DailySettlement: self.trader.cancel_order_stock(self.acc, o.order_id) time.sleep(2) except: pass - # 2. 获取实盘 + real_positions = self.trader.query_stock_positions(self.acc) - real_pos_map = {} - if real_positions: - for p in real_positions: - if p.volume > 0: real_pos_map[p.stock_code] = p.volume - # 3. 校准Redis + real_pos_map = {p.stock_code: p.volume for p in real_positions if p.volume > 0} if real_positions else {} + for strategy in self.strategies: virtual = self.pos_mgr.get_all_virtual_positions(strategy) - for code, v in virtual.items(): + for code, v_str in virtual.items(): + v = int(v_str) if code not in real_pos_map: - logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放") + logger.warning(f" [修正] {strategy} 幽灵持仓 {code} (Redis={v}) -> 强制释放") self.pos_mgr.force_delete(strategy, code) + elif v == 0 and code in real_pos_map: + real_vol = real_pos_map[code] + self.pos_mgr.update_actual_volume(strategy, code, real_vol) + logger.info(f" [修正] {strategy} 修正占位符 {code} 0 -> {real_vol}") + logger.info("清算完成") self.has_settled = True + def reset_flag(self): self.has_settled = False -# ================= 4. 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr @@ -164,110 +211,158 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): cache_info = ORDER_CACHE.get(trade.order_id) if not cache_info: return strategy, _, action = cache_info - logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) except: traceback.print_exc() def on_order_error(self, err): try: - logger.error(f"下单失败: {err.error_msg}") + logger.error(f"下单失败回调: {err.error_msg} OrderID:{err.order_id}") cache = ORDER_CACHE.get(err.order_id) if cache and cache[2] == 'BUY': self.pos_mgr.rollback_holding(cache[0], cache[1]) del ORDER_CACHE[err.order_id] except: pass -# ================= 5. 核心消息处理 ================= +# ================= 5. 核心消息处理 (重写版:拒绝静默失败) ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" + + # 1. 获取消息 msg_json = r_client.lpop(queue_key) - if not msg_json: return + if not msg_json: + return + # 2. 存入历史并解析 (打印原始消息,确保知道收到了什么) + logger.info(f"-------- 处理消息 [{strategy_name}] --------") + logger.info(f"收到原始消息: {msg_json}") + try: - # 归档 r_client.rpush(f"{queue_key}:history", msg_json) - data = json.loads(msg_json) - # 1. 过滤回测 - if data.get('is_backtest'): return - - # 2. 校验日期 (只处理当日消息) - msg_ts = data.get('timestamp') - today_str = datetime.date.today().strftime('%Y-%m-%d') - if msg_ts: - msg_date = msg_ts.split(' ')[0] - if msg_date != today_str: - logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}") - return - else: - logger.warning(f"[{strategy_name}] 消息无时间戳,忽略") + try: + data = json.loads(msg_json) + except json.JSONDecodeError: + logger.error("JSON 解析失败,跳过消息") return - stock_code = data['stock_code'] - action = data['action'] - price = float(data['price']) + # 3. 基础校验 (每一步失败都必须打印 Log) + if data.get('is_backtest'): + logger.warning(f"检测到回测标记 is_backtest=True,忽略此消息") + return + + msg_ts = data.get('timestamp') + if not msg_ts: + logger.warning(f"消息缺失时间戳 timestamp,忽略") + return + + today_str = datetime.date.today().strftime('%Y-%m-%d') + msg_date = msg_ts.split(' ')[0] + if msg_date != today_str: + logger.warning(f"消息日期过期: {msg_date} != 今日 {today_str},忽略") + return + + # 4. 提取关键字段 + stock_code = data.get('stock_code') + action = data.get('action') + price = float(data.get('price', 0)) total_slots = int(data.get('total_slots', 1)) + if not stock_code or not action: + logger.error(f"缺少关键字段: Code={stock_code}, Action={action}") + return + + logger.info(f"解析成功: {action} {stock_code} @ {price}, 目标槽位: {total_slots}") + + # 5. QMT 存活检查 + if xt_trader is None or acc is None: + logger.error("严重错误: QMT 对象未初始化 (xt_trader is None)") + return + + # 6. 买入逻辑 if action == 'BUY': - # 槽位检查 holding = pos_manager.get_holding_count(strategy_name) empty = total_slots - holding + + logger.info(f"检查持仓: 当前占用 {holding} / 总槽位 {total_slots} -> 剩余 {empty}") + if empty <= 0: - logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})") + logger.warning(f"拦截买入: 槽位已满,不执行下单") return + # 查询资金 asset = xt_trader.query_stock_asset(acc) if not asset: - logger.error("无法查询资产,QMT可能未就绪") + logger.error("API 错误: query_stock_asset 返回 None,可能是 QMT 断连或未同步") return - # 金额计算 + logger.info(f"当前可用资金: {asset.cash:.2f}") + amt = asset.cash / empty if amt < 2000: - logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})") + logger.warning(f"拦截买入: 单笔金额过小 ({amt:.2f} < 2000)") return - if price <= 0: price = 1.0 - vol = int(amt / price / 100) * 100 - - if vol >= 100: - oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') - if oid != -1: - logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}") - ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') - pos_manager.mark_holding(strategy_name, stock_code) - else: - logger.error(f"[{strategy_name}] 下单被拒绝 (-1)") - else: - logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})") + if price <= 0: + logger.warning(f"价格异常: {price},强制设为1.0以计算股数(仅测试用)") + price = 1.0 + vol = int(amt / price / 100) * 100 + logger.info(f"计算股数: 资金{amt:.2f} / 价格{price} -> {vol}股") + + if vol < 100: + logger.warning(f"拦截买入: 股数不足 100 ({vol})") + return + + # 执行下单 + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') + + if oid != -1: + logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 买入 {vol}") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') + pos_manager.mark_holding(strategy_name, stock_code) + else: + logger.error(f"XXX 下单请求被拒绝 (Result=-1),请检查 QMT 终端报错") + + # 7. 卖出逻辑 elif action == 'SELL': v_vol = pos_manager.get_position(strategy_name, stock_code) + logger.info(f"Redis 记录持仓: {v_vol}") + if v_vol > 0: real_pos = xt_trader.query_stock_positions(acc) + if real_pos is None: + logger.error("API 错误: query_stock_positions 返回 None") + return + rp = next((p for p in real_pos if p.stock_code==stock_code), None) can_use = rp.can_use_volume if rp else 0 - + logger.info(f"实盘可用持仓: {can_use}") + final = min(v_vol, can_use) if final > 0: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if oid != -1: - logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}") + logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 卖出 {final}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') + else: + logger.error(f"XXX 下单请求被拒绝 (Result=-1)") else: - logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}") + logger.warning(f"拦截卖出: 最终计算卖出量为 0 (虚拟:{v_vol}, 实盘:{can_use})") else: - logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") - - except Exception: - logger.error("消息处理异常", exc_info=True) + logger.warning(f"拦截卖出: Redis 中无此持仓记录,忽略") + + else: + logger.error(f"未知的 Action: {action}") + except Exception as e: + logger.error(f"消息处理发生未捕获异常: {str(e)}", exc_info=True) # ================= 6. QMT初始化 ================= def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): try: session_id = int(time.time()) + logger.info(f"正在连接 QMT (Path: {qmt_path})...") trader = XtQuantTrader(qmt_path, session_id) acc = StockAccount(account_id, account_type) callback = MyXtQuantTraderCallback(pos_manager) @@ -280,127 +375,229 @@ def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): callback.is_connected = True return trader, acc, callback else: - logger.error(f"QMT 连接失败 Code:{res}") + logger.error(f"QMT 连接失败 Code:{res} (请检查 QMT 是否登录且路径正确)") return None, None, None except Exception as e: - logger.error(f"初始化异常: {e}") + logger.error(f"初始化异常: {e}", exc_info=True) return None, None, None -# ================= 7. 主程序 ================= -if __name__ == '__main__': - logger.info(">>> 系统启动 (实盘生产模式) <<<") +# ================= 7. 交易逻辑主循环 ================= +def trading_loop(): + global logger + threading.current_thread().name = "TradeThread" + logger.info(">>> 交易逻辑子线程启动 <<<") - # 加载配置 - CONFIG = load_config('config.json') + GLOBAL_STATE.config = load_config('config.json') + CONFIG = GLOBAL_STATE.config redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] - # 连接Redis try: r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() pos_manager = PositionManager(r) + GLOBAL_STATE.pos_manager = pos_manager + logger.info("Redis 连接成功") except Exception as e: logger.critical(f"Redis 连接失败: {e}") - sys.exit(1) + return - # 初次连接 QMT + # 初始化 xt_trader, acc, callback = init_qmt_trader( qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager ) + GLOBAL_STATE.xt_trader = xt_trader + GLOBAL_STATE.acc = acc + GLOBAL_STATE.callback = callback settler = None if xt_trader: settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + for s in watch_list: + pos_manager.clean_stale_placeholders(s, xt_trader, acc) - logger.info(">>> 进入主循环监听 <<<") + logger.info(">>> 进入主轮询循环 <<<") + + last_health_check = 0 # 上次深度检查时间 - while True: + while GLOBAL_STATE.is_running: try: - # --- 1. 日志跨天处理 --- + # 1. 基础心跳更新 + GLOBAL_STATE.last_heartbeat = datetime.datetime.now().strftime('%H:%M:%S') + + # 2. 状态诊断与自动修复 (关键修改!!!) + # 每 15 秒执行一次“深度探测”,而不是每一轮都看 callback + if time.time() - last_health_check > 15: + last_health_check = time.time() + + is_alive_physically = False + + # 尝试通过“查资产”来验证连接是否真的活着 + if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc: + try: + asset = GLOBAL_STATE.xt_trader.query_stock_asset(GLOBAL_STATE.acc) + if asset: + is_alive_physically = True + # 【核心修复】:如果物理探测成功,强行修正 callback 状态 + if GLOBAL_STATE.callback and not GLOBAL_STATE.callback.is_connected: + GLOBAL_STATE.callback.is_connected = True + logger.info("✅ [自愈] 检测到资产查询正常,修正伪造的断开状态 (False -> True)") + except: + pass + + # 只有当 逻辑断开(callback) AND 物理断开(无法查资产) 时,才判定为断线 + current_status = GLOBAL_STATE.callback.is_connected if GLOBAL_STATE.callback else False + + # 减少日志刷屏:只有状态真的异常时才打印 + if not current_status and not is_alive_physically: + logger.warning(f"⚠️ 线程存活检查 | 逻辑状态:{current_status} | 物理探测:失败") + + # 3. 断线重连逻辑 + # 只有“物理探测”彻底失败了,才执行重连 + if not is_alive_physically: + # 避让 QMT 夜间重启高峰期 (23:20 - 23:35) + # 避免在这段时间疯狂重连打印日志 + now_hm = datetime.datetime.now().strftime('%H%M') + if '2320' <= now_hm <= '2335': + logger.info("⏳ QMT维护时段,暂停重连,休眠60秒...") + time.sleep(60) + continue + + if datetime.date.today().weekday() >= 5: # 周末 + time.sleep(3600) + continue + + logger.warning("🚫 确认连接丢失,执行重连...") + if GLOBAL_STATE.xt_trader: + try: GLOBAL_STATE.xt_trader.stop() + except: pass + + new_trader, new_acc, new_cb = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + if new_trader: + GLOBAL_STATE.xt_trader = new_trader + GLOBAL_STATE.acc = new_acc + GLOBAL_STATE.callback = new_cb + settler = DailySettlement(new_trader, new_acc, pos_manager, watch_list) + logger.info("✅ 重连成功") + else: + logger.error("❌ 重连失败,60秒后重试") + time.sleep(60) + continue + + # 4. 日志轮转与心跳文件 today_str = datetime.date.today().strftime('%Y-%m-%d') if today_str != CURRENT_LOG_DATE: - logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...") logger = setup_logger() - # --- 2. 断线重连 (仅工作日) --- - need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) - if need_reconnect: - # 检查是否为周末 (5=周六, 6=周日) - if datetime.date.today().weekday() >= 5: - logger.info("当前是周末,暂停重连,休眠1小时") - time.sleep(3600) - continue + try: + with open("heartbeat.txt", "w") as f: + f.write(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + except: pass - logger.warning("连接丢失,尝试重连...") - if xt_trader: - try: xt_trader.stop() - except: pass - - xt_trader, acc, callback = init_qmt_trader( - qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager - ) - - if xt_trader: - settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - logger.info("重连成功") - else: - logger.error("重连失败,60秒后重试") - time.sleep(60) - continue - - # --- 3. 交易时段轮询逻辑 --- - now = datetime.datetime.now() - current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS + # 5. 交易逻辑处理 + current_time_str = datetime.datetime.now().strftime('%H%M%S') + is_trading_time = ('091500' <= current_time_str <= '113000') or ('130000' <= current_time_str <= '150000') - # 默认休眠 60秒 - sleep_sec = 60 - is_trading_time = False - - # 判断时段 - # 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒 - if '091500' <= current_time_str <= '100000': - sleep_sec = 1 - is_trading_time = True - - # 10:00:01 - 11:30:00 -> 低频 60秒 - elif '100000' < current_time_str <= '113000': - sleep_sec = 60 - is_trading_time = True - - # 13:00:00 - 14:50:00 -> 低频 60秒 - elif '130000' <= current_time_str < '145000': - sleep_sec = 60 - is_trading_time = True - - # 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒 - elif '145000' <= current_time_str <= '150000': - sleep_sec = 1 - is_trading_time = True - - # --- 4. 执行业务 --- - if is_trading_time: - # 每日重置清算标记 + # 如果连接正常(无论 callback 怎么说,只要上面探测过了,xt_trader 就是可用的) + if is_trading_time and GLOBAL_STATE.xt_trader: if settler and settler.has_settled: settler.reset_flag() - - # 处理信号 for s in watch_list: - process_strategy_queue(s, r, xt_trader, acc, pos_manager) - - # --- 5. 收盘清算 (15:05 - 15:10) --- + process_strategy_queue(s, r, GLOBAL_STATE.xt_trader, GLOBAL_STATE.acc, pos_manager) + elif '150500' <= current_time_str <= '151000': if settler and not settler.has_settled: settler.run_settlement() - sleep_sec = 60 - # 执行休眠 - time.sleep(sleep_sec) + time.sleep(1 if is_trading_time else 5) - except KeyboardInterrupt: - logger.info("用户停止") - break except Exception as e: - logger.critical("主循环未捕获异常", exc_info=True) - time.sleep(10) \ No newline at end of file + logger.critical("交易循环异常", exc_info=True) + time.sleep(10) + +# ================= 8. FastAPI 接口 ================= +app = FastAPI(title="QMT Monitor") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/") +async def read_root(): + if os.path.exists("dashboard.html"): + return FileResponse("dashboard.html") + return {"error": "Dashboard not found"} + +@app.get("/api/status") +def get_status(): + connected = False + if GLOBAL_STATE.callback: + connected = GLOBAL_STATE.callback.is_connected + return { + "running": True, + "qmt_connected": connected, + "start_time": GLOBAL_STATE.start_time, + "last_loop_update": GLOBAL_STATE.last_heartbeat, + "account_id": GLOBAL_STATE.acc.account_id if GLOBAL_STATE.acc else "Unknown" + } + +@app.get("/api/positions") +def get_positions(): + real_pos_list = [] + virtual_pos_map = {} + + if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc and GLOBAL_STATE.callback and GLOBAL_STATE.callback.is_connected: + try: + positions = GLOBAL_STATE.xt_trader.query_stock_positions(GLOBAL_STATE.acc) + if positions: + for p in positions: + if p.volume > 0: + real_pos_list.append({ + "code": p.stock_code, + "volume": p.volume, + "can_use": p.can_use_volume, + "market_value": p.market_value + }) + except: pass + + if GLOBAL_STATE.config and GLOBAL_STATE.pos_manager: + for s in GLOBAL_STATE.config.get('strategies', []): + v_data = GLOBAL_STATE.pos_manager.get_all_virtual_positions(s) + virtual_pos_map[s] = v_data + + return { + "real_positions": real_pos_list, + "virtual_positions": virtual_pos_map + } + +@app.get("/api/logs") +def get_logs(lines: int = 50): + today_str = datetime.date.today().strftime('%Y-%m-%d') + log_path = os.path.join("logs", f"{today_str}.log") + if not os.path.exists(log_path): + return {"logs": ["暂无今日日志"]} + try: + with open(log_path, 'r', encoding='utf-8') as f: + all_lines = f.readlines() + return {"logs": [line.strip() for line in all_lines[-lines:]]} + except Exception as e: + return {"logs": [f"读取失败: {str(e)}"]} + +# ================= 9. 启动入口 ================= +if __name__ == '__main__': + # 使用 -u 参数运行是最佳实践: python -u main.py + # 但这里也在代码里强制 flush 了 + print(">>> 系统正在启动...") + + t = threading.Thread(target=trading_loop, daemon=True) + t.start() + + print("Web服务启动: http://localhost:8001") + uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning") \ No newline at end of file diff --git a/qmt/start.bat b/qmt/start.bat new file mode 100644 index 0000000..1d019df --- /dev/null +++ b/qmt/start.bat @@ -0,0 +1,75 @@ +@echo off +setlocal enabledelayedexpansion + +:: ================= ================= +:: Ŀ¼ (ȷ·ȷ) +set "WORK_DIR=C:\Data\Project\NewStock\qmt" +:: Pythonű +set "SCRIPT_NAME=qmt_trader.py" +:: Դ +set MAX_RETRIES=5 +:: Լ +set RETRY_COUNT=0 +:: Եȴʱ() +set RETRY_WAIT=10 +:: ־Ŀ¼ +set "LOG_DIR=%WORK_DIR%\logs\launcher" +:: =========================================== + +:: 1. лĿ¼ +cd /d "%WORK_DIR%" +title QMT ʵػϵͳ [Port:8001] + +:: 2. ־Ŀ¼ +if not exist "%LOG_DIR%" mkdir "%LOG_DIR%" + +:: ȡΪ־ļ (򵥵ڴ䳣Windowsʽ) +set "TODAY=%date:~0,4%-%date:~5,2%-%date:~8,2%" +set "LOG_FILE=%LOG_DIR%\%TODAY%.log" + +echo ================================================== +echo QMT ʵ̽ϵͳ +echo ʱ: %time% +echo ־: %LOG_FILE% +echo : http://localhost:8001 +echo ================================================== + +:LOOP +echo. +echo [%time%] ӽ... +echo [%time%] ӽ... >> "%LOG_FILE%" + +:: 3. Python ű +:: ʹ uv run 2>&1 Ҳд־ +uv run %SCRIPT_NAME% >> "%LOG_FILE%" 2>&1 + +:: 4. ˳ +set EXIT_CODE=%errorlevel% +echo [%time%] 쳣˳: %EXIT_CODE% >> "%LOG_FILE%" +echo : ˳ (Code: %EXIT_CODE%) + +:: 5. ߼ +if %RETRY_COUNT% GEQ %MAX_RETRIES% ( + echo [%time%] ﵽԴϵͳֹͣ >> "%LOG_FILE%" + :: ʾ + msg * "QMT ϵͳѱ޷Զָ־" + goto FAIL +) + +set /a RETRY_COUNT+=1 +echo [%time%] ȴ %RETRY_WAIT% е %RETRY_COUNT% ... >> "%LOG_FILE%" +echo ڵȴ (%RETRY_COUNT%/%MAX_RETRIES%)... +timeout /t %RETRY_WAIT% >nul + +goto LOOP + +:FAIL +title QMT ʵػϵͳ [ѱ] +color 4F +echo. +echo ========================================== +echo ϵͳֹͣ +echo ־ļ: %LOG_FILE% +echo ========================================== +pause +exit /b 1 \ No newline at end of file