From bbf1d2248cc63dd913dc4ca737c6c7a7b194bbc3 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Tue, 2 Dec 2025 23:38:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0qmt=E4=BB=A3=E7=A0=81=201?= =?UTF-8?q?=E3=80=81=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= =?UTF-8?q?=202=E3=80=81=E6=96=AD=E8=BF=9E=E9=87=8D=E8=BF=9E=203=E3=80=81r?= =?UTF-8?q?edis=E6=8C=81=E4=B9=85=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- qmt/qmt_trader.py | 222 +++++++++++++++++++++++++++++++++------------- 2 files changed, 161 insertions(+), 64 deletions(-) diff --git a/.gitignore b/.gitignore index 4e039c6..8d33143 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ model **/mlruns/ **/mnt/ -/qmt/config.json \ No newline at end of file +/qmt/config.json +/qmt/logs/* \ No newline at end of file diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index c229b8d..60b310d 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,48 +1,82 @@ # coding:utf-8 import time, datetime, traceback, sys, json, os +import logging +from logging.handlers import TimedRotatingFileHandler import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# 全局变量占位 (稍后在 main 中初始化) +# ================= 1. 日志系统初始化 ================= +def setup_logger(): + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + today = datetime.date.today().strftime('%Y-%m-%d') + log_file = os.path.join(log_dir, f"qmt_trade_{today}.log") + + logger = logging.getLogger("QMT_Trader") + logger.setLevel(logging.INFO) + + if logger.handlers: + logger.handlers.clear() + + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + file_handler = TimedRotatingFileHandler( + filename=log_file, + when='MIDNIGHT', + interval=1, + backupCount=30, + encoding='utf-8' + ) + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + +logger = setup_logger() + +# ================= 全局变量 ================= CONFIG = {} ORDER_CACHE = {} # ================= 配置加载模块 ================= def load_config(config_file='config.json'): - """ - 读取同级目录下的配置文件 - """ - # 获取脚本所在目录 if getattr(sys, 'frozen', False): - # 如果被打包为exe base_path = os.path.dirname(sys.executable) else: - # 普通脚本运行 base_path = os.path.dirname(os.path.abspath(__file__)) full_path = os.path.join(base_path, config_file) if not os.path.exists(full_path): - # 尝试直接读取(兼容 QMT 内置 Python 的路径行为) if os.path.exists(config_file): full_path = config_file else: - print(f"[错误] 找不到配置文件: {full_path}") + logger.error(f"找不到配置文件: {full_path}") sys.exit(1) try: with open(full_path, 'r', encoding='utf-8') as f: config = json.load(f) - print(f"成功加载配置: {full_path}") + logger.info(f"成功加载配置: {full_path}") return config except Exception as e: - print(f"[错误] 配置文件格式错误: {e}") + logger.error(f"配置文件格式错误: {e}") sys.exit(1) -# ================= 业务逻辑类 (保持不变) ================= +# ================= 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): @@ -53,8 +87,9 @@ class PositionManager: def mark_holding(self, strategy_name, code): """乐观占位""" - self.r.hsetnx(self._get_key(strategy_name), code, 0) - # print(f"[{strategy_name}] 乐观占位: {code}") + key = self._get_key(strategy_name) + if self.r.hsetnx(key, code, 0): + logger.info(f"[{strategy_name}] Redis乐观占位成功: {code}") def rollback_holding(self, strategy_name, code): """失败回滚""" @@ -62,7 +97,7 @@ class PositionManager: val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) - print(f"[{strategy_name}] 回滚释放槽位: {code}") + logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): """成交更新""" @@ -96,17 +131,21 @@ class DailySettlement: self.has_settled = False def run_settlement(self): - print("\n" + "="*40) - print(f"开始收盘清算: {datetime.datetime.now()}") + logger.info("="*40) + logger.info("开始执行收盘清算流程...") - # 1. 撤单 - orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) - if orders: - for o in orders: - self.trader.cancel_order_stock(self.acc, o.order_id) - time.sleep(2) + try: + orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) + if orders: + for o in orders: + self.trader.cancel_order_stock(self.acc, o.order_id) + logger.info(f" -> 撤销未成交挂单: {o.stock_code} {o.order_volume}") + time.sleep(2) + else: + logger.info(" -> 无未成交订单") + except Exception as e: + logger.error(f"查询/撤单异常: {e}") - # 2. 获取实盘真实持仓 real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: @@ -114,15 +153,15 @@ class DailySettlement: if p.volume > 0: real_pos_map[p.stock_code] = p.volume - # 3. 校准 Redis for strategy in self.strategies: virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) for code, v_vol_str in virtual_data.items(): if code not in real_pos_map: - print(f" [修正] {strategy} 幽灵持仓 {code} -> 强制释放") + logger.warning(f" [修正] 策略[{strategy}] 幽灵持仓 {code} (Redis={v_vol_str}) -> 强制释放") self.pos_mgr.force_delete(strategy, code) - print("清算完成") + logger.info("收盘清算完成") + logger.info("="*40) self.has_settled = True def reset_flag(self): @@ -132,9 +171,15 @@ class DailySettlement: class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr + # 【新增】连接状态标记,默认False,连接成功后外部置为True + self.is_connected = False def on_disconnected(self): - print(">> 连接断开") + """ + 连接断开回调 + """ + logger.warning(">> 检测到交易端连接断开 (QMT可能正在重启)") + self.is_connected = False def on_stock_trade(self, trade): try: @@ -146,27 +191,32 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): if not cache_info: return strategy_name, cached_code, action_type = cache_info - print(f">>> [成交] {strategy_name} {stock_code} 成交量:{traded_vol}") + logger.info(f">>> [成交回报] 策略:{strategy_name} 代码:{stock_code} 成交量:{traded_vol}") if action_type == 'BUY': - self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) + new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) + logger.info(f" [记账] {strategy_name} 持仓增加 -> {new_pos}") elif action_type == 'SELL': - self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) + new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) + logger.info(f" [记账] {strategy_name} 持仓扣减 -> {new_pos}") except Exception: - traceback.print_exc() + logger.error("成交回调处理异常", exc_info=True) def on_order_error(self, order_error): try: order_id = order_error.order_id - print(f">>> [下单失败] ID:{order_id} Msg:{order_error.error_msg}") + err_msg = order_error.error_msg + logger.error(f">>> [下单失败] ID:{order_id} Msg:{err_msg}") + cache_info = ORDER_CACHE.get(order_id) if cache_info: strategy_name, stock_code, action_type = cache_info if action_type == 'BUY': self.pos_mgr.rollback_holding(strategy_name, stock_code) del ORDER_CACHE[order_id] - except: pass + except: + pass def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): @@ -175,14 +225,17 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) if not msg_json: return try: + history_key = f"{queue_key}:history" + r_client.rpush(history_key, msg_json) + data = json.loads(msg_json) - # 校验 if data.get('is_backtest', False): return msg_ts = data.get('timestamp') if msg_ts: msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() if msg_date != datetime.date.today(): + logger.warning(f"忽略过期消息: {msg_ts}") return stock_code = data['stock_code'] @@ -190,17 +243,25 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) price = float(data['price']) target_total_slots = int(data.get('total_slots', 1)) + logger.info(f"收到信号 [{strategy_name}]: {stock_code} {action} Slot:{target_total_slots}") + if action == 'BUY': current_holding_count = pos_manager.get_holding_count(strategy_name) empty_slots = target_total_slots - current_holding_count - if empty_slots <= 0: return + if empty_slots <= 0: + logger.warning(f"[{strategy_name}] 槽位已满,忽略买入") + return asset = xt_trader.query_stock_asset(acc) - if not asset: return + if not asset: + logger.error("无法查询资产信息(可能连接未就绪)") + return target_amount = asset.cash / empty_slots - if target_amount < 2000: return + if target_amount < 2000: + logger.info(f"[{strategy_name}] 下单金额过小 ({target_amount:.2f}),忽略") + return if price <= 0: price = 1.0 vol = int(target_amount / price / 100) * 100 @@ -208,9 +269,11 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) if vol >= 100: order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') if order_id != -1: - print(f"[{strategy_name}] 买入 {stock_code} {vol}股 (1/{empty_slots})") + logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 (1/{empty_slots}) ID:{order_id}") ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) + else: + logger.error(f"[{strategy_name}] 下单请求被柜台拒绝 (-1)") elif action == 'SELL': virtual_vol = pos_manager.get_position(strategy_name, stock_code) @@ -223,27 +286,27 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) if final_vol > 0: order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if order_id != -1: - print(f"[{strategy_name}] 卖出 {stock_code} {final_vol}股") + logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final_vol}股 ID:{order_id}") ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') + else: + logger.warning(f"[{strategy_name}] 账本有货但实盘不足 (Redis:{virtual_vol}, Real:{real_can_use})") + else: + logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") except Exception: - traceback.print_exc() + logger.error("消息处理异常", exc_info=True) # ================= 主程序入口 ================= if __name__ == '__main__': - print("正在启动...") + logger.info("正在启动 QMT 策略监听系统...") # 1. 加载配置 CONFIG = load_config('config.json') - - # 从配置中提取参数 redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] - print(f"Redis目标: {redis_cfg['host']}:{redis_cfg['port']}") - print(f"QMT路径: {qmt_cfg['path']}") - print(f"监听策略: {watch_list}") + logger.info(f"监听策略列表: {watch_list}") # 2. 连接 Redis try: @@ -255,60 +318,93 @@ if __name__ == '__main__': decode_responses=True ) r.ping() - print("Redis 连接成功") + logger.info("Redis 连接成功") pos_manager = PositionManager(r) except Exception as e: - print(f"[FATAL] Redis 连接失败: {e}") + logger.critical(f"Redis 连接失败: {e}") sys.exit(1) - # 3. 连接 QMT + # 3. 初始化 QMT 对象 (暂不连接) try: session_id = int(time.time()) xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) - acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) callback = MyXtQuantTraderCallback(pos_manager) xt_trader.register_callback(callback) - xt_trader.start() - connect_res = xt_trader.connect() + # 初次连接 + connect_res = xt_trader.connect() if connect_res == 0: - print(f"QMT 连接成功: {qmt_cfg['account_id']}") + logger.info(f"QMT 初次连接成功: {qmt_cfg['account_id']}") xt_trader.subscribe(acc) + callback.is_connected = True # 标记为已连接 else: - print(f"[FATAL] QMT 连接失败,错误码: {connect_res}") - sys.exit(1) + logger.error(f"QMT 初次连接失败 ({connect_res}),将进入重连循环") + callback.is_connected = False except Exception as e: - print(f"[FATAL] QMT 初始化异常: {e}") + logger.critical(f"QMT 初始化异常: {e}") sys.exit(1) # 4. 初始化清算器 settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - print("=== 系统就绪,开始监听 ===") + logger.info("=== 系统就绪,开始循环监听 ===") try: while True: + # ================= [新增] 断线重连逻辑 ================= + if not callback.is_connected: + logger.info("正在尝试重连 QMT...") + try: + # 尝试连接 + res = xt_trader.connect() + if res == 0: + logger.info("QMT 重连成功!重新订阅账户...") + # 重连后必须重新订阅 + xt_trader.subscribe(acc) + callback.is_connected = True + else: + logger.warning(f"QMT 重连失败 (Code: {res}),60秒后重试...") + time.sleep(60) + continue # 跳过本次循环,直接进入下一次检查 + except Exception as e: + logger.error(f"重连过程发生异常: {e}") + time.sleep(60) + continue + # ==================================================== + now = datetime.datetime.now() current_time_str = now.strftime('%H%M') + + # 默认休眠时间 + sleep_interval = 60 - # 交易时段 + # === 交易时段 === if '0900' <= current_time_str <= '1500': + # 高频轮询时段 + if '0920' <= current_time_str <= '1000': + sleep_interval = 10 + else: + sleep_interval = 60 + if settler.has_settled: settler.reset_flag() for strategy in watch_list: process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) - # 收盘清算时段 + # === 收盘清算时段 === elif '1505' <= current_time_str <= '1510': if not settler.has_settled: settler.run_settlement() - - time.sleep(60) + sleep_interval = 60 + + time.sleep(sleep_interval) except KeyboardInterrupt: - print("用户终止程序") \ No newline at end of file + logger.info("用户手动终止程序") + except Exception as e: + logger.critical("主循环发生未捕获异常", exc_info=True) \ No newline at end of file