import logging import sys import os import datetime from xtquant import xttrader from xtquant.xtdata import download_history_data, get_market_data from xtquant.xttype import StockAccount import random def setup_logger(): """配置日志系统""" log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, f"{datetime.date.today().strftime('%Y-%m-%d')}_test.log") logger = logging.getLogger("QMT_Test") logger.setLevel(logging.INFO) formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler.setFormatter(formatter) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stream_handler) return logger logger = setup_logger() # 设置 QMT 交易端的数据路径和会话ID min_path = r"C:\\QMT\\中金财富QMT个人版交易端\\userdata_mini" session_id = int(random.randint(100000, 999999)) logger.info(f"QMT路径: {min_path}") logger.info(f"会话ID: {session_id}") # 创建 XtQuantTrader 实例并启动 logger.info("正在创建 XtQuantTrader 实例...") xt_trader = xttrader.XtQuantTrader(min_path, session_id) logger.info("正在启动 XtQuantTrader...") xt_trader.start() logger.info("XtQuantTrader 已启动") # 连接 QMT 交易端 logger.info("正在连接 QMT 交易端...") connect_result = xt_trader.connect() if connect_result == 0: logger.info("✅ 连接成功") else: logger.error(f"❌ 连接失败,错误码: {connect_result}") xt_trader.stop() logger.info("XtQuantTrader 已停止") exit() # 设置账户信息 account_id = '8176081580' logger.info(f"账户ID: {account_id}") account = StockAccount(account_id) # 订阅账户 logger.info("正在订阅账户...") res = xt_trader.subscribe(account) if res == 0: logger.info("✅ 订阅成功") else: logger.error(f"❌ 订阅失败,错误码: {res}")