diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index 60b310d..bf951a4 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,42 +1,57 @@ # coding:utf-8 import time, datetime, traceback, sys, json, os import logging -from logging.handlers import TimedRotatingFileHandler import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# ================= 1. 日志系统初始化 ================= +# 全局变量记录当前日志日期,用于跨天判断 +CURRENT_LOG_DATE = None + +# ================= 1. 日志系统 (按日期直写) ================= def setup_logger(): + """ + 配置日志系统: + 1. 确保日志目录存在 + 2. 生成当天日期的日志文件 (YYYY-MM-DD.log) + 3. 同时输出到控制台 + """ + global CURRENT_LOG_DATE + log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) - today = datetime.date.today().strftime('%Y-%m-%d') - log_file = os.path.join(log_dir, f"qmt_trade_{today}.log") + # 获取今日日期 + today_str = datetime.date.today().strftime('%Y-%m-%d') + CURRENT_LOG_DATE = today_str # 更新全局变量 + + # 文件名直接就是日期: logs/2025-12-05.log + log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) + # 【关键】清除旧的 handler,防止跨天后重复打印或写入旧文件 if logger.handlers: - logger.handlers.clear() + for handler in logger.handlers[:]: + try: + handler.close() + logger.removeHandler(handler) + except: pass formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) - file_handler = TimedRotatingFileHandler( - filename=log_file, - when='MIDNIGHT', - interval=1, - backupCount=30, - encoding='utf-8' - ) + # Handler 1: 普通文件输出 (追加模式) + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) + # Handler 2: 控制台 stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -45,83 +60,62 @@ def setup_logger(): return logger +# 初次初始化 logger = setup_logger() # ================= 全局变量 ================= CONFIG = {} ORDER_CACHE = {} -# ================= 配置加载模块 ================= +# ================= 配置加载 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) else: base_path = os.path.dirname(os.path.abspath(__file__)) - full_path = os.path.join(base_path, config_file) - if not os.path.exists(full_path): - if os.path.exists(config_file): - full_path = config_file + if os.path.exists(config_file): full_path = config_file else: logger.error(f"找不到配置文件: {full_path}") sys.exit(1) - try: with open(full_path, 'r', encoding='utf-8') as f: - config = json.load(f) - logger.info(f"成功加载配置: {full_path}") - return config + return json.load(f) except Exception as e: - logger.error(f"配置文件格式错误: {e}") + logger.error(f"配置文件错误: {e}") sys.exit(1) # ================= 业务逻辑类 ================= - class PositionManager: def __init__(self, r_client): self.r = r_client - def _get_key(self, strategy_name): return f"POS:{strategy_name}" - def mark_holding(self, strategy_name, code): - """乐观占位""" - key = self._get_key(strategy_name) - if self.r.hsetnx(key, code, 0): - logger.info(f"[{strategy_name}] Redis乐观占位成功: {code}") - + self.r.hsetnx(self._get_key(strategy_name), code, 0) def rollback_holding(self, strategy_name, code): - """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) - logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") - def update_actual_volume(self, strategy_name, code, delta_vol): - """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: self.r.hdel(key, code) new_vol = 0 return new_vol - def get_position(self, strategy_name, code): vol = self.r.hget(self._get_key(strategy_name), code) return int(vol) if vol else 0 - def get_holding_count(self, strategy_name): return self.r.hlen(self._get_key(strategy_name)) - def get_all_virtual_positions(self, strategy_name): return self.r.hgetall(self._get_key(strategy_name)) - def force_delete(self, strategy_name, code): self.r.hdel(self._get_key(strategy_name), code) - class DailySettlement: def __init__(self, xt_trader, acc, pos_mgr, strategies): self.trader = xt_trader @@ -129,282 +123,223 @@ class DailySettlement: self.pos_mgr = pos_mgr self.strategies = strategies self.has_settled = False - def run_settlement(self): logger.info("="*40) - logger.info("开始执行收盘清算流程...") - + logger.info("执行收盘清算...") try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: for o in orders: self.trader.cancel_order_stock(self.acc, o.order_id) - logger.info(f" -> 撤销未成交挂单: {o.stock_code} {o.order_volume}") time.sleep(2) - else: - logger.info(" -> 无未成交订单") - except Exception as e: - logger.error(f"查询/撤单异常: {e}") - + except: pass real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: for p in real_positions: - if p.volume > 0: - real_pos_map[p.stock_code] = p.volume - + if p.volume > 0: real_pos_map[p.stock_code] = p.volume for strategy in self.strategies: - virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) - for code, v_vol_str in virtual_data.items(): + virtual = self.pos_mgr.get_all_virtual_positions(strategy) + for code, v in virtual.items(): if code not in real_pos_map: - logger.warning(f" [修正] 策略[{strategy}] 幽灵持仓 {code} (Redis={v_vol_str}) -> 强制释放") + logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放") self.pos_mgr.force_delete(strategy, code) - - logger.info("收盘清算完成") - logger.info("="*40) + logger.info("清算完成") self.has_settled = True - def reset_flag(self): self.has_settled = False - +# ================= 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr - # 【新增】连接状态标记,默认False,连接成功后外部置为True - self.is_connected = False + self.is_connected = False def on_disconnected(self): - """ - 连接断开回调 - """ - logger.warning(">> 检测到交易端连接断开 (QMT可能正在重启)") + logger.warning(">> 回调通知: 交易端连接断开") self.is_connected = False def on_stock_trade(self, trade): try: - order_id = trade.order_id - stock_code = trade.stock_code - traded_vol = trade.traded_volume + cache_info = ORDER_CACHE.get(trade.order_id) + if not cache_info: return + strategy, _, action = cache_info - cache_info = ORDER_CACHE.get(order_id) - if not cache_info: return + logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") + if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) + elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) + except: traceback.print_exc() - strategy_name, cached_code, action_type = cache_info - logger.info(f">>> [成交回报] 策略:{strategy_name} 代码:{stock_code} 成交量:{traded_vol}") - - if action_type == 'BUY': - new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) - logger.info(f" [记账] {strategy_name} 持仓增加 -> {new_pos}") - elif action_type == 'SELL': - new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) - logger.info(f" [记账] {strategy_name} 持仓扣减 -> {new_pos}") - - except Exception: - logger.error("成交回调处理异常", exc_info=True) - - def on_order_error(self, order_error): + def on_order_error(self, err): try: - order_id = order_error.order_id - err_msg = order_error.error_msg - logger.error(f">>> [下单失败] ID:{order_id} Msg:{err_msg}") - - cache_info = ORDER_CACHE.get(order_id) - if cache_info: - strategy_name, stock_code, action_type = cache_info - if action_type == 'BUY': - self.pos_mgr.rollback_holding(strategy_name, stock_code) - del ORDER_CACHE[order_id] - except: - pass - + logger.error(f"下单失败: {err.error_msg}") + cache = ORDER_CACHE.get(err.order_id) + if cache and cache[2] == 'BUY': + self.pos_mgr.rollback_holding(cache[0], cache[1]) + del ORDER_CACHE[err.order_id] + except: pass +# ================= 消息处理 ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" msg_json = r_client.lpop(queue_key) if not msg_json: return try: - history_key = f"{queue_key}:history" - r_client.rpush(history_key, msg_json) - + r_client.rpush(f"{queue_key}:history", msg_json) data = json.loads(msg_json) + if data.get('is_backtest'): return - if data.get('is_backtest', False): return - msg_ts = data.get('timestamp') - if msg_ts: - msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() - if msg_date != datetime.date.today(): - logger.warning(f"忽略过期消息: {msg_ts}") - return - + # 简单日期校验 + if data.get('timestamp', '').split(' ')[0] != datetime.date.today().strftime('%Y-%m-%d'): + return + stock_code = data['stock_code'] action = data['action'] price = float(data['price']) - target_total_slots = int(data.get('total_slots', 1)) - - logger.info(f"收到信号 [{strategy_name}]: {stock_code} {action} Slot:{target_total_slots}") + total_slots = int(data.get('total_slots', 1)) if action == 'BUY': - current_holding_count = pos_manager.get_holding_count(strategy_name) - empty_slots = target_total_slots - current_holding_count - - if empty_slots <= 0: - logger.warning(f"[{strategy_name}] 槽位已满,忽略买入") - return + holding = pos_manager.get_holding_count(strategy_name) + empty = total_slots - holding + if empty <= 0: return asset = xt_trader.query_stock_asset(acc) - if not asset: - logger.error("无法查询资产信息(可能连接未就绪)") - return + if not asset: return - target_amount = asset.cash / empty_slots - if target_amount < 2000: - logger.info(f"[{strategy_name}] 下单金额过小 ({target_amount:.2f}),忽略") - return - - if price <= 0: price = 1.0 - vol = int(target_amount / price / 100) * 100 + amt = asset.cash / empty + if amt < 2000: return + if price<=0: price=1.0 + vol = int(amt/price/100)*100 if vol >= 100: - order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') - if order_id != -1: - logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 (1/{empty_slots}) ID:{order_id}") - ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') + if oid != -1: + logger.info(f"[{strategy_name}] 买入 {stock_code} {vol}股") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) - else: - logger.error(f"[{strategy_name}] 下单请求被柜台拒绝 (-1)") elif action == 'SELL': - virtual_vol = pos_manager.get_position(strategy_name, stock_code) - if virtual_vol > 0: - real_positions = xt_trader.query_stock_positions(acc) - real_pos = next((p for p in real_positions if p.stock_code == stock_code), None) - real_can_use = real_pos.can_use_volume if real_pos else 0 - - final_vol = min(virtual_vol, real_can_use) - if final_vol > 0: - order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') - if order_id != -1: - logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final_vol}股 ID:{order_id}") - ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') - else: - logger.warning(f"[{strategy_name}] 账本有货但实盘不足 (Redis:{virtual_vol}, Real:{real_can_use})") - else: - logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") - - except Exception: + v_vol = pos_manager.get_position(strategy_name, stock_code) + if v_vol > 0: + real_pos = xt_trader.query_stock_positions(acc) + rp = next((p for p in real_pos if p.stock_code==stock_code), None) + can_use = rp.can_use_volume if rp else 0 + final = min(v_vol, can_use) + if final > 0: + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') + if oid != -1: + logger.info(f"[{strategy_name}] 卖出 {stock_code} {final}") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') + except: logger.error("消息处理异常", exc_info=True) -# ================= 主程序入口 ================= + +# ================= QMT对象初始化 ================= +def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): + try: + session_id = int(time.time()) + trader = XtQuantTrader(qmt_path, session_id) + acc = StockAccount(account_id, account_type) + callback = MyXtQuantTraderCallback(pos_manager) + trader.register_callback(callback) + trader.start() + res = trader.connect() + if res == 0: + logger.info(f"QMT 连接成功 [Session:{session_id}]") + trader.subscribe(acc) + callback.is_connected = True + return trader, acc, callback + else: + logger.error(f"QMT 连接失败 Code:{res}") + return None, None, None + except Exception as e: + logger.error(f"初始化异常: {e}") + return None, None, None + +# ================= 主程序 ================= if __name__ == '__main__': - logger.info("正在启动 QMT 策略监听系统...") + logger.info("系统启动...") - # 1. 加载配置 CONFIG = load_config('config.json') redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] - - logger.info(f"监听策略列表: {watch_list}") - # 2. 连接 Redis try: - r = redis.Redis( - host=redis_cfg['host'], - port=redis_cfg['port'], - password=redis_cfg['password'], - db=redis_cfg['db'], - decode_responses=True - ) + r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() - logger.info("Redis 连接成功") pos_manager = PositionManager(r) except Exception as e: logger.critical(f"Redis 连接失败: {e}") sys.exit(1) - # 3. 初始化 QMT 对象 (暂不连接) - try: - session_id = int(time.time()) - xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) - acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) - - callback = MyXtQuantTraderCallback(pos_manager) - xt_trader.register_callback(callback) - xt_trader.start() - - # 初次连接 - connect_res = xt_trader.connect() - if connect_res == 0: - logger.info(f"QMT 初次连接成功: {qmt_cfg['account_id']}") - xt_trader.subscribe(acc) - callback.is_connected = True # 标记为已连接 - else: - logger.error(f"QMT 初次连接失败 ({connect_res}),将进入重连循环") - callback.is_connected = False + xt_trader, acc, callback = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + settler = None + if xt_trader: + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + + logger.info("进入主循环...") + + while True: + try: + # === [新增] 日志跨天检查 === + # 如果日期变更了,重新初始化日志,这会自动创建新日期的文件 + today_str = datetime.date.today().strftime('%Y-%m-%d') + if today_str != CURRENT_LOG_DATE: + logger.info(f"检测到跨天 ({CURRENT_LOG_DATE} -> {today_str}),切换日志文件...") + logger = setup_logger() + logger.info(f"日志切换完成,当前写入: logs/{today_str}.log") - except Exception as e: - logger.critical(f"QMT 初始化异常: {e}") - sys.exit(1) - - # 4. 初始化清算器 - settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - - logger.info("=== 系统就绪,开始循环监听 ===") - - try: - while True: - # ================= [新增] 断线重连逻辑 ================= - if not callback.is_connected: - logger.info("正在尝试重连 QMT...") - try: - # 尝试连接 - res = xt_trader.connect() - if res == 0: - logger.info("QMT 重连成功!重新订阅账户...") - # 重连后必须重新订阅 - xt_trader.subscribe(acc) - callback.is_connected = True - else: - logger.warning(f"QMT 重连失败 (Code: {res}),60秒后重试...") - time.sleep(60) - continue # 跳过本次循环,直接进入下一次检查 - except Exception as e: - logger.error(f"重连过程发生异常: {e}") + # === 断线重连 === + need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) + if need_reconnect: + logger.warning("连接丢失,执行硬重连...") + if xt_trader: + try: xt_trader.stop() + except: pass + + xt_trader, acc, callback = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + if xt_trader: + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + logger.info("重连成功") + else: + logger.error("重连失败,60秒后重试") time.sleep(60) continue - # ==================================================== + # === 业务轮询 === now = datetime.datetime.now() current_time_str = now.strftime('%H%M') - - # 默认休眠时间 - sleep_interval = 60 + sleep_sec = 60 - # === 交易时段 === if '0900' <= current_time_str <= '1500': - # 高频轮询时段 if '0920' <= current_time_str <= '1000': - sleep_interval = 10 + sleep_sec = 10 else: - sleep_interval = 60 + sleep_sec = 60 - if settler.has_settled: + if settler and settler.has_settled: settler.reset_flag() - for strategy in watch_list: - process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) + for s in watch_list: + process_strategy_queue(s, r, xt_trader, acc, pos_manager) - # === 收盘清算时段 === elif '1505' <= current_time_str <= '1510': - if not settler.has_settled: + if settler and not settler.has_settled: settler.run_settlement() - sleep_interval = 60 - time.sleep(sleep_interval) + time.sleep(sleep_sec) - except KeyboardInterrupt: - logger.info("用户手动终止程序") - except Exception as e: - logger.critical("主循环发生未捕获异常", exc_info=True) \ No newline at end of file + except KeyboardInterrupt: + logger.info("用户停止") + break + except Exception as e: + logger.critical("主循环未捕获异常", exc_info=True) + time.sleep(10) \ No newline at end of file