# coding: utf-8 """ QMT 回测消息消费者 独立的回测消息消费脚本,用于: - 消费回测消息流 - 记录完整的处理流程日志 - 模拟订单执行(不执行真实交易) - ACK 确认消息 使用方法: # 守护模式(持续运行,处理所有配置的策略) python backtest_consumer.py # 单次运行模式(处理一次后退出) python backtest_consumer.py --once # 指定策略运行 python backtest_consumer.py --strategy StrategyA python backtest_consumer.py --once --strategy StrategyA,StrategyB 配置说明: 可通过环境变量或 .env.local 文件配置: - REDIS_HOST: Redis 主机地址(默认: localhost) - REDIS_PORT: Redis 端口(默认: 6379) - REDIS_PASSWORD: Redis 密码(默认: None) - REDIS_DB: Redis 数据库(默认: 0) - BACKTEST_CONSUMER_ID: 消费者ID(默认: backtest-consumer-1) - BACKTEST_STRATEGIES: 默认策略列表,逗号分隔 - LOG_LEVEL: 日志级别(默认: DEBUG) - LOG_FILE: 日志文件路径(默认: logs/backtest_consumer.log) 消息格式: 消费的消息为 JSON 格式,包含以下字段: - strategy_name: 策略名称 - stock_code: 股票代码(如 000001.SZ) - action: 交易动作(BUY/SELL) - price: 价格 - total_slots: 目标持仓槽位数 - timestamp: 时间戳 - is_backtest: 是否为回测模式 """ import os import sys import time import json import signal import argparse from datetime import datetime from typing import List, Optional, Dict, Any # 添加父目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from message_processor import StreamMessageProcessor from logger import QMTLogger except ImportError as e: print(f"导入错误: {e}") print("请确保在 qmt 目录下运行此脚本") sys.exit(1) def load_config() -> Dict[str, Any]: """加载配置文件 从 /qmt/config/.env.local 加载环境变量配置 Returns: 配置字典 """ # 尝试加载 python-dotenv dotenv_loaded = False try: from dotenv import load_dotenv dotenv_available = True except ImportError: dotenv_available = False print("[Config] 警告: 未安装 python-dotenv,使用默认配置") # 如果 dotenv 可用,尝试加载配置文件 if dotenv_available: # 尝试多个路径 env_paths = [ os.path.join( os.path.dirname(os.path.abspath(__file__)), "config", ".env.local" ), os.path.join(os.path.dirname(__file__), "config", ".env.local"), os.path.join(os.path.dirname(__file__), "..", "config", ".env.local"), "/qmt/config/.env.local", "config/.env.local", ] for env_path in env_paths: if os.path.exists(env_path): load_dotenv(env_path) print(f"[Config] 加载配置文件: {env_path}") dotenv_loaded = True break if not dotenv_loaded: print("[Config] 警告: 未找到 .env.local 文件,使用默认配置") # 从环境变量读取配置 config = { "redis": { "host": os.getenv("REDIS_HOST", "localhost"), "port": int(os.getenv("REDIS_PORT", "6379")), "password": os.getenv("REDIS_PASSWORD") or None, "db": int(os.getenv("REDIS_DB", "0")), }, "consumer_id": os.getenv("BACKTEST_CONSUMER_ID", "backtest-consumer-1"), "block_timeout": int(os.getenv("BACKTEST_BLOCK_TIMEOUT", "5000")), "strategies": os.getenv("BACKTEST_STRATEGIES", ""), "simulate_delay": float(os.getenv("BACKTEST_SIMULATE_DELAY", "0.1")), "log_level": os.getenv("LOG_LEVEL", "DEBUG"), "log_file": os.getenv("LOG_FILE", "logs/backtest_consumer.log"), } # 解析策略列表 if config["strategies"]: config["strategy_list"] = [ s.strip() for s in config["strategies"].split(",") if s.strip() ] else: config["strategy_list"] = [] return config class BacktestConsumer: """回测消息消费者""" def __init__(self, config: Dict[str, Any]): """初始化回测消费者 Args: config: 配置字典 """ self.config = config self.logger = QMTLogger(name="BacktestConsumer", log_file=config["log_file"]) # 初始化 Stream 处理器 self.processor = StreamMessageProcessor(redis_config=config["redis"]) # 运行状态 self.running = True # 统计信息 self.stats = { "total_received": 0, "total_processed": 0, "total_failed": 0, "start_time": datetime.now(), } self.logger.info(f"[BacktestConsumer] 初始化完成") self.logger.info(f"[BacktestConsumer] 消费者ID: {config['consumer_id']}") self.logger.info( f"[BacktestConsumer] 策略列表: {config['strategy_list'] or '所有策略'}" ) def signal_handler(self, signum, frame): """信号处理函数""" self.logger.info(f"[BacktestConsumer] 收到信号 {signum},准备退出...") self.running = False def simulate_order(self, strategy_name: str, data: Dict[str, Any]) -> bool: """模拟订单执行 Args: strategy_name: 策略名称 data: 消息数据 Returns: 模拟成功返回 True """ action = data.get("action") stock_code = data.get("stock_code") price = data.get("price", 0) total_slots = data.get("total_slots", 0) self.logger.log_order_execution( strategy_name=strategy_name, stock_code=stock_code, action=action, volume=100, # 模拟数量 price=price, ) # 模拟延迟 if self.config["simulate_delay"] > 0: time.sleep(self.config["simulate_delay"]) # 记录模拟结果 if action == "BUY": self.logger.info( f"[模拟交易] {strategy_name} 买入 {stock_code} @ {price}, " f"目标持仓: {total_slots}只" ) elif action == "SELL": self.logger.info( f"[模拟交易] {strategy_name} 卖出 {stock_code} @ {price}, 清仓释放槽位" ) else: self.logger.warning(f"[模拟交易] 未知动作: {action}") return False # 记录成功 self.logger.log_order_execution( strategy_name=strategy_name, stock_code=stock_code, action=action, volume=100, price=price, order_id=-1, # 模拟订单ID ) return True def process_messages(self, strategy_name: str) -> int: """处理单个策略的消息 Args: strategy_name: 策略名称 Returns: 处理的消息数量 """ processed = 0 try: # 消费消息(回测模式,不使用消费者组) messages = self.processor.consume_message( strategy_name=strategy_name, consumer_id=self.config["consumer_id"], is_backtest=True, block_ms=100, # 短阻塞 count=10, # 每次最多处理10条 ) if not messages: return 0 for msg_id, data in messages: self.stats["total_received"] += 1 try: stream_key = f"qmt:{strategy_name}:backtest" # 1. 记录消息接收 self.logger.log_message_receive(stream_key, msg_id, data) # 2. 记录消息解析 self.logger.log_message_parse( strategy_name=data.get("strategy_name", strategy_name), stock_code=data.get("stock_code", ""), action=data.get("action", ""), price=data.get("price", 0), extra_fields={ "total_slots": data.get("total_slots"), "timestamp": data.get("timestamp"), "is_backtest": data.get("is_backtest"), }, ) # 3. 业务校验 self.logger.log_validation( validation_type="field_check", strategy_name=strategy_name, details={"fields": list(data.keys())}, result=True, ) # 4. 模拟订单执行 success = self.simulate_order(strategy_name, data) if success: self.stats["total_processed"] += 1 else: self.stats["total_failed"] += 1 # 5. ACK 消息(回测模式不需要真正的 ACK,但记录日志) self.logger.log_ack(stream_key, msg_id, True) processed += 1 except Exception as e: error_msg = f"消息处理异常: {str(e)}" self.logger.error(error_msg, exc_info=True) self.logger.log_failure( stream_key=f"qmt:{strategy_name}:backtest", message_id=msg_id, error_reason=str(e), to_failure_queue=False, ) self.stats["total_failed"] += 1 except Exception as e: self.logger.error(f"消费消息异常: {str(e)}", exc_info=True) return processed def run_once(self, specific_strategies: Optional[List[str]] = None) -> None: """单次运行 Args: specific_strategies: 指定要处理的策略列表,None 则处理所有 """ strategies = specific_strategies or self.config["strategy_list"] if not strategies: self.logger.warning("[BacktestConsumer] 未指定策略,退出") return self.logger.info(f"[BacktestConsumer] 单次运行,策略: {strategies}") total_processed = 0 for strategy in strategies: count = self.process_messages(strategy) total_processed += count if count > 0: self.logger.info( f"[BacktestConsumer] 策略 {strategy} 处理 {count} 条消息" ) self.logger.info( f"[BacktestConsumer] 单次运行完成,共处理 {total_processed} 条消息" ) self._log_stats() def run_daemon(self, specific_strategies: Optional[List[str]] = None) -> None: """守护模式运行 Args: specific_strategies: 指定要处理的策略列表,None 则处理所有 """ strategies = specific_strategies or self.config["strategy_list"] if not strategies: self.logger.warning("[BacktestConsumer] 未指定策略,退出") return self.logger.info(f"[BacktestConsumer] 守护模式启动,策略: {strategies}") self.logger.info("按 Ctrl+C 停止") # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) last_heartbeat = time.time() while self.running: try: # 处理所有策略 for strategy in strategies: if not self.running: break self.process_messages(strategy) # 心跳日志(每60秒) if time.time() - last_heartbeat > 60: self.logger.log_heartbeat( component="BacktestConsumer", status="running", extra_info=self.stats, ) last_heartbeat = time.time() # 短暂休眠,避免CPU占用过高 time.sleep(0.1) except Exception as e: self.logger.error(f"守护循环异常: {str(e)}", exc_info=True) time.sleep(1) self.logger.info("[BacktestConsumer] 守护模式停止") self._log_stats() def _log_stats(self) -> None: """记录统计信息""" runtime = (datetime.now() - self.stats["start_time"]).total_seconds() self.logger.info("=" * 50) self.logger.info("[统计信息]") self.logger.info(f"运行时间: {runtime:.1f} 秒") self.logger.info(f"接收消息: {self.stats['total_received']}") self.logger.info(f"处理成功: {self.stats['total_processed']}") self.logger.info(f"处理失败: {self.stats['total_failed']}") self.logger.info("=" * 50) def main(): """主函数""" parser = argparse.ArgumentParser( description="QMT 回测消息消费者", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: python backtest_consumer.py # 守护模式,处理所有配置的策略 python backtest_consumer.py --once # 单次运行 python backtest_consumer.py --strategy A,B # 指定策略 python backtest_consumer.py --once --strategy A # 单次运行指定策略 """, ) parser.add_argument( "--once", action="store_true", help="单次运行模式(处理一次后退出)" ) parser.add_argument( "--strategy", type=str, default="", help="指定策略名称,多个用逗号分隔(如: StrategyA,StrategyB)", ) args = parser.parse_args() # 加载配置 config = load_config() # 命令行参数覆盖配置 specific_strategies = None if args.strategy: specific_strategies = [s.strip() for s in args.strategy.split(",") if s.strip()] # 创建消费者 consumer = BacktestConsumer(config) # 运行 if args.once: consumer.run_once(specific_strategies) else: consumer.run_daemon(specific_strategies) if __name__ == "__main__": main()