Files
NewStock/qmt/qmt_sender.py
liaozhaorun 7b4112b70b efactor(qmt): 重构架构 - 统一信号发送和消息处理
- 修复 qmt_engine.py 中 initialize() 的重复代码
- 新增 message_processor.py: Redis Stream 消息处理器
- 新增 logger.py: 细粒度日志模块
- 新增 qmt_sender.py: 统一信号发送端(槽位+百分比模式)
- 新增 backtest_consumer.py: 回测消息消费者
- 删除旧模块: qmt_trader.py, qmt_signal_sender.py, qmt_percentage_sender.py
- 更新文档: qmt_functionality.md 反映新架构
2026-02-25 20:49:56 +08:00

252 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import redis
import json
import socket
import os
from typing import Dict, Any, Optional
# --- 模块级全局变量 ---
_BACKTEST_SEND_COUNT = 0
# --- Stream 配置常量 ---
STREAM_PREFIX = "qmt"
MAXLEN = 1000 # Stream 最大长度
def _get_stream_key(strategy_name: str, is_backtest: bool = False) -> str:
"""获取流键名
Args:
strategy_name: 策略名称
is_backtest: 是否为回测模式
Returns:
流键名,格式: qmt:{strategy_name}:real 或 qmt:{strategy_name}:backtest
"""
suffix = "backtest" if is_backtest else "real"
return f"{STREAM_PREFIX}:{strategy_name}:{suffix}"
def _create_redis_client(redis_config: Dict[str, Any]) -> redis.Redis:
"""创建 Redis 客户端"""
return redis.Redis(
host=redis_config.get("host", "localhost"),
port=redis_config.get("port", 6379),
password=redis_config.get("password"),
db=redis_config.get("db", 0),
decode_responses=True,
socket_timeout=1,
)
def _send_to_stream(
r: redis.Redis,
strategy_name: str,
message_data: Dict[str, Any],
is_backtest: bool = False,
) -> Optional[str]:
"""发送消息到 Redis Stream
Args:
r: Redis 客户端
strategy_name: 策略名称
message_data: 消息数据字典
is_backtest: 是否为回测消息
Returns:
消息ID (格式: timestamp-sequence),失败返回 None
"""
stream_key = _get_stream_key(strategy_name, is_backtest)
try:
# 确保消息数据是字符串格式
message_json = json.dumps(message_data, ensure_ascii=False)
# 使用 XADD 发送消息,设置最大长度
message_id = r.xadd(
stream_key,
{"data": message_json},
maxlen=MAXLEN,
approximate=True, # 使用近似裁剪,提高性能
)
return message_id
except Exception as e:
print(f"[Error] Stream 发送失败: {e}")
return None
def _convert_code_to_qmt(code: str) -> str:
"""股票代码格式转换: 聚宽(.XSHE/.XSHG) -> QMT(.SZ/.SH)"""
if code.endswith(".XSHE"):
return code.replace(".XSHE", ".SZ")
elif code.endswith(".XSHG"):
return code.replace(".XSHG", ".SH")
return code
def send_qmt_signal(code, target_total_slots, price, context, redis_config):
"""
发送信号到 Redis Stream (基于槽位状态判断买卖意图)
参数:
- code: 股票代码 (聚宽格式: 000001.XSHE)
- target_total_slots:
* 意向持仓时: 传入策略设定的总槽位数 (例如 5)。此时 action 判定为 BUY。
* 意向清仓时: 传入 0。此时 action 判定为 SELL。
- price: 当前最新价格 (用于实盘限价单参考)
- context: 聚宽上下文对象
- redis_config: Redis配置字典包含 host, port, password, db, strategy_name
"""
global _BACKTEST_SEND_COUNT
try:
# ---------------------------------------------------------
# 1. 环境判断与流量控制
# ---------------------------------------------------------
run_type = context.run_params.type
is_backtest = run_type in ["simple_backtest", "full_backtest"]
if is_backtest:
if _BACKTEST_SEND_COUNT >= 10:
print(f"[流量控制] 回测消息已达上限 (10条),跳过发送")
return
_BACKTEST_SEND_COUNT += 1
# ---------------------------------------------------------
# 2. 建立 Redis 连接
# ---------------------------------------------------------
r = _create_redis_client(redis_config)
# ---------------------------------------------------------
# 3. 数据转换与规范化
# ---------------------------------------------------------
qmt_code = _convert_code_to_qmt(code)
# 【核心逻辑】:根据 target_total_slots 判断动作
if target_total_slots > 0:
action = "BUY"
slots_val = int(target_total_slots)
else:
action = "SELL"
slots_val = 0
# ---------------------------------------------------------
# 4. 构建消息体
# ---------------------------------------------------------
base_strategy_name = redis_config.get("strategy_name", "default_strategy")
ts_str = context.current_dt.strftime("%Y-%m-%d %H:%M:%S")
msg = {
"strategy_name": base_strategy_name,
"stock_code": qmt_code,
"action": action,
"price": price,
"total_slots": slots_val,
"timestamp": ts_str,
"is_backtest": is_backtest,
}
# ---------------------------------------------------------
# 5. 使用 Stream 发送消息
# ---------------------------------------------------------
message_id = _send_to_stream(r, base_strategy_name, msg, is_backtest)
if message_id:
# ---------------------------------------------------------
# 6. 控制台输出
# ---------------------------------------------------------
log_prefix = "【回测】" if is_backtest else "【实盘】"
desc = f"目标总持仓:{slots_val}" if action == "BUY" else "清仓释放槽位"
print(
f"{log_prefix} 信号同步 -> {qmt_code} | 动作:{action} | {desc} | 时间:{ts_str} | msg_id:{message_id}"
)
else:
print(f"[Error] 发送QMT信号失败: Stream 返回 None")
except Exception as e:
print(f"[Error] 发送QMT信号失败: {e}")
def send_qmt_percentage_signal(
code, position_pct, action, price, is_backtest, timestamp, redis_config
):
"""
发送基于仓位百分比的信号到 Redis Stream
参数:
- code: 股票代码 (聚宽格式: 000001.XSHE)
- position_pct: 目标持仓占总资产的比例 (0.0 ~ 1.0,如 0.2 表示 20%)
- action: 交易动作,"BUY""SELL"
- price: 当前最新价格 (用于实盘限价单参考)
- is_backtest: 是否为回测模式 (True/False)
- timestamp: 时间戳字符串,格式 "YYYY-MM-DD HH:MM:SS"
- redis_config: Redis配置字典包含 host, port, password, db, strategy_name
"""
global _BACKTEST_SEND_COUNT
try:
# ---------------------------------------------------------
# 1. 环境判断与流量控制
# ---------------------------------------------------------
if is_backtest:
if _BACKTEST_SEND_COUNT >= 10:
return
_BACKTEST_SEND_COUNT += 1
# ---------------------------------------------------------
# 2. 建立 Redis 连接
# ---------------------------------------------------------
r = _create_redis_client(redis_config)
# ---------------------------------------------------------
# 3. 数据转换与规范化
# ---------------------------------------------------------
qmt_code = _convert_code_to_qmt(code)
# 校验 action 参数
if action not in ["BUY", "SELL"]:
print(f"[Error] 无效的 action 参数: {action},必须是 'BUY''SELL'")
return
# ---------------------------------------------------------
# 4. 构建消息体
# ---------------------------------------------------------
base_strategy_name = redis_config.get("strategy_name", "default_strategy")
msg = {
"strategy_name": base_strategy_name,
"stock_code": qmt_code,
"action": action,
"price": price,
"position_pct": float(position_pct),
"timestamp": timestamp,
"is_backtest": is_backtest,
}
# ---------------------------------------------------------
# 5. 使用 Stream 发送消息
# ---------------------------------------------------------
message_id = _send_to_stream(r, base_strategy_name, msg, is_backtest)
if message_id:
# ---------------------------------------------------------
# 6. 控制台输出
# ---------------------------------------------------------
log_prefix = "【回测】" if is_backtest else "【实盘】"
pct_display = f"{position_pct * 100:.1f}%"
desc = f"目标仓位:{pct_display}" if action == "BUY" else "清仓"
print(
f"{log_prefix} 百分比信号 -> {qmt_code} | 动作:{action} | {desc} | 价格:{price} | 时间:{timestamp} | msg_id:{message_id}"
)
else:
print(f"[Error] 发送QMT百分比信号失败: Stream 返回 None")
except Exception as e:
print(f"[Error] 发送QMT百分比信号失败: {e}")
# 便捷函数别名,保持向后兼容
send_signal = send_qmt_signal
send_percentage_signal = send_qmt_percentage_signal