Files
NewStock/qmt/logger.py
liaozhaorun 7b4112b70b efactor(qmt): 重构架构 - 统一信号发送和消息处理
- 修复 qmt_engine.py 中 initialize() 的重复代码
- 新增 message_processor.py: Redis Stream 消息处理器
- 新增 logger.py: 细粒度日志模块
- 新增 qmt_sender.py: 统一信号发送端(槽位+百分比模式)
- 新增 backtest_consumer.py: 回测消息消费者
- 删除旧模块: qmt_trader.py, qmt_signal_sender.py, qmt_percentage_sender.py
- 更新文档: qmt_functionality.md 反映新架构
2026-02-25 20:49:56 +08:00

302 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
"""
QMT Fine-grained Logger Module
提供细粒度的日志记录功能,用于追踪消息处理全流程。
所有方法默认使用 DEBUG 级别,确保详细的追踪信息。
"""
import logging
import sys
import os
from logging.handlers import RotatingFileHandler
from typing import Dict, Any, Optional
class QMTLogger:
"""QMT 细粒度日志记录器
提供消息处理全流程的日志记录,包括:
- 消息接收
- 消息解析
- 业务校验
- 订单执行
- 消息确认
- 失败处理
"""
def __init__(
self,
name: str = "QMT_Stream",
log_file: str = "logs/qmt_stream.log",
log_level: int = logging.DEBUG,
):
"""初始化日志记录器
Args:
name: 日志记录器名称
log_file: 日志文件路径
log_level: 日志级别
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(log_level)
# 避免重复添加处理器
if self.logger.handlers:
return
# 创建日志目录
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# 日志格式
formatter = logging.Formatter(
"[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] [%(threadName)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 文件处理器 (轮转,最大 10MB保留 5 个备份)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding="utf-8",
)
file_handler.setFormatter(formatter)
file_handler.setLevel(log_level)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(log_level)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
self.logger.debug(f"[QMTLogger] 日志记录器初始化完成: {name}")
def log_message_receive(
self, stream_key: str, message_id: str, message_data: Dict[str, Any]
) -> None:
"""记录消息接收日志
Args:
stream_key: 流键名
message_id: 消息ID
message_data: 消息数据
"""
self.logger.debug(
f"[消息接收] stream={stream_key}, id={message_id}, "
f"data={str(message_data)[:500]}"
)
def log_message_parse(
self,
strategy_name: str,
stock_code: str,
action: str,
price: float,
extra_fields: Optional[Dict[str, Any]] = None,
) -> None:
"""记录消息解析日志
Args:
strategy_name: 策略名称
stock_code: 股票代码
action: 交易动作
price: 价格
extra_fields: 其他字段
"""
extra_str = f", extra={extra_fields}" if extra_fields else ""
self.logger.debug(
f"[消息解析] strategy={strategy_name}, code={stock_code}, "
f"action={action}, price={price}{extra_str}"
)
def log_validation(
self,
validation_type: str,
strategy_name: str,
details: Dict[str, Any],
result: bool,
) -> None:
"""记录业务校验日志
Args:
validation_type: 校验类型 (如: slot_check, date_check, field_check)
strategy_name: 策略名称
details: 校验详情
result: 校验结果 (True/False)
"""
result_str = "通过" if result else "拦截"
self.logger.debug(
f"[业务校验] type={validation_type}, strategy={strategy_name}, "
f"result={result_str}, details={details}"
)
def log_order_execution(
self,
strategy_name: str,
stock_code: str,
action: str,
volume: int,
price: float,
order_id: Optional[int] = None,
error: Optional[str] = None,
) -> None:
"""记录订单执行日志
Args:
strategy_name: 策略名称
stock_code: 股票代码
action: 交易动作
volume: 数量
price: 价格
order_id: 订单ID (成功时)
error: 错误信息 (失败时)
"""
if error:
self.logger.error(
f"[订单执行失败] strategy={strategy_name}, code={stock_code}, "
f"action={action}, vol={volume}, price={price}, error={error}"
)
elif order_id is not None:
self.logger.info(
f"[订单执行成功] strategy={strategy_name}, code={stock_code}, "
f"action={action}, vol={volume}, price={price}, order_id={order_id}"
)
else:
self.logger.debug(
f"[订单执行请求] strategy={strategy_name}, code={stock_code}, "
f"action={action}, vol={volume}, price={price}"
)
def log_ack(self, stream_key: str, message_id: str, success: bool) -> None:
"""记录消息确认日志
Args:
stream_key: 流键名
message_id: 消息ID
success: 确认是否成功
"""
result_str = "成功" if success else "失败"
if success:
self.logger.debug(
f"[消息确认] stream={stream_key}, id={message_id}, result={result_str}"
)
else:
self.logger.warning(
f"[消息确认] stream={stream_key}, id={message_id}, result={result_str}"
)
def log_failure(
self,
stream_key: str,
message_id: str,
error_reason: str,
to_failure_queue: bool = True,
) -> None:
"""记录失败处理日志
Args:
stream_key: 流键名
message_id: 消息ID
error_reason: 失败原因
to_failure_queue: 是否已进入失败队列
"""
queue_str = "已入失败队列" if to_failure_queue else "未入失败队列"
self.logger.warning(
f"[失败处理] stream={stream_key}, id={message_id}, "
f"reason={error_reason}, status={queue_str}"
)
def log_consumer_group(
self, stream_key: str, group_name: str, action: str, result: bool
) -> None:
"""记录消费者组操作日志
Args:
stream_key: 流键名
group_name: 消费者组名
action: 操作 (create, check)
result: 操作结果
"""
result_str = "成功" if result else "失败"
self.logger.debug(
f"[消费者组] stream={stream_key}, group={group_name}, "
f"action={action}, result={result_str}"
)
def log_pending_claim(
self, stream_key: str, consumer_id: str, message_ids: list, count: int
) -> None:
"""记录待处理消息认领日志
Args:
stream_key: 流键名
consumer_id: 消费者ID
message_ids: 认领的消息ID列表
count: 认领数量
"""
self.logger.info(
f"[消息认领] stream={stream_key}, consumer={consumer_id}, "
f"claimed_count={count}, ids={message_ids}"
)
def log_heartbeat(
self, component: str, status: str, extra_info: Optional[Dict[str, Any]] = None
) -> None:
"""记录心跳日志
Args:
component: 组件名称
status: 状态
extra_info: 额外信息
"""
extra_str = f", info={extra_info}" if extra_info else ""
self.logger.debug(f"[心跳] component={component}, status={status}{extra_str}")
def info(self, message: str) -> None:
"""记录 INFO 级别日志"""
self.logger.info(message)
def warning(self, message: str) -> None:
"""记录 WARNING 级别日志"""
self.logger.warning(message)
def error(self, message: str, exc_info: bool = False) -> None:
"""记录 ERROR 级别日志"""
self.logger.error(message, exc_info=exc_info)
def debug(self, message: str) -> None:
"""记录 DEBUG 级别日志"""
self.logger.debug(message)
# 全局日志记录器实例
_default_logger = None
def get_qmt_logger(
name: str = "QMT_Stream",
log_file: str = "logs/qmt_stream.log",
log_level: int = logging.DEBUG,
) -> QMTLogger:
"""获取 QMT 日志记录器实例
返回单例模式的日志记录器。
Args:
name: 日志记录器名称
log_file: 日志文件路径
log_level: 日志级别
Returns:
QMTLogger 实例
"""
global _default_logger
if _default_logger is None:
_default_logger = QMTLogger(name, log_file, log_level)
return _default_logger