# coding: utf-8 """ QMT Redis Stream Message Processor 统一封装 Redis Stream 操作,提供消息发送、消费、确认、失败处理等功能。 """ import json import logging import socket import os from typing import Dict, Any, Optional, List, Tuple from datetime import datetime import redis class StreamMessageProcessor: """Redis Stream 消息处理器 封装所有 Redis Stream 操作,包括: - 消息发送 (XADD) - 消息消费 (XREADGROUP) - 消息确认 (XACK) - 失败队列 (XADD to failure stream) - 消费者组管理 - 待处理消息认领 """ # 消费者组名称 CONSUMER_GROUP = "qmt_consumers" # 最大消息长度 MAXLEN = 1000 # 流键前缀 STREAM_PREFIX = "qmt" def __init__( self, redis_client: Optional[redis.Redis] = None, redis_config: Optional[Dict[str, Any]] = None, ): """初始化消息处理器 Args: redis_client: 已有的 Redis 连接实例 redis_config: Redis 配置字典 (当 redis_client 为 None 时使用) 格式: {"host": "localhost", "port": 6379, "password": None, "db": 0} """ self.logger = logging.getLogger("QMT_Stream") if redis_client: self.r = redis_client elif redis_config: self.r = redis.Redis( host=redis_config.get("host", "localhost"), port=redis_config.get("port", 6379), password=redis_config.get("password"), db=redis_config.get("db", 0), decode_responses=True, ) else: # 尝试从环境变量加载配置 self.r = self._load_redis_from_env() self.logger.debug(f"[StreamMessageProcessor] Redis连接初始化完成") def _load_redis_from_env(self) -> redis.Redis: """从环境变量加载 Redis 配置""" import os host = os.getenv("REDIS_HOST", "localhost") port = int(os.getenv("REDIS_PORT", "6379")) password = os.getenv("REDIS_PASSWORD") or None db = int(os.getenv("REDIS_DB", "0")) return redis.Redis( host=host, port=port, password=password, db=db, decode_responses=True ) def _get_stream_key(self, strategy_name: str, is_backtest: bool = False) -> str: """获取流键名 Args: strategy_name: 策略名称 is_backtest: 是否为回测模式 Returns: 流键名,格式: qmt:{strategy_name}:real 或 qmt:{strategy_name}:backtest """ suffix = "backtest" if is_backtest else "real" return f"{self.STREAM_PREFIX}:{strategy_name}:{suffix}" def _get_failure_stream_key(self, strategy_name: str) -> str: """获取失败流键名 Args: strategy_name: 策略名称 Returns: 失败流键名,格式: qmt:{strategy_name}:failed """ return f"{self.STREAM_PREFIX}:{strategy_name}:failed" def send_message( self, strategy_name: str, message_data: Dict[str, Any], is_backtest: bool = False, ) -> Optional[str]: """发送消息到 Redis Stream Args: strategy_name: 策略名称 message_data: 消息数据字典 is_backtest: 是否为回测消息 Returns: 消息ID (格式: timestamp-sequence),失败返回 None """ stream_key = self._get_stream_key(strategy_name, is_backtest) try: # 确保消息数据是字符串格式 if isinstance(message_data, dict): message_json = json.dumps(message_data, ensure_ascii=False) else: message_json = str(message_data) # 使用 XADD 发送消息,设置最大长度 message_id = self.r.xadd( stream_key, {"data": message_json}, maxlen=self.MAXLEN, approximate=True, # 使用近似裁剪,提高性能 ) self.logger.debug( f"[消息发送] stream={stream_key}, id={message_id}, " f"is_backtest={is_backtest}, data={message_json[:200]}" ) return message_id except redis.RedisError as e: self.logger.error(f"[消息发送失败] stream={stream_key}, error={str(e)}") return None except Exception as e: self.logger.error( f"[消息发送异常] stream={stream_key}, error={str(e)}", exc_info=True ) return None def create_consumer_group( self, strategy_name: str, is_backtest: bool = False ) -> bool: """创建消费者组 如果消费者组已存在,则忽略错误。 Args: strategy_name: 策略名称 is_backtest: 是否为回测模式 Returns: 创建成功返回 True,失败返回 False """ # 回测模式不使用消费者组 if is_backtest: return True stream_key = self._get_stream_key(strategy_name, is_backtest) try: # 创建消费者组,从最新消息开始消费 ($) self.r.xgroup_create(stream_key, self.CONSUMER_GROUP, id="$", mkstream=True) self.logger.info( f"[消费者组创建] stream={stream_key}, group={self.CONSUMER_GROUP}" ) return True except redis.ResponseError as e: if "BUSYGROUP" in str(e): # 消费者组已存在,这是正常的 self.logger.debug( f"[消费者组已存在] stream={stream_key}, group={self.CONSUMER_GROUP}" ) return True else: self.logger.error( f"[消费者组创建失败] stream={stream_key}, error={str(e)}" ) return False except Exception as e: self.logger.error( f"[消费者组创建异常] stream={stream_key}, error={str(e)}", exc_info=True ) return False def consume_message( self, strategy_name: str, consumer_id: Optional[str] = None, is_backtest: bool = False, block_ms: int = 5000, count: int = 1, ) -> Optional[List[Tuple[str, Dict[str, Any]]]]: """从 Redis Stream 消费消息 Args: strategy_name: 策略名称 consumer_id: 消费者ID,None 则自动生成 is_backtest: 是否为回测模式 block_ms: 阻塞等待时间(毫秒) count: 每次消费的消息数量 Returns: 消息列表,每个元素为 (message_id, message_data) 元组 无消息或失败返回 None """ stream_key = self._get_stream_key(strategy_name, is_backtest) # 生成消费者ID if consumer_id is None: consumer_id = self._generate_consumer_id() try: if is_backtest: # 回测模式使用 XREAD (不使用消费者组) messages = self.r.xread( {stream_key: "0"}, # 从最早未确认消息开始 count=count, block=block_ms, ) else: # 实盘模式使用 XREADGROUP # 确保消费者组存在 self.create_consumer_group(strategy_name, is_backtest) messages = self.r.xreadgroup( groupname=self.CONSUMER_GROUP, consumername=consumer_id, streams={stream_key: ">"}, # 读取新消息 count=count, block=block_ms, ) if not messages: return None # 解析消息 result = [] for stream_name, stream_messages in messages: for msg_id, msg_fields in stream_messages: # 解析消息数据 data = msg_fields.get("data", "{}") try: parsed_data = json.loads(data) except json.JSONDecodeError: parsed_data = {"raw_data": data} result.append((msg_id, parsed_data)) self.logger.debug( f"[消息接收] stream={stream_key}, id={msg_id}, " f"consumer={consumer_id}, data={str(parsed_data)[:200]}" ) return result if result else None except redis.RedisError as e: self.logger.error(f"[消息消费失败] stream={stream_key}, error={str(e)}") return None except Exception as e: self.logger.error( f"[消息消费异常] stream={stream_key}, error={str(e)}", exc_info=True ) return None def ack_message( self, strategy_name: str, message_id: str, is_backtest: bool = False ) -> bool: """确认消息已消费 Args: strategy_name: 策略名称 message_id: 消息ID is_backtest: 是否为回测模式 Returns: 确认成功返回 True,失败返回 False """ # 回测模式不需要确认 if is_backtest: return True stream_key = self._get_stream_key(strategy_name, is_backtest) try: result = self.r.xack(stream_key, self.CONSUMER_GROUP, message_id) if result == 1: self.logger.debug( f"[消息确认] stream={stream_key}, id={message_id}, success=True" ) return True else: self.logger.warning( f"[消息确认] stream={stream_key}, id={message_id}, success=False, result={result}" ) return False except redis.RedisError as e: self.logger.error( f"[消息确认失败] stream={stream_key}, id={message_id}, error={str(e)}" ) return False except Exception as e: self.logger.error( f"[消息确认异常] stream={stream_key}, id={message_id}, error={str(e)}", exc_info=True, ) return False def send_to_failure_queue( self, strategy_name: str, original_message: Dict[str, Any], error_reason: str, retry_count: int = 0, ) -> Optional[str]: """将失败消息发送到失败队列 Args: strategy_name: 策略名称 original_message: 原始消息数据 error_reason: 失败原因 retry_count: 重试次数 Returns: 失败消息ID,失败返回 None """ failure_key = self._get_failure_stream_key(strategy_name) try: failure_data = { "original_message": json.dumps(original_message, ensure_ascii=False), "error_reason": error_reason, "failed_at": datetime.now().isoformat(), "retry_count": str(retry_count), } message_id = self.r.xadd(failure_key, failure_data) self.logger.warning( f"[失败队列] strategy={strategy_name}, id={message_id}, " f"reason={error_reason}, retry_count={retry_count}" ) return message_id except redis.RedisError as e: self.logger.error( f"[失败队列发送失败] strategy={strategy_name}, error={str(e)}" ) return None except Exception as e: self.logger.error( f"[失败队列发送异常] strategy={strategy_name}, error={str(e)}", exc_info=True, ) return None def claim_pending_messages( self, strategy_name: str, consumer_id: Optional[str] = None, is_backtest: bool = False, min_idle_ms: int = 60000, ) -> Optional[List[Tuple[str, Dict[str, Any]]]]: """认领待处理消息(处理崩溃消费者的遗留消息) Args: strategy_name: 策略名称 consumer_id: 消费者ID is_backtest: 是否为回测模式 min_idle_ms: 最小空闲时间(毫秒),超过此时间的消息才会被认领 Returns: 认领的消息列表,无消息或失败返回 None """ # 回测模式不需要认领 if is_backtest: return None stream_key = self._get_stream_key(strategy_name, is_backtest) if consumer_id is None: consumer_id = self._generate_consumer_id() try: # 获取待处理消息列表 pending_info = self.r.xpending_range( stream_key, self.CONSUMER_GROUP, min=min_idle_ms, max="+", count=10 ) if not pending_info: return None # 认领消息 message_ids = [item["message_id"] for item in pending_info] claimed = self.r.xclaim( stream_key, self.CONSUMER_GROUP, consumer_id, min_idle_time=min_idle_ms, message_ids=message_ids, ) if not claimed: return None # 解析消息 result = [] for msg_id, msg_fields in claimed: data = msg_fields.get("data", "{}") try: parsed_data = json.loads(data) except json.JSONDecodeError: parsed_data = {"raw_data": data} result.append((msg_id, parsed_data)) self.logger.info( f"[消息认领] stream={stream_key}, id={msg_id}, " f"consumer={consumer_id}, idle_ms>={min_idle_ms}" ) return result if result else None except redis.RedisError as e: self.logger.error(f"[消息认领失败] stream={stream_key}, error={str(e)}") return None except Exception as e: self.logger.error( f"[消息认领异常] stream={stream_key}, error={str(e)}", exc_info=True ) return None def _generate_consumer_id(self) -> str: """生成消费者ID""" hostname = socket.gethostname() pid = os.getpid() return f"{hostname}-{pid}" def get_stream_info( self, strategy_name: str, is_backtest: bool = False ) -> Dict[str, Any]: """获取流信息 Args: strategy_name: 策略名称 is_backtest: 是否为回测模式 Returns: 流信息字典 """ stream_key = self._get_stream_key(strategy_name, is_backtest) try: length = self.r.xlen(stream_key) info = {"length": length, "stream_key": stream_key} # 获取消费者组信息(仅实盘模式) if not is_backtest: try: groups = self.r.xinfo_groups(stream_key) info["groups"] = groups except: info["groups"] = [] return info except redis.RedisError as e: self.logger.error(f"[获取流信息失败] stream={stream_key}, error={str(e)}") return {"length": 0, "stream_key": stream_key, "error": str(e)} except Exception as e: self.logger.error( f"[获取流信息异常] stream={stream_key}, error={str(e)}", exc_info=True ) return {"length": 0, "stream_key": stream_key, "error": str(e)} # 便捷函数,用于快速发送消息 def send_qmt_signal_to_stream( strategy_name: str, stock_code: str, action: str, price: float, total_slots: int, timestamp: str, is_backtest: bool = False, redis_config: Optional[Dict[str, Any]] = None, ) -> Optional[str]: """发送 QMT 交易信号到 Redis Stream 这是一个便捷函数,用于快速发送标准格式的交易信号。 Args: strategy_name: 策略名称 stock_code: 股票代码 action: 交易动作 (BUY/SELL) price: 价格 total_slots: 总槽位数 timestamp: 时间戳 is_backtest: 是否为回测 redis_config: Redis 配置 Returns: 消息ID,失败返回 None """ processor = StreamMessageProcessor(redis_config=redis_config) message = { "strategy_name": strategy_name, "stock_code": stock_code, "action": action, "price": price, "total_slots": total_slots, "timestamp": timestamp, "is_backtest": is_backtest, } return processor.send_message(strategy_name, message, is_backtest)