Files
NewStock/qmt/api_server.py
liaozhaorun 7bb0a0537b feat: 添加 Redis 消息展示功能到监控面板
- 新增 /api/messages API 接口,支持从 Redis Stream 读取消息
- 支持按策略筛选消息和分页展示
- 前端新增消息列表卡片,展示时间、策略、股票代码、动作、价格和状态
- 自动判断消息处理状态(已处理/待处理)
- 消息列表每30秒自动刷新,支持手动刷新
2026-03-01 22:06:42 +08:00

467 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding:utf-8
import os
import json
import datetime
import logging
import re
from typing import Optional, List, Dict, Any
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
# 导入新的管理器类
from qmt_engine import MultiEngineManager, TerminalStatus, AutoReconnectScheduler
# ================= Pydantic模型 =================
class TerminalStatusModel(BaseModel):
"""单个终端状态模型"""
qmt_id: str
alias: str
account_id: str
is_connected: bool
callback_connected: bool
physical_connected: bool
last_heartbeat: str
class StatusResponse(BaseModel):
"""全局状态响应模型"""
running: bool
start_time: str
terminals: List[TerminalStatusModel]
class PositionsResponse(BaseModel):
"""持仓响应模型"""
# 按 qmt_id 分组的实盘持仓
real_positions: Dict[str, List[Dict[str, Any]]]
# 按策略名分组的虚拟持仓
virtual_positions: Dict[str, Dict[str, str]]
class LogsResponse(BaseModel):
"""日志响应模型"""
logs: List[str]
class ConfigResponse(BaseModel):
"""配置响应模型"""
reconnect_time: str
auto_reconnect_enabled: bool
class MessageItem(BaseModel):
"""消息项模型"""
message_id: str
data: Dict[str, Any]
timestamp: str
is_processed: bool = False
class MessagesResponse(BaseModel):
"""消息响应模型"""
messages: List[MessageItem]
total: int
strategy_name: str
class FileConfigResponse(BaseModel):
"""配置文件响应模型"""
redis: Dict[str, Any]
qmt_terminals: List[Dict[str, Any]]
strategies: Dict[str, Any]
raw_config: str
config_path: str
class ConfigUpdateRequest(BaseModel):
"""配置更新请求模型"""
reconnect_time: Optional[str] = None
auto_reconnect_enabled: Optional[bool] = None
class ReconnectResponse(BaseModel):
"""重连响应模型"""
success: bool
message: str
# ================= FastAPI应用 =================
class QMTAPIServer:
"""多终端 QMT API服务器"""
def __init__(self, manager: MultiEngineManager, config_file: str = "config.json"):
self.app = FastAPI(title="QMT Multi-Terminal Monitor")
self.manager = manager
self.config_file = config_file
# 初始化自动重连调度器
self.scheduler = AutoReconnectScheduler(manager, config_file=config_file)
self._setup_middleware()
self._setup_routes()
# 启动调度器
self.scheduler.start()
def _setup_middleware(self):
"""设置中间件"""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _mask_sensitive_value(self, value: str) -> str:
"""对敏感值进行脱敏处理"""
if not value:
return value
if len(value) <= 4:
return "*" * len(value)
return value[0] + "*" * (len(value) - 2) + value[-1]
def _mask_config(self, config: dict) -> dict:
"""对配置文件中的敏感信息进行脱敏处理"""
masked = {}
for key, value in config.items():
if isinstance(value, dict):
masked[key] = self._mask_config(value)
elif isinstance(value, list):
masked[key] = [
self._mask_config(item) if isinstance(item, dict) else item
for item in value
]
elif key in ("password", "pwd"):
masked[key] = "******"
elif key == "account_id" and isinstance(value, str):
masked[key] = self._mask_sensitive_value(value)
elif key == "path" and isinstance(value, str):
# 路径脱敏,保留目录结构但隐藏用户名
parts = value.split(os.sep)
if len(parts) >= 3:
masked[key] = os.sep.join(parts[:-2] + ["****"] + parts[-2:])
else:
masked[key] = value
else:
masked[key] = value
return masked
def _find_config_file(self) -> Optional[str]:
"""查找配置文件路径"""
# 按优先级查找配置文件
config_paths = [
"config.json",
"qmt/config.json",
os.environ.get("QMT_CONFIG_PATH", "")
]
for path in config_paths:
if path and os.path.exists(path):
return path
return None
def _setup_routes(self):
"""设置路由"""
@self.app.get("/", summary="仪表盘页面")
async def read_root():
"""返回仪表盘HTML页面"""
if os.path.exists("dashboard.html"):
return FileResponse("dashboard.html")
return {"error": "Dashboard not found"}
@self.app.get("/api/status", response_model=StatusResponse, summary="获取所有终端状态")
def get_status():
"""获取所有 QMT 终端的连接状态,包含物理连接验证"""
terminal_data = self.manager.get_all_status()
terminals = [
TerminalStatusModel(
qmt_id=t.qmt_id,
alias=t.alias,
account_id=t.account_id,
is_connected=t.is_connected,
callback_connected=t.callback_connected,
physical_connected=t.physical_connected,
last_heartbeat=t.last_heartbeat
) for t in terminal_data
]
return StatusResponse(
running=self.manager.is_running,
start_time=self.manager.start_time,
terminals=terminals
)
@self.app.get("/api/positions", response_model=PositionsResponse, summary="获取持仓信息")
def get_positions():
"""汇总所有终端的实盘持仓和所有策略的虚拟持仓"""
real_pos_data = {}
virtual_pos_data = {}
# 调试日志:记录管理器状态
import logging
logger = logging.getLogger("QMT_API")
logger.info(f"[POS DEBUG] manager.units 数量: {len(self.manager.units) if hasattr(self.manager, 'units') else 'N/A'}")
logger.info(f"[POS DEBUG] manager.config strategies: {list(self.manager.config.strategies.keys()) if hasattr(self.manager, 'config') else 'N/A'}")
logger.info(f"[POS DEBUG] pos_manager 是否存在: {hasattr(self.manager, 'pos_manager') and self.manager.pos_manager is not None}")
# 1. 遍历所有终端单元获取实盘持仓
for qmt_id, unit in self.manager.units.items():
positions = []
logger.info(f"[POS DEBUG] 处理终端: {qmt_id}, callback: {unit.callback}, callback.is_connected: {unit.callback.is_connected if unit.callback else 'N/A'}")
if unit.callback and unit.callback.is_connected:
try:
xt_pos = unit.xt_trader.query_stock_positions(unit.acc_obj)
logger.info(f"[POS DEBUG] 终端 {qmt_id} 查询到持仓数量: {len(xt_pos) if xt_pos else 0}")
if xt_pos:
positions = [
{
"code": p.stock_code,
"volume": p.volume,
"can_use": p.can_use_volume,
"market_value": round(p.market_value, 2)
} for p in xt_pos if p.volume > 0
]
logger.info(f"[POS DEBUG] 终端 {qmt_id} 有效持仓(volume>0): {len(positions)}")
except Exception as e:
logger.error(f"[POS DEBUG] 终端 {qmt_id} 查询持仓失败: {e}")
real_pos_data[qmt_id] = positions
# 2. 遍历所有策略获取虚拟持仓
for s_name in self.manager.config.strategies.keys():
try:
if hasattr(self.manager, 'pos_manager') and self.manager.pos_manager is not None:
v_data = self.manager.pos_manager.get_all_virtual_positions(s_name)
logger.info(f"[POS DEBUG] 策略 {s_name} 虚拟持仓: {v_data}")
virtual_pos_data[s_name] = v_data
else:
logger.warning(f"[POS DEBUG] pos_manager 未初始化,策略 {s_name} 无法获取虚拟持仓")
virtual_pos_data[s_name] = {}
except Exception as e:
logger.error(f"[POS DEBUG] 获取策略 {s_name} 虚拟持仓失败: {e}")
virtual_pos_data[s_name] = {}
logger.info(f"[POS DEBUG] 最终 real_positions keys: {list(real_pos_data.keys())}, 总持仓数: {sum(len(v) for v in real_pos_data.values())}")
logger.info(f"[POS DEBUG] 最终 virtual_positions keys: {list(virtual_pos_data.keys())}")
return PositionsResponse(
real_positions=real_pos_data,
virtual_positions=virtual_pos_data
)
@self.app.get("/api/logs", response_model=LogsResponse, summary="获取系统日志")
def get_logs(lines: int = Query(50, ge=1, le=1000)):
"""获取最近的系统运行日志"""
logs = self.manager.get_logs(lines)
return LogsResponse(logs=logs)
@self.app.get("/api/health", summary="健康检查")
def health_check():
"""健康检查:只要有一个终端在线即视为正常"""
terminal_data = self.manager.get_all_status()
any_connected = any(t.is_connected for t in terminal_data)
if self.manager.is_running and any_connected:
return {"status": "healthy", "timestamp": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
else:
return {"status": "unhealthy", "reason": "No terminals connected", "timestamp": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
@self.app.get("/api/config", response_model=ConfigResponse, summary="获取自动重连配置")
def get_config():
"""获取当前自动重连配置"""
config = self.scheduler.get_config()
return ConfigResponse(
reconnect_time=config["reconnect_time"],
auto_reconnect_enabled=config["enabled"]
)
@self.app.post("/api/config", summary="更新自动重连配置")
def update_config(request: ConfigUpdateRequest):
"""更新自动重连配置"""
success = True
message = "配置更新成功"
if request.reconnect_time is not None:
if not self.scheduler.set_reconnect_time(request.reconnect_time):
success = False
message = "时间格式错误,请使用 HH:MM 格式"
if request.auto_reconnect_enabled is not None:
self.scheduler.set_enabled(request.auto_reconnect_enabled)
if success:
return {"success": True, "message": message, "config": self.scheduler.get_config()}
else:
return {"success": False, "message": message}
@self.app.post("/api/reconnect", response_model=ReconnectResponse, summary="手动触发重连")
def trigger_reconnect():
"""手动触发立即重连所有终端"""
self.scheduler.trigger_reconnect()
return ReconnectResponse(
success=True,
message="重连任务已在后台启动"
)
@self.app.get("/api/file_config", response_model=FileConfigResponse, summary="获取配置文件内容")
def get_file_config():
"""获取配置文件内容,敏感信息已脱敏"""
config_path = self._find_config_file()
if not config_path or not os.path.exists(config_path):
return FileConfigResponse(
redis={},
qmt_terminals=[],
strategies={},
raw_config="",
config_path=""
)
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# 脱敏处理
masked_config = self._mask_config(config)
# 获取原始 JSON 字符串
raw_json = json.dumps(config, ensure_ascii=False, indent=4)
return FileConfigResponse(
redis=masked_config.get("redis", {}),
qmt_terminals=masked_config.get("qmt_terminals", []),
strategies=masked_config.get("strategies", {}),
raw_config=raw_json,
config_path=os.path.abspath(config_path)
)
except Exception as e:
logging.error(f"读取配置文件失败: {e}")
return FileConfigResponse(
redis={},
qmt_terminals=[],
strategies={},
raw_config=f"读取配置文件失败: {str(e)}",
config_path=config_path
)
@self.app.get("/api/messages", response_model=MessagesResponse, summary="获取Redis消息")
def get_messages(
strategy: str = Query("all", description="策略名称all表示所有策略"),
count: int = Query(50, ge=1, le=200, description="获取消息数量"),
is_backtest: bool = Query(False, description="是否为回测消息")
):
"""从Redis Stream获取消息列表"""
messages = []
try:
# 从manager获取redis连接
if hasattr(self.manager, 'pos_manager') and self.manager.pos_manager:
r = self.manager.pos_manager.r
elif hasattr(self.manager, 'stream_processor') and self.manager.stream_processor:
r = self.manager.stream_processor.r
else:
# 尝试从配置创建连接
import redis
redis_config = getattr(self.manager.config, 'redis', {})
r = redis.Redis(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
password=redis_config.get('password'),
db=redis_config.get('db', 0),
decode_responses=True
)
if strategy == "all":
# 获取所有策略的消息
pattern = f"qmt:*:{'backtest' if is_backtest else 'real'}"
stream_keys = []
for key in r.scan_iter(match=pattern):
stream_keys.append(key)
else:
# 获取指定策略的消息
stream_key = f"qmt:{strategy}:{'backtest' if is_backtest else 'real'}"
stream_keys = [stream_key]
# 从每个stream读取消息
for stream_key in stream_keys:
try:
# 使用XREVRANGE获取最新消息
stream_msgs = r.xrevrange(stream_key, max="+", min="-", count=count)
for msg_id, msg_fields in stream_msgs:
# 解析消息数据
data_str = msg_fields.get("data", "{}")
try:
data = json.loads(data_str)
except json.JSONDecodeError:
data = {"raw_data": data_str}
# 解析消息ID获取时间戳
# Redis消息ID格式: timestamp-sequence
timestamp_ms = int(msg_id.split("-")[0])
timestamp = datetime.datetime.fromtimestamp(
timestamp_ms / 1000
).strftime('%Y-%m-%d %H:%M:%S')
# 检查消息是否已处理通过检查pending列表
is_processed = True
try:
# 获取消费者组的pending消息
pending_info = r.xpending(
stream_key,
"qmt_consumers"
)
# 如果消息在pending列表中说明还未确认
if pending_info and pending_info.get('pending', 0) > 0:
# 获取具体的pending消息ID
pending_range = r.xpending_range(
stream_key,
"qmt_consumers",
min="-",
max="+",
count=100
)
pending_ids = [item['message_id'] for item in pending_range] if pending_range else []
is_processed = msg_id not in pending_ids
except:
# 如果没有消费者组或出错,默认认为已处理
is_processed = True
messages.append(MessageItem(
message_id=msg_id,
data=data,
timestamp=timestamp,
is_processed=is_processed
))
except Exception as e:
logging.error(f"读取stream {stream_key}失败: {e}")
continue
# 按时间戳倒序排序
messages.sort(key=lambda x: x.timestamp, reverse=True)
return MessagesResponse(
messages=messages[:count],
total=len(messages),
strategy_name=strategy
)
except Exception as e:
logging.error(f"获取消息失败: {e}")
return MessagesResponse(
messages=[],
total=0,
strategy_name=strategy
)
def get_app(self) -> FastAPI:
"""获取FastAPI应用实例"""
return self.app
# ================= 辅助函数 =================
def create_api_server(manager: MultiEngineManager, config_file: str = "config.json") -> FastAPI:
"""创建API服务器入口
参数:
- manager: MultiEngineManager 实例
- config_file: 配置文件路径
"""
server = QMTAPIServer(manager, config_file=config_file)
return server.get_app()