Files
NewStock/qmt/qmt_trader.py
liaozhaorun c30d74d251 更新qmt:
1、修复重连bug
2、修复日志名字bug
2025-12-05 00:28:50 +08:00

345 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding:utf-8
import time, datetime, traceback, sys, json, os
import logging
import redis
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
# 全局变量记录当前日志日期,用于跨天判断
CURRENT_LOG_DATE = None
# ================= 1. 日志系统 (按日期直写) =================
def setup_logger():
"""
配置日志系统:
1. 确保日志目录存在
2. 生成当天日期的日志文件 (YYYY-MM-DD.log)
3. 同时输出到控制台
"""
global CURRENT_LOG_DATE
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 获取今日日期
today_str = datetime.date.today().strftime('%Y-%m-%d')
CURRENT_LOG_DATE = today_str # 更新全局变量
# 文件名直接就是日期: logs/2025-12-05.log
log_file = os.path.join(log_dir, f"{today_str}.log")
logger = logging.getLogger("QMT_Trader")
logger.setLevel(logging.INFO)
# 【关键】清除旧的 handler防止跨天后重复打印或写入旧文件
if logger.handlers:
for handler in logger.handlers[:]:
try:
handler.close()
logger.removeHandler(handler)
except: pass
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Handler 1: 普通文件输出 (追加模式)
file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
file_handler.setFormatter(formatter)
# Handler 2: 控制台
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
return logger
# 初次初始化
logger = setup_logger()
# ================= 全局变量 =================
CONFIG = {}
ORDER_CACHE = {}
# ================= 配置加载 =================
def load_config(config_file='config.json'):
if getattr(sys, 'frozen', False):
base_path = os.path.dirname(sys.executable)
else:
base_path = os.path.dirname(os.path.abspath(__file__))
full_path = os.path.join(base_path, config_file)
if not os.path.exists(full_path):
if os.path.exists(config_file): full_path = config_file
else:
logger.error(f"找不到配置文件: {full_path}")
sys.exit(1)
try:
with open(full_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"配置文件错误: {e}")
sys.exit(1)
# ================= 业务逻辑类 =================
class PositionManager:
def __init__(self, r_client):
self.r = r_client
def _get_key(self, strategy_name):
return f"POS:{strategy_name}"
def mark_holding(self, strategy_name, code):
self.r.hsetnx(self._get_key(strategy_name), code, 0)
def rollback_holding(self, strategy_name, code):
key = self._get_key(strategy_name)
val = self.r.hget(key, code)
if val is not None and int(val) == 0:
self.r.hdel(key, code)
def update_actual_volume(self, strategy_name, code, delta_vol):
key = self._get_key(strategy_name)
new_vol = self.r.hincrby(key, code, int(delta_vol))
if new_vol <= 0:
self.r.hdel(key, code)
new_vol = 0
return new_vol
def get_position(self, strategy_name, code):
vol = self.r.hget(self._get_key(strategy_name), code)
return int(vol) if vol else 0
def get_holding_count(self, strategy_name):
return self.r.hlen(self._get_key(strategy_name))
def get_all_virtual_positions(self, strategy_name):
return self.r.hgetall(self._get_key(strategy_name))
def force_delete(self, strategy_name, code):
self.r.hdel(self._get_key(strategy_name), code)
class DailySettlement:
def __init__(self, xt_trader, acc, pos_mgr, strategies):
self.trader = xt_trader
self.acc = acc
self.pos_mgr = pos_mgr
self.strategies = strategies
self.has_settled = False
def run_settlement(self):
logger.info("="*40)
logger.info("执行收盘清算...")
try:
orders = self.trader.query_stock_orders(self.acc, cancelable_only=True)
if orders:
for o in orders:
self.trader.cancel_order_stock(self.acc, o.order_id)
time.sleep(2)
except: pass
real_positions = self.trader.query_stock_positions(self.acc)
real_pos_map = {}
if real_positions:
for p in real_positions:
if p.volume > 0: real_pos_map[p.stock_code] = p.volume
for strategy in self.strategies:
virtual = self.pos_mgr.get_all_virtual_positions(strategy)
for code, v in virtual.items():
if code not in real_pos_map:
logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放")
self.pos_mgr.force_delete(strategy, code)
logger.info("清算完成")
self.has_settled = True
def reset_flag(self):
self.has_settled = False
# ================= 回调类 =================
class MyXtQuantTraderCallback(XtQuantTraderCallback):
def __init__(self, pos_mgr):
self.pos_mgr = pos_mgr
self.is_connected = False
def on_disconnected(self):
logger.warning(">> 回调通知: 交易端连接断开")
self.is_connected = False
def on_stock_trade(self, trade):
try:
cache_info = ORDER_CACHE.get(trade.order_id)
if not cache_info: return
strategy, _, action = cache_info
logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}")
if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume)
elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume)
except: traceback.print_exc()
def on_order_error(self, err):
try:
logger.error(f"下单失败: {err.error_msg}")
cache = ORDER_CACHE.get(err.order_id)
if cache and cache[2] == 'BUY':
self.pos_mgr.rollback_holding(cache[0], cache[1])
del ORDER_CACHE[err.order_id]
except: pass
# ================= 消息处理 =================
def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager):
queue_key = f"{strategy_name}_real"
msg_json = r_client.lpop(queue_key)
if not msg_json: return
try:
r_client.rpush(f"{queue_key}:history", msg_json)
data = json.loads(msg_json)
if data.get('is_backtest'): return
# 简单日期校验
if data.get('timestamp', '').split(' ')[0] != datetime.date.today().strftime('%Y-%m-%d'):
return
stock_code = data['stock_code']
action = data['action']
price = float(data['price'])
total_slots = int(data.get('total_slots', 1))
if action == 'BUY':
holding = pos_manager.get_holding_count(strategy_name)
empty = total_slots - holding
if empty <= 0: return
asset = xt_trader.query_stock_asset(acc)
if not asset: return
amt = asset.cash / empty
if amt < 2000: return
if price<=0: price=1.0
vol = int(amt/price/100)*100
if vol >= 100:
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
if oid != -1:
logger.info(f"[{strategy_name}] 买入 {stock_code} {vol}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY')
pos_manager.mark_holding(strategy_name, stock_code)
elif action == 'SELL':
v_vol = pos_manager.get_position(strategy_name, stock_code)
if v_vol > 0:
real_pos = xt_trader.query_stock_positions(acc)
rp = next((p for p in real_pos if p.stock_code==stock_code), None)
can_use = rp.can_use_volume if rp else 0
final = min(v_vol, can_use)
if final > 0:
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell')
if oid != -1:
logger.info(f"[{strategy_name}] 卖出 {stock_code} {final}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL')
except:
logger.error("消息处理异常", exc_info=True)
# ================= QMT对象初始化 =================
def init_qmt_trader(qmt_path, account_id, account_type, pos_manager):
try:
session_id = int(time.time())
trader = XtQuantTrader(qmt_path, session_id)
acc = StockAccount(account_id, account_type)
callback = MyXtQuantTraderCallback(pos_manager)
trader.register_callback(callback)
trader.start()
res = trader.connect()
if res == 0:
logger.info(f"QMT 连接成功 [Session:{session_id}]")
trader.subscribe(acc)
callback.is_connected = True
return trader, acc, callback
else:
logger.error(f"QMT 连接失败 Code:{res}")
return None, None, None
except Exception as e:
logger.error(f"初始化异常: {e}")
return None, None, None
# ================= 主程序 =================
if __name__ == '__main__':
logger.info("系统启动...")
CONFIG = load_config('config.json')
redis_cfg = CONFIG['redis']
qmt_cfg = CONFIG['qmt']
watch_list = CONFIG['strategies']
try:
r = redis.Redis(**redis_cfg, decode_responses=True)
r.ping()
pos_manager = PositionManager(r)
except Exception as e:
logger.critical(f"Redis 连接失败: {e}")
sys.exit(1)
xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
settler = None
if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
logger.info("进入主循环...")
while True:
try:
# === [新增] 日志跨天检查 ===
# 如果日期变更了,重新初始化日志,这会自动创建新日期的文件
today_str = datetime.date.today().strftime('%Y-%m-%d')
if today_str != CURRENT_LOG_DATE:
logger.info(f"检测到跨天 ({CURRENT_LOG_DATE} -> {today_str}),切换日志文件...")
logger = setup_logger()
logger.info(f"日志切换完成,当前写入: logs/{today_str}.log")
# === 断线重连 ===
need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected)
if need_reconnect:
logger.warning("连接丢失,执行硬重连...")
if xt_trader:
try: xt_trader.stop()
except: pass
xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
logger.info("重连成功")
else:
logger.error("重连失败60秒后重试")
time.sleep(60)
continue
# === 业务轮询 ===
now = datetime.datetime.now()
current_time_str = now.strftime('%H%M')
sleep_sec = 60
if '0900' <= current_time_str <= '1500':
if '0920' <= current_time_str <= '1000':
sleep_sec = 10
else:
sleep_sec = 60
if settler and settler.has_settled:
settler.reset_flag()
for s in watch_list:
process_strategy_queue(s, r, xt_trader, acc, pos_manager)
elif '1505' <= current_time_str <= '1510':
if settler and not settler.has_settled:
settler.run_settlement()
time.sleep(sleep_sec)
except KeyboardInterrupt:
logger.info("用户停止")
break
except Exception as e:
logger.critical("主循环未捕获异常", exc_info=True)
time.sleep(10)