Files
NewStock/qmt/qmt_engine.py

394 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding:utf-8
import time
import datetime
import traceback
import sys
import json
import os
import threading
import logging
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import redis
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
# ================= 0. Windows 补丁 =================
try:
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 128)
except:
pass
@dataclass
class TerminalStatus:
"""终端实例状态封装"""
qmt_id: str
alias: str
account_id: str
is_connected: bool
last_heartbeat: str
# ================= 1. 业务逻辑辅助类 =================
class PositionManager:
"""Redis 虚拟持仓管理(全局单例)"""
def __init__(self, r_client):
self.r = r_client
def _get_key(self, strategy_name):
return f"POS:{strategy_name}"
def mark_holding(self, strategy_name, code):
self.r.hsetnx(self._get_key(strategy_name), code, 0)
def rollback_holding(self, strategy_name, code):
key = self._get_key(strategy_name)
val = self.r.hget(key, code)
if val is not None and int(val) == 0:
self.r.hdel(key, code)
def update_actual_volume(self, strategy_name, code, delta_vol):
key = self._get_key(strategy_name)
new_vol = self.r.hincrby(key, code, int(delta_vol))
if new_vol <= 0:
self.r.hdel(key, code)
new_vol = 0
return new_vol
def get_position(self, strategy_name, code):
vol = self.r.hget(self._get_key(strategy_name), code)
return int(vol) if vol else 0
def get_holding_count(self, strategy_name):
return self.r.hlen(self._get_key(strategy_name))
def get_all_virtual_positions(self, strategy_name):
return self.r.hgetall(self._get_key(strategy_name))
def force_delete(self, strategy_name, code):
self.r.hdel(self._get_key(strategy_name), code)
class DailySettlement:
"""终端级别的日终对账"""
def __init__(self, unit):
self.unit = unit
self.has_settled = False
def run_settlement(self):
trader = self.unit.xt_trader
acc = self.unit.acc_obj
if not trader: return
real_positions = trader.query_stock_positions(acc)
real_pos_map = {p.stock_code: p.volume for p in real_positions if p.volume > 0} if real_positions else {}
manager = MultiEngineManager()
strategies = manager.get_strategies_by_terminal(self.unit.qmt_id)
for s_name in strategies:
virtual = manager.pos_manager.get_all_virtual_positions(s_name)
for code, v_str in virtual.items():
if code not in real_pos_map:
manager.pos_manager.force_delete(s_name, code)
elif int(v_str) == 0 and code in real_pos_map:
manager.pos_manager.update_actual_volume(s_name, code, real_pos_map[code])
self.has_settled = True
def reset_flag(self):
self.has_settled = False
# ================= 2. 执行单元 (TradingUnit) =================
class UnitCallback(XtQuantTraderCallback):
def __init__(self, unit):
self.unit = unit
self.is_connected = False
def on_disconnected(self):
logging.getLogger("QMT_Engine").warning(f"终端 {self.unit.alias}({self.unit.qmt_id}) 物理连接断开")
self.is_connected = False
def on_stock_trade(self, trade):
try:
cache_info = self.unit.order_cache.get(trade.order_id)
if not cache_info: return
s_name, _, action = cache_info
manager = MultiEngineManager()
if action == 'BUY':
manager.pos_manager.update_actual_volume(s_name, trade.stock_code, trade.traded_volume)
elif action == 'SELL':
manager.pos_manager.update_actual_volume(s_name, trade.stock_code, -trade.traded_volume)
except:
logging.getLogger("QMT_Engine").error(traceback.format_exc())
def on_order_error(self, err):
cache = self.unit.order_cache.get(err.order_id)
if cache and cache[2] == 'BUY':
MultiEngineManager().pos_manager.rollback_holding(cache[0], cache[1])
self.unit.order_cache.pop(err.order_id, None)
class TradingUnit:
"""终端实例执行单元,负责管理单个 QMT 进程"""
def __init__(self, t_cfg):
self.qmt_id = t_cfg['qmt_id']
self.alias = t_cfg.get('alias', self.qmt_id)
self.path = t_cfg['path']
self.account_id = t_cfg['account_id']
self.account_type = t_cfg['account_type']
self.xt_trader = None
self.acc_obj = None
self.callback = None
self.settler = None
self.order_cache = {}
self.last_heartbeat = "N/A"
def cleanup(self):
"""强制销毁资源,确保文件句柄释放"""
if self.xt_trader:
try:
logging.getLogger("QMT_Engine").info(f"正在销毁终端 {self.alias} 的旧资源...")
self.xt_trader.stop()
self.xt_trader = None # 显式置空
self.callback = None
time.sleep(1.5) # 给 C++ 引擎留出释放 down_queue 锁的时间
except:
pass
def connect(self):
"""连接 QMT 终端"""
self.cleanup() # 启动前先执行清理
try:
# 采用动态 Session ID 避免冲突
session_id = int(time.time()) + hash(self.qmt_id) % 1000
self.xt_trader = XtQuantTrader(self.path, session_id)
self.acc_obj = StockAccount(self.account_id, self.account_type)
self.callback = UnitCallback(self)
self.xt_trader.register_callback(self.callback)
self.xt_trader.start()
res = self.xt_trader.connect()
if res == 0:
self.xt_trader.subscribe(self.acc_obj)
self.callback.is_connected = True
self.settler = DailySettlement(self)
logging.getLogger("QMT_Engine").info(f"终端 {self.alias} 连接成功 (SID: {session_id})")
return True
return False
except Exception as e:
logging.getLogger("QMT_Engine").error(f"终端 {self.alias} 连接异常: {repr(e)}")
return False
# ================= 3. 总控中心 (MultiEngineManager) =================
class MultiEngineManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'): return
self.units: Dict[str, TradingUnit] = {}
self.config = {}
self.is_running = True
self.start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self._initialized = True
def initialize(self, config_file='config.json'):
self._setup_logger()
with open(config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
self.r = redis.Redis(**self.config['redis'], decode_responses=True)
self.pos_manager = PositionManager(self.r)
for t_cfg in self.config.get('qmt_terminals', []):
unit = TradingUnit(t_cfg)
unit.connect()
self.units[unit.qmt_id] = unit
def _setup_logger(self):
log_dir = "logs"
if not os.path.exists(log_dir): os.makedirs(log_dir)
log_file = os.path.join(log_dir, f"{datetime.date.today().strftime('%Y-%m-%d')}.log")
logger = logging.getLogger("QMT_Engine")
logger.setLevel(logging.INFO)
# 确保日志流为 UTF-8
fmt = logging.Formatter('[%(asctime)s] [%(threadName)s] %(message)s', '%H:%M:%S')
fh = logging.FileHandler(log_file, mode='a', encoding='utf-8')
fh.setFormatter(fmt)
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(fmt)
logger.addHandler(fh)
logger.addHandler(sh)
def get_strategies_by_terminal(self, qmt_id):
return [s for s, cfg in self.config['strategies'].items() if cfg.get('qmt_id') == qmt_id]
def run_trading_loop(self):
self.logger = logging.getLogger("QMT_Engine")
self.logger.info(">>> 多终端交易主循环线程已启动 <<<")
last_check = 0
while self.is_running:
try:
now_t = time.time()
curr_hms = datetime.datetime.now().strftime('%H%M%S')
# --- 健康检查与自动修复 ---
if now_t - last_check > 25:
last_check = now_t
for unit in self.units.values():
is_unit_alive = False
if unit.xt_trader and unit.acc_obj:
try:
# 物理探测:通过查资产确认连接有效性
asset = unit.xt_trader.query_stock_asset(unit.acc_obj)
if asset:
is_unit_alive = True
unit.last_heartbeat = datetime.datetime.now().strftime('%H:%M:%S')
# 状态修正物理通但逻辑False时自动拉回
if unit.callback and not unit.callback.is_connected:
unit.callback.is_connected = True
self.logger.info(f"✅ 修正终端 {unit.alias} 状态为在线")
except:
is_unit_alive = False
# 断线重连策略
if not is_unit_alive:
# 避让 QMT 夜间重启高峰 (21:32 - 21:50)
if not ('213200' <= curr_hms <= '215000'):
self.logger.warning(f"🚫 终端 {unit.alias} 物理连接丢失,执行重连...")
unit.connect()
else:
self.logger.info(f"⏳ 处于 QMT 重启时段 ({curr_hms}),跳过重连操作...")
# --- 交易逻辑处理 ---
is_trading = ('091500' <= curr_hms <= '113030') or ('130000' <= curr_hms <= '150030')
if is_trading:
for s_name in self.config['strategies'].keys():
self.process_route(s_name)
# --- 收盘结算与标志位重置 ---
elif '150500' <= curr_hms <= '151500':
for unit in self.units.values():
if unit.settler and not unit.settler.has_settled:
unit.settler.run_settlement()
elif '153000' <= curr_hms <= '160000':
for unit in self.units.values():
if unit.settler: unit.settler.reset_flag()
time.sleep(1 if is_trading else 5)
except:
self.logger.error("主循环异常:")
self.logger.error(traceback.format_exc())
time.sleep(10)
def process_route(self, strategy_name):
strat_cfg = self.config['strategies'].get(strategy_name)
unit = self.units.get(strat_cfg.get('qmt_id'))
if not unit or not unit.callback or not unit.callback.is_connected: return
msg_json = self.r.lpop(f"{strategy_name}_real")
if not msg_json: return
try:
data = json.loads(msg_json)
# 严格校验消息日期
if data.get('timestamp', '').split(' ')[0] != datetime.date.today().strftime('%Y-%m-%d'):
return
if data['action'] == 'BUY':
self._execute_buy(unit, strategy_name, data)
elif data['action'] == 'SELL':
self._execute_sell(unit, strategy_name, data)
except:
pass
def _execute_buy(self, unit, strategy_name, data):
strat_cfg = self.config['strategies'][strategy_name]
# 1. 槽位校验
if data['total_slots'] != strat_cfg['total_slots']:
self.logger.error(f"[{strategy_name}] 信号槽位({data['total_slots']})与配置({strat_cfg['total_slots']})不符")
return
# 2. 持仓数检查
if self.pos_manager.get_holding_count(strategy_name) >= strat_cfg['total_slots']:
return
try:
asset = unit.xt_trader.query_stock_asset(unit.acc_obj)
# 计算该终端的总槽位之和
terminal_strategies = self.get_strategies_by_terminal(unit.qmt_id)
total_slots = sum(self.config['strategies'][s]['total_slots'] for s in terminal_strategies)
if not asset or total_slots <= 0: return
# 3. 资金等权分配 (基于该终端总资产)
total_equity = asset.cash + asset.market_value
target_amt = total_equity / total_slots
actual_amt = min(target_amt, asset.cash * 0.98) # 预留手续费滑点
if actual_amt < 2000:
self.logger.warning(f"[{strategy_name}] 单笔预算 {actual_amt:.2f} 不足 2000 元,取消买入")
return
# 4. 价格与股数
offset = strat_cfg.get('execution', {}).get('buy_price_offset', 0.0)
price = round(float(data['price']) + offset, 3)
vol = int(actual_amt / (price if price > 0 else 1.0) / 100) * 100
if vol < 100: return
oid = unit.xt_trader.order_stock(unit.acc_obj, data['stock_code'], xtconstant.STOCK_BUY,
vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
if oid != -1:
unit.order_cache[oid] = (strategy_name, data['stock_code'], 'BUY')
self.pos_manager.mark_holding(strategy_name, data['stock_code'])
self.logger.info(f"√√√ [{unit.alias}] {strategy_name} 下单买入: {data['stock_code']} {vol}股 @ {price}")
except:
self.logger.error(traceback.format_exc())
def _execute_sell(self, unit, strategy_name, data):
v_vol = self.pos_manager.get_position(strategy_name, data['stock_code'])
if v_vol <= 0: return
real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj)
rp = next((p for p in real_pos if p.stock_code == data['stock_code']), None) if real_pos else None
can_use = rp.can_use_volume if rp else 0
# 取虚拟持仓和实盘可用持仓的最小值
final_vol = min(v_vol, can_use)
if final_vol <= 0:
self.logger.warning(f"[{strategy_name}] 卖出拦截: {data['stock_code']} 实盘无可用持仓")
return
try:
offset = self.config['strategies'][strategy_name].get('execution', {}).get('sell_price_offset', 0.0)
price = round(float(data['price']) + offset, 3)
oid = unit.xt_trader.order_stock(unit.acc_obj, data['stock_code'], xtconstant.STOCK_SELL,
final_vol, xtconstant.FIX_PRICE, price, strategy_name, 'PySell')
if oid != -1:
unit.order_cache[oid] = (strategy_name, data['stock_code'], 'SELL')
self.logger.info(f"√√√ [{unit.alias}] {strategy_name} 下单卖出: {data['stock_code']} {final_vol}股 @ {price}")
except:
self.logger.error(traceback.format_exc())
def get_all_status(self) -> List[TerminalStatus]:
return [TerminalStatus(u.qmt_id, u.alias, u.account_id, (u.callback.is_connected if u.callback else False), u.last_heartbeat) for u in self.units.values()]
def stop(self):
self.is_running = False
for u in self.units.values():
u.cleanup()