diff --git a/.gitignore b/.gitignore index eb6e046..9f06053 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,7 @@ model **/mlruns/ **/mnt/ -predications_test.csv \ No newline at end of file +predications_test.csv + +/qmt/config.json +/qmt/logs/* \ No newline at end of file diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index c993cff..bcca240 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,182 +1,406 @@ # coding:utf-8 -import time, datetime, traceback, sys, json +import time, datetime, traceback, sys, json, os +import logging import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# ================= 配置区域 ================= -QMT_PATH = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata' -ACCOUNT_ID = '2000128' -ACCOUNT_TYPE = 'STOCK' +# 全局变量 +CURRENT_LOG_DATE = None +CONFIG = {} +ORDER_CACHE = {} -REDIS_HOST = '127.0.0.1' -REDIS_PORT = 6379 -REDIS_PASS = None +# ================= 1. 日志系统 (按日期直写) ================= +def setup_logger(): + """ + 配置日志系统: + 每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log + """ + global CURRENT_LOG_DATE + + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) -# 策略基础名称 (不需要加 _real,代码会自动加) -STRATEGY_BASE_NAME = 'default_strategy' -# =========================================== + # 获取今日日期 + today_str = datetime.date.today().strftime('%Y-%m-%d') + CURRENT_LOG_DATE = today_str -# 定义监听的队列名称 (只监听实盘队列,物理屏蔽回测数据) -LISTEN_QUEUE = f"{STRATEGY_BASE_NAME}_real" + log_file = os.path.join(log_dir, f"{today_str}.log") + logger = logging.getLogger("QMT_Trader") + logger.setLevel(logging.INFO) + + # 清除旧 handler + if logger.handlers: + for handler in logger.handlers[:]: + try: + handler.close() + logger.removeHandler(handler) + except: pass + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 文件输出 + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') + file_handler.setFormatter(formatter) + + # 控制台输出 + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + +# 初始化日志 +logger = setup_logger() + +# ================= 2. 配置加载 ================= +def load_config(config_file='config.json'): + if getattr(sys, 'frozen', False): + base_path = os.path.dirname(sys.executable) + else: + base_path = os.path.dirname(os.path.abspath(__file__)) + full_path = os.path.join(base_path, config_file) + if not os.path.exists(full_path): + if os.path.exists(config_file): full_path = config_file + else: + logger.error(f"找不到配置文件: {full_path}") + sys.exit(1) + try: + with open(full_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + logger.error(f"配置文件错误: {e}") + sys.exit(1) + +# ================= 3. 业务逻辑类 ================= +class PositionManager: + def __init__(self, r_client): + self.r = r_client + def _get_key(self, strategy_name): + return f"POS:{strategy_name}" + def mark_holding(self, strategy_name, code): + """乐观占位""" + self.r.hsetnx(self._get_key(strategy_name), code, 0) + def rollback_holding(self, strategy_name, code): + """失败回滚""" + key = self._get_key(strategy_name) + val = self.r.hget(key, code) + if val is not None and int(val) == 0: + self.r.hdel(key, code) + logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") + def update_actual_volume(self, strategy_name, code, delta_vol): + """成交更新""" + key = self._get_key(strategy_name) + new_vol = self.r.hincrby(key, code, int(delta_vol)) + if new_vol <= 0: + self.r.hdel(key, code) + new_vol = 0 + return new_vol + def get_position(self, strategy_name, code): + vol = self.r.hget(self._get_key(strategy_name), code) + return int(vol) if vol else 0 + def get_holding_count(self, strategy_name): + return self.r.hlen(self._get_key(strategy_name)) + def get_all_virtual_positions(self, strategy_name): + return self.r.hgetall(self._get_key(strategy_name)) + def force_delete(self, strategy_name, code): + self.r.hdel(self._get_key(strategy_name), code) + +class DailySettlement: + def __init__(self, xt_trader, acc, pos_mgr, strategies): + self.trader = xt_trader + self.acc = acc + self.pos_mgr = pos_mgr + self.strategies = strategies + self.has_settled = False + def run_settlement(self): + logger.info("="*40) + logger.info("执行收盘清算流程...") + # 1. 撤单 + try: + orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) + if orders: + for o in orders: + self.trader.cancel_order_stock(self.acc, o.order_id) + time.sleep(2) + except: pass + # 2. 获取实盘 + real_positions = self.trader.query_stock_positions(self.acc) + real_pos_map = {} + if real_positions: + for p in real_positions: + if p.volume > 0: real_pos_map[p.stock_code] = p.volume + # 3. 校准Redis + for strategy in self.strategies: + virtual = self.pos_mgr.get_all_virtual_positions(strategy) + for code, v in virtual.items(): + if code not in real_pos_map: + logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放") + self.pos_mgr.force_delete(strategy, code) + logger.info("清算完成") + self.has_settled = True + def reset_flag(self): + self.has_settled = False + +# ================= 4. 回调类 ================= class MyXtQuantTraderCallback(XtQuantTraderCallback): + def __init__(self, pos_mgr): + self.pos_mgr = pos_mgr + self.is_connected = False def on_disconnected(self): - print("连接断开") - - def on_stock_order(self, order): - print(f"委托回报: {order.order_id} {order.order_remark}") - + logger.warning(">> 回调通知: 交易端连接断开") + self.is_connected = False def on_stock_trade(self, trade): - print(f"成交: {trade.stock_code} {trade.traded_volume}") + try: + cache_info = ORDER_CACHE.get(trade.order_id) + if not cache_info: return + strategy, _, action = cache_info + + logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") + if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) + elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) + except: traceback.print_exc() + def on_order_error(self, err): + try: + logger.error(f"下单失败: {err.error_msg}") + cache = ORDER_CACHE.get(err.order_id) + if cache and cache[2] == 'BUY': + self.pos_mgr.rollback_holding(cache[0], cache[1]) + del ORDER_CACHE[err.order_id] + except: pass - def on_order_error(self, order_error): - print(f"下单失败: {order_error.error_msg}") +# ================= 5. 核心消息处理 ================= +def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): + queue_key = f"{strategy_name}_real" + msg_json = r_client.lpop(queue_key) + if not msg_json: return - -def init_redis(): try: - r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASS, decode_responses=True) - r.ping() - return r - except Exception as e: - print(f"Redis连接失败: {e}") - return None - - -def is_msg_valid(data): - """ - 【安全核心】校验消息时效性与合法性 - """ - try: - # 1. 检查是否为回测标记 (防御性编程,虽然队列已物理隔离) - if data.get('is_backtest', False): - print(f"警报:拦截到回测数据,已丢弃!") - return False - - # 2. 检查时间戳 - msg_time_str = data.get('timestamp') - if not msg_time_str: - print("数据缺失时间戳,丢弃") - return False - - # 解析消息时间 - # 格式必须匹配策略端发送的 '%Y-%m-%d %H:%M:%S' - msg_dt = datetime.datetime.strptime(msg_time_str, '%Y-%m-%d %H:%M:%S') - msg_date = msg_dt.date() - - # 获取当前服务器日期 - today = datetime.date.today() - - # 3. 【核心】判断是否为当天的消息 - if msg_date != today: - print(f"拦截过期消息: 消息日期[{msg_date}] != 今日[{today}]") - return False - - # 可选:如果你想更严格,可以判断时间差不能超过5分钟 - # delta = datetime.datetime.now() - msg_dt - # if abs(delta.total_seconds()) > 300: ... - - return True - - except Exception as e: - print(f"校验逻辑异常: {e}") - return False - - -def process_redis_signal(r_client, xt_trader, acc): - try: - msg_json = r_client.lpop(LISTEN_QUEUE) - if not msg_json: return - - print(f"收到信号: {msg_json}") + # 归档 + r_client.rpush(f"{queue_key}:history", msg_json) + data = json.loads(msg_json) - - if not is_msg_valid(data): return # 之前的校验逻辑 + # 1. 过滤回测 + if data.get('is_backtest'): return + + # 2. 校验日期 (只处理当日消息) + msg_ts = data.get('timestamp') + today_str = datetime.date.today().strftime('%Y-%m-%d') + if msg_ts: + msg_date = msg_ts.split(' ')[0] + if msg_date != today_str: + logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}") + return + else: + logger.warning(f"[{strategy_name}] 消息无时间戳,忽略") + return stock_code = data['stock_code'] action = data['action'] price = float(data['price']) + total_slots = int(data.get('total_slots', 1)) - # 获取切分份数 - # 兼容性处理:如果redis里还是旧key 'weight',也可以尝试获取 - div_count = float(data.get('div_count', data.get('weight', 1))) - - # ========================================================= - # 买入逻辑:资金切片法 - # ========================================================= if action == 'BUY': - # 1. 必须查最新的可用资金 (Available Cash) + # 槽位检查 + holding = pos_manager.get_holding_count(strategy_name) + empty = total_slots - holding + if empty <= 0: + logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})") + return + asset = xt_trader.query_stock_asset(acc) - if not asset: - print("错误:无法查询资产") + if not asset: + logger.error("无法查询资产,QMT可能未就绪") return - - current_cash = asset.cash - - # 2. 计算下单金额 - # 逻辑:Amount = Cash / div_count - if div_count <= 0: div_count = 1 # 防止除0 - - target_amount = current_cash / div_count - - # 3. 打印调试信息 (非常重要) - print(f"【资金分配】可用现金:{current_cash:.2f} / 切分份数:{div_count} = 下单金额:{target_amount:.2f}") - - # 4. 计算股数 + + # 金额计算 + amt = asset.cash / empty + if amt < 2000: + logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})") + return + if price <= 0: price = 1.0 - - # 过滤小额杂单 - if target_amount < 2000: - print(f"忽略:金额过小 ({target_amount:.2f})") - return - - vol = int(target_amount / price / 100) * 100 - + vol = int(amt / price / 100) * 100 + if vol >= 100: - xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, - STRATEGY_BASE_NAME, 'PyBuy') - print(f"买入下单: {stock_code} {vol}股") + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') + if oid != -1: + logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY') + pos_manager.mark_holding(strategy_name, stock_code) + else: + logger.error(f"[{strategy_name}] 下单被拒绝 (-1)") else: - print(f"计算股数不足100股") + logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})") - # ========================================================= - # 卖出逻辑 (清仓) - # ========================================================= elif action == 'SELL': - positions = xt_trader.query_stock_positions(acc) - target_pos = next((p for p in positions if p.stock_code == stock_code), None) - - if target_pos and target_pos.can_use_volume > 0: - xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, target_pos.can_use_volume, - xtconstant.FIX_PRICE, price, STRATEGY_BASE_NAME, 'PySell') - print(f"卖出下单: {stock_code} {target_pos.can_use_volume}股") + v_vol = pos_manager.get_position(strategy_name, stock_code) + if v_vol > 0: + real_pos = xt_trader.query_stock_positions(acc) + rp = next((p for p in real_pos if p.stock_code==stock_code), None) + can_use = rp.can_use_volume if rp else 0 + + final = min(v_vol, can_use) + if final > 0: + oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') + if oid != -1: + logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}") + ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') + else: + logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}") else: - print(f"无可用持仓: {stock_code}") + logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") + except Exception: + logger.error("消息处理异常", exc_info=True) + + +# ================= 6. QMT初始化 ================= +def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): + try: + session_id = int(time.time()) + trader = XtQuantTrader(qmt_path, session_id) + acc = StockAccount(account_id, account_type) + callback = MyXtQuantTraderCallback(pos_manager) + trader.register_callback(callback) + trader.start() + res = trader.connect() + if res == 0: + logger.info(f"QMT 连接成功 [Session:{session_id}]") + trader.subscribe(acc) + callback.is_connected = True + return trader, acc, callback + else: + logger.error(f"QMT 连接失败 Code:{res}") + return None, None, None except Exception as e: - print(f"处理异常: {e}") - traceback.print_exc() - + logger.error(f"初始化异常: {e}") + return None, None, None +# ================= 7. 主程序 ================= if __name__ == '__main__': - r_client = init_redis() - session_id = int(time.time()) - xt_trader = XtQuantTrader(QMT_PATH, session_id) - acc = StockAccount(ACCOUNT_ID, ACCOUNT_TYPE) - callback = MyXtQuantTraderCallback() - xt_trader.register_callback(callback) - xt_trader.start() - xt_trader.connect() - xt_trader.subscribe(acc) + logger.info(">>> 系统启动 (实盘生产模式) <<<") + + # 加载配置 + CONFIG = load_config('config.json') + redis_cfg = CONFIG['redis'] + qmt_cfg = CONFIG['qmt'] + watch_list = CONFIG['strategies'] - print(f"=== 启动监听: {LISTEN_QUEUE} ===") - print("只处理当日的实盘/模拟信号,自动过滤回测数据及历史遗留数据。") + # 连接Redis + try: + r = redis.Redis(**redis_cfg, decode_responses=True) + r.ping() + pos_manager = PositionManager(r) + except Exception as e: + logger.critical(f"Redis 连接失败: {e}") + sys.exit(1) + + # 初次连接 QMT + xt_trader, acc, callback = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + settler = None + if xt_trader: + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + + logger.info(">>> 进入主循环监听 <<<") while True: - if r_client: - process_redis_signal(r_client, xt_trader, acc) - time.sleep(60) \ No newline at end of file + try: + # --- 1. 日志跨天处理 --- + today_str = datetime.date.today().strftime('%Y-%m-%d') + if today_str != CURRENT_LOG_DATE: + logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...") + logger = setup_logger() + + # --- 2. 断线重连 (仅工作日) --- + need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) + if need_reconnect: + # 检查是否为周末 (5=周六, 6=周日) + if datetime.date.today().weekday() >= 5: + logger.info("当前是周末,暂停重连,休眠1小时") + time.sleep(3600) + continue + + logger.warning("连接丢失,尝试重连...") + if xt_trader: + try: xt_trader.stop() + except: pass + + xt_trader, acc, callback = init_qmt_trader( + qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager + ) + + if xt_trader: + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + logger.info("重连成功") + else: + logger.error("重连失败,60秒后重试") + time.sleep(60) + continue + + # --- 3. 交易时段轮询逻辑 --- + now = datetime.datetime.now() + current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS + + # 默认休眠 60秒 + sleep_sec = 60 + is_trading_time = False + + # 判断时段 + # 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒 + if '091500' <= current_time_str <= '100000': + sleep_sec = 1 + is_trading_time = True + + # 10:00:01 - 11:30:00 -> 低频 60秒 + elif '100000' < current_time_str <= '113000': + sleep_sec = 60 + is_trading_time = True + + # 13:00:00 - 14:50:00 -> 低频 60秒 + elif '130000' <= current_time_str < '145000': + sleep_sec = 60 + is_trading_time = True + + # 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒 + elif '145000' <= current_time_str <= '150000': + sleep_sec = 1 + is_trading_time = True + + # --- 4. 执行业务 --- + if is_trading_time: + # 每日重置清算标记 + if settler and settler.has_settled: + settler.reset_flag() + + # 处理信号 + for s in watch_list: + process_strategy_queue(s, r, xt_trader, acc, pos_manager) + + # --- 5. 收盘清算 (15:05 - 15:10) --- + elif '150500' <= current_time_str <= '151000': + if settler and not settler.has_settled: + settler.run_settlement() + sleep_sec = 60 + + # 执行休眠 + time.sleep(sleep_sec) + + except KeyboardInterrupt: + logger.info("用户停止") + break + except Exception as e: + logger.critical("主循环未捕获异常", exc_info=True) + time.sleep(10) \ No newline at end of file