From 555e7ebb439dc0375cd13cdf3ef1abc996c89bc9 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Wed, 25 Feb 2026 21:48:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(qmt):=20=E6=96=B0=E5=A2=9E=20Pydantic=20?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=A8=A1=E5=9E=8B=E5=B9=B6=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E5=BC=95=E6=93=8E=E6=9E=B6=E6=9E=84=20-=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20config=5Fmodels.py:=20=E4=BD=BF=E7=94=A8=20Pydantic=20?= =?UTF-8?q?=E6=8F=90=E4=BE=9B=E5=BC=BA=E7=B1=BB=E5=9E=8B=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C=20=20=20-=20QMTConfig,=20QMTTerminalConfig,?= =?UTF-8?q?=20StrategyConfig=20=E7=AD=89=E6=95=B0=E6=8D=AE=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=20=20=20-=20=E6=94=AF=E6=8C=81=20slots/percentage=20?= =?UTF-8?q?=E4=B8=A4=E7=A7=8D=E4=B8=8B=E5=8D=95=E6=A8=A1=E5=BC=8F=20=20=20?= =?UTF-8?q?-=20=E5=85=BC=E5=AE=B9=E6=97=A7=E7=89=88=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E8=BF=81=E7=A7=BB=20-=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20validate=5Fconfig.py:=20=E9=85=8D=E7=BD=AE=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=20CLI=20=E5=B7=A5=E5=85=B7=20-=20=E9=87=8D=E6=9E=84=20TradingU?= =?UTF-8?q?nit=20=E5=92=8C=20MultiEngineManager=20=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=96=B0=E9=85=8D=E7=BD=AE=E6=A8=A1=E5=9E=8B=20-=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E7=99=BE=E5=88=86=E6=AF=94=E6=A8=A1=E5=BC=8F=E4=B9=B0?= =?UTF-8?q?=E5=8D=96=E9=80=BB=E8=BE=91=20(=5Fexecute=5Fpercentage=5Fbuy/se?= =?UTF-8?q?ll)=20-=20=E5=AE=8C=E5=96=84=E6=97=A5=E5=BF=97=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E5=92=8C=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86=20-=20?= =?UTF-8?q?=E5=88=A0=E9=99=A4=20TODO=5FFIX.md:=20=E6=B8=85=E7=90=86?= =?UTF-8?q?=E5=B7=B2=E5=AE=8C=E6=88=90=E7=9A=84=E7=BC=BA=E9=99=B7=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E4=BB=BB=E5=8A=A1=E6=B8=85=E5=8D=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- qmt/__init__.py | 36 ++ qmt/main.py | 95 ++++++ qmt/qmt_engine.py | 837 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 955 insertions(+), 13 deletions(-) diff --git a/qmt/__init__.py b/qmt/__init__.py index 14b4093..2b62f8b 100644 --- a/qmt/__init__.py +++ b/qmt/__init__.py @@ -2,6 +2,42 @@ """ QMT (Quantitative Trading) Module +提供量化交易相关的功能: +- Redis Stream 消息处理 +- 交易信号发送 +- 回测消息消费 +- 细粒度日志记录 +- 强类型配置校验 +""" + +from .message_processor import StreamMessageProcessor, send_qmt_signal_to_stream +from .logger import QMTLogger, get_qmt_logger +from .config_models import ( + QMTConfig, + QMTTerminalConfig, + StrategyConfig, + RedisConfig, + AutoReconnectConfig, + load_config, + ConfigError, +) + +__all__ = [ + "StreamMessageProcessor", + "send_qmt_signal_to_stream", + "QMTLogger", + "get_qmt_logger", + "QMTConfig", + "QMTTerminalConfig", + "StrategyConfig", + "RedisConfig", + "AutoReconnectConfig", + "load_config", + "ConfigError", +] +""" +QMT (Quantitative Trading) Module + 提供量化交易相关的功能: - Redis Stream 消息处理 - 交易信号发送 diff --git a/qmt/main.py b/qmt/main.py index b5e1c20..dec9d90 100644 --- a/qmt/main.py +++ b/qmt/main.py @@ -6,6 +6,101 @@ import logging import datetime import uvicorn +from .qmt_engine import QMTEngine, ConfigError +from .api_server import create_api_server + + +def setup_logger(): + """配置日志系统""" + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + log_file = os.path.join(log_dir, f"{datetime.date.today().strftime('%Y-%m-%d')}.log") + + logger = logging.getLogger("QMT_Main") + logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + '[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + + +def main(): + """主函数 - 启动QMT交易引擎和API服务器""" + logger = setup_logger() + logger.info("="*50) + logger.info(">>> QMT交易系统启动中...") + logger.info("="*50) + + # 创建QMT引擎实例 + engine = QMTEngine() + logger.info("QMT引擎实例创建成功") + + try: + # 初始化引擎 + engine.initialize('config.json') + logger.info("✅ QMT引擎初始化成功") + except ConfigError as e: + logger.error(f"❌ 配置校验失败: {str(e)}") + logger.error("请检查 config.json 配置文件") + sys.exit(1) + except Exception as e: + logger.error(f"❌ QMT引擎初始化失败: {str(e)}", exc_info=True) + sys.exit(1) + + # 启动交易线程 + trading_thread = threading.Thread(target=engine.run_trading_loop, daemon=True) + trading_thread.start() + logger.info("✅ 交易线程启动成功") + + # 创建API服务器 + app = create_api_server(engine) + logger.info("✅ API服务器创建成功") + + # 启动Web服务 + logger.info(">>> Web服务启动: http://localhost:8001") + try: + uvicorn.run( + app, + host="0.0.0.0", + port=8001, + log_level="warning", + access_log=False + ) + except KeyboardInterrupt: + logger.info(">>> 正在关闭系统...") + engine.stop() + logger.info(">>> 系统已关闭") + except Exception as e: + logger.error(f">>> 系统异常: {str(e)}", exc_info=True) + engine.stop() + logger.info(">>> 系统已关闭") + + +if __name__ == '__main__': + # 使用 -u 参数运行是最佳实践: python -u main.py + # 但这里也在代码里强制 flush 了 + main() +import threading +import sys +import os +import logging +import datetime +import uvicorn + from .qmt_engine import QMTEngine from .api_server import create_api_server diff --git a/qmt/qmt_engine.py b/qmt/qmt_engine.py index 235c9a3..e48b308 100644 --- a/qmt/qmt_engine.py +++ b/qmt/qmt_engine.py @@ -17,6 +17,34 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant +# 导入 Redis Stream 消息处理器和日志模块 +try: + from .message_processor import StreamMessageProcessor + from .logger import QMTLogger + from .config_models import QMTConfig, QMTTerminalConfig, StrategyConfig, load_config, ConfigError +except ImportError: + # 当作为脚本直接运行时 + from message_processor import StreamMessageProcessor + from logger import QMTLogger + from config_models import QMTConfig, QMTTerminalConfig, StrategyConfig, load_config, ConfigError +import time +import datetime +import traceback +import sys +import json +import os +import threading +import logging +from typing import Optional, Dict, Any, List +from dataclasses import dataclass +from dateutil.parser import parse as parse_time + +import redis +from xtquant import xtdata +from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback +from xtquant.xttype import StockAccount +from xtquant import xtconstant + # 导入 Redis Stream 消息处理器和日志模块 try: from .message_processor import StreamMessageProcessor @@ -394,12 +422,13 @@ class UnitCallback(XtQuantTraderCallback): class TradingUnit: """终端实例执行单元,负责管理单个 QMT 进程""" - def __init__(self, t_cfg): - self.qmt_id = t_cfg["qmt_id"] - self.alias = t_cfg.get("alias", self.qmt_id) - self.path = t_cfg["path"] - self.account_id = t_cfg["account_id"] - self.account_type = t_cfg["account_type"] + def __init__(self, t_cfg: QMTTerminalConfig): + self.qmt_id = t_cfg.qmt_id + self.alias = t_cfg.alias or t_cfg.qmt_id + self.path = t_cfg.path + self.account_id = t_cfg.account_id + self.account_type = t_cfg.account_type + self.xt_trader = None self.acc_obj = None @@ -474,6 +503,19 @@ class MultiEngineManager: return cls._instance def __init__(self): + if hasattr(self, "_initialized"): + return + self.units: Dict[str, TradingUnit] = {} + self.config: Optional[QMTConfig] = None + self.is_running = True + self.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.is_scheduled_reconnecting = False # 定时重连调度器正在执行标志 + self._initialized = True + # Stream 处理器和日志器将在 initialize 中创建 + self.stream_processor: Optional[StreamMessageProcessor] = None + self.qmt_logger: Optional[QMTLogger] = None + self.r = None + self.pos_manager = None if hasattr(self, "_initialized"): return self.units: Dict[str, TradingUnit] = {} @@ -488,8 +530,19 @@ class MultiEngineManager: def initialize(self, config_file="config.json"): self._setup_logger() # 先初始化 logger - with open(config_file, "r", encoding="utf-8") as f: - self.config = json.load(f) + + # 使用新的配置模型加载配置 + try: + self.config = load_config(config_file) + self.logger.info(f"✅ 配置加载成功: {config_file}") + self.logger.info(f" - 终端数量: {len(self.config.qmt_terminals)}") + self.logger.info(f" - 策略数量: {len(self.config.strategies)}") + except ConfigError as e: + self.logger.error(f"❌ 配置错误: {e}") + raise + except Exception as e: + self.logger.error(f"❌ 配置加载失败: {e}") + raise # 从 .env.local 加载 Redis 配置 redis_config = self._load_redis_config() @@ -501,10 +554,11 @@ class MultiEngineManager: self.qmt_logger = QMTLogger(name="QMT_Engine_Stream") self.logger.info("Redis Stream 处理器初始化完成") - for t_cfg in self.config.get("qmt_terminals", []): + for t_cfg in self.config.qmt_terminals: unit = TradingUnit(t_cfg) unit.connect() self.units[unit.qmt_id] = unit + def _load_redis_config(self) -> Dict[str, Any]: """从 .env.local 加载 Redis 配置""" # 尝试加载 python-dotenv @@ -563,6 +617,9 @@ class MultiEngineManager: self.logger.addHandler(fh) self.logger.addHandler(sh) def get_strategies_by_terminal(self, qmt_id): + if not self.config: + return [] + return self.config.get_strategies_by_terminal(qmt_id) return [ s for s, cfg in self.config["strategies"].items() @@ -667,7 +724,8 @@ class MultiEngineManager: "130000" <= curr_hms <= "150030" ) if is_trading: - for s_name in self.config["strategies"].keys(): + for s_name in self.config.strategies.keys(): + self.process_route(s_name) self.process_route(s_name) # --- 收盘结算与标志位重置 --- @@ -691,7 +749,13 @@ class MultiEngineManager: 从 Redis Stream 消费消息,处理成功后 ACK,失败则进入失败队列。 """ - strat_cfg = self.config["strategies"].get(strategy_name) + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.warning(f"[{strategy_name}] 策略配置不存在") + return + unit = self.units.get(strat_cfg.qmt_id) + if not unit or not unit.callback or not unit.callback.is_connected: + return unit = self.units.get(strat_cfg.get("qmt_id")) if not unit or not unit.callback or not unit.callback.is_connected: return @@ -748,7 +812,12 @@ class MultiEngineManager: action = data.get("action") # 获取策略配置,确定下单模式 - strat_cfg = self.config["strategies"].get(strategy_name, {}) + # 获取策略配置,确定下单模式 + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.warning(f"[{strategy_name}] 策略配置不存在") + order_mode = strat_cfg.order_mode + order_mode = strat_cfg.order_mode order_mode = strat_cfg.get("order_mode", "slots") if action == "BUY": @@ -830,7 +899,482 @@ class MultiEngineManager: self.logger.error(f"[process_route] 消费消息异常: {str(e)}", exc_info=True) def _execute_buy(self, unit, strategy_name, data): - strat_cfg = self.config["strategies"][strategy_name] + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + # 1. 槽位校验 + if data["total_slots"] != strat_cfg.total_slots: + self.logger.error( + f"[{strategy_name}] 信号槽位({data['total_slots']})与配置({strat_cfg.total_slots})不符" + ) + return + + # 2. 持仓数检查 + if ( + self.pos_manager.get_holding_count(strategy_name) + >= strat_cfg.total_slots + ): + return + + try: + asset = unit.xt_trader.query_stock_asset(unit.acc_obj) + # 计算该终端的总槽位之和 + terminal_strategies = self.get_strategies_by_terminal(unit.qmt_id) + + # 计算加权槽位总和(支持策略权重配置) + # 权重默认为 1,支持通过 weight 字段调整资金分配比例 + # 示例:strategies = {"strategy_a": {"total_slots": 5, "weight": 1}, "strategy_b": {"total_slots": 5, "weight": 2}} + total_weighted_slots = sum( + self.config.get_strategy(s).total_slots * self.config.get_strategy(s).weight + for s in terminal_strategies + if self.config.get_strategy(s) + ) + + if not asset or total_weighted_slots <= 0: + return + + # 获取当前策略的权重 + weight = strat_cfg.weight + + # 4. 资金加权分配 (基于该终端总资产) + total_equity = asset.cash + asset.market_value + target_amt = total_equity * weight / total_weighted_slots + actual_amt = min(target_amt, asset.cash * 0.98) # 预留手续费滑点 + + if actual_amt < 2000: + self.logger.warning( + f"[{strategy_name}] 单笔预算 {actual_amt:.2f} 不足 2000 元,取消买入" + ) + return + + # 4. 价格与股数 + offset = strat_cfg.execution.buy_price_offset + price = round(float(data["price"]) + offset, 3) + vol = int(actual_amt / (price if price > 0 else 1.0) / 100) * 100 + + if vol < 100: + return + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_BUY, + vol, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PyBuy", + ) + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "BUY") + self.pos_manager.mark_holding(strategy_name, data["stock_code"]) + self.logger.info( + f"√√√ [{unit.alias}] {strategy_name} 下单买入: {data['stock_code']} {vol}股 @ {price}" + ) + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + + def _execute_sell(self, unit, strategy_name, data): + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + # 1. 查询实盘持仓(一切以实盘为准) + real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj) + rp = ( + next((p for p in real_pos if p.stock_code == data["stock_code"]), None) + if real_pos + else None + ) + can_use = rp.can_use_volume if rp else 0 + + # 2. 检查虚拟持仓 + v_vol = self.pos_manager.get_position(strategy_name, data["stock_code"]) + + # 3. 实盘无持仓 -> 拒绝卖出(清理幽灵持仓) + if can_use <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出拦截: {data['stock_code']} 实盘无可用持仓" + ) + # 如果虚拟持仓存在但实盘已清仓,清理幽灵持仓 + if v_vol > 0: + self.pos_manager.force_delete(strategy_name, data["stock_code"]) + self.logger.info( + f"[{strategy_name}] 已清理幽灵持仓: {data['stock_code']} 虚拟{v_vol}股" + ) + return + + # 4. 实盘有持仓 -> 必须卖出(取虚拟和实盘的最小值,虚拟无持仓则取实盘) + if v_vol <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出提醒: {data['stock_code']} 虚拟无持仓但实盘有{can_use}股,以实盘为准执行卖出" + ) + + final_vol = min(v_vol, can_use) if v_vol > 0 else can_use + if final_vol <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出拦截: {data['stock_code']} 计算后卖出量为0" + ) + return + + try: + offset = strat_cfg.execution.sell_price_offset + price = round(float(data["price"]) + offset, 3) + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_SELL, + final_vol, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PySell", + ) + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "SELL") + self.logger.info( + f"√√√ [{unit.alias}] {strategy_name} 下单卖出: {data['stock_code']} {final_vol}股 @ {price}" + ) + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + + def _execute_percentage_buy(self, unit, strategy_name, data): + """处理百分比模式的买入逻辑""" + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + + # 获取目标持仓百分比 + position_pct = float(data.get("position_pct", 0)) + if position_pct <= 0: + self.logger.warning(f"[{strategy_name}] 百分比模式买入: position_pct 无效 ({position_pct})") + return + + self.logger.info(f"[{strategy_name}] [百分比模式] 处理买入: {data['stock_code']}, 目标占比: {position_pct}") + + try: + asset = unit.xt_trader.query_stock_asset(unit.acc_obj) + if not asset: + self.logger.error(f"[{strategy_name}] API 错误: query_stock_asset 返回 None") + return + + total_asset = asset.total_asset + available_cash = asset.cash + + self.logger.info(f"[{strategy_name}] [百分比模式] 账户总资产: {total_asset:.2f}, 可用资金: {available_cash:.2f}") + + # 计算目标金额 + target_amount = total_asset * position_pct + actual_amount = min(target_amount, available_cash * 0.98) # 预留手续费滑点 + + self.logger.info(f"[{strategy_name}] [百分比模式] 目标金额: {target_amount:.2f}, 实际可用: {actual_amount:.2f}") + + # 检查最小金额限制 + if actual_amount < 2000: + self.logger.warning(f"[{strategy_name}] [百分比模式] 拦截买入: 金额过小 ({actual_amount:.2f} < 2000)") + return + + # 价格校验 + price = float(data.get("price", 0)) + offset = strat_cfg.execution.buy_price_offset + price = round(price + offset, 3) + + if price <= 0: + self.logger.warning(f"[{strategy_name}] [百分比模式] 价格异常: {price},强制设为1.0") + price = 1.0 + + # 计算股数 + vol = int(actual_amount / price / 100) * 100 + self.logger.info(f"[{strategy_name}] [百分比模式] 计算股数: 资金{actual_amount:.2f} / 价格{price} -> {vol}股") + + if vol < 100: + self.logger.warning(f"[{strategy_name}] [百分比模式] 拦截买入: 股数不足 100 ({vol})") + return + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_BUY, + vol, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PyBuyPct", + ) + + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "BUY") + self.logger.info(f"√√√ [{unit.alias}] [{strategy_name}] [百分比模式] 下单买入: {data['stock_code']} {vol}股 @ {price}") + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] [百分比模式] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + + def _execute_percentage_sell(self, unit, strategy_name, data): + """处理百分比模式的卖出逻辑(清仓)""" + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + self.logger.info(f"[{strategy_name}] [百分比模式] 处理卖出: {data['stock_code']} (清仓)") + + try: + # 查询实盘持仓 + real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj) + if real_pos is None: + self.logger.error(f"[{strategy_name}] [百分比模式] API 错误: query_stock_positions 返回 None") + return + + rp = next((p for p in real_pos if p.stock_code == data["stock_code"]), None) + can_use = rp.can_use_volume if rp else 0 + + self.logger.info(f"[{strategy_name}] [百分比模式] 股票 {data['stock_code']} 实盘可用持仓: {can_use}") + + if can_use <= 0: + self.logger.warning(f"[{strategy_name}] [百分比模式] 拦截卖出: 无可用持仓") + return + + # 执行清仓 + price = float(data.get("price", 0)) + offset = strat_cfg.execution.sell_price_offset + price = round(price + offset, 3) + + self.logger.info(f"[{strategy_name}] [百分比模式] 执行清仓: {data['stock_code']} @ {price}, 数量: {can_use}") + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=can_use, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_SELL, + can_use, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PySellPct", + ) + + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "SELL") + self.logger.info(f"√√√ [{unit.alias}] [{strategy_name}] [百分比模式] 下单卖出: {data['stock_code']} {can_use}股 @ {price}") + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=can_use, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] [百分比模式] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=can_use, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + + def _execute_sell(self, unit, strategy_name, data): + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + # 1. 查询实盘持仓(一切以实盘为准) + real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj) + rp = ( + next((p for p in real_pos if p.stock_code == data["stock_code"]), None) + if real_pos + else None + ) + can_use = rp.can_use_volume if rp else 0 + + # 2. 检查虚拟持仓 + v_vol = self.pos_manager.get_position(strategy_name, data["stock_code"]) + + # 3. 实盘无持仓 -> 拒绝卖出(清理幽灵持仓) + if can_use <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出拦截: {data['stock_code']} 实盘无可用持仓" + ) + # 如果虚拟持仓存在但实盘已清仓,清理幽灵持仓 + if v_vol > 0: + self.pos_manager.force_delete(strategy_name, data["stock_code"]) + self.logger.info( + f"[{strategy_name}] 已清理幽灵持仓: {data['stock_code']} 虚拟{v_vol}股" + ) + return + + # 4. 实盘有持仓 -> 必须卖出(取虚拟和实盘的最小值,虚拟无持仓则取实盘) + if v_vol <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出提醒: {data['stock_code']} 虚拟无持仓但实盘有{can_use}股,以实盘为准执行卖出" + ) + + final_vol = min(v_vol, can_use) if v_vol > 0 else can_use + if final_vol <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出拦截: {data['stock_code']} 计算后卖出量为0" + ) + return + + try: + offset = strat_cfg.execution.sell_price_offset + price = round(float(data["price"]) + offset, 3) + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_SELL, + final_vol, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PySell", + ) + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "SELL") + self.logger.info( + f"√√√ [{unit.alias}] {strategy_name} 下单卖出: {data['stock_code']} {final_vol}股 @ {price}" + ) + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + """处理百分比模式的卖出逻辑(清仓)""" + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + self.logger.info(f"[{strategy_name}] [百分比模式] 处理卖出: {data['stock_code']} (清仓)") + # 1. 槽位校验 if data["total_slots"] != strat_cfg["total_slots"]: self.logger.error( @@ -933,6 +1477,98 @@ class MultiEngineManager: self.logger.error(traceback.format_exc()) def _execute_sell(self, unit, strategy_name, data): + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + # 1. 查询实盘持仓(一切以实盘为准) + real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj) + rp = ( + next((p for p in real_pos if p.stock_code == data["stock_code"]), None) + if real_pos + else None + ) + can_use = rp.can_use_volume if rp else 0 + + # 2. 检查虚拟持仓 + v_vol = self.pos_manager.get_position(strategy_name, data["stock_code"]) + + # 3. 实盘无持仓 -> 拒绝卖出(清理幽灵持仓) + if can_use <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出拦截: {data['stock_code']} 实盘无可用持仓" + ) + # 如果虚拟持仓存在但实盘已清仓,清理幽灵持仓 + if v_vol > 0: + self.pos_manager.force_delete(strategy_name, data["stock_code"]) + self.logger.info( + f"[{strategy_name}] 已清理幽灵持仓: {data['stock_code']} 虚拟{v_vol}股" + ) + return + + # 4. 实盘有持仓 -> 必须卖出(取虚拟和实盘的最小值,虚拟无持仓则取实盘) + if v_vol <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出提醒: {data['stock_code']} 虚拟无持仓但实盘有{can_use}股,以实盘为准执行卖出" + ) + + final_vol = min(v_vol, can_use) if v_vol > 0 else can_use + if final_vol <= 0: + self.logger.warning( + f"[{strategy_name}] 卖出拦截: {data['stock_code']} 计算后卖出量为0" + ) + return + + try: + offset = strat_cfg.execution.sell_price_offset + price = round(float(data["price"]) + offset, 3) + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_SELL, + final_vol, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PySell", + ) + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "SELL") + self.logger.info( + f"√√√ [{unit.alias}] {strategy_name} 下单卖出: {data['stock_code']} {final_vol}股 @ {price}" + ) + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=final_vol, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) # 1. 查询实盘持仓(一切以实盘为准) real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj) rp = ( @@ -1028,6 +1664,181 @@ class MultiEngineManager: def _execute_percentage_buy(self, unit, strategy_name, data): """处理百分比模式的买入逻辑""" + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + + # 获取目标持仓百分比 + position_pct = float(data.get("position_pct", 0)) + if position_pct <= 0: + self.logger.warning(f"[{strategy_name}] 百分比模式买入: position_pct 无效 ({position_pct})") + return + + self.logger.info(f"[{strategy_name}] [百分比模式] 处理买入: {data['stock_code']}, 目标占比: {position_pct}") + + try: + asset = unit.xt_trader.query_stock_asset(unit.acc_obj) + if not asset: + self.logger.error(f"[{strategy_name}] API 错误: query_stock_asset 返回 None") + return + + total_asset = asset.total_asset + available_cash = asset.cash + + self.logger.info(f"[{strategy_name}] [百分比模式] 账户总资产: {total_asset:.2f}, 可用资金: {available_cash:.2f}") + + # 计算目标金额 + target_amount = total_asset * position_pct + actual_amount = min(target_amount, available_cash * 0.98) # 预留手续费滑点 + + self.logger.info(f"[{strategy_name}] [百分比模式] 目标金额: {target_amount:.2f}, 实际可用: {actual_amount:.2f}") + + # 检查最小金额限制 + if actual_amount < 2000: + self.logger.warning(f"[{strategy_name}] [百分比模式] 拦截买入: 金额过小 ({actual_amount:.2f} < 2000)") + return + + # 价格校验 + price = float(data.get("price", 0)) + offset = strat_cfg.execution.buy_price_offset + price = round(price + offset, 3) + + if price <= 0: + self.logger.warning(f"[{strategy_name}] [百分比模式] 价格异常: {price},强制设为1.0") + price = 1.0 + + # 计算股数 + vol = int(actual_amount / price / 100) * 100 + self.logger.info(f"[{strategy_name}] [百分比模式] 计算股数: 资金{actual_amount:.2f} / 价格{price} -> {vol}股") + + if vol < 100: + self.logger.warning(f"[{strategy_name}] [百分比模式] 拦截买入: 股数不足 100 ({vol})") + return + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_BUY, + vol, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PyBuyPct", + ) + + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "BUY") + self.logger.info(f"√√√ [{unit.alias}] [{strategy_name}] [百分比模式] 下单买入: {data['stock_code']} {vol}股 @ {price}") + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] [百分比模式] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="BUY", + volume=vol, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + + def _execute_percentage_sell(self, unit, strategy_name, data): + """处理百分比模式的卖出逻辑(清仓)""" + strat_cfg = self.config.get_strategy(strategy_name) + if not strat_cfg: + self.logger.error(f"[{strategy_name}] 策略配置不存在") + return + self.logger.info(f"[{strategy_name}] [百分比模式] 处理卖出: {data['stock_code']} (清仓)") + + try: + # 查询实盘持仓 + real_pos = unit.xt_trader.query_stock_positions(unit.acc_obj) + if real_pos is None: + self.logger.error(f"[{strategy_name}] [百分比模式] API 错误: query_stock_positions 返回 None") + return + + rp = next((p for p in real_pos if p.stock_code == data["stock_code"]), None) + can_use = rp.can_use_volume if rp else 0 + + self.logger.info(f"[{strategy_name}] [百分比模式] 股票 {data['stock_code']} 实盘可用持仓: {can_use}") + + if can_use <= 0: + self.logger.warning(f"[{strategy_name}] [百分比模式] 拦截卖出: 无可用持仓") + return + + # 执行清仓 + price = float(data.get("price", 0)) + offset = strat_cfg.execution.sell_price_offset + price = round(price + offset, 3) + + self.logger.info(f"[{strategy_name}] [百分比模式] 执行清仓: {data['stock_code']} @ {price}, 数量: {can_use}") + + # 记录订单执行请求 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=can_use, + price=price, + ) + + oid = unit.xt_trader.order_stock( + unit.acc_obj, + data["stock_code"], + xtconstant.STOCK_SELL, + can_use, + xtconstant.FIX_PRICE, + price, + strategy_name, + "PySellPct", + ) + + if oid != -1: + unit.order_cache[oid] = (strategy_name, data["stock_code"], "SELL") + self.logger.info(f"√√√ [{unit.alias}] [{strategy_name}] [百分比模式] 下单卖出: {data['stock_code']} {can_use}股 @ {price}") + # 记录订单执行成功 + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=can_use, + price=price, + order_id=oid, + ) + else: + error_msg = "下单请求被拒绝 (Result=-1)" + self.logger.error(f"[{strategy_name}] [百分比模式] {error_msg}") + self.qmt_logger.log_order_execution( + strategy_name=strategy_name, + stock_code=data["stock_code"], + action="SELL", + volume=can_use, + price=price, + error=error_msg, + ) + except: + self.logger.error(traceback.format_exc()) + """处理百分比模式的买入逻辑""" strat_cfg = self.config["strategies"][strategy_name] # 获取目标持仓百分比