diff --git a/.gitignore b/.gitignore index 56b072e..4e039c6 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,6 @@ model !.env **/mlruns/ -**/mnt/ \ No newline at end of file +**/mnt/ + +/qmt/config.json \ No newline at end of file diff --git a/qmt/qmt_trader.py b/qmt/qmt_trader.py index c993cff..c229b8d 100644 --- a/qmt/qmt_trader.py +++ b/qmt/qmt_trader.py @@ -1,182 +1,314 @@ # coding:utf-8 -import time, datetime, traceback, sys, json +import time, datetime, traceback, sys, json, os import redis from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant -# ================= 配置区域 ================= -QMT_PATH = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata' -ACCOUNT_ID = '2000128' -ACCOUNT_TYPE = 'STOCK' +# 全局变量占位 (稍后在 main 中初始化) +CONFIG = {} +ORDER_CACHE = {} -REDIS_HOST = '127.0.0.1' -REDIS_PORT = 6379 -REDIS_PASS = None +# ================= 配置加载模块 ================= +def load_config(config_file='config.json'): + """ + 读取同级目录下的配置文件 + """ + # 获取脚本所在目录 + if getattr(sys, 'frozen', False): + # 如果被打包为exe + base_path = os.path.dirname(sys.executable) + else: + # 普通脚本运行 + base_path = os.path.dirname(os.path.abspath(__file__)) + + full_path = os.path.join(base_path, config_file) + + if not os.path.exists(full_path): + # 尝试直接读取(兼容 QMT 内置 Python 的路径行为) + if os.path.exists(config_file): + full_path = config_file + else: + print(f"[错误] 找不到配置文件: {full_path}") + sys.exit(1) + + try: + with open(full_path, 'r', encoding='utf-8') as f: + config = json.load(f) + print(f"成功加载配置: {full_path}") + return config + except Exception as e: + print(f"[错误] 配置文件格式错误: {e}") + sys.exit(1) -# 策略基础名称 (不需要加 _real,代码会自动加) -STRATEGY_BASE_NAME = 'default_strategy' -# =========================================== +# ================= 业务逻辑类 (保持不变) ================= -# 定义监听的队列名称 (只监听实盘队列,物理屏蔽回测数据) -LISTEN_QUEUE = f"{STRATEGY_BASE_NAME}_real" +class PositionManager: + def __init__(self, r_client): + self.r = r_client + + def _get_key(self, strategy_name): + return f"POS:{strategy_name}" + + def mark_holding(self, strategy_name, code): + """乐观占位""" + self.r.hsetnx(self._get_key(strategy_name), code, 0) + # print(f"[{strategy_name}] 乐观占位: {code}") + + def rollback_holding(self, strategy_name, code): + """失败回滚""" + key = self._get_key(strategy_name) + val = self.r.hget(key, code) + if val is not None and int(val) == 0: + self.r.hdel(key, code) + print(f"[{strategy_name}] 回滚释放槽位: {code}") + + def update_actual_volume(self, strategy_name, code, delta_vol): + """成交更新""" + key = self._get_key(strategy_name) + new_vol = self.r.hincrby(key, code, int(delta_vol)) + if new_vol <= 0: + self.r.hdel(key, code) + new_vol = 0 + return new_vol + + def get_position(self, strategy_name, code): + vol = self.r.hget(self._get_key(strategy_name), code) + return int(vol) if vol else 0 + + def get_holding_count(self, strategy_name): + return self.r.hlen(self._get_key(strategy_name)) + + def get_all_virtual_positions(self, strategy_name): + return self.r.hgetall(self._get_key(strategy_name)) + + def force_delete(self, strategy_name, code): + self.r.hdel(self._get_key(strategy_name), code) + + +class DailySettlement: + def __init__(self, xt_trader, acc, pos_mgr, strategies): + self.trader = xt_trader + self.acc = acc + self.pos_mgr = pos_mgr + self.strategies = strategies + self.has_settled = False + + def run_settlement(self): + print("\n" + "="*40) + print(f"开始收盘清算: {datetime.datetime.now()}") + + # 1. 撤单 + orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) + if orders: + for o in orders: + self.trader.cancel_order_stock(self.acc, o.order_id) + time.sleep(2) + + # 2. 获取实盘真实持仓 + real_positions = self.trader.query_stock_positions(self.acc) + real_pos_map = {} + if real_positions: + for p in real_positions: + if p.volume > 0: + real_pos_map[p.stock_code] = p.volume + + # 3. 校准 Redis + for strategy in self.strategies: + virtual_data = self.pos_mgr.get_all_virtual_positions(strategy) + for code, v_vol_str in virtual_data.items(): + if code not in real_pos_map: + print(f" [修正] {strategy} 幽灵持仓 {code} -> 强制释放") + self.pos_mgr.force_delete(strategy, code) + + print("清算完成") + self.has_settled = True + + def reset_flag(self): + self.has_settled = False class MyXtQuantTraderCallback(XtQuantTraderCallback): - def on_disconnected(self): - print("连接断开") + def __init__(self, pos_mgr): + self.pos_mgr = pos_mgr - def on_stock_order(self, order): - print(f"委托回报: {order.order_id} {order.order_remark}") + def on_disconnected(self): + print(">> 连接断开") def on_stock_trade(self, trade): - print(f"成交: {trade.stock_code} {trade.traded_volume}") + try: + order_id = trade.order_id + stock_code = trade.stock_code + traded_vol = trade.traded_volume + + cache_info = ORDER_CACHE.get(order_id) + if not cache_info: return + + strategy_name, cached_code, action_type = cache_info + print(f">>> [成交] {strategy_name} {stock_code} 成交量:{traded_vol}") + + if action_type == 'BUY': + self.pos_mgr.update_actual_volume(strategy_name, stock_code, traded_vol) + elif action_type == 'SELL': + self.pos_mgr.update_actual_volume(strategy_name, stock_code, -traded_vol) + + except Exception: + traceback.print_exc() def on_order_error(self, order_error): - print(f"下单失败: {order_error.error_msg}") + try: + order_id = order_error.order_id + print(f">>> [下单失败] ID:{order_id} Msg:{order_error.error_msg}") + cache_info = ORDER_CACHE.get(order_id) + if cache_info: + strategy_name, stock_code, action_type = cache_info + if action_type == 'BUY': + self.pos_mgr.rollback_holding(strategy_name, stock_code) + del ORDER_CACHE[order_id] + except: pass -def init_redis(): +def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): + queue_key = f"{strategy_name}_real" + msg_json = r_client.lpop(queue_key) + if not msg_json: return + try: - r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASS, decode_responses=True) - r.ping() - return r - except Exception as e: - print(f"Redis连接失败: {e}") - return None - - -def is_msg_valid(data): - """ - 【安全核心】校验消息时效性与合法性 - """ - try: - # 1. 检查是否为回测标记 (防御性编程,虽然队列已物理隔离) - if data.get('is_backtest', False): - print(f"警报:拦截到回测数据,已丢弃!") - return False - - # 2. 检查时间戳 - msg_time_str = data.get('timestamp') - if not msg_time_str: - print("数据缺失时间戳,丢弃") - return False - - # 解析消息时间 - # 格式必须匹配策略端发送的 '%Y-%m-%d %H:%M:%S' - msg_dt = datetime.datetime.strptime(msg_time_str, '%Y-%m-%d %H:%M:%S') - msg_date = msg_dt.date() - - # 获取当前服务器日期 - today = datetime.date.today() - - # 3. 【核心】判断是否为当天的消息 - if msg_date != today: - print(f"拦截过期消息: 消息日期[{msg_date}] != 今日[{today}]") - return False - - # 可选:如果你想更严格,可以判断时间差不能超过5分钟 - # delta = datetime.datetime.now() - msg_dt - # if abs(delta.total_seconds()) > 300: ... - - return True - - except Exception as e: - print(f"校验逻辑异常: {e}") - return False - - -def process_redis_signal(r_client, xt_trader, acc): - try: - msg_json = r_client.lpop(LISTEN_QUEUE) - if not msg_json: return - - print(f"收到信号: {msg_json}") data = json.loads(msg_json) - - if not is_msg_valid(data): return # 之前的校验逻辑 - + + # 校验 + if data.get('is_backtest', False): return + msg_ts = data.get('timestamp') + if msg_ts: + msg_date = datetime.datetime.strptime(msg_ts, '%Y-%m-%d %H:%M:%S').date() + if msg_date != datetime.date.today(): + return + stock_code = data['stock_code'] action = data['action'] price = float(data['price']) + target_total_slots = int(data.get('total_slots', 1)) - # 获取切分份数 - # 兼容性处理:如果redis里还是旧key 'weight',也可以尝试获取 - div_count = float(data.get('div_count', data.get('weight', 1))) - - # ========================================================= - # 买入逻辑:资金切片法 - # ========================================================= if action == 'BUY': - # 1. 必须查最新的可用资金 (Available Cash) + current_holding_count = pos_manager.get_holding_count(strategy_name) + empty_slots = target_total_slots - current_holding_count + + if empty_slots <= 0: return + asset = xt_trader.query_stock_asset(acc) - if not asset: - print("错误:无法查询资产") - return + if not asset: return + + target_amount = asset.cash / empty_slots + if target_amount < 2000: return - current_cash = asset.cash - - # 2. 计算下单金额 - # 逻辑:Amount = Cash / div_count - if div_count <= 0: div_count = 1 # 防止除0 - - target_amount = current_cash / div_count - - # 3. 打印调试信息 (非常重要) - print(f"【资金分配】可用现金:{current_cash:.2f} / 切分份数:{div_count} = 下单金额:{target_amount:.2f}") - - # 4. 计算股数 if price <= 0: price = 1.0 - - # 过滤小额杂单 - if target_amount < 2000: - print(f"忽略:金额过小 ({target_amount:.2f})") - return - vol = int(target_amount / price / 100) * 100 - + if vol >= 100: - xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, - STRATEGY_BASE_NAME, 'PyBuy') - print(f"买入下单: {stock_code} {vol}股") - else: - print(f"计算股数不足100股") + order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy') + if order_id != -1: + print(f"[{strategy_name}] 买入 {stock_code} {vol}股 (1/{empty_slots})") + ORDER_CACHE[order_id] = (strategy_name, stock_code, 'BUY') + pos_manager.mark_holding(strategy_name, stock_code) - # ========================================================= - # 卖出逻辑 (清仓) - # ========================================================= elif action == 'SELL': - positions = xt_trader.query_stock_positions(acc) - target_pos = next((p for p in positions if p.stock_code == stock_code), None) + virtual_vol = pos_manager.get_position(strategy_name, stock_code) + if virtual_vol > 0: + real_positions = xt_trader.query_stock_positions(acc) + real_pos = next((p for p in real_positions if p.stock_code == stock_code), None) + real_can_use = real_pos.can_use_volume if real_pos else 0 + + final_vol = min(virtual_vol, real_can_use) + if final_vol > 0: + order_id = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') + if order_id != -1: + print(f"[{strategy_name}] 卖出 {stock_code} {final_vol}股") + ORDER_CACHE[order_id] = (strategy_name, stock_code, 'SELL') - if target_pos and target_pos.can_use_volume > 0: - xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, target_pos.can_use_volume, - xtconstant.FIX_PRICE, price, STRATEGY_BASE_NAME, 'PySell') - print(f"卖出下单: {stock_code} {target_pos.can_use_volume}股") - else: - print(f"无可用持仓: {stock_code}") - - except Exception as e: - print(f"处理异常: {e}") + except Exception: traceback.print_exc() - +# ================= 主程序入口 ================= if __name__ == '__main__': - r_client = init_redis() - session_id = int(time.time()) - xt_trader = XtQuantTrader(QMT_PATH, session_id) - acc = StockAccount(ACCOUNT_ID, ACCOUNT_TYPE) - callback = MyXtQuantTraderCallback() - xt_trader.register_callback(callback) - xt_trader.start() - xt_trader.connect() - xt_trader.subscribe(acc) + print("正在启动...") + + # 1. 加载配置 + CONFIG = load_config('config.json') + + # 从配置中提取参数 + redis_cfg = CONFIG['redis'] + qmt_cfg = CONFIG['qmt'] + watch_list = CONFIG['strategies'] + + print(f"Redis目标: {redis_cfg['host']}:{redis_cfg['port']}") + print(f"QMT路径: {qmt_cfg['path']}") + print(f"监听策略: {watch_list}") - print(f"=== 启动监听: {LISTEN_QUEUE} ===") - print("只处理当日的实盘/模拟信号,自动过滤回测数据及历史遗留数据。") + # 2. 连接 Redis + try: + r = redis.Redis( + host=redis_cfg['host'], + port=redis_cfg['port'], + password=redis_cfg['password'], + db=redis_cfg['db'], + decode_responses=True + ) + r.ping() + print("Redis 连接成功") + pos_manager = PositionManager(r) + except Exception as e: + print(f"[FATAL] Redis 连接失败: {e}") + sys.exit(1) - while True: - if r_client: - process_redis_signal(r_client, xt_trader, acc) - time.sleep(60) \ No newline at end of file + # 3. 连接 QMT + try: + session_id = int(time.time()) + xt_trader = XtQuantTrader(qmt_cfg['path'], session_id) + + acc = StockAccount(qmt_cfg['account_id'], qmt_cfg['account_type']) + + callback = MyXtQuantTraderCallback(pos_manager) + xt_trader.register_callback(callback) + + xt_trader.start() + connect_res = xt_trader.connect() + + if connect_res == 0: + print(f"QMT 连接成功: {qmt_cfg['account_id']}") + xt_trader.subscribe(acc) + else: + print(f"[FATAL] QMT 连接失败,错误码: {connect_res}") + sys.exit(1) + + except Exception as e: + print(f"[FATAL] QMT 初始化异常: {e}") + sys.exit(1) + + # 4. 初始化清算器 + settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) + + print("=== 系统就绪,开始监听 ===") + + try: + while True: + now = datetime.datetime.now() + current_time_str = now.strftime('%H%M') + + # 交易时段 + if '0900' <= current_time_str <= '1500': + if settler.has_settled: + settler.reset_flag() + + for strategy in watch_list: + process_strategy_queue(strategy, r, xt_trader, acc, pos_manager) + + # 收盘清算时段 + elif '1505' <= current_time_str <= '1510': + if not settler.has_settled: + settler.run_settlement() + + time.sleep(60) + + except KeyboardInterrupt: + print("用户终止程序") \ No newline at end of file