# coding:utf-8 import time, datetime, traceback, sys, json, os, threading import logging from typing import Optional import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant # FastAPI 相关 from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse import uvicorn # ================= 0. Windows 防卡死补丁 ================= try: import ctypes kernel32 = ctypes.windll.kernel32 # 禁用快速编辑模式 (0x0040) kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 128) except: pass # ================= 1. 全局状态管理 ================= class SystemState: def __init__(self): self.xt_trader = None self.acc = None self.pos_manager = None self.callback = None self.is_running = True self.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.last_heartbeat = "Initializing..." self.config = {} # 重连控制 self.reconnect_attempts: int = 0 # 累计重连次数 self.max_reconnect_attempts: int = 3 # 最大重连次数 self.last_reconnect_fail_time: Optional[float] = None # 上次重连失败时间 GLOBAL_STATE = SystemState() CURRENT_LOG_DATE = None ORDER_CACHE = {} # 内存缓存: OrderID -> (Strategy, Code, Action) # ================= 2. 增强型日志系统 ================= def setup_logger(): global CURRENT_LOG_DATE log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) today_str = datetime.date.today().strftime("%Y-%m-%d") CURRENT_LOG_DATE = today_str log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) # 清除旧 handler if logger.handlers: for handler in logger.handlers[:]: try: handler.close() logger.removeHandler(handler) except: pass # 格式中增加 线程名,方便排查是 API 线程还是 交易线程 formatter = logging.Formatter( "[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # 文件输出 file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8") file_handler.setFormatter(formatter) # 控制台输出 (强制刷新流,防止命令行卡住不显示) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) stream_handler.flush = sys.stdout.flush logger.addHandler(file_handler) logger.addHandler(stream_handler) return logger logger = setup_logger() # ================= 3. 配置加载 ================= def load_config(config_file="config.json"): if getattr(sys, "frozen", False): base_path = os.path.dirname(sys.executable) else: base_path = os.path.dirname(os.path.abspath(__file__)) full_path = os.path.join(base_path, config_file) if not os.path.exists(full_path): if os.path.exists(config_file): full_path = config_file else: logger.error(f"找不到配置文件: {full_path}") sys.exit(1) try: with open(full_path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.error(f"配置文件错误: {e}") sys.exit(1) # ================= 4. 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): self.r = r_client def _get_key(self, strategy_name): return f"POS:{strategy_name}" def mark_holding(self, strategy_name, code): self.r.hsetnx(self._get_key(strategy_name), code, 0) def rollback_holding(self, strategy_name, code): key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: self.r.hdel(key, code) new_vol = 0 return new_vol def get_position(self, strategy_name, code): vol = self.r.hget(self._get_key(strategy_name), code) return int(vol) if vol else 0 def get_holding_count(self, strategy_name): return self.r.hlen(self._get_key(strategy_name)) def get_all_virtual_positions(self, strategy_name): return self.r.hgetall(self._get_key(strategy_name)) def force_delete(self, strategy_name, code): self.r.hdel(self._get_key(strategy_name), code) def clean_stale_placeholders(self, strategy_name, xt_trader, acc): try: key = self._get_key(strategy_name) all_pos = self.r.hgetall(key) if not all_pos: return active_orders = xt_trader.query_stock_orders(acc, cancelable_only=True) active_codes = ( [o.stock_code for o in active_orders] if active_orders else [] ) real_positions = xt_trader.query_stock_positions(acc) real_holdings = ( [p.stock_code for p in real_positions if p.volume > 0] if real_positions else [] ) for code, vol_str in all_pos.items(): if int(vol_str) == 0: if (code not in real_holdings) and (code not in active_codes): self.r.hdel(key, code) logger.warning(f"[{strategy_name}] 自动清理僵尸占位: {code}") except Exception as e: logger.error(f"清理僵尸占位异常: {e}") class DailySettlement: def __init__(self, xt_trader, acc, pos_mgr, strategies): self.trader = xt_trader self.acc = acc self.pos_mgr = pos_mgr self.strategies = strategies self.has_settled = False def run_settlement(self): logger.info("=" * 40) logger.info("执行收盘清算流程...") try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) logger.info( f"收盘清算 - 查询可撤单订单: 获取到 {len(orders) if orders else 0} 个订单" ) if orders: for o in orders: logger.info( f"收盘清算 - 撤单: OrderID={o.order_id}, Stock={o.stock_code}" ) self.trader.cancel_order_stock(self.acc, o.order_id) time.sleep(2) logger.info(f"收盘清算 - 完成撤单操作,共处理 {len(orders)} 个订单") else: logger.info("收盘清算 - 无待撤单订单") except Exception as e: logger.error(f"收盘清算 - 查询/撤单失败: {str(e)}", exc_info=True) real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = ( {p.stock_code: p.volume for p in real_positions if p.volume > 0} if real_positions else {} ) for strategy in self.strategies: virtual = self.pos_mgr.get_all_virtual_positions(strategy) for code, v_str in virtual.items(): v = int(v_str) if code not in real_pos_map: logger.warning( f" [修正] {strategy} 幽灵持仓 {code} (Redis={v}) -> 强制释放" ) self.pos_mgr.force_delete(strategy, code) elif v == 0 and code in real_pos_map: real_vol = real_pos_map[code] self.pos_mgr.update_actual_volume(strategy, code, real_vol) logger.info( f" [修正] {strategy} 修正占位符 {code} 0 -> {real_vol}" ) logger.info("清算完成") self.has_settled = True def reset_flag(self): self.has_settled = False class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr self.is_connected = False def on_disconnected(self): logger.warning(">> 回调通知: 交易端连接断开") self.is_connected = False def on_stock_trade(self, trade): try: cache_info = ORDER_CACHE.get(trade.order_id) if not cache_info: return strategy, _, action = cache_info logger.info( f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}" ) if action == "BUY": self.pos_mgr.update_actual_volume( strategy, trade.stock_code, trade.traded_volume ) elif action == "SELL": self.pos_mgr.update_actual_volume( strategy, trade.stock_code, -trade.traded_volume ) except Exception as e: logger.error(f"on_stock_trade 成交回调处理失败: {str(e)}", exc_info=True) def on_order_error(self, err): try: logger.error( f"下单失败回调: OrderID={err.order_id}, 错误信息={err.error_msg}" ) cache = ORDER_CACHE.get(err.order_id) if cache and cache[2] == "BUY": logger.info(f"回滚持仓: Strategy={cache[0]}, Stock={cache[1]}") self.pos_mgr.rollback_holding(cache[0], cache[1]) del ORDER_CACHE[err.order_id] except Exception as e: logger.error(f"on_order_error 错误回调处理失败: {str(e)}", exc_info=True) # ================= 5. 核心消息处理 (重写版:拒绝静默失败) ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" # 1. 获取消息 msg_json = r_client.lpop(queue_key) if not msg_json: return # 2. 存入历史并解析 (打印原始消息,确保知道收到了什么) logger.info(f"-------- 处理消息 [{strategy_name}] --------") logger.info(f"收到原始消息: {msg_json}") try: r_client.rpush(f"{queue_key}:history", msg_json) try: data = json.loads(msg_json) except json.JSONDecodeError: logger.error("JSON 解析失败,跳过消息") return # 3. 基础校验 (每一步失败都必须打印 Log) if data.get("is_backtest"): logger.warning(f"检测到回测标记 is_backtest=True,忽略此消息") return msg_ts = data.get("timestamp") if not msg_ts: logger.warning(f"消息缺失时间戳 timestamp,忽略") return today_str = datetime.date.today().strftime("%Y-%m-%d") msg_date = msg_ts.split(" ")[0] if msg_date != today_str: logger.warning(f"消息日期过期: {msg_date} != 今日 {today_str},忽略") return # 4. 提取关键字段 stock_code = data.get("stock_code") action = data.get("action") price = float(data.get("price", 0)) total_slots = int(data.get("total_slots", 1)) if not stock_code or not action: logger.error(f"缺少关键字段: Code={stock_code}, Action={action}") return logger.info( f"解析成功: {action} {stock_code} @ {price}, 目标槽位: {total_slots}" ) # 5. QMT 存活检查 if xt_trader is None or acc is None: logger.error("严重错误: QMT 对象未初始化 (xt_trader is None)") return # 6. 买入逻辑 if action == "BUY": holding = pos_manager.get_holding_count(strategy_name) empty = total_slots - holding logger.info( f"检查持仓: 当前占用 {holding} / 总槽位 {total_slots} -> 剩余 {empty}" ) if empty <= 0: logger.warning(f"拦截买入: 槽位已满,不执行下单") return # 查询资金 asset = xt_trader.query_stock_asset(acc) if not asset: logger.error( "API 错误: query_stock_asset 返回 None,可能是 QMT 断连或未同步" ) return logger.info(f"当前可用资金: {asset.cash:.2f}") amt = asset.cash / empty if amt < 2000: logger.warning(f"拦截买入: 单笔金额过小 ({amt:.2f} < 2000)") return if price <= 0: logger.warning(f"价格异常: {price},强制设为1.0以计算股数(仅测试用)") price = 1.0 vol = int(amt / price / 100) * 100 logger.info(f"计算股数: 资金{amt:.2f} / 价格{price} -> {vol}股") if vol < 100: logger.warning(f"拦截买入: 股数不足 100 ({vol})") return # 执行下单 oid = xt_trader.order_stock( acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, "PyBuy", ) if oid != -1: logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 买入 {vol}") ORDER_CACHE[oid] = (strategy_name, stock_code, "BUY") pos_manager.mark_holding(strategy_name, stock_code) else: logger.error(f"XXX 下单请求被拒绝 (Result=-1),请检查 QMT 终端报错") # 7. 卖出逻辑 elif action == "SELL": v_vol = pos_manager.get_position(strategy_name, stock_code) logger.info(f"卖出 - Redis 记录虚拟持仓: {v_vol}") if v_vol > 0: logger.info(f"卖出 - 正在查询实盘持仓: {stock_code}") real_pos = xt_trader.query_stock_positions(acc) logger.info( f"卖出 - 实盘持仓查询完成,获取到 {len(real_pos) if real_pos else 0} 条记录" ) if real_pos is None: logger.error("API 错误: query_stock_positions 返回 None") return rp = next((p for p in real_pos if p.stock_code == stock_code), None) can_use = rp.can_use_volume if rp else 0 logger.info(f"卖出 - 股票 {stock_code} 实盘可用持仓: {can_use}") final = min(v_vol, can_use) logger.info(f"卖出 - 计算卖出量: min({v_vol}, {can_use}) = {final}") if final > 0: logger.info( f"卖出 - 执行卖出订单: {stock_code} @ {price}, 数量: {final}" ) oid = xt_trader.order_stock( acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, "PySell", ) if oid != -1: logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 卖出 {final}") ORDER_CACHE[oid] = (strategy_name, stock_code, "SELL") else: logger.error(f"XXX 下单请求被拒绝 (Result=-1)") else: logger.warning( f"拦截卖出: 最终计算卖出量为 0 (虚拟:{v_vol}, 实盘:{can_use})" ) else: logger.warning(f"拦截卖出: Redis 中无此持仓记录,忽略") else: logger.error(f"未知的 Action: {action}") except Exception as e: logger.error(f"消息处理发生未捕获异常: {str(e)}", exc_info=True) # ================= 6. QMT初始化 ================= def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): try: session_id = int(time.time()) logger.info(f"正在连接 QMT (Path: {qmt_path})...") trader = XtQuantTrader(qmt_path, session_id) acc = StockAccount(account_id, account_type) callback = MyXtQuantTraderCallback(pos_manager) trader.register_callback(callback) trader.start() res = trader.connect() if res == 0: logger.info(f"QMT 连接成功 [Session:{session_id}]") trader.subscribe(acc) callback.is_connected = True return trader, acc, callback else: logger.error(f"QMT 连接失败 Code:{res} (请检查 QMT 是否登录且路径正确)") return None, None, None except Exception as e: logger.error(f"初始化异常: {e}", exc_info=True) return None, None, None # ================= 7. 交易逻辑主循环 ================= def trading_loop(): global logger threading.current_thread().name = "TradeThread" logger.info(">>> 交易逻辑子线程启动 <<<") GLOBAL_STATE.config = load_config("config.json") CONFIG = GLOBAL_STATE.config redis_cfg = CONFIG["redis"] qmt_cfg = CONFIG["qmt"] watch_list = CONFIG["strategies"] try: r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() pos_manager = PositionManager(r) GLOBAL_STATE.pos_manager = pos_manager logger.info("Redis 连接成功") except Exception as e: logger.critical(f"Redis 连接失败: {e}") return # 初始化 xt_trader, acc, callback = init_qmt_trader( qmt_cfg["path"], qmt_cfg["account_id"], qmt_cfg["account_type"], pos_manager ) GLOBAL_STATE.xt_trader = xt_trader GLOBAL_STATE.acc = acc GLOBAL_STATE.callback = callback settler = None if xt_trader: settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) for s in watch_list: pos_manager.clean_stale_placeholders(s, xt_trader, acc) logger.info(">>> 进入主轮询循环 <<<") last_health_check = 0 # 上次深度检查时间 while GLOBAL_STATE.is_running: try: # 1. 基础心跳更新 GLOBAL_STATE.last_heartbeat = datetime.datetime.now().strftime("%H:%M:%S") # 2. 状态诊断与自动修复 (关键修改!!!) # 每 15 秒执行一次“深度探测”,而不是每一轮都看 callback if time.time() - last_health_check > 15: last_health_check = time.time() is_alive_physically = False # 尝试通过“查资产”来验证连接是否真的活着 if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc: try: asset = GLOBAL_STATE.xt_trader.query_stock_asset( GLOBAL_STATE.acc ) if asset: is_alive_physically = True # 【核心修复】:如果物理探测成功,强行修正 callback 状态 if ( GLOBAL_STATE.callback and not GLOBAL_STATE.callback.is_connected ): GLOBAL_STATE.callback.is_connected = True logger.info( "✅ [自愈] 检测到资产查询正常,修正伪造的断开状态 (False -> True)" ) except Exception as e: logger.warning(f"[健康检查] 资产查询失败: {str(e)}") # 只有当 逻辑断开(callback) AND 物理断开(无法查资产) 时,才判定为断线 current_status = ( GLOBAL_STATE.callback.is_connected if GLOBAL_STATE.callback else False ) # 减少日志刷屏:只有状态真的异常时才打印 if not current_status and not is_alive_physically: logger.warning( f"⚠️ 线程存活检查 | 逻辑状态:{current_status} | 物理探测:失败" ) # 3. 断线重连逻辑 # 只有"物理探测"彻底失败了,才执行重连 if not is_alive_physically: # 避让 QMT 夜间重启高峰期 (23:20 - 23:35) # 避免在这段时间疯狂重连打印日志 now_hm = datetime.datetime.now().strftime("%H%M") if "2320" <= now_hm <= "2335": logger.info("⏳ QMT维护时段,暂停重连,休眠60秒...") time.sleep(60) continue if datetime.date.today().weekday() >= 5: # 周末 time.sleep(3600) continue # 检查重连次数是否超过限制 if ( GLOBAL_STATE.reconnect_attempts >= GLOBAL_STATE.max_reconnect_attempts ): logger.warning( f"⚠️ 重连失败次数已达上限 ({GLOBAL_STATE.reconnect_attempts}/{GLOBAL_STATE.max_reconnect_attempts}),停止自动重连" ) # 如果距离上次失败超过5分钟,重置计数器 if GLOBAL_STATE.last_reconnect_fail_time: elapsed = ( time.time() - GLOBAL_STATE.last_reconnect_fail_time ) if elapsed > 300: # 5分钟 GLOBAL_STATE.reconnect_attempts = 0 logger.info( f"⏰ 重连计数器已重置 (距离上次失败 {elapsed / 60:.1f} 分钟)" ) else: logger.info(f"⏳ 需要等待 {300 - elapsed:.0f} 秒后重试") # 在重连次数超限时,仍然等待一段时间再继续循环 time.sleep(60) continue logger.warning( f"🚫 确认连接丢失,执行重连 ({GLOBAL_STATE.reconnect_attempts + 1}/{GLOBAL_STATE.max_reconnect_attempts})..." ) if GLOBAL_STATE.xt_trader: try: GLOBAL_STATE.xt_trader.stop() logger.info("已停止旧交易实例") except Exception as e: logger.error(f"停止旧交易实例失败: {str(e)}", exc_info=True) new_trader, new_acc, new_cb = init_qmt_trader( qmt_cfg["path"], qmt_cfg["account_id"], qmt_cfg["account_type"], pos_manager, ) if new_trader: GLOBAL_STATE.xt_trader = new_trader GLOBAL_STATE.acc = new_acc GLOBAL_STATE.callback = new_cb GLOBAL_STATE.reconnect_attempts = 0 # 重连成功后重置计数 GLOBAL_STATE.last_reconnect_fail_time = None settler = DailySettlement( new_trader, new_acc, pos_manager, watch_list ) logger.info("✅ 重连成功") else: GLOBAL_STATE.reconnect_attempts += 1 GLOBAL_STATE.last_reconnect_fail_time = time.time() logger.error( f"❌ 重连失败,已尝试 {GLOBAL_STATE.reconnect_attempts}/{GLOBAL_STATE.max_reconnect_attempts} 次,60秒后重试" ) time.sleep(60) continue # 4. 日志轮转与心跳文件 today_str = datetime.date.today().strftime("%Y-%m-%d") if today_str != CURRENT_LOG_DATE: logger = setup_logger() try: with open("heartbeat.txt", "w") as f: f.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) except Exception as e: logger.warning(f"[心跳] 写入心跳文件失败: {str(e)}") # 5. 交易逻辑处理 current_time_str = datetime.datetime.now().strftime("%H%M%S") is_trading_time = ("091500" <= current_time_str <= "113000") or ( "130000" <= current_time_str <= "150000" ) # 如果连接正常(无论 callback 怎么说,只要上面探测过了,xt_trader 就是可用的) if is_trading_time and GLOBAL_STATE.xt_trader: if settler and settler.has_settled: settler.reset_flag() for s in watch_list: process_strategy_queue( s, r, GLOBAL_STATE.xt_trader, GLOBAL_STATE.acc, pos_manager ) elif "150500" <= current_time_str <= "151000": if settler and not settler.has_settled: settler.run_settlement() time.sleep(1 if is_trading_time else 5) except Exception as e: logger.critical("交易循环异常", exc_info=True) time.sleep(10) # ================= 8. FastAPI 接口 ================= app = FastAPI(title="QMT Monitor") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def read_root(): if os.path.exists("dashboard.html"): return FileResponse("dashboard.html") return {"error": "Dashboard not found"} @app.get("/api/status") def get_status(): connected = False if GLOBAL_STATE.callback: connected = GLOBAL_STATE.callback.is_connected return { "running": True, "qmt_connected": connected, "start_time": GLOBAL_STATE.start_time, "last_loop_update": GLOBAL_STATE.last_heartbeat, "account_id": GLOBAL_STATE.acc.account_id if GLOBAL_STATE.acc else "Unknown", } @app.get("/api/positions") def get_positions(): real_pos_list = [] virtual_pos_map = {} if ( GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc and GLOBAL_STATE.callback and GLOBAL_STATE.callback.is_connected ): try: positions = GLOBAL_STATE.xt_trader.query_stock_positions(GLOBAL_STATE.acc) if positions: for p in positions: if p.volume > 0: real_pos_list.append( { "code": p.stock_code, "volume": p.volume, "can_use": p.can_use_volume, "market_value": p.market_value, } ) except Exception as e: logger.warning(f"[API] 查询持仓失败: {str(e)}") if GLOBAL_STATE.config and GLOBAL_STATE.pos_manager: for s in GLOBAL_STATE.config.get("strategies", []): v_data = GLOBAL_STATE.pos_manager.get_all_virtual_positions(s) virtual_pos_map[s] = v_data return {"real_positions": real_pos_list, "virtual_positions": virtual_pos_map} @app.get("/api/logs") def get_logs(lines: int = 50): today_str = datetime.date.today().strftime("%Y-%m-%d") log_path = os.path.join("logs", f"{today_str}.log") if not os.path.exists(log_path): return {"logs": ["暂无今日日志"]} try: with open(log_path, "r", encoding="utf-8") as f: all_lines = f.readlines() return {"logs": [line.strip() for line in all_lines[-lines:]]} except Exception as e: return {"logs": [f"读取失败: {str(e)}"]} # ================= 9. 启动入口 ================= if __name__ == "__main__": # 使用 -u 参数运行是最佳实践: python -u main.py # 但这里也在代码里强制 flush 了 print(">>> 系统正在启动...") t = threading.Thread(target=trading_loop, daemon=True) t.start() print("Web服务启动: http://localhost:8001") uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning")