diff --git a/qmt/main.py b/qmt/main.py index fabe1bb..b5e1c20 100644 --- a/qmt/main.py +++ b/qmt/main.py @@ -1,38 +1,73 @@ # coding:utf-8 import threading import sys +import os +import logging +import datetime import uvicorn from .qmt_engine import QMTEngine from .api_server import create_api_server +def setup_logger(): + """配置日志系统""" + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + log_file = os.path.join(log_dir, f"{datetime.date.today().strftime('%Y-%m-%d')}.log") + + logger = logging.getLogger("QMT_Main") + logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + + def main(): """主函数 - 启动QMT交易引擎和API服务器""" - print(">>> 系统正在启动...") + logger = setup_logger() + logger.info("="*50) + logger.info(">>> QMT交易系统启动中...") + logger.info("="*50) # 创建QMT引擎实例 engine = QMTEngine() + logger.info("QMT引擎实例创建成功") try: # 初始化引擎 engine.initialize('config.json') - print("✅ QMT引擎初始化成功") + logger.info("✅ QMT引擎初始化成功") except Exception as e: - print(f"❌ QMT引擎初始化失败: {e}") + logger.error(f"❌ QMT引擎初始化失败: {str(e)}", exc_info=True) sys.exit(1) # 启动交易线程 trading_thread = threading.Thread(target=engine.run_trading_loop, daemon=True) trading_thread.start() - print("✅ 交易线程启动成功") + logger.info("✅ 交易线程启动成功") # 创建API服务器 app = create_api_server(engine) - print("✅ API服务器创建成功") + logger.info("✅ API服务器创建成功") # 启动Web服务 - print(">>> Web服务启动: http://localhost:8001") + logger.info(">>> Web服务启动: http://localhost:8001") try: uvicorn.run( app, @@ -42,9 +77,13 @@ def main(): access_log=False ) except KeyboardInterrupt: - print("\n>>> 正在关闭系统...") + logger.info(">>> 正在关闭系统...") engine.stop() - print(">>> 系统已关闭") + logger.info(">>> 系统已关闭") + except Exception as e: + logger.error(f">>> 系统异常: {str(e)}", exc_info=True) + engine.stop() + logger.info(">>> 系统已关闭") if __name__ == '__main__': diff --git a/qmt/qmt_engine.py b/qmt/qmt_engine.py index e42e4a6..fd9cb4e 100644 --- a/qmt/qmt_engine.py +++ b/qmt/qmt_engine.py @@ -459,7 +459,8 @@ class MultiEngineManager: if unit.callback and not unit.callback.is_connected: unit.callback.is_connected = True self.logger.info(f"✅ 修正终端 {unit.alias} 状态为在线") - except: + except Exception as e: + self.logger.error(f"健康检查失败 - 终端 {unit.alias}: {str(e)}", exc_info=True) is_unit_alive = False # 断线重连策略 diff --git a/qmt/qmt_test.py b/qmt/qmt_test.py index 6d9db23..cf8d4f3 100644 --- a/qmt/qmt_test.py +++ b/qmt/qmt_test.py @@ -1,32 +1,77 @@ +import logging +import sys +import os +import datetime from xtquant import xttrader from xtquant.xtdata import download_history_data, get_market_data from xtquant.xttype import StockAccount import random -##订阅账户 + +def setup_logger(): + """配置日志系统""" + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + log_file = os.path.join(log_dir, f"{datetime.date.today().strftime('%Y-%m-%d')}_test.log") + + logger = logging.getLogger("QMT_Test") + logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + + +logger = setup_logger() + # 设置 QMT 交易端的数据路径和会话ID min_path = r"C:\\QMT\\中金财富QMT个人版交易端\\userdata_mini" session_id = int(random.randint(100000, 999999)) +logger.info(f"QMT路径: {min_path}") +logger.info(f"会话ID: {session_id}") + # 创建 XtQuantTrader 实例并启动 +logger.info("正在创建 XtQuantTrader 实例...") xt_trader = xttrader.XtQuantTrader(min_path, session_id) +logger.info("正在启动 XtQuantTrader...") xt_trader.start() +logger.info("XtQuantTrader 已启动") # 连接 QMT 交易端 +logger.info("正在连接 QMT 交易端...") connect_result = xt_trader.connect() if connect_result == 0: - print('连接成功') + logger.info("✅ 连接成功") else: - print('连接失败') + logger.error(f"❌ 连接失败,错误码: {connect_result}") xt_trader.stop() + logger.info("XtQuantTrader 已停止") exit() # 设置账户信息 -account = StockAccount('8176081580') +account_id = '8176081580' +logger.info(f"账户ID: {account_id}") +account = StockAccount(account_id) # 订阅账户 +logger.info("正在订阅账户...") res = xt_trader.subscribe(account) if res == 0: - print('订阅成功') + logger.info("✅ 订阅成功") else: - print('订阅失败') + logger.error(f"❌ 订阅失败,错误码: {res}") diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index f388bdc..f7aa314 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -172,11 +172,17 @@ class DailySettlement: logger.info("执行收盘清算流程...") try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) + logger.info(f"收盘清算 - 查询可撤单订单: 获取到 {len(orders) if orders else 0} 个订单") if orders: for o in orders: + logger.info(f"收盘清算 - 撤单: OrderID={o.order_id}, Stock={o.stock_code}") self.trader.cancel_order_stock(self.acc, o.order_id) time.sleep(2) - except: pass + logger.info(f"收盘清算 - 完成撤单操作,共处理 {len(orders)} 个订单") + else: + logger.info("收盘清算 - 无待撤单订单") + except Exception as e: + logger.error(f"收盘清算 - 查询/撤单失败: {str(e)}", exc_info=True) real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {p.stock_code: p.volume for p in real_positions if p.volume > 0} if real_positions else {} @@ -214,15 +220,19 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) - except: traceback.print_exc() + except Exception as e: + logger.error(f"on_stock_trade 成交回调处理失败: {str(e)}", exc_info=True) + def on_order_error(self, err): try: - logger.error(f"下单失败回调: {err.error_msg} OrderID:{err.order_id}") + logger.error(f"下单失败回调: OrderID={err.order_id}, 错误信息={err.error_msg}") cache = ORDER_CACHE.get(err.order_id) if cache and cache[2] == 'BUY': + logger.info(f"回滚持仓: Strategy={cache[0]}, Stock={cache[1]}") self.pos_mgr.rollback_holding(cache[0], cache[1]) del ORDER_CACHE[err.order_id] - except: pass + except Exception as e: + logger.error(f"on_order_error 错误回调处理失败: {str(e)}", exc_info=True) # ================= 5. 核心消息处理 (重写版:拒绝静默失败) ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): @@ -327,20 +337,26 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) # 7. 卖出逻辑 elif action == 'SELL': v_vol = pos_manager.get_position(strategy_name, stock_code) - logger.info(f"Redis 记录持仓: {v_vol}") + logger.info(f"卖出 - Redis 记录虚拟持仓: {v_vol}") if v_vol > 0: + logger.info(f"卖出 - 正在查询实盘持仓: {stock_code}") real_pos = xt_trader.query_stock_positions(acc) + logger.info(f"卖出 - 实盘持仓查询完成,获取到 {len(real_pos) if real_pos else 0} 条记录") + if real_pos is None: logger.error("API 错误: query_stock_positions 返回 None") return rp = next((p for p in real_pos if p.stock_code==stock_code), None) can_use = rp.can_use_volume if rp else 0 - logger.info(f"实盘可用持仓: {can_use}") + logger.info(f"卖出 - 股票 {stock_code} 实盘可用持仓: {can_use}") final = min(v_vol, can_use) + logger.info(f"卖出 - 计算卖出量: min({v_vol}, {can_use}) = {final}") + if final > 0: + logger.info(f"卖出 - 执行卖出订单: {stock_code} @ {price}, 数量: {final}") oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if oid != -1: logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 卖出 {final}") @@ -470,8 +486,11 @@ def trading_loop(): logger.warning("🚫 确认连接丢失,执行重连...") if GLOBAL_STATE.xt_trader: - try: GLOBAL_STATE.xt_trader.stop() - except: pass + try: + GLOBAL_STATE.xt_trader.stop() + logger.info("已停止旧交易实例") + except Exception as e: + logger.error(f"停止旧交易实例失败: {str(e)}", exc_info=True) new_trader, new_acc, new_cb = init_qmt_trader( qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager diff --git a/qmt/run.py b/qmt/run.py index 72e99fd..5da3a76 100644 --- a/qmt/run.py +++ b/qmt/run.py @@ -6,6 +6,8 @@ QMT多终端交易系统启动器 import sys import os import threading +import logging +import datetime import uvicorn # 将当前目录添加到Python路径,确保模块导入正常 @@ -17,40 +19,73 @@ if current_dir not in sys.path: from qmt_engine import MultiEngineManager from api_server import create_api_server + +def setup_logger(): + """配置日志系统""" + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + log_file = os.path.join(log_dir, f"{datetime.date.today().strftime('%Y-%m-%d')}.log") + + logger = logging.getLogger("QMT_Run") + logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + + def main(): """主函数 - 启动多终端QMT交易引擎管理中心和API服务器""" + logger = setup_logger() + # 强制设置环境变量,确保Python在Windows控制台输出不因编码崩溃 os.environ["PYTHONUTF8"] = "1" - print("==================================================") - print(" QMT Multi-Terminal System Starting... ") - print("==================================================") + logger.info("="*50) + logger.info(">>> QMT Multi-Terminal System Starting...") + logger.info("="*50) # 1. 获取多终端管理器单例 manager = MultiEngineManager() + logger.info("多终端管理器实例获取成功") try: # 2. 初始化引擎(加载配置、连接Redis、初始化各终端执行单元) + logger.info("正在加载配置文件: config.json") manager.initialize('config.json') - print("Done: Multi-Manager initialized successfully.") + logger.info("Done: Multi-Manager initialized successfully.") except Exception as e: - print(f"Error: System initialization failed: {repr(e)}") - import traceback - traceback.print_exc() + logger.error(f"Error: System initialization failed: {repr(e)}", exc_info=True) sys.exit(1) # 3. 启动全局监控与交易路由主循环线程 # 该线程负责:终端健康检查、断线重连、消息路由、收盘结算 + logger.info("正在启动交易主循环线程...") trading_thread = threading.Thread(target=manager.run_trading_loop, name="MainTradeLoop", daemon=True) trading_thread.start() - print("Done: Global trading loop thread started.") + logger.info("Done: Global trading loop thread started.") # 4. 创建适配多终端的API服务器 + logger.info("正在创建API服务器...") app = create_api_server(manager) - print("Done: API server created with multi-terminal support.") + logger.info("Done: API server created with multi-terminal support.") # 5. 启动Web服务 - print(">>> Web Dashboard: http://localhost:8001") + logger.info(">>> Web Dashboard: http://localhost:8001") try: # 建议关闭 access_log 以减少控制台刷屏 uvicorn.run( @@ -61,20 +96,22 @@ def main(): access_log=False ) except KeyboardInterrupt: - print("\n>>> Shutdown signal received. Closing terminals...") + logger.info(">>> Shutdown signal received. Closing terminals...") manager.stop() - print(">>> System safely closed.") + logger.info(">>> System safely closed.") _write_exit_code(0) sys.exit(0) def _write_exit_code(code): """将退出码写入临时文件,供 start.bat 读取""" + logger = logging.getLogger("QMT_Run") try: exit_code_file = os.path.join(os.environ.get('TEMP', ''), 'exit_code.txt') with open(exit_code_file, 'w') as f: f.write(str(code)) - except Exception: - pass + logger.info(f">>> 退出码已写入: {code}") + except Exception as e: + logger.error(f">>> 退出码写入失败: {str(e)}") if __name__ == '__main__': # 最佳实践:使用 python -u run.py 运行以获得实时日志输出