diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index bf951a4..bcca240 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -7,16 +7,16 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# 全局变量记录当前日志日期,用于跨天判断 +# 全局变量 CURRENT_LOG_DATE = None +CONFIG = {} +ORDER_CACHE = {} # ================= 1. 日志系统 (按日期直写) ================= def setup_logger(): """ 配置日志系统: - 1. 确保日志目录存在 - 2. 生成当天日期的日志文件 (YYYY-MM-DD.log) - 3. 同时输出到控制台 + 每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log """ global CURRENT_LOG_DATE @@ -26,15 +26,14 @@ def setup_logger(): # 获取今日日期 today_str = datetime.date.today().strftime('%Y-%m-%d') - CURRENT_LOG_DATE = today_str # 更新全局变量 + CURRENT_LOG_DATE = today_str - # 文件名直接就是日期: logs/2025-12-05.log log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) - # 【关键】清除旧的 handler,防止跨天后重复打印或写入旧文件 + # 清除旧 handler if logger.handlers: for handler in logger.handlers[:]: try: @@ -47,11 +46,11 @@ def setup_logger(): datefmt='%Y-%m-%d %H:%M:%S' ) - # Handler 1: 普通文件输出 (追加模式) + # 文件输出 file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) - # Handler 2: 控制台 + # 控制台输出 stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -60,14 +59,10 @@ def setup_logger(): return logger -# 初次初始化 +# 初始化日志 logger = setup_logger() -# ================= 全局变量 ================= -CONFIG = {} -ORDER_CACHE = {} - -# ================= 配置加载 ================= +# ================= 2. 配置加载 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) @@ -86,20 +81,24 @@ def load_config(config_file='config.json'): logger.error(f"配置文件错误: {e}") sys.exit(1) -# ================= 业务逻辑类 ================= +# ================= 3. 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): self.r = r_client def _get_key(self, strategy_name): return f"POS:{strategy_name}" def mark_holding(self, strategy_name, code): + """乐观占位""" self.r.hsetnx(self._get_key(strategy_name), code, 0) def rollback_holding(self, strategy_name, code): + """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) + logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): + """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: @@ -125,7 +124,8 @@ class DailySettlement: self.has_settled = False def run_settlement(self): logger.info("="*40) - logger.info("执行收盘清算...") + logger.info("执行收盘清算流程...") + # 1. 撤单 try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: @@ -133,11 +133,13 @@ class DailySettlement: self.trader.cancel_order_stock(self.acc, o.order_id) time.sleep(2) except: pass + # 2. 获取实盘 real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: for p in real_positions: if p.volume > 0: real_pos_map[p.stock_code] = p.volume + # 3. 校准Redis for strategy in self.strategies: virtual = self.pos_mgr.get_all_virtual_positions(strategy) for code, v in virtual.items(): @@ -149,16 +151,14 @@ class DailySettlement: def reset_flag(self): self.has_settled = False -# ================= 回调类 ================= +# ================= 4. 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr self.is_connected = False - def on_disconnected(self): logger.warning(">> 回调通知: 交易端连接断开") self.is_connected = False - def on_stock_trade(self, trade): try: cache_info = ORDER_CACHE.get(trade.order_id) @@ -169,7 +169,6 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) except: traceback.print_exc() - def on_order_error(self, err): try: logger.error(f"下单失败: {err.error_msg}") @@ -179,19 +178,30 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): del ORDER_CACHE[err.order_id] except: pass -# ================= 消息处理 ================= +# ================= 5. 核心消息处理 ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" msg_json = r_client.lpop(queue_key) if not msg_json: return try: + # 归档 r_client.rpush(f"{queue_key}:history", msg_json) + data = json.loads(msg_json) + # 1. 过滤回测 if data.get('is_backtest'): return - # 简单日期校验 - if data.get('timestamp', '').split(' ')[0] != datetime.date.today().strftime('%Y-%m-%d'): + # 2. 校验日期 (只处理当日消息) + msg_ts = data.get('timestamp') + today_str = datetime.date.today().strftime('%Y-%m-%d') + if msg_ts: + msg_date = msg_ts.split(' ')[0] + if msg_date != today_str: + logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}") + return + else: + logger.warning(f"[{strategy_name}] 消息无时间戳,忽略") return stock_code = data['stock_code'] @@ -200,24 +210,37 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) total_slots = int(data.get('total_slots', 1)) if action == 'BUY': + # 槽位检查 holding = pos_manager.get_holding_count(strategy_name) empty = total_slots - holding - if empty <= 0: return + if empty <= 0: + logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})") + return asset = xt_trader.query_stock_asset(acc) - if not asset: return + if not asset: + logger.error("无法查询资产,QMT可能未就绪") + return + # 金额计算 amt = asset.cash / empty - if amt < 2000: return + if amt < 2000: + logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})") + return + + if price <= 0: price = 1.0 + vol = int(amt / price / 100) * 100 - if price<=0: price=1.0 - vol = int(amt/price/100)*100 if vol >= 100: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') if oid != -1: - logger.info(f"[{strategy_name}] 买入 {stock_code} {vol}股") + logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) + else: + logger.error(f"[{strategy_name}] 下单被拒绝 (-1)") + else: + logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})") elif action == 'SELL': v_vol = pos_manager.get_position(strategy_name, stock_code) @@ -225,17 +248,23 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) real_pos = xt_trader.query_stock_positions(acc) rp = next((p for p in real_pos if p.stock_code==stock_code), None) can_use = rp.can_use_volume if rp else 0 + final = min(v_vol, can_use) if final > 0: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if oid != -1: - logger.info(f"[{strategy_name}] 卖出 {stock_code} {final}") + logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') - except: + else: + logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}") + else: + logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") + + except Exception: logger.error("消息处理异常", exc_info=True) -# ================= QMT对象初始化 ================= +# ================= 6. QMT初始化 ================= def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): try: session_id = int(time.time()) @@ -257,15 +286,17 @@ def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): logger.error(f"初始化异常: {e}") return None, None, None -# ================= 主程序 ================= +# ================= 7. 主程序 ================= if __name__ == '__main__': - logger.info("系统启动...") + logger.info(">>> 系统启动 (实盘生产模式) <<<") + # 加载配置 CONFIG = load_config('config.json') redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] + # 连接Redis try: r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() @@ -274,6 +305,7 @@ if __name__ == '__main__': logger.critical(f"Redis 连接失败: {e}") sys.exit(1) + # 初次连接 QMT xt_trader, acc, callback = init_qmt_trader( qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager ) @@ -282,22 +314,26 @@ if __name__ == '__main__': if xt_trader: settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - logger.info("进入主循环...") + logger.info(">>> 进入主循环监听 <<<") while True: try: - # === [新增] 日志跨天检查 === - # 如果日期变更了,重新初始化日志,这会自动创建新日期的文件 + # --- 1. 日志跨天处理 --- today_str = datetime.date.today().strftime('%Y-%m-%d') if today_str != CURRENT_LOG_DATE: - logger.info(f"检测到跨天 ({CURRENT_LOG_DATE} -> {today_str}),切换日志文件...") + logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...") logger = setup_logger() - logger.info(f"日志切换完成,当前写入: logs/{today_str}.log") - # === 断线重连 === + # --- 2. 断线重连 (仅工作日) --- need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) if need_reconnect: - logger.warning("连接丢失,执行硬重连...") + # 检查是否为周末 (5=周六, 6=周日) + if datetime.date.today().weekday() >= 5: + logger.info("当前是周末,暂停重连,休眠1小时") + time.sleep(3600) + continue + + logger.warning("连接丢失,尝试重连...") if xt_trader: try: xt_trader.stop() except: pass @@ -314,27 +350,52 @@ if __name__ == '__main__': time.sleep(60) continue - # === 业务轮询 === + # --- 3. 交易时段轮询逻辑 --- now = datetime.datetime.now() - current_time_str = now.strftime('%H%M') + current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS + + # 默认休眠 60秒 sleep_sec = 60 + is_trading_time = False - if '0900' <= current_time_str <= '1500': - if '0920' <= current_time_str <= '1000': - sleep_sec = 10 - else: - sleep_sec = 60 - + # 判断时段 + # 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒 + if '091500' <= current_time_str <= '100000': + sleep_sec = 1 + is_trading_time = True + + # 10:00:01 - 11:30:00 -> 低频 60秒 + elif '100000' < current_time_str <= '113000': + sleep_sec = 60 + is_trading_time = True + + # 13:00:00 - 14:50:00 -> 低频 60秒 + elif '130000' <= current_time_str < '145000': + sleep_sec = 60 + is_trading_time = True + + # 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒 + elif '145000' <= current_time_str <= '150000': + sleep_sec = 1 + is_trading_time = True + + # --- 4. 执行业务 --- + if is_trading_time: + # 每日重置清算标记 if settler and settler.has_settled: settler.reset_flag() + # 处理信号 for s in watch_list: process_strategy_queue(s, r, xt_trader, acc, pos_manager) - elif '1505' <= current_time_str <= '1510': + # --- 5. 收盘清算 (15:05 - 15:10) --- + elif '150500' <= current_time_str <= '151000': if settler and not settler.has_settled: settler.run_settlement() - + sleep_sec = 60 + + # 执行休眠 time.sleep(sleep_sec) except KeyboardInterrupt: