Files
NewStock/qmt/api_server.py
2026-01-04 22:43:13 +08:00

110 lines
3.7 KiB
Python

# coding:utf-8
import os
import datetime
from typing import Optional, List, Dict, Any
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
from qmt_engine import QMTEngine, QMTStatus
# ================= Pydantic模型 =================
class StatusResponse(BaseModel):
"""状态响应模型"""
running: bool
qmt_connected: bool
start_time: str
last_loop_update: str
account_id: str
class PositionsResponse(BaseModel):
"""持仓响应模型"""
real_positions: List[Dict[str, Any]]
virtual_positions: Dict[str, Dict[str, str]]
class LogsResponse(BaseModel):
"""日志响应模型"""
logs: List[str]
# ================= FastAPI应用 =================
class QMTAPIServer:
"""QMT API服务器"""
def __init__(self, qmt_engine: QMTEngine):
self.app = FastAPI(title="QMT Monitor")
self.qmt_engine = qmt_engine
self._setup_middleware()
self._setup_routes()
def _setup_middleware(self):
"""设置中间件"""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _setup_routes(self):
"""设置路由"""
@self.app.get("/", summary="仪表盘页面")
async def read_root():
"""返回仪表盘HTML页面"""
if os.path.exists("dashboard.html"):
return FileResponse("dashboard.html")
return {"error": "Dashboard not found"}
@self.app.get("/api/status", response_model=StatusResponse, summary="获取系统状态")
def get_status():
"""获取QMT连接状态和系统信息"""
status = self.qmt_engine.get_status()
return StatusResponse(
running=status.is_running,
qmt_connected=status.is_connected,
start_time=status.start_time,
last_loop_update=status.last_heartbeat,
account_id=status.account_id
)
@self.app.get("/api/positions", response_model=PositionsResponse, summary="获取持仓信息")
def get_positions():
"""获取实盘和虚拟持仓信息"""
positions = self.qmt_engine.get_positions()
return PositionsResponse(
real_positions=positions["real_positions"],
virtual_positions=positions["virtual_positions"]
)
@self.app.get("/api/logs", response_model=LogsResponse, summary="获取日志")
def get_logs(lines: int = Query(50, ge=1, le=1000, description="返回日志行数")):
"""获取最近的交易日志"""
logs = self.qmt_engine.get_logs(lines)
return LogsResponse(logs=logs)
@self.app.get("/api/health", summary="健康检查")
def health_check():
"""健康检查接口"""
status = self.qmt_engine.get_status()
if status.is_running and status.is_connected:
return {"status": "healthy", "timestamp": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
else:
return {"status": "unhealthy", "timestamp": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
def get_app(self) -> FastAPI:
"""获取FastAPI应用实例"""
return self.app
# ================= 辅助函数 =================
def create_api_server(qmt_engine: QMTEngine) -> FastAPI:
"""创建API服务器"""
server = QMTAPIServer(qmt_engine)
return server.get_app()