# coding:utf-8 import time, datetime, traceback, sys, json, os import logging import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant # 全局变量 CURRENT_LOG_DATE = None CONFIG = {} ORDER_CACHE = {} # ================= 1. 日志系统 (按日期直写) ================= def setup_logger(): """ 配置日志系统: 每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log """ global CURRENT_LOG_DATE log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) # 获取今日日期 today_str = datetime.date.today().strftime('%Y-%m-%d') CURRENT_LOG_DATE = today_str log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) # 清除旧 handler if logger.handlers: for handler in logger.handlers[:]: try: handler.close() logger.removeHandler(handler) except: pass formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件输出 file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) # 控制台输出 stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stream_handler) return logger # 初始化日志 logger = setup_logger() # ================= 2. 配置加载 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) else: base_path = os.path.dirname(os.path.abspath(__file__)) full_path = os.path.join(base_path, config_file) if not os.path.exists(full_path): if os.path.exists(config_file): full_path = config_file else: logger.error(f"找不到配置文件: {full_path}") sys.exit(1) try: with open(full_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"配置文件错误: {e}") sys.exit(1) # ================= 3. 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): self.r = r_client def _get_key(self, strategy_name): return f"POS:{strategy_name}" def mark_holding(self, strategy_name, code): """乐观占位""" self.r.hsetnx(self._get_key(strategy_name), code, 0) def rollback_holding(self, strategy_name, code): """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: self.r.hdel(key, code) new_vol = 0 return new_vol def get_position(self, strategy_name, code): vol = self.r.hget(self._get_key(strategy_name), code) return int(vol) if vol else 0 def get_holding_count(self, strategy_name): return self.r.hlen(self._get_key(strategy_name)) def get_all_virtual_positions(self, strategy_name): return self.r.hgetall(self._get_key(strategy_name)) def force_delete(self, strategy_name, code): self.r.hdel(self._get_key(strategy_name), code) class DailySettlement: def __init__(self, xt_trader, acc, pos_mgr, strategies): self.trader = xt_trader self.acc = acc self.pos_mgr = pos_mgr self.strategies = strategies self.has_settled = False def run_settlement(self): logger.info("="*40) logger.info("执行收盘清算流程...") # 1. 撤单 try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: for o in orders: self.trader.cancel_order_stock(self.acc, o.order_id) time.sleep(2) except: pass # 2. 获取实盘 real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: for p in real_positions: if p.volume > 0: real_pos_map[p.stock_code] = p.volume # 3. 校准Redis for strategy in self.strategies: virtual = self.pos_mgr.get_all_virtual_positions(strategy) for code, v in virtual.items(): if code not in real_pos_map: logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放") self.pos_mgr.force_delete(strategy, code) logger.info("清算完成") self.has_settled = True def reset_flag(self): self.has_settled = False # ================= 4. 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr self.is_connected = False def on_disconnected(self): logger.warning(">> 回调通知: 交易端连接断开") self.is_connected = False def on_stock_trade(self, trade): try: cache_info = ORDER_CACHE.get(trade.order_id) if not cache_info: return strategy, _, action = cache_info logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) except: traceback.print_exc() def on_order_error(self, err): try: logger.error(f"下单失败: {err.error_msg}") cache = ORDER_CACHE.get(err.order_id) if cache and cache[2] == 'BUY': self.pos_mgr.rollback_holding(cache[0], cache[1]) del ORDER_CACHE[err.order_id] except: pass # ================= 5. 核心消息处理 ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" msg_json = r_client.lpop(queue_key) if not msg_json: return try: # 归档 r_client.rpush(f"{queue_key}:history", msg_json) data = json.loads(msg_json) # 1. 过滤回测 if data.get('is_backtest'): return # 2. 校验日期 (只处理当日消息) msg_ts = data.get('timestamp') today_str = datetime.date.today().strftime('%Y-%m-%d') if msg_ts: msg_date = msg_ts.split(' ')[0] if msg_date != today_str: logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}") return else: logger.warning(f"[{strategy_name}] 消息无时间戳,忽略") return stock_code = data['stock_code'] action = data['action'] price = float(data['price']) total_slots = int(data.get('total_slots', 1)) if action == 'BUY': # 槽位检查 holding = pos_manager.get_holding_count(strategy_name) empty = total_slots - holding if empty <= 0: logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})") return asset = xt_trader.query_stock_asset(acc) if not asset: logger.error("无法查询资产,QMT可能未就绪") return # 金额计算 amt = asset.cash / empty if amt < 2000: logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})") return if price <= 0: price = 1.0 vol = int(amt / price / 100) * 100 if vol >= 100: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') if oid != -1: logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) else: logger.error(f"[{strategy_name}] 下单被拒绝 (-1)") else: logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})") elif action == 'SELL': v_vol = pos_manager.get_position(strategy_name, stock_code) if v_vol > 0: real_pos = xt_trader.query_stock_positions(acc) rp = next((p for p in real_pos if p.stock_code==stock_code), None) can_use = rp.can_use_volume if rp else 0 final = min(v_vol, can_use) if final > 0: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if oid != -1: logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') else: logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}") else: logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") except Exception: logger.error("消息处理异常", exc_info=True) # ================= 6. QMT初始化 ================= def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): try: session_id = int(time.time()) trader = XtQuantTrader(qmt_path, session_id) acc = StockAccount(account_id, account_type) callback = MyXtQuantTraderCallback(pos_manager) trader.register_callback(callback) trader.start() res = trader.connect() if res == 0: logger.info(f"QMT 连接成功 [Session:{session_id}]") trader.subscribe(acc) callback.is_connected = True return trader, acc, callback else: logger.error(f"QMT 连接失败 Code:{res}") return None, None, None except Exception as e: logger.error(f"初始化异常: {e}") return None, None, None # ================= 7. 主程序 ================= if __name__ == '__main__': logger.info(">>> 系统启动 (实盘生产模式) <<<") # 加载配置 CONFIG = load_config('config.json') redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] # 连接Redis try: r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() pos_manager = PositionManager(r) except Exception as e: logger.critical(f"Redis 连接失败: {e}") sys.exit(1) # 初次连接 QMT xt_trader, acc, callback = init_qmt_trader( qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager ) settler = None if xt_trader: settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) logger.info(">>> 进入主循环监听 <<<") while True: try: # --- 1. 日志跨天处理 --- today_str = datetime.date.today().strftime('%Y-%m-%d') if today_str != CURRENT_LOG_DATE: logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...") logger = setup_logger() # --- 2. 断线重连 (仅工作日) --- need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) if need_reconnect: # 检查是否为周末 (5=周六, 6=周日) if datetime.date.today().weekday() >= 5: logger.info("当前是周末,暂停重连,休眠1小时") time.sleep(3600) continue logger.warning("连接丢失,尝试重连...") if xt_trader: try: xt_trader.stop() except: pass xt_trader, acc, callback = init_qmt_trader( qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager ) if xt_trader: settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) logger.info("重连成功") else: logger.error("重连失败,60秒后重试") time.sleep(60) continue # --- 3. 交易时段轮询逻辑 --- now = datetime.datetime.now() current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS # 默认休眠 60秒 sleep_sec = 60 is_trading_time = False # 判断时段 # 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒 if '091500' <= current_time_str <= '100000': sleep_sec = 1 is_trading_time = True # 10:00:01 - 11:30:00 -> 低频 60秒 elif '100000' < current_time_str <= '113000': sleep_sec = 60 is_trading_time = True # 13:00:00 - 14:50:00 -> 低频 60秒 elif '130000' <= current_time_str < '145000': sleep_sec = 60 is_trading_time = True # 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒 elif '145000' <= current_time_str <= '150000': sleep_sec = 1 is_trading_time = True # --- 4. 执行业务 --- if is_trading_time: # 每日重置清算标记 if settler and settler.has_settled: settler.reset_flag() # 处理信号 for s in watch_list: process_strategy_queue(s, r, xt_trader, acc, pos_manager) # --- 5. 收盘清算 (15:05 - 15:10) --- elif '150500' <= current_time_str <= '151000': if settler and not settler.has_settled: settler.run_settlement() sleep_sec = 60 # 执行休眠 time.sleep(sleep_sec) except KeyboardInterrupt: logger.info("用户停止") break except Exception as e: logger.critical("主循环未捕获异常", exc_info=True) time.sleep(10)