# coding:utf-8 import time, datetime, traceback, sys, json, os import logging from logging.handlers import TimedRotatingFileHandler import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant # ================= 1. 日志系统初始化 ================= def setup_logger(): log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) today = datetime.date.today().strftime('%Y-%m-%d') log_file = os.path.join(log_dir, f"qmt_trade_{today}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) if logger.handlers: logger.handlers.clear() formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler = TimedRotatingFileHandler( filename=log_file, when='MIDNIGHT', interval=1, backupCount=30, encoding='utf-8' ) file_handler.setFormatter(formatter) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stream_handler) return logger logger = setup_logger() # ================= 全局变量 ================= CONFIG = {} ORDER_CACHE = {} # ================= 配置加载模块 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) else: base_path = os.path.dirname(os.path.abspath(__file__)) full_path = os.path.join(base_path, config_file) if not os.path.exists(full_path): if os.path.exists(config_file): full_path = config_file else: logger.error(f"找不到配置文件: {full_path}") sys.exit(1) try: with open(full_path, 'r', encoding='utf-8') as f: config = json.load(f) logger.info(f"成功加载配置: {full_path}") return config except Exception as e: logger.error(f"配置文件格式错误: {e}") sys.exit(1) # ================= 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): self.r = r_client def _get_key(self, strategy_name): return f"POS:{strategy_name}" def mark_holding(self, strategy_name, code): """乐观占位""" key = self._get_key(strategy_name) if self.r.hsetnx(key, code, 0): logger.info(f"[{strategy_name}] Redis乐观占位成功: {code}") def rollback_holding(self, strategy_name, code): """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: self.r.hdel(key, code) new_vol = 0 return new_vol def get_position(self, strategy_name, code): vol = self.r.hget(self._get_key(strategy_name), code) return int(vol) if vol else 0 def get_holding_count(self, strategy_name): return self.r.hlen(self._get_key(strategy_name)) def get_all_virtual_positions(self, strategy_name): return self.r.hgetall(self._get_key(strategy_name)) def force_delete(self, strategy_name, code): self.r.hdel(self._get_key(strategy_name), code) class DailySettlement: def __init__(self, xt_trader, acc, pos_mgr, strategies): self.trader = xt_trader self.acc = acc self.pos_mgr = pos_mgr self.strategies = strategies self.has_settled = False def run_settlement(self): logger.info("="*40) logger.info("开始执行收盘清算流程...") try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: for o in orders: self.trader.cancel_order_stock(self.acc, o.order_id) logger.info(f" -> 撤销未成交挂单: {o.stock_code} {o.order_volume}") time.sleep(2) else: logger.info(" -> 无未成交订单") except Exception as e: logger.error(f"查询/撤单异常: {e}") real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: for p in real_positions: if p.volume > 0: real_pos_map[p.stock_code] = p.volume for strategy in self.strategies: virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) for code, v_vol_str in virtual_data.items(): if code not in real_pos_map: logger.warning(f" [修正] 策略[{strategy}] 幽灵持仓 {code} (Redis={v_vol_str}) -> 强制释放") self.pos_mgr.force_delete(strategy, code) logger.info("收盘清算完成") logger.info("="*40) self.has_settled = True def reset_flag(self): self.has_settled = False class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr # 【新增】连接状态标记,默认False,连接成功后外部置为True self.is_connected = False def on_disconnected(self): """ 连接断开回调 """ logger.warning(">> 检测到交易端连接断开 (QMT可能正在重启)") self.is_connected = False def on_stock_trade(self, trade): try: order_id = trade.order_id stock_code = trade.stock_code traded_vol = trade.traded_volume cache_info = ORDER_CACHE.get(order_id) if not cache_info: return strategy_name, cached_code, action_type = cache_info logger.info(f">>> [成交回报] 策略:{strategy_name} 代码:{stock_code} 成交量:{traded_vol}") if action_type == 'BUY': new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) logger.info(f" [记账] {strategy_name} 持仓增加 -> {new_pos}") elif action_type == 'SELL': new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) logger.info(f" [记账] {strategy_name} 持仓扣减 -> {new_pos}") except Exception: logger.error("成交回调处理异常", exc_info=True) def on_order_error(self, order_error): try: order_id = order_error.order_id err_msg = order_error.error_msg logger.error(f">>> [下单失败] ID:{order_id} Msg:{err_msg}") cache_info = ORDER_CACHE.get(order_id) if cache_info: strategy_name, stock_code, action_type = cache_info if action_type == 'BUY': self.pos_mgr.rollback_holding(strategy_name, stock_code) del ORDER_CACHE[order_id] except: pass def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" msg_json = r_client.lpop(queue_key) if not msg_json: return try: history_key = f"{queue_key}:history" r_client.rpush(history_key, msg_json) data = json.loads(msg_json) if data.get('is_backtest', False): return msg_ts = data.get('timestamp') if msg_ts: msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() if msg_date != datetime.date.today(): logger.warning(f"忽略过期消息: {msg_ts}") return stock_code = data['stock_code'] action = data['action'] price = float(data['price']) target_total_slots = int(data.get('total_slots', 1)) logger.info(f"收到信号 [{strategy_name}]: {stock_code} {action} Slot:{target_total_slots}") if action == 'BUY': current_holding_count = pos_manager.get_holding_count(strategy_name) empty_slots = target_total_slots - current_holding_count if empty_slots <= 0: logger.warning(f"[{strategy_name}] 槽位已满,忽略买入") return asset = xt_trader.query_stock_asset(acc) if not asset: logger.error("无法查询资产信息(可能连接未就绪)") return target_amount = asset.cash / empty_slots if target_amount < 2000: logger.info(f"[{strategy_name}] 下单金额过小 ({target_amount:.2f}),忽略") return if price <= 0: price = 1.0 vol = int(target_amount / price / 100) * 100 if vol >= 100: order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') if order_id != -1: logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 (1/{empty_slots}) ID:{order_id}") ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) else: logger.error(f"[{strategy_name}] 下单请求被柜台拒绝 (-1)") elif action == 'SELL': virtual_vol = pos_manager.get_position(strategy_name, stock_code) if virtual_vol > 0: real_positions = xt_trader.query_stock_positions(acc) real_pos = next((p for p in real_positions if p.stock_code == stock_code), None) real_can_use = real_pos.can_use_volume if real_pos else 0 final_vol = min(virtual_vol, real_can_use) if final_vol > 0: order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if order_id != -1: logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final_vol}股 ID:{order_id}") ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') else: logger.warning(f"[{strategy_name}] 账本有货但实盘不足 (Redis:{virtual_vol}, Real:{real_can_use})") else: logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") except Exception: logger.error("消息处理异常", exc_info=True) # ================= 主程序入口 ================= if __name__ == '__main__': logger.info("正在启动 QMT 策略监听系统...") # 1. 加载配置 CONFIG = load_config('config.json') redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] logger.info(f"监听策略列表: {watch_list}") # 2. 连接 Redis try: r = redis.Redis( host=redis_cfg['host'], port=redis_cfg['port'], password=redis_cfg['password'], db=redis_cfg['db'], decode_responses=True ) r.ping() logger.info("Redis 连接成功") pos_manager = PositionManager(r) except Exception as e: logger.critical(f"Redis 连接失败: {e}") sys.exit(1) # 3. 初始化 QMT 对象 (暂不连接) try: session_id = int(time.time()) xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) callback = MyXtQuantTraderCallback(pos_manager) xt_trader.register_callback(callback) xt_trader.start() # 初次连接 connect_res = xt_trader.connect() if connect_res == 0: logger.info(f"QMT 初次连接成功: {qmt_cfg['account_id']}") xt_trader.subscribe(acc) callback.is_connected = True # 标记为已连接 else: logger.error(f"QMT 初次连接失败 ({connect_res}),将进入重连循环") callback.is_connected = False except Exception as e: logger.critical(f"QMT 初始化异常: {e}") sys.exit(1) # 4. 初始化清算器 settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) logger.info("=== 系统就绪,开始循环监听 ===") try: while True: # ================= [新增] 断线重连逻辑 ================= if not callback.is_connected: logger.info("正在尝试重连 QMT...") try: # 尝试连接 res = xt_trader.connect() if res == 0: logger.info("QMT 重连成功!重新订阅账户...") # 重连后必须重新订阅 xt_trader.subscribe(acc) callback.is_connected = True else: logger.warning(f"QMT 重连失败 (Code: {res}),60秒后重试...") time.sleep(60) continue # 跳过本次循环,直接进入下一次检查 except Exception as e: logger.error(f"重连过程发生异常: {e}") time.sleep(60) continue # ==================================================== now = datetime.datetime.now() current_time_str = now.strftime('%H%M') # 默认休眠时间 sleep_interval = 60 # === 交易时段 === if '0900' <= current_time_str <= '1500': # 高频轮询时段 if '0920' <= current_time_str <= '1000': sleep_interval = 10 else: sleep_interval = 60 if settler.has_settled: settler.reset_flag() for strategy in watch_list: process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) # === 收盘清算时段 === elif '1505' <= current_time_str <= '1510': if not settler.has_settled: settler.run_settlement() sleep_interval = 60 time.sleep(sleep_interval) except KeyboardInterrupt: logger.info("用户手动终止程序") except Exception as e: logger.critical("主循环发生未捕获异常", exc_info=True)