From 122ffbefba08a0bf309deb729cce1b9610cc3682 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Sun, 30 Nov 2025 23:41:35 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0qmt=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 +- qmt/qmt_trader.py | 416 ++++++++++++++++++++++++++++++---------------- 2 files changed, 277 insertions(+), 143 deletions(-) diff --git a/.gitignore b/.gitignore index 56b072e..4e039c6 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,6 @@ model !.env **/mlruns/ -**/mnt/ \ No newline at end of file +**/mnt/ + +/qmt/config.json \ No newline at end of file diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index c993cff..c229b8d 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,182 +1,314 @@ # coding:utf-8 -import time, datetime, traceback, sys, json +import time, datetime, traceback, sys, json, os import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# ================= 配置区域 ================= -QMT_PATH = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata' -ACCOUNT_ID = '2000128' -ACCOUNT_TYPE = 'STOCK' +# 全局变量占位 (稍后在 main 中初始化) +CONFIG = {} +ORDER_CACHE = {} -REDIS_HOST = '127.0.0.1' -REDIS_PORT = 6379 -REDIS_PASS = None +# ================= 配置加载模块 ================= +def load_config(config_file='config.json'): + """ + 读取同级目录下的配置文件 + """ + # 获取脚本所在目录 + if getattr(sys, 'frozen', False): + # 如果被打包为exe + base_path = os.path.dirname(sys.executable) + else: + # 普通脚本运行 + base_path = os.path.dirname(os.path.abspath(__file__)) + + full_path = os.path.join(base_path, config_file) + + if not os.path.exists(full_path): + # 尝试直接读取(兼容 QMT 内置 Python 的路径行为) + if os.path.exists(config_file): + full_path = config_file + else: + print(f"[错误] 找不到配置文件: {full_path}") + sys.exit(1) + + try: + with open(full_path, 'r', encoding='utf-8') as f: + config = json.load(f) + print(f"成功加载配置: {full_path}") + return config + except Exception as e: + print(f"[错误] 配置文件格式错误: {e}") + sys.exit(1) -# 策略基础名称 (不需要加 _real,代码会自动加) -STRATEGY_BASE_NAME = 'default_strategy' -# =========================================== +# ================= 业务逻辑类 (保持不变) ================= -# 定义监听的队列名称 (只监听实盘队列,物理屏蔽回测数据) -LISTEN_QUEUE = f"{STRATEGY_BASE_NAME}_real" +class PositionManager: + def __init__(self, r_client): + self.r = r_client + + def _get_key(self, strategy_name): + return f"POS:{strategy_name}" + + def mark_holding(self, strategy_name, code): + """乐观占位""" + self.r.hsetnx(self._get_key(strategy_name), code, 0) + # print(f"[{strategy_name}] 乐观占位: {code}") + + def rollback_holding(self, strategy_name, code): + """失败回滚""" + key = self._get_key(strategy_name) + val = self.r.hget(key, code) + if val is not None and int(val) == 0: + self.r.hdel(key, code) + print(f"[{strategy_name}] 回滚释放槽位: {code}") + + def update_actual_volume(self, strategy_name, code, delta_vol): + """成交更新""" + key = self._get_key(strategy_name) + new_vol = self.r.hincrby(key, code, int(delta_vol)) + if new_vol <= 0: + self.r.hdel(key, code) + new_vol = 0 + return new_vol + + def get_position(self, strategy_name, code): + vol = self.r.hget(self._get_key(strategy_name), code) + return int(vol) if vol else 0 + + def get_holding_count(self, strategy_name): + return self.r.hlen(self._get_key(strategy_name)) + + def get_all_virtual_positions(self, strategy_name): + return self.r.hgetall(self._get_key(strategy_name)) + + def force_delete(self, strategy_name, code): + self.r.hdel(self._get_key(strategy_name), code) + + +class DailySettlement: + def __init__(self, xt_trader, acc, pos_mgr, strategies): + self.trader = xt_trader + self.acc = acc + self.pos_mgr = pos_mgr + self.strategies = strategies + self.has_settled = False + + def run_settlement(self): + print("\n" + "="*40) + print(f"开始收盘清算: {datetime.datetime.now()}") + + # 1. 撤单 + orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) + if orders: + for o in orders: + self.trader.cancel_order_stock(self.acc, o.order_id) + time.sleep(2) + + # 2. 获取实盘真实持仓 + real_positions = self.trader.query_stock_positions(self.acc) + real_pos_map = {} + if real_positions: + for p in real_positions: + if p.volume > 0: + real_pos_map[p.stock_code] = p.volume + + # 3. 校准 Redis + for strategy in self.strategies: + virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) + for code, v_vol_str in virtual_data.items(): + if code not in real_pos_map: + print(f" [修正] {strategy} 幽灵持仓 {code} -> 强制释放") + self.pos_mgr.force_delete(strategy, code) + + print("清算完成") + self.has_settled = True + + def reset_flag(self): + self.has_settled = False class MyXtQuantTraderCallback(XtQuantTraderCallback): - def on_disconnected(self): - print("连接断开") + def __init__(self, pos_mgr): + self.pos_mgr = pos_mgr - def on_stock_order(self, order): - print(f"委托回报: {order.order_id} {order.order_remark}") + def on_disconnected(self): + print(">> 连接断开") def on_stock_trade(self, trade): - print(f"成交: {trade.stock_code} {trade.traded_volume}") + try: + order_id = trade.order_id + stock_code = trade.stock_code + traded_vol = trade.traded_volume + + cache_info = ORDER_CACHE.get(order_id) + if not cache_info: return + + strategy_name, cached_code, action_type = cache_info + print(f">>> [成交] {strategy_name} {stock_code} 成交量:{traded_vol}") + + if action_type == 'BUY': + self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) + elif action_type == 'SELL': + self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) + + except Exception: + traceback.print_exc() def on_order_error(self, order_error): - print(f"下单失败: {order_error.error_msg}") + try: + order_id = order_error.order_id + print(f">>> [下单失败] ID:{order_id} Msg:{order_error.error_msg}") + cache_info = ORDER_CACHE.get(order_id) + if cache_info: + strategy_name, stock_code, action_type = cache_info + if action_type == 'BUY': + self.pos_mgr.rollback_holding(strategy_name, stock_code) + del ORDER_CACHE[order_id] + except: pass -def init_redis(): +def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): + queue_key = f"{strategy_name}_real" + msg_json = r_client.lpop(queue_key) + if not msg_json: return + try: - r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASS, decode_responses=True) - r.ping() - return r - except Exception as e: - print(f"Redis连接失败: {e}") - return None - - -def is_msg_valid(data): - """ - 【安全核心】校验消息时效性与合法性 - """ - try: - # 1. 检查是否为回测标记 (防御性编程,虽然队列已物理隔离) - if data.get('is_backtest', False): - print(f"警报:拦截到回测数据,已丢弃!") - return False - - # 2. 检查时间戳 - msg_time_str = data.get('timestamp') - if not msg_time_str: - print("数据缺失时间戳,丢弃") - return False - - # 解析消息时间 - # 格式必须匹配策略端发送的 '%Y-%m-%d %H:%M:%S' - msg_dt = datetime.datetime.strptime(msg_time_str, '%Y-%m-%d %H:%M:%S') - msg_date = msg_dt.date() - - # 获取当前服务器日期 - today = datetime.date.today() - - # 3. 【核心】判断是否为当天的消息 - if msg_date != today: - print(f"拦截过期消息: 消息日期[{msg_date}] != 今日[{today}]") - return False - - # 可选:如果你想更严格,可以判断时间差不能超过5分钟 - # delta = datetime.datetime.now() - msg_dt - # if abs(delta.total_seconds()) > 300: ... - - return True - - except Exception as e: - print(f"校验逻辑异常: {e}") - return False - - -def process_redis_signal(r_client, xt_trader, acc): - try: - msg_json = r_client.lpop(LISTEN_QUEUE) - if not msg_json: return - - print(f"收到信号: {msg_json}") data = json.loads(msg_json) - - if not is_msg_valid(data): return # 之前的校验逻辑 - + + # 校验 + if data.get('is_backtest', False): return + msg_ts = data.get('timestamp') + if msg_ts: + msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() + if msg_date != datetime.date.today(): + return + stock_code = data['stock_code'] action = data['action'] price = float(data['price']) + target_total_slots = int(data.get('total_slots', 1)) - # 获取切分份数 - # 兼容性处理:如果redis里还是旧key 'weight',也可以尝试获取 - div_count = float(data.get('div_count', data.get('weight', 1))) - - # ========================================================= - # 买入逻辑:资金切片法 - # ========================================================= if action == 'BUY': - # 1. 必须查最新的可用资金 (Available Cash) + current_holding_count = pos_manager.get_holding_count(strategy_name) + empty_slots = target_total_slots - current_holding_count + + if empty_slots <= 0: return + asset = xt_trader.query_stock_asset(acc) - if not asset: - print("错误:无法查询资产") - return + if not asset: return + + target_amount = asset.cash / empty_slots + if target_amount < 2000: return - current_cash = asset.cash - - # 2. 计算下单金额 - # 逻辑:Amount = Cash / div_count - if div_count <= 0: div_count = 1 # 防止除0 - - target_amount = current_cash / div_count - - # 3. 打印调试信息 (非常重要) - print(f"【资金分配】可用现金:{current_cash:.2f} / 切分份数:{div_count} = 下单金额:{target_amount:.2f}") - - # 4. 计算股数 if price <= 0: price = 1.0 - - # 过滤小额杂单 - if target_amount < 2000: - print(f"忽略:金额过小 ({target_amount:.2f})") - return - vol = int(target_amount / price / 100) * 100 - + if vol >= 100: - xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, - STRATEGY_BASE_NAME, 'PyBuy') - print(f"买入下单: {stock_code} {vol}股") - else: - print(f"计算股数不足100股") + order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') + if order_id != -1: + print(f"[{strategy_name}] 买入 {stock_code} {vol}股 (1/{empty_slots})") + ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') + pos_manager.mark_holding(strategy_name, stock_code) - # ========================================================= - # 卖出逻辑 (清仓) - # ========================================================= elif action == 'SELL': - positions = xt_trader.query_stock_positions(acc) - target_pos = next((p for p in positions if p.stock_code == stock_code), None) + virtual_vol = pos_manager.get_position(strategy_name, stock_code) + if virtual_vol > 0: + real_positions = xt_trader.query_stock_positions(acc) + real_pos = next((p for p in real_positions if p.stock_code == stock_code), None) + real_can_use = real_pos.can_use_volume if real_pos else 0 + + final_vol = min(virtual_vol, real_can_use) + if final_vol > 0: + order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') + if order_id != -1: + print(f"[{strategy_name}] 卖出 {stock_code} {final_vol}股") + ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') - if target_pos and target_pos.can_use_volume > 0: - xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, target_pos.can_use_volume, - xtconstant.FIX_PRICE, price, STRATEGY_BASE_NAME, 'PySell') - print(f"卖出下单: {stock_code} {target_pos.can_use_volume}股") - else: - print(f"无可用持仓: {stock_code}") - - except Exception as e: - print(f"处理异常: {e}") + except Exception: traceback.print_exc() - +# ================= 主程序入口 ================= if __name__ == '__main__': - r_client = init_redis() - session_id = int(time.time()) - xt_trader = XtQuantTrader(QMT_PATH, session_id) - acc = StockAccount(ACCOUNT_ID, ACCOUNT_TYPE) - callback = MyXtQuantTraderCallback() - xt_trader.register_callback(callback) - xt_trader.start() - xt_trader.connect() - xt_trader.subscribe(acc) + print("正在启动...") + + # 1. 加载配置 + CONFIG = load_config('config.json') + + # 从配置中提取参数 + redis_cfg = CONFIG['redis'] + qmt_cfg = CONFIG['qmt'] + watch_list = CONFIG['strategies'] + + print(f"Redis目标: {redis_cfg['host']}:{redis_cfg['port']}") + print(f"QMT路径: {qmt_cfg['path']}") + print(f"监听策略: {watch_list}") - print(f"=== 启动监听: {LISTEN_QUEUE} ===") - print("只处理当日的实盘/模拟信号,自动过滤回测数据及历史遗留数据。") + # 2. 连接 Redis + try: + r = redis.Redis( + host=redis_cfg['host'], + port=redis_cfg['port'], + password=redis_cfg['password'], + db=redis_cfg['db'], + decode_responses=True + ) + r.ping() + print("Redis 连接成功") + pos_manager = PositionManager(r) + except Exception as e: + print(f"[FATAL] Redis 连接失败: {e}") + sys.exit(1) - while True: - if r_client: - process_redis_signal(r_client, xt_trader, acc) - time.sleep(60) \ No newline at end of file + # 3. 连接 QMT + try: + session_id = int(time.time()) + xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) + + acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) + + callback = MyXtQuantTraderCallback(pos_manager) + xt_trader.register_callback(callback) + + xt_trader.start() + connect_res = xt_trader.connect() + + if connect_res == 0: + print(f"QMT 连接成功: {qmt_cfg['account_id']}") + xt_trader.subscribe(acc) + else: + print(f"[FATAL] QMT 连接失败,错误码: {connect_res}") + sys.exit(1) + + except Exception as e: + print(f"[FATAL] QMT 初始化异常: {e}") + sys.exit(1) + + # 4. 初始化清算器 + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + + print("=== 系统就绪,开始监听 ===") + + try: + while True: + now = datetime.datetime.now() + current_time_str = now.strftime('%H%M') + + # 交易时段 + if '0900' <= current_time_str <= '1500': + if settler.has_settled: + settler.reset_flag() + + for strategy in watch_list: + process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) + + # 收盘清算时段 + elif '1505' <= current_time_str <= '1510': + if not settler.has_settled: + settler.run_settlement() + + time.sleep(60) + + except KeyboardInterrupt: + print("用户终止程序") \ No newline at end of file From bbf1d2248cc63dd913dc4ca737c6c7a7b194bbc3 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Tue, 2 Dec 2025 23:38:49 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0qmt=E4=BB=A3=E7=A0=81=201?= =?UTF-8?q?=E3=80=81=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= =?UTF-8?q?=202=E3=80=81=E6=96=AD=E8=BF=9E=E9=87=8D=E8=BF=9E=203=E3=80=81r?= =?UTF-8?q?edis=E6=8C=81=E4=B9=85=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- qmt/qmt_trader.py | 222 +++++++++++++++++++++++++++++++++------------- 2 files changed, 161 insertions(+), 64 deletions(-) diff --git a/.gitignore b/.gitignore index 4e039c6..8d33143 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ model **/mlruns/ **/mnt/ -/qmt/config.json \ No newline at end of file +/qmt/config.json +/qmt/logs/* \ No newline at end of file diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index c229b8d..60b310d 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,48 +1,82 @@ # coding:utf-8 import time, datetime, traceback, sys, json, os +import logging +from logging.handlers import TimedRotatingFileHandler import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# 全局变量占位 (稍后在 main 中初始化) +# ================= 1. 日志系统初始化 ================= +def setup_logger(): + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + today = datetime.date.today().strftime('%Y-%m-%d') + log_file = os.path.join(log_dir, f"qmt_trade_{today}.log") + + logger = logging.getLogger("QMT_Trader") + logger.setLevel(logging.INFO) + + if logger.handlers: + logger.handlers.clear() + + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + file_handler = TimedRotatingFileHandler( + filename=log_file, + when='MIDNIGHT', + interval=1, + backupCount=30, + encoding='utf-8' + ) + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + +logger = setup_logger() + +# ================= 全局变量 ================= CONFIG = {} ORDER_CACHE = {} # ================= 配置加载模块 ================= def load_config(config_file='config.json'): - """ - 读取同级目录下的配置文件 - """ - # 获取脚本所在目录 if getattr(sys, 'frozen', False): - # 如果被打包为exe base_path = os.path.dirname(sys.executable) else: - # 普通脚本运行 base_path = os.path.dirname(os.path.abspath(__file__)) full_path = os.path.join(base_path, config_file) if not os.path.exists(full_path): - # 尝试直接读取(兼容 QMT 内置 Python 的路径行为) if os.path.exists(config_file): full_path = config_file else: - print(f"[错误] 找不到配置文件: {full_path}") + logger.error(f"找不到配置文件: {full_path}") sys.exit(1) try: with open(full_path, 'r', encoding='utf-8') as f: config = json.load(f) - print(f"成功加载配置: {full_path}") + logger.info(f"成功加载配置: {full_path}") return config except Exception as e: - print(f"[错误] 配置文件格式错误: {e}") + logger.error(f"配置文件格式错误: {e}") sys.exit(1) -# ================= 业务逻辑类 (保持不变) ================= +# ================= 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): @@ -53,8 +87,9 @@ class PositionManager: def mark_holding(self, strategy_name, code): """乐观占位""" - self.r.hsetnx(self._get_key(strategy_name), code, 0) - # print(f"[{strategy_name}] 乐观占位: {code}") + key = self._get_key(strategy_name) + if self.r.hsetnx(key, code, 0): + logger.info(f"[{strategy_name}] Redis乐观占位成功: {code}") def rollback_holding(self, strategy_name, code): """失败回滚""" @@ -62,7 +97,7 @@ class PositionManager: val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) - print(f"[{strategy_name}] 回滚释放槽位: {code}") + logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): """成交更新""" @@ -96,17 +131,21 @@ class DailySettlement: self.has_settled = False def run_settlement(self): - print("\n" + "="*40) - print(f"开始收盘清算: {datetime.datetime.now()}") + logger.info("="*40) + logger.info("开始执行收盘清算流程...") - # 1. 撤单 - orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) - if orders: - for o in orders: - self.trader.cancel_order_stock(self.acc, o.order_id) - time.sleep(2) + try: + orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) + if orders: + for o in orders: + self.trader.cancel_order_stock(self.acc, o.order_id) + logger.info(f" -> 撤销未成交挂单: {o.stock_code} {o.order_volume}") + time.sleep(2) + else: + logger.info(" -> 无未成交订单") + except Exception as e: + logger.error(f"查询/撤单异常: {e}") - # 2. 获取实盘真实持仓 real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: @@ -114,15 +153,15 @@ class DailySettlement: if p.volume > 0: real_pos_map[p.stock_code] = p.volume - # 3. 校准 Redis for strategy in self.strategies: virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) for code, v_vol_str in virtual_data.items(): if code not in real_pos_map: - print(f" [修正] {strategy} 幽灵持仓 {code} -> 强制释放") + logger.warning(f" [修正] 策略[{strategy}] 幽灵持仓 {code} (Redis={v_vol_str}) -> 强制释放") self.pos_mgr.force_delete(strategy, code) - print("清算完成") + logger.info("收盘清算完成") + logger.info("="*40) self.has_settled = True def reset_flag(self): @@ -132,9 +171,15 @@ class DailySettlement: class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr + # 【新增】连接状态标记,默认False,连接成功后外部置为True + self.is_connected = False def on_disconnected(self): - print(">> 连接断开") + """ + 连接断开回调 + """ + logger.warning(">> 检测到交易端连接断开 (QMT可能正在重启)") + self.is_connected = False def on_stock_trade(self, trade): try: @@ -146,27 +191,32 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): if not cache_info: return strategy_name, cached_code, action_type = cache_info - print(f">>> [成交] {strategy_name} {stock_code} 成交量:{traded_vol}") + logger.info(f">>> [成交回报] 策略:{strategy_name} 代码:{stock_code} 成交量:{traded_vol}") if action_type == 'BUY': - self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) + new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) + logger.info(f" [记账] {strategy_name} 持仓增加 -> {new_pos}") elif action_type == 'SELL': - self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) + new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) + logger.info(f" [记账] {strategy_name} 持仓扣减 -> {new_pos}") except Exception: - traceback.print_exc() + logger.error("成交回调处理异常", exc_info=True) def on_order_error(self, order_error): try: order_id = order_error.order_id - print(f">>> [下单失败] ID:{order_id} Msg:{order_error.error_msg}") + err_msg = order_error.error_msg + logger.error(f">>> [下单失败] ID:{order_id} Msg:{err_msg}") + cache_info = ORDER_CACHE.get(order_id) if cache_info: strategy_name, stock_code, action_type = cache_info if action_type == 'BUY': self.pos_mgr.rollback_holding(strategy_name, stock_code) del ORDER_CACHE[order_id] - except: pass + except: + pass def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): @@ -175,14 +225,17 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) if not msg_json: return try: + history_key = f"{queue_key}:history" + r_client.rpush(history_key, msg_json) + data = json.loads(msg_json) - # 校验 if data.get('is_backtest', False): return msg_ts = data.get('timestamp') if msg_ts: msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() if msg_date != datetime.date.today(): + logger.warning(f"忽略过期消息: {msg_ts}") return stock_code = data['stock_code'] @@ -190,17 +243,25 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) price = float(data['price']) target_total_slots = int(data.get('total_slots', 1)) + logger.info(f"收到信号 [{strategy_name}]: {stock_code} {action} Slot:{target_total_slots}") + if action == 'BUY': current_holding_count = pos_manager.get_holding_count(strategy_name) empty_slots = target_total_slots - current_holding_count - if empty_slots <= 0: return + if empty_slots <= 0: + logger.warning(f"[{strategy_name}] 槽位已满,忽略买入") + return asset = xt_trader.query_stock_asset(acc) - if not asset: return + if not asset: + logger.error("无法查询资产信息(可能连接未就绪)") + return target_amount = asset.cash / empty_slots - if target_amount < 2000: return + if target_amount < 2000: + logger.info(f"[{strategy_name}] 下单金额过小 ({target_amount:.2f}),忽略") + return if price <= 0: price = 1.0 vol = int(target_amount / price / 100) * 100 @@ -208,9 +269,11 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) if vol >= 100: order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') if order_id != -1: - print(f"[{strategy_name}] 买入 {stock_code} {vol}股 (1/{empty_slots})") + logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 (1/{empty_slots}) ID:{order_id}") ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) + else: + logger.error(f"[{strategy_name}] 下单请求被柜台拒绝 (-1)") elif action == 'SELL': virtual_vol = pos_manager.get_position(strategy_name, stock_code) @@ -223,27 +286,27 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) if final_vol > 0: order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if order_id != -1: - print(f"[{strategy_name}] 卖出 {stock_code} {final_vol}股") + logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final_vol}股 ID:{order_id}") ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') + else: + logger.warning(f"[{strategy_name}] 账本有货但实盘不足 (Redis:{virtual_vol}, Real:{real_can_use})") + else: + logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") except Exception: - traceback.print_exc() + logger.error("消息处理异常", exc_info=True) # ================= 主程序入口 ================= if __name__ == '__main__': - print("正在启动...") + logger.info("正在启动 QMT 策略监听系统...") # 1. 加载配置 CONFIG = load_config('config.json') - - # 从配置中提取参数 redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] - print(f"Redis目标: {redis_cfg['host']}:{redis_cfg['port']}") - print(f"QMT路径: {qmt_cfg['path']}") - print(f"监听策略: {watch_list}") + logger.info(f"监听策略列表: {watch_list}") # 2. 连接 Redis try: @@ -255,60 +318,93 @@ if __name__ == '__main__': decode_responses=True ) r.ping() - print("Redis 连接成功") + logger.info("Redis 连接成功") pos_manager = PositionManager(r) except Exception as e: - print(f"[FATAL] Redis 连接失败: {e}") + logger.critical(f"Redis 连接失败: {e}") sys.exit(1) - # 3. 连接 QMT + # 3. 初始化 QMT 对象 (暂不连接) try: session_id = int(time.time()) xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) - acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) callback = MyXtQuantTraderCallback(pos_manager) xt_trader.register_callback(callback) - xt_trader.start() - connect_res = xt_trader.connect() + # 初次连接 + connect_res = xt_trader.connect() if connect_res == 0: - print(f"QMT 连接成功: {qmt_cfg['account_id']}") + logger.info(f"QMT 初次连接成功: {qmt_cfg['account_id']}") xt_trader.subscribe(acc) + callback.is_connected = True # 标记为已连接 else: - print(f"[FATAL] QMT 连接失败,错误码: {connect_res}") - sys.exit(1) + logger.error(f"QMT 初次连接失败 ({connect_res}),将进入重连循环") + callback.is_connected = False except Exception as e: - print(f"[FATAL] QMT 初始化异常: {e}") + logger.critical(f"QMT 初始化异常: {e}") sys.exit(1) # 4. 初始化清算器 settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - print("=== 系统就绪,开始监听 ===") + logger.info("=== 系统就绪,开始循环监听 ===") try: while True: + # ================= [新增] 断线重连逻辑 ================= + if not callback.is_connected: + logger.info("正在尝试重连 QMT...") + try: + # 尝试连接 + res = xt_trader.connect() + if res == 0: + logger.info("QMT 重连成功!重新订阅账户...") + # 重连后必须重新订阅 + xt_trader.subscribe(acc) + callback.is_connected = True + else: + logger.warning(f"QMT 重连失败 (Code: {res}),60秒后重试...") + time.sleep(60) + continue # 跳过本次循环,直接进入下一次检查 + except Exception as e: + logger.error(f"重连过程发生异常: {e}") + time.sleep(60) + continue + # ==================================================== + now = datetime.datetime.now() current_time_str = now.strftime('%H%M') + + # 默认休眠时间 + sleep_interval = 60 - # 交易时段 + # === 交易时段 === if '0900' <= current_time_str <= '1500': + # 高频轮询时段 + if '0920' <= current_time_str <= '1000': + sleep_interval = 10 + else: + sleep_interval = 60 + if settler.has_settled: settler.reset_flag() for strategy in watch_list: process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) - # 收盘清算时段 + # === 收盘清算时段 === elif '1505' <= current_time_str <= '1510': if not settler.has_settled: settler.run_settlement() - - time.sleep(60) + sleep_interval = 60 + + time.sleep(sleep_interval) except KeyboardInterrupt: - print("用户终止程序") \ No newline at end of file + logger.info("用户手动终止程序") + except Exception as e: + logger.critical("主循环发生未捕获异常", exc_info=True) \ No newline at end of file From c30d74d25105ee3fce99a96b765515b34d84347b Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Fri, 5 Dec 2025 00:28:50 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0qmt=EF=BC=9A=201=E3=80=81?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=87=8D=E8=BF=9Ebug=202=E3=80=81=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=97=A5=E5=BF=97=E5=90=8D=E5=AD=97bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- qmt/qmt_trader.py | 397 +++++++++++++++++++--------------------------- 1 file changed, 166 insertions(+), 231 deletions(-) diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index 60b310d..bf951a4 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,42 +1,57 @@ # coding:utf-8 import time, datetime, traceback, sys, json, os import logging -from logging.handlers import TimedRotatingFileHandler import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# ================= 1. 日志系统初始化 ================= +# 全局变量记录当前日志日期,用于跨天判断 +CURRENT_LOG_DATE = None + +# ================= 1. 日志系统 (按日期直写) ================= def setup_logger(): + """ + 配置日志系统: + 1. 确保日志目录存在 + 2. 生成当天日期的日志文件 (YYYY-MM-DD.log) + 3. 同时输出到控制台 + """ + global CURRENT_LOG_DATE + log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) - today = datetime.date.today().strftime('%Y-%m-%d') - log_file = os.path.join(log_dir, f"qmt_trade_{today}.log") + # 获取今日日期 + today_str = datetime.date.today().strftime('%Y-%m-%d') + CURRENT_LOG_DATE = today_str # 更新全局变量 + + # 文件名直接就是日期: logs/2025-12-05.log + log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) + # 【关键】清除旧的 handler,防止跨天后重复打印或写入旧文件 if logger.handlers: - logger.handlers.clear() + for handler in logger.handlers[:]: + try: + handler.close() + logger.removeHandler(handler) + except: pass formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) - file_handler = TimedRotatingFileHandler( - filename=log_file, - when='MIDNIGHT', - interval=1, - backupCount=30, - encoding='utf-8' - ) + # Handler 1: 普通文件输出 (追加模式) + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) + # Handler 2: 控制台 stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -45,83 +60,62 @@ def setup_logger(): return logger +# 初次初始化 logger = setup_logger() # ================= 全局变量 ================= CONFIG = {} ORDER_CACHE = {} -# ================= 配置加载模块 ================= +# ================= 配置加载 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) else: base_path = os.path.dirname(os.path.abspath(__file__)) - full_path = os.path.join(base_path, config_file) - if not os.path.exists(full_path): - if os.path.exists(config_file): - full_path = config_file + if os.path.exists(config_file): full_path = config_file else: logger.error(f"找不到配置文件: {full_path}") sys.exit(1) - try: with open(full_path, 'r', encoding='utf-8') as f: - config = json.load(f) - logger.info(f"成功加载配置: {full_path}") - return config + return json.load(f) except Exception as e: - logger.error(f"配置文件格式错误: {e}") + logger.error(f"配置文件错误: {e}") sys.exit(1) # ================= 业务逻辑类 ================= - class PositionManager: def __init__(self, r_client): self.r = r_client - def _get_key(self, strategy_name): return f"POS:{strategy_name}" - def mark_holding(self, strategy_name, code): - """乐观占位""" - key = self._get_key(strategy_name) - if self.r.hsetnx(key, code, 0): - logger.info(f"[{strategy_name}] Redis乐观占位成功: {code}") - + self.r.hsetnx(self._get_key(strategy_name), code, 0) def rollback_holding(self, strategy_name, code): - """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) - logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") - def update_actual_volume(self, strategy_name, code, delta_vol): - """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: self.r.hdel(key, code) new_vol = 0 return new_vol - def get_position(self, strategy_name, code): vol = self.r.hget(self._get_key(strategy_name), code) return int(vol) if vol else 0 - def get_holding_count(self, strategy_name): return self.r.hlen(self._get_key(strategy_name)) - def get_all_virtual_positions(self, strategy_name): return self.r.hgetall(self._get_key(strategy_name)) - def force_delete(self, strategy_name, code): self.r.hdel(self._get_key(strategy_name), code) - class DailySettlement: def __init__(self, xt_trader, acc, pos_mgr, strategies): self.trader = xt_trader @@ -129,282 +123,223 @@ class DailySettlement: self.pos_mgr = pos_mgr self.strategies = strategies self.has_settled = False - def run_settlement(self): logger.info("="*40) - logger.info("开始执行收盘清算流程...") - + logger.info("执行收盘清算...") try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: for o in orders: self.trader.cancel_order_stock(self.acc, o.order_id) - logger.info(f" -> 撤销未成交挂单: {o.stock_code} {o.order_volume}") time.sleep(2) - else: - logger.info(" -> 无未成交订单") - except Exception as e: - logger.error(f"查询/撤单异常: {e}") - + except: pass real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: for p in real_positions: - if p.volume > 0: - real_pos_map[p.stock_code] = p.volume - + if p.volume > 0: real_pos_map[p.stock_code] = p.volume for strategy in self.strategies: - virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) - for code, v_vol_str in virtual_data.items(): + virtual = self.pos_mgr.get_all_virtual_positions(strategy) + for code, v in virtual.items(): if code not in real_pos_map: - logger.warning(f" [修正] 策略[{strategy}] 幽灵持仓 {code} (Redis={v_vol_str}) -> 强制释放") + logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放") self.pos_mgr.force_delete(strategy, code) - - logger.info("收盘清算完成") - logger.info("="*40) + logger.info("清算完成") self.has_settled = True - def reset_flag(self): self.has_settled = False - +# ================= 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr - # 【新增】连接状态标记,默认False,连接成功后外部置为True - self.is_connected = False + self.is_connected = False def on_disconnected(self): - """ - 连接断开回调 - """ - logger.warning(">> 检测到交易端连接断开 (QMT可能正在重启)") + logger.warning(">> 回调通知: 交易端连接断开") self.is_connected = False def on_stock_trade(self, trade): try: - order_id = trade.order_id - stock_code = trade.stock_code - traded_vol = trade.traded_volume + cache_info = ORDER_CACHE.get(trade.order_id) + if not cache_info: return + strategy, _, action = cache_info - cache_info = ORDER_CACHE.get(order_id) - if not cache_info: return + logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") + if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) + elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) + except: traceback.print_exc() - strategy_name, cached_code, action_type = cache_info - logger.info(f">>> [成交回报] 策略:{strategy_name} 代码:{stock_code} 成交量:{traded_vol}") - - if action_type == 'BUY': - new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) - logger.info(f" [记账] {strategy_name} 持仓增加 -> {new_pos}") - elif action_type == 'SELL': - new_pos = self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) - logger.info(f" [记账] {strategy_name} 持仓扣减 -> {new_pos}") - - except Exception: - logger.error("成交回调处理异常", exc_info=True) - - def on_order_error(self, order_error): + def on_order_error(self, err): try: - order_id = order_error.order_id - err_msg = order_error.error_msg - logger.error(f">>> [下单失败] ID:{order_id} Msg:{err_msg}") - - cache_info = ORDER_CACHE.get(order_id) - if cache_info: - strategy_name, stock_code, action_type = cache_info - if action_type == 'BUY': - self.pos_mgr.rollback_holding(strategy_name, stock_code) - del ORDER_CACHE[order_id] - except: - pass - + logger.error(f"下单失败: {err.error_msg}") + cache = ORDER_CACHE.get(err.order_id) + if cache and cache[2] == 'BUY': + self.pos_mgr.rollback_holding(cache[0], cache[1]) + del ORDER_CACHE[err.order_id] + except: pass +# ================= 消息处理 ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" msg_json = r_client.lpop(queue_key) if not msg_json: return try: - history_key = f"{queue_key}:history" - r_client.rpush(history_key, msg_json) - + r_client.rpush(f"{queue_key}:history", msg_json) data = json.loads(msg_json) + if data.get('is_backtest'): return - if data.get('is_backtest', False): return - msg_ts = data.get('timestamp') - if msg_ts: - msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() - if msg_date != datetime.date.today(): - logger.warning(f"忽略过期消息: {msg_ts}") - return - + # 简单日期校验 + if data.get('timestamp', '').split(' ')[0] != datetime.date.today().strftime('%Y-%m-%d'): + return + stock_code = data['stock_code'] action = data['action'] price = float(data['price']) - target_total_slots = int(data.get('total_slots', 1)) - - logger.info(f"收到信号 [{strategy_name}]: {stock_code} {action} Slot:{target_total_slots}") + total_slots = int(data.get('total_slots', 1)) if action == 'BUY': - current_holding_count = pos_manager.get_holding_count(strategy_name) - empty_slots = target_total_slots - current_holding_count - - if empty_slots <= 0: - logger.warning(f"[{strategy_name}] 槽位已满,忽略买入") - return + holding = pos_manager.get_holding_count(strategy_name) + empty = total_slots - holding + if empty <= 0: return asset = xt_trader.query_stock_asset(acc) - if not asset: - logger.error("无法查询资产信息(可能连接未就绪)") - return + if not asset: return - target_amount = asset.cash / empty_slots - if target_amount < 2000: - logger.info(f"[{strategy_name}] 下单金额过小 ({target_amount:.2f}),忽略") - return - - if price <= 0: price = 1.0 - vol = int(target_amount / price / 100) * 100 + amt = asset.cash / empty + if amt < 2000: return + if price<=0: price=1.0 + vol = int(amt/price/100)*100 if vol >= 100: - order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') - if order_id != -1: - logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 (1/{empty_slots}) ID:{order_id}") - ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') + if oid != -1: + logger.info(f"[{strategy_name}] 买入 {stock_code} {vol}股") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) - else: - logger.error(f"[{strategy_name}] 下单请求被柜台拒绝 (-1)") elif action == 'SELL': - virtual_vol = pos_manager.get_position(strategy_name, stock_code) - if virtual_vol > 0: - real_positions = xt_trader.query_stock_positions(acc) - real_pos = next((p for p in real_positions if p.stock_code == stock_code), None) - real_can_use = real_pos.can_use_volume if real_pos else 0 - - final_vol = min(virtual_vol, real_can_use) - if final_vol > 0: - order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') - if order_id != -1: - logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final_vol}股 ID:{order_id}") - ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') - else: - logger.warning(f"[{strategy_name}] 账本有货但实盘不足 (Redis:{virtual_vol}, Real:{real_can_use})") - else: - logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") - - except Exception: + v_vol = pos_manager.get_position(strategy_name, stock_code) + if v_vol > 0: + real_pos = xt_trader.query_stock_positions(acc) + rp = next((p for p in real_pos if p.stock_code==stock_code), None) + can_use = rp.can_use_volume if rp else 0 + final = min(v_vol, can_use) + if final > 0: + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') + if oid != -1: + logger.info(f"[{strategy_name}] 卖出 {stock_code} {final}") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') + except: logger.error("消息处理异常", exc_info=True) -# ================= 主程序入口 ================= + +# ================= QMT对象初始化 ================= +def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): + try: + session_id = int(time.time()) + trader = XtQuantTrader(qmt_path, session_id) + acc = StockAccount(account_id, account_type) + callback = MyXtQuantTraderCallback(pos_manager) + trader.register_callback(callback) + trader.start() + res = trader.connect() + if res == 0: + logger.info(f"QMT 连接成功 [Session:{session_id}]") + trader.subscribe(acc) + callback.is_connected = True + return trader, acc, callback + else: + logger.error(f"QMT 连接失败 Code:{res}") + return None, None, None + except Exception as e: + logger.error(f"初始化异常: {e}") + return None, None, None + +# ================= 主程序 ================= if __name__ == '__main__': - logger.info("正在启动 QMT 策略监听系统...") + logger.info("系统启动...") - # 1. 加载配置 CONFIG = load_config('config.json') redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] - - logger.info(f"监听策略列表: {watch_list}") - # 2. 连接 Redis try: - r = redis.Redis( - host=redis_cfg['host'], - port=redis_cfg['port'], - password=redis_cfg['password'], - db=redis_cfg['db'], - decode_responses=True - ) + r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() - logger.info("Redis 连接成功") pos_manager = PositionManager(r) except Exception as e: logger.critical(f"Redis 连接失败: {e}") sys.exit(1) - # 3. 初始化 QMT 对象 (暂不连接) - try: - session_id = int(time.time()) - xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) - acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) - - callback = MyXtQuantTraderCallback(pos_manager) - xt_trader.register_callback(callback) - xt_trader.start() - - # 初次连接 - connect_res = xt_trader.connect() - if connect_res == 0: - logger.info(f"QMT 初次连接成功: {qmt_cfg['account_id']}") - xt_trader.subscribe(acc) - callback.is_connected = True # 标记为已连接 - else: - logger.error(f"QMT 初次连接失败 ({connect_res}),将进入重连循环") - callback.is_connected = False + xt_trader, acc, callback = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + settler = None + if xt_trader: + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + + logger.info("进入主循环...") + + while True: + try: + # === [新增] 日志跨天检查 === + # 如果日期变更了,重新初始化日志,这会自动创建新日期的文件 + today_str = datetime.date.today().strftime('%Y-%m-%d') + if today_str != CURRENT_LOG_DATE: + logger.info(f"检测到跨天 ({CURRENT_LOG_DATE} -> {today_str}),切换日志文件...") + logger = setup_logger() + logger.info(f"日志切换完成,当前写入: logs/{today_str}.log") - except Exception as e: - logger.critical(f"QMT 初始化异常: {e}") - sys.exit(1) - - # 4. 初始化清算器 - settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - - logger.info("=== 系统就绪,开始循环监听 ===") - - try: - while True: - # ================= [新增] 断线重连逻辑 ================= - if not callback.is_connected: - logger.info("正在尝试重连 QMT...") - try: - # 尝试连接 - res = xt_trader.connect() - if res == 0: - logger.info("QMT 重连成功!重新订阅账户...") - # 重连后必须重新订阅 - xt_trader.subscribe(acc) - callback.is_connected = True - else: - logger.warning(f"QMT 重连失败 (Code: {res}),60秒后重试...") - time.sleep(60) - continue # 跳过本次循环,直接进入下一次检查 - except Exception as e: - logger.error(f"重连过程发生异常: {e}") + # === 断线重连 === + need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) + if need_reconnect: + logger.warning("连接丢失,执行硬重连...") + if xt_trader: + try: xt_trader.stop() + except: pass + + xt_trader, acc, callback = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + if xt_trader: + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + logger.info("重连成功") + else: + logger.error("重连失败,60秒后重试") time.sleep(60) continue - # ==================================================== + # === 业务轮询 === now = datetime.datetime.now() current_time_str = now.strftime('%H%M') - - # 默认休眠时间 - sleep_interval = 60 + sleep_sec = 60 - # === 交易时段 === if '0900' <= current_time_str <= '1500': - # 高频轮询时段 if '0920' <= current_time_str <= '1000': - sleep_interval = 10 + sleep_sec = 10 else: - sleep_interval = 60 + sleep_sec = 60 - if settler.has_settled: + if settler and settler.has_settled: settler.reset_flag() - for strategy in watch_list: - process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) + for s in watch_list: + process_strategy_queue(s, r, xt_trader, acc, pos_manager) - # === 收盘清算时段 === elif '1505' <= current_time_str <= '1510': - if not settler.has_settled: + if settler and not settler.has_settled: settler.run_settlement() - sleep_interval = 60 - time.sleep(sleep_interval) + time.sleep(sleep_sec) - except KeyboardInterrupt: - logger.info("用户手动终止程序") - except Exception as e: - logger.critical("主循环发生未捕获异常", exc_info=True) \ No newline at end of file + except KeyboardInterrupt: + logger.info("用户停止") + break + except Exception as e: + logger.critical("主循环未捕获异常", exc_info=True) + time.sleep(10) \ No newline at end of file From 6876d1b43be5c4039cb4dd342548714430a0a765 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Sun, 14 Dec 2025 21:56:01 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0qmt=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=EF=BC=8C=E9=81=BF=E5=85=8D=E5=91=A8=E6=9C=AB=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- qmt/qmt_trader.py | 165 +++++++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 52 deletions(-) diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index bf951a4..bcca240 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -7,16 +7,16 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# 全局变量记录当前日志日期,用于跨天判断 +# 全局变量 CURRENT_LOG_DATE = None +CONFIG = {} +ORDER_CACHE = {} # ================= 1. 日志系统 (按日期直写) ================= def setup_logger(): """ 配置日志系统: - 1. 确保日志目录存在 - 2. 生成当天日期的日志文件 (YYYY-MM-DD.log) - 3. 同时输出到控制台 + 每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log """ global CURRENT_LOG_DATE @@ -26,15 +26,14 @@ def setup_logger(): # 获取今日日期 today_str = datetime.date.today().strftime('%Y-%m-%d') - CURRENT_LOG_DATE = today_str # 更新全局变量 + CURRENT_LOG_DATE = today_str - # 文件名直接就是日期: logs/2025-12-05.log log_file = os.path.join(log_dir, f"{today_str}.log") logger = logging.getLogger("QMT_Trader") logger.setLevel(logging.INFO) - # 【关键】清除旧的 handler,防止跨天后重复打印或写入旧文件 + # 清除旧 handler if logger.handlers: for handler in logger.handlers[:]: try: @@ -47,11 +46,11 @@ def setup_logger(): datefmt='%Y-%m-%d %H:%M:%S' ) - # Handler 1: 普通文件输出 (追加模式) + # 文件输出 file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) - # Handler 2: 控制台 + # 控制台输出 stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -60,14 +59,10 @@ def setup_logger(): return logger -# 初次初始化 +# 初始化日志 logger = setup_logger() -# ================= 全局变量 ================= -CONFIG = {} -ORDER_CACHE = {} - -# ================= 配置加载 ================= +# ================= 2. 配置加载 ================= def load_config(config_file='config.json'): if getattr(sys, 'frozen', False): base_path = os.path.dirname(sys.executable) @@ -86,20 +81,24 @@ def load_config(config_file='config.json'): logger.error(f"配置文件错误: {e}") sys.exit(1) -# ================= 业务逻辑类 ================= +# ================= 3. 业务逻辑类 ================= class PositionManager: def __init__(self, r_client): self.r = r_client def _get_key(self, strategy_name): return f"POS:{strategy_name}" def mark_holding(self, strategy_name, code): + """乐观占位""" self.r.hsetnx(self._get_key(strategy_name), code, 0) def rollback_holding(self, strategy_name, code): + """失败回滚""" key = self._get_key(strategy_name) val = self.r.hget(key, code) if val is not None and int(val) == 0: self.r.hdel(key, code) + logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") def update_actual_volume(self, strategy_name, code, delta_vol): + """成交更新""" key = self._get_key(strategy_name) new_vol = self.r.hincrby(key, code, int(delta_vol)) if new_vol <= 0: @@ -125,7 +124,8 @@ class DailySettlement: self.has_settled = False def run_settlement(self): logger.info("="*40) - logger.info("执行收盘清算...") + logger.info("执行收盘清算流程...") + # 1. 撤单 try: orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) if orders: @@ -133,11 +133,13 @@ class DailySettlement: self.trader.cancel_order_stock(self.acc, o.order_id) time.sleep(2) except: pass + # 2. 获取实盘 real_positions = self.trader.query_stock_positions(self.acc) real_pos_map = {} if real_positions: for p in real_positions: if p.volume > 0: real_pos_map[p.stock_code] = p.volume + # 3. 校准Redis for strategy in self.strategies: virtual = self.pos_mgr.get_all_virtual_positions(strategy) for code, v in virtual.items(): @@ -149,16 +151,14 @@ class DailySettlement: def reset_flag(self): self.has_settled = False -# ================= 回调类 ================= +# ================= 4. 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): def __init__(self, pos_mgr): self.pos_mgr = pos_mgr self.is_connected = False - def on_disconnected(self): logger.warning(">> 回调通知: 交易端连接断开") self.is_connected = False - def on_stock_trade(self, trade): try: cache_info = ORDER_CACHE.get(trade.order_id) @@ -169,7 +169,6 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) except: traceback.print_exc() - def on_order_error(self, err): try: logger.error(f"下单失败: {err.error_msg}") @@ -179,19 +178,30 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): del ORDER_CACHE[err.order_id] except: pass -# ================= 消息处理 ================= +# ================= 5. 核心消息处理 ================= def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): queue_key = f"{strategy_name}_real" msg_json = r_client.lpop(queue_key) if not msg_json: return try: + # 归档 r_client.rpush(f"{queue_key}:history", msg_json) + data = json.loads(msg_json) + # 1. 过滤回测 if data.get('is_backtest'): return - # 简单日期校验 - if data.get('timestamp', '').split(' ')[0] != datetime.date.today().strftime('%Y-%m-%d'): + # 2. 校验日期 (只处理当日消息) + msg_ts = data.get('timestamp') + today_str = datetime.date.today().strftime('%Y-%m-%d') + if msg_ts: + msg_date = msg_ts.split(' ')[0] + if msg_date != today_str: + logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}") + return + else: + logger.warning(f"[{strategy_name}] 消息无时间戳,忽略") return stock_code = data['stock_code'] @@ -200,24 +210,37 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) total_slots = int(data.get('total_slots', 1)) if action == 'BUY': + # 槽位检查 holding = pos_manager.get_holding_count(strategy_name) empty = total_slots - holding - if empty <= 0: return + if empty <= 0: + logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})") + return asset = xt_trader.query_stock_asset(acc) - if not asset: return + if not asset: + logger.error("无法查询资产,QMT可能未就绪") + return + # 金额计算 amt = asset.cash / empty - if amt < 2000: return + if amt < 2000: + logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})") + return + + if price <= 0: price = 1.0 + vol = int(amt / price / 100) * 100 - if price<=0: price=1.0 - vol = int(amt/price/100)*100 if vol >= 100: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') if oid != -1: - logger.info(f"[{strategy_name}] 买入 {stock_code} {vol}股") + logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') pos_manager.mark_holding(strategy_name, stock_code) + else: + logger.error(f"[{strategy_name}] 下单被拒绝 (-1)") + else: + logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})") elif action == 'SELL': v_vol = pos_manager.get_position(strategy_name, stock_code) @@ -225,17 +248,23 @@ def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager) real_pos = xt_trader.query_stock_positions(acc) rp = next((p for p in real_pos if p.stock_code==stock_code), None) can_use = rp.can_use_volume if rp else 0 + final = min(v_vol, can_use) if final > 0: oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') if oid != -1: - logger.info(f"[{strategy_name}] 卖出 {stock_code} {final}") + logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}") ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') - except: + else: + logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}") + else: + logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") + + except Exception: logger.error("消息处理异常", exc_info=True) -# ================= QMT对象初始化 ================= +# ================= 6. QMT初始化 ================= def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): try: session_id = int(time.time()) @@ -257,15 +286,17 @@ def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): logger.error(f"初始化异常: {e}") return None, None, None -# ================= 主程序 ================= +# ================= 7. 主程序 ================= if __name__ == '__main__': - logger.info("系统启动...") + logger.info(">>> 系统启动 (实盘生产模式) <<<") + # 加载配置 CONFIG = load_config('config.json') redis_cfg = CONFIG['redis'] qmt_cfg = CONFIG['qmt'] watch_list = CONFIG['strategies'] + # 连接Redis try: r = redis.Redis(**redis_cfg, decode_responses=True) r.ping() @@ -274,6 +305,7 @@ if __name__ == '__main__': logger.critical(f"Redis 连接失败: {e}") sys.exit(1) + # 初次连接 QMT xt_trader, acc, callback = init_qmt_trader( qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager ) @@ -282,22 +314,26 @@ if __name__ == '__main__': if xt_trader: settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) - logger.info("进入主循环...") + logger.info(">>> 进入主循环监听 <<<") while True: try: - # === [新增] 日志跨天检查 === - # 如果日期变更了,重新初始化日志,这会自动创建新日期的文件 + # --- 1. 日志跨天处理 --- today_str = datetime.date.today().strftime('%Y-%m-%d') if today_str != CURRENT_LOG_DATE: - logger.info(f"检测到跨天 ({CURRENT_LOG_DATE} -> {today_str}),切换日志文件...") + logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...") logger = setup_logger() - logger.info(f"日志切换完成,当前写入: logs/{today_str}.log") - # === 断线重连 === + # --- 2. 断线重连 (仅工作日) --- need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) if need_reconnect: - logger.warning("连接丢失,执行硬重连...") + # 检查是否为周末 (5=周六, 6=周日) + if datetime.date.today().weekday() >= 5: + logger.info("当前是周末,暂停重连,休眠1小时") + time.sleep(3600) + continue + + logger.warning("连接丢失,尝试重连...") if xt_trader: try: xt_trader.stop() except: pass @@ -314,27 +350,52 @@ if __name__ == '__main__': time.sleep(60) continue - # === 业务轮询 === + # --- 3. 交易时段轮询逻辑 --- now = datetime.datetime.now() - current_time_str = now.strftime('%H%M') + current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS + + # 默认休眠 60秒 sleep_sec = 60 + is_trading_time = False - if '0900' <= current_time_str <= '1500': - if '0920' <= current_time_str <= '1000': - sleep_sec = 10 - else: - sleep_sec = 60 - + # 判断时段 + # 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒 + if '091500' <= current_time_str <= '100000': + sleep_sec = 1 + is_trading_time = True + + # 10:00:01 - 11:30:00 -> 低频 60秒 + elif '100000' < current_time_str <= '113000': + sleep_sec = 60 + is_trading_time = True + + # 13:00:00 - 14:50:00 -> 低频 60秒 + elif '130000' <= current_time_str < '145000': + sleep_sec = 60 + is_trading_time = True + + # 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒 + elif '145000' <= current_time_str <= '150000': + sleep_sec = 1 + is_trading_time = True + + # --- 4. 执行业务 --- + if is_trading_time: + # 每日重置清算标记 if settler and settler.has_settled: settler.reset_flag() + # 处理信号 for s in watch_list: process_strategy_queue(s, r, xt_trader, acc, pos_manager) - elif '1505' <= current_time_str <= '1510': + # --- 5. 收盘清算 (15:05 - 15:10) --- + elif '150500' <= current_time_str <= '151000': if settler and not settler.has_settled: settler.run_settlement() - + sleep_sec = 60 + + # 执行休眠 time.sleep(sleep_sec) except KeyboardInterrupt: