- 修复 qmt_engine.py 中 initialize() 的重复代码 - 新增 message_processor.py: Redis Stream 消息处理器 - 新增 logger.py: 细粒度日志模块 - 新增 qmt_sender.py: 统一信号发送端(槽位+百分比模式) - 新增 backtest_consumer.py: 回测消息消费者 - 删除旧模块: qmt_trader.py, qmt_signal_sender.py, qmt_percentage_sender.py - 更新文档: qmt_functionality.md 反映新架构
441 lines
14 KiB
Python
441 lines
14 KiB
Python
# coding: utf-8
|
||
"""
|
||
QMT 回测消息消费者
|
||
|
||
独立的回测消息消费脚本,用于:
|
||
- 消费回测消息流
|
||
- 记录完整的处理流程日志
|
||
- 模拟订单执行(不执行真实交易)
|
||
- ACK 确认消息
|
||
|
||
使用方法:
|
||
# 守护模式(持续运行,处理所有配置的策略)
|
||
python backtest_consumer.py
|
||
|
||
# 单次运行模式(处理一次后退出)
|
||
python backtest_consumer.py --once
|
||
|
||
# 指定策略运行
|
||
python backtest_consumer.py --strategy StrategyA
|
||
python backtest_consumer.py --once --strategy StrategyA,StrategyB
|
||
|
||
配置说明:
|
||
可通过环境变量或 .env.local 文件配置:
|
||
- REDIS_HOST: Redis 主机地址(默认: localhost)
|
||
- REDIS_PORT: Redis 端口(默认: 6379)
|
||
- REDIS_PASSWORD: Redis 密码(默认: None)
|
||
- REDIS_DB: Redis 数据库(默认: 0)
|
||
- BACKTEST_CONSUMER_ID: 消费者ID(默认: backtest-consumer-1)
|
||
- BACKTEST_STRATEGIES: 默认策略列表,逗号分隔
|
||
- LOG_LEVEL: 日志级别(默认: DEBUG)
|
||
- LOG_FILE: 日志文件路径(默认: logs/backtest_consumer.log)
|
||
|
||
消息格式:
|
||
消费的消息为 JSON 格式,包含以下字段:
|
||
- strategy_name: 策略名称
|
||
- stock_code: 股票代码(如 000001.SZ)
|
||
- action: 交易动作(BUY/SELL)
|
||
- price: 价格
|
||
- total_slots: 目标持仓槽位数
|
||
- timestamp: 时间戳
|
||
- is_backtest: 是否为回测模式
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import signal
|
||
import argparse
|
||
from datetime import datetime
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
# 添加父目录到路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
try:
|
||
from message_processor import StreamMessageProcessor
|
||
from logger import QMTLogger
|
||
except ImportError as e:
|
||
print(f"导入错误: {e}")
|
||
print("请确保在 qmt 目录下运行此脚本")
|
||
sys.exit(1)
|
||
|
||
|
||
def load_config() -> Dict[str, Any]:
|
||
"""加载配置文件
|
||
|
||
从 /qmt/config/.env.local 加载环境变量配置
|
||
|
||
Returns:
|
||
配置字典
|
||
"""
|
||
# 尝试加载 python-dotenv
|
||
dotenv_loaded = False
|
||
try:
|
||
from dotenv import load_dotenv
|
||
dotenv_available = True
|
||
except ImportError:
|
||
dotenv_available = False
|
||
print("[Config] 警告: 未安装 python-dotenv,使用默认配置")
|
||
|
||
# 如果 dotenv 可用,尝试加载配置文件
|
||
if dotenv_available:
|
||
# 尝试多个路径
|
||
env_paths = [
|
||
os.path.join(
|
||
os.path.dirname(os.path.abspath(__file__)), "config", ".env.local"
|
||
),
|
||
os.path.join(os.path.dirname(__file__), "config", ".env.local"),
|
||
os.path.join(os.path.dirname(__file__), "..", "config", ".env.local"),
|
||
"/qmt/config/.env.local",
|
||
"config/.env.local",
|
||
]
|
||
|
||
for env_path in env_paths:
|
||
if os.path.exists(env_path):
|
||
load_dotenv(env_path)
|
||
print(f"[Config] 加载配置文件: {env_path}")
|
||
dotenv_loaded = True
|
||
break
|
||
|
||
if not dotenv_loaded:
|
||
print("[Config] 警告: 未找到 .env.local 文件,使用默认配置")
|
||
|
||
|
||
# 从环境变量读取配置
|
||
config = {
|
||
"redis": {
|
||
"host": os.getenv("REDIS_HOST", "localhost"),
|
||
"port": int(os.getenv("REDIS_PORT", "6379")),
|
||
"password": os.getenv("REDIS_PASSWORD") or None,
|
||
"db": int(os.getenv("REDIS_DB", "0")),
|
||
},
|
||
"consumer_id": os.getenv("BACKTEST_CONSUMER_ID", "backtest-consumer-1"),
|
||
"block_timeout": int(os.getenv("BACKTEST_BLOCK_TIMEOUT", "5000")),
|
||
"strategies": os.getenv("BACKTEST_STRATEGIES", ""),
|
||
"simulate_delay": float(os.getenv("BACKTEST_SIMULATE_DELAY", "0.1")),
|
||
"log_level": os.getenv("LOG_LEVEL", "DEBUG"),
|
||
"log_file": os.getenv("LOG_FILE", "logs/backtest_consumer.log"),
|
||
}
|
||
|
||
# 解析策略列表
|
||
if config["strategies"]:
|
||
config["strategy_list"] = [
|
||
s.strip() for s in config["strategies"].split(",") if s.strip()
|
||
]
|
||
else:
|
||
config["strategy_list"] = []
|
||
|
||
return config
|
||
|
||
|
||
class BacktestConsumer:
|
||
"""回测消息消费者"""
|
||
|
||
def __init__(self, config: Dict[str, Any]):
|
||
"""初始化回测消费者
|
||
|
||
Args:
|
||
config: 配置字典
|
||
"""
|
||
self.config = config
|
||
self.logger = QMTLogger(name="BacktestConsumer", log_file=config["log_file"])
|
||
|
||
# 初始化 Stream 处理器
|
||
self.processor = StreamMessageProcessor(redis_config=config["redis"])
|
||
|
||
# 运行状态
|
||
self.running = True
|
||
|
||
# 统计信息
|
||
self.stats = {
|
||
"total_received": 0,
|
||
"total_processed": 0,
|
||
"total_failed": 0,
|
||
"start_time": datetime.now(),
|
||
}
|
||
|
||
self.logger.info(f"[BacktestConsumer] 初始化完成")
|
||
self.logger.info(f"[BacktestConsumer] 消费者ID: {config['consumer_id']}")
|
||
self.logger.info(
|
||
f"[BacktestConsumer] 策略列表: {config['strategy_list'] or '所有策略'}"
|
||
)
|
||
|
||
def signal_handler(self, signum, frame):
|
||
"""信号处理函数"""
|
||
self.logger.info(f"[BacktestConsumer] 收到信号 {signum},准备退出...")
|
||
self.running = False
|
||
|
||
def simulate_order(self, strategy_name: str, data: Dict[str, Any]) -> bool:
|
||
"""模拟订单执行
|
||
|
||
Args:
|
||
strategy_name: 策略名称
|
||
data: 消息数据
|
||
|
||
Returns:
|
||
模拟成功返回 True
|
||
"""
|
||
action = data.get("action")
|
||
stock_code = data.get("stock_code")
|
||
price = data.get("price", 0)
|
||
total_slots = data.get("total_slots", 0)
|
||
|
||
self.logger.log_order_execution(
|
||
strategy_name=strategy_name,
|
||
stock_code=stock_code,
|
||
action=action,
|
||
volume=100, # 模拟数量
|
||
price=price,
|
||
)
|
||
|
||
# 模拟延迟
|
||
if self.config["simulate_delay"] > 0:
|
||
time.sleep(self.config["simulate_delay"])
|
||
|
||
# 记录模拟结果
|
||
if action == "BUY":
|
||
self.logger.info(
|
||
f"[模拟交易] {strategy_name} 买入 {stock_code} @ {price}, "
|
||
f"目标持仓: {total_slots}只"
|
||
)
|
||
elif action == "SELL":
|
||
self.logger.info(
|
||
f"[模拟交易] {strategy_name} 卖出 {stock_code} @ {price}, 清仓释放槽位"
|
||
)
|
||
else:
|
||
self.logger.warning(f"[模拟交易] 未知动作: {action}")
|
||
return False
|
||
|
||
# 记录成功
|
||
self.logger.log_order_execution(
|
||
strategy_name=strategy_name,
|
||
stock_code=stock_code,
|
||
action=action,
|
||
volume=100,
|
||
price=price,
|
||
order_id=-1, # 模拟订单ID
|
||
)
|
||
|
||
return True
|
||
|
||
def process_messages(self, strategy_name: str) -> int:
|
||
"""处理单个策略的消息
|
||
|
||
Args:
|
||
strategy_name: 策略名称
|
||
|
||
Returns:
|
||
处理的消息数量
|
||
"""
|
||
processed = 0
|
||
|
||
try:
|
||
# 消费消息(回测模式,不使用消费者组)
|
||
messages = self.processor.consume_message(
|
||
strategy_name=strategy_name,
|
||
consumer_id=self.config["consumer_id"],
|
||
is_backtest=True,
|
||
block_ms=100, # 短阻塞
|
||
count=10, # 每次最多处理10条
|
||
)
|
||
|
||
if not messages:
|
||
return 0
|
||
|
||
for msg_id, data in messages:
|
||
self.stats["total_received"] += 1
|
||
|
||
try:
|
||
stream_key = f"qmt:{strategy_name}:backtest"
|
||
|
||
# 1. 记录消息接收
|
||
self.logger.log_message_receive(stream_key, msg_id, data)
|
||
|
||
# 2. 记录消息解析
|
||
self.logger.log_message_parse(
|
||
strategy_name=data.get("strategy_name", strategy_name),
|
||
stock_code=data.get("stock_code", ""),
|
||
action=data.get("action", ""),
|
||
price=data.get("price", 0),
|
||
extra_fields={
|
||
"total_slots": data.get("total_slots"),
|
||
"timestamp": data.get("timestamp"),
|
||
"is_backtest": data.get("is_backtest"),
|
||
},
|
||
)
|
||
|
||
# 3. 业务校验
|
||
self.logger.log_validation(
|
||
validation_type="field_check",
|
||
strategy_name=strategy_name,
|
||
details={"fields": list(data.keys())},
|
||
result=True,
|
||
)
|
||
|
||
# 4. 模拟订单执行
|
||
success = self.simulate_order(strategy_name, data)
|
||
|
||
if success:
|
||
self.stats["total_processed"] += 1
|
||
else:
|
||
self.stats["total_failed"] += 1
|
||
|
||
# 5. ACK 消息(回测模式不需要真正的 ACK,但记录日志)
|
||
self.logger.log_ack(stream_key, msg_id, True)
|
||
|
||
processed += 1
|
||
|
||
except Exception as e:
|
||
error_msg = f"消息处理异常: {str(e)}"
|
||
self.logger.error(error_msg, exc_info=True)
|
||
self.logger.log_failure(
|
||
stream_key=f"qmt:{strategy_name}:backtest",
|
||
message_id=msg_id,
|
||
error_reason=str(e),
|
||
to_failure_queue=False,
|
||
)
|
||
self.stats["total_failed"] += 1
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"消费消息异常: {str(e)}", exc_info=True)
|
||
|
||
return processed
|
||
|
||
def run_once(self, specific_strategies: Optional[List[str]] = None) -> None:
|
||
"""单次运行
|
||
|
||
Args:
|
||
specific_strategies: 指定要处理的策略列表,None 则处理所有
|
||
"""
|
||
strategies = specific_strategies or self.config["strategy_list"]
|
||
|
||
if not strategies:
|
||
self.logger.warning("[BacktestConsumer] 未指定策略,退出")
|
||
return
|
||
|
||
self.logger.info(f"[BacktestConsumer] 单次运行,策略: {strategies}")
|
||
|
||
total_processed = 0
|
||
for strategy in strategies:
|
||
count = self.process_messages(strategy)
|
||
total_processed += count
|
||
if count > 0:
|
||
self.logger.info(
|
||
f"[BacktestConsumer] 策略 {strategy} 处理 {count} 条消息"
|
||
)
|
||
|
||
self.logger.info(
|
||
f"[BacktestConsumer] 单次运行完成,共处理 {total_processed} 条消息"
|
||
)
|
||
self._log_stats()
|
||
|
||
def run_daemon(self, specific_strategies: Optional[List[str]] = None) -> None:
|
||
"""守护模式运行
|
||
|
||
Args:
|
||
specific_strategies: 指定要处理的策略列表,None 则处理所有
|
||
"""
|
||
strategies = specific_strategies or self.config["strategy_list"]
|
||
|
||
if not strategies:
|
||
self.logger.warning("[BacktestConsumer] 未指定策略,退出")
|
||
return
|
||
|
||
self.logger.info(f"[BacktestConsumer] 守护模式启动,策略: {strategies}")
|
||
self.logger.info("按 Ctrl+C 停止")
|
||
|
||
# 设置信号处理
|
||
signal.signal(signal.SIGINT, self.signal_handler)
|
||
signal.signal(signal.SIGTERM, self.signal_handler)
|
||
|
||
last_heartbeat = time.time()
|
||
|
||
while self.running:
|
||
try:
|
||
# 处理所有策略
|
||
for strategy in strategies:
|
||
if not self.running:
|
||
break
|
||
self.process_messages(strategy)
|
||
|
||
# 心跳日志(每60秒)
|
||
if time.time() - last_heartbeat > 60:
|
||
self.logger.log_heartbeat(
|
||
component="BacktestConsumer",
|
||
status="running",
|
||
extra_info=self.stats,
|
||
)
|
||
last_heartbeat = time.time()
|
||
|
||
# 短暂休眠,避免CPU占用过高
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"守护循环异常: {str(e)}", exc_info=True)
|
||
time.sleep(1)
|
||
|
||
self.logger.info("[BacktestConsumer] 守护模式停止")
|
||
self._log_stats()
|
||
|
||
def _log_stats(self) -> None:
|
||
"""记录统计信息"""
|
||
runtime = (datetime.now() - self.stats["start_time"]).total_seconds()
|
||
self.logger.info("=" * 50)
|
||
self.logger.info("[统计信息]")
|
||
self.logger.info(f"运行时间: {runtime:.1f} 秒")
|
||
self.logger.info(f"接收消息: {self.stats['total_received']}")
|
||
self.logger.info(f"处理成功: {self.stats['total_processed']}")
|
||
self.logger.info(f"处理失败: {self.stats['total_failed']}")
|
||
self.logger.info("=" * 50)
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(
|
||
description="QMT 回测消息消费者",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
使用示例:
|
||
python backtest_consumer.py # 守护模式,处理所有配置的策略
|
||
python backtest_consumer.py --once # 单次运行
|
||
python backtest_consumer.py --strategy A,B # 指定策略
|
||
python backtest_consumer.py --once --strategy A # 单次运行指定策略
|
||
""",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--once", action="store_true", help="单次运行模式(处理一次后退出)"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--strategy",
|
||
type=str,
|
||
default="",
|
||
help="指定策略名称,多个用逗号分隔(如: StrategyA,StrategyB)",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 加载配置
|
||
config = load_config()
|
||
|
||
# 命令行参数覆盖配置
|
||
specific_strategies = None
|
||
if args.strategy:
|
||
specific_strategies = [s.strip() for s in args.strategy.split(",") if s.strip()]
|
||
|
||
# 创建消费者
|
||
consumer = BacktestConsumer(config)
|
||
|
||
# 运行
|
||
if args.once:
|
||
consumer.run_once(specific_strategies)
|
||
else:
|
||
consumer.run_daemon(specific_strategies)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|