Files
NewQuant/strategy_manager/web_backend.py
liaozhaorun c0d996f39b feat(strategy_manager): 添加策略自动启动的白名单管理
实现全面的白名单管理系统,支持策略自动启动:

- 添加 WhitelistManager 类用于持久化白名单配置存储
- 将白名单功能集成到 StrategyManager 核心模块
- 添加用于白名单 CRUD 操作的 REST API 端点
- 通过 start.py 创建用于白名单管理的 CLI 命令
- 更新前端 UI,添加白名单状态指示器和批量操作功能
- 实现每日 08:58 的自动启动调度
- 为白名单操作添加配置验证和日志记录

同时添加项目文档:
- 代码格式规则(缩进、行长度、导入、命名)
- 文件、变量、常量、函数、类的命名规范
- 受限制文件和目录的保护规则
2026-01-26 01:21:46 +08:00

281 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# filename: main.py
import subprocess
import os
import logging
from collections import deque
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
# 引入调度器
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
# 复用现有manager
from core.manager import StrategyManager
# ================== 初始化 ==================
app = FastAPI(title="策略控制台")
manager = StrategyManager()
# 初始化调度器
scheduler = AsyncIOScheduler()
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ================== 新增:获取 Git 版本信息 ==================
def get_git_commit_info():
"""
[智能版] 获取 Git 仓库的最新提交信息。
此函数会自动从当前文件位置向上查找项目根目录(.git 文件夹所在位置)。
"""
try:
# 1. 获取 main.py 所在的目录
script_dir = Path(__file__).resolve().parent
# 2. 推断出项目根目录 (即 main.py 所在目录的上一级)
project_root = script_dir.parent
# 3. 检查项目根目录下是否存在 .git 文件夹
git_dir = project_root / ".git"
if not git_dir.is_dir():
return "Git repo not found in parent directory"
# 4. 在推断出的项目根目录下执行 git 命令
# 使用 cwd (current working directory) 参数是关键
result = subprocess.run(
['git', 'log', '-1', '--pretty=format:%h - %s (%cr)'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
encoding='utf-8',
cwd=project_root # <--- 在这里指定命令的执行目录
)
return result.stdout.strip()
except (FileNotFoundError, subprocess.CalledProcessError) as e:
logger.warning(f"无法获取 Git 提交信息: {e}")
return "获取 Git 信息失败"
# ================== 定时任务逻辑 ==================
def scheduled_restart_task():
"""
定时任务:重启所有正在运行的策略
"""
logger.info("⏰ [定时任务] 触发自动重启流程...")
# 获取当前所有策略状态
status = manager.get_status()
running_strategies = [
name for name, info in status['strategies'].items()
if info['status'] == 'running'
]
if not running_strategies:
logger.info("⏰ [定时任务] 当前无运行中的策略,无需重启")
return
logger.info(f"⏰ [定时任务] 即将重启以下策略: {running_strategies}")
for name in running_strategies:
try:
manager.restart_strategy(name)
logger.info(f"✅ [定时任务] {name} 重启成功")
except Exception as e:
logger.error(f"❌ [定时任务] {name} 重启失败: {e}")
logger.info("⏰ [定时任务] 自动重启流程结束")
def scheduled_whitelist_auto_start():
"""
定时任务:白名单自动启动
仅在 08:58 执行
"""
logger.info("⏰ [白名单定时任务] 触发白名单自动启动...")
results = manager.auto_start_whitelist_strategies()
if not results:
logger.info("⏰ [白名单定时任务] 今天已执行过或无需启动")
return
success_count = sum(1 for v in results.values() if v)
fail_count = len(results) - success_count
logger.info(f"⏰ [白名单定时任务] 完成: 成功 {success_count}, 失败 {fail_count}")
for name, success in results.items():
if success:
logger.info(f"✅ [白名单定时任务] {name} 启动成功")
else:
logger.error(f"❌ [白名单定时任务] {name} 启动失败")
# ================== FastAPI 生命周期事件 (使用 lifespan 替代废弃的 on_event) ==================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
FastAPI 生命周期管理器,替代废弃的 @app.on_event 装饰器
"""
# 原有重启任务 (08:58, 20:58)
scheduler.add_job(
scheduled_restart_task,
CronTrigger(hour=8, minute=58),
id="restart_morning",
replace_existing=True
)
scheduler.add_job(
scheduled_restart_task,
CronTrigger(hour=20, minute=58),
id="restart_evening",
replace_existing=True
)
# 新增:白名单自动启动任务(仅 08:58
scheduler.add_job(
scheduled_whitelist_auto_start,
CronTrigger(hour=8, minute=58),
id="whitelist_auto_start",
replace_existing=True
)
scheduler.start()
logger.info("📅 定时任务调度器已启动")
logger.info(" - 重启任务: 08:58, 20:58")
logger.info(" - 白名单自动启动: 08:58")
yield
# 应用关闭时执行
scheduler.shutdown()
logger.info("📅 定时任务调度器已关闭")
# ================== 初始化 ==================
app = FastAPI(title="策略控制台", lifespan=lifespan)
# ================== API 路由 ==================
@app.get("/api/status")
def get_status():
# [核心修改] 在返回状态的同时,注入 Git 版本信息
status_data = manager.get_status()
status_data['git_info'] = get_git_commit_info()
return status_data
@app.post("/api/strategy/{name}/start")
def start_strategy(name: str):
if manager.start_strategy(name):
return {"success": True}
raise HTTPException(400, "启动失败")
@app.post("/api/strategy/{name}/stop")
def stop_strategy(name: str):
if manager.stop_strategy(name):
return {"success": True}
raise HTTPException(400, "停止失败")
@app.post("/api/strategy/{name}/restart")
def restart_strategy(name: str):
if manager.restart_strategy(name):
return {"success": True}
raise HTTPException(400, "重启失败,请检查日志")
@app.get("/api/logs/{name}")
def get_logs(name: str, lines: int = Query(50, le=500)):
try:
if '_' not in name:
return {"lines": []}
strategy_name, symbol = name.split('_', 1)
log_file = Path(f"logs/{strategy_name}/{symbol}.log")
if not log_file.exists():
return {"lines": ["日志文件尚未生成"]}
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
last_lines = deque(f, maxlen=lines)
return {"lines": [line.rstrip() for line in last_lines]}
except Exception as e:
raise HTTPException(500, f"读取日志失败: {e}")
# ================== 白名单管理 API ==================
@app.get("/api/whitelist")
def get_whitelist():
"""获取白名单列表"""
whitelist = manager.whitelist_manager.get_all()
auto_start_status = manager.whitelist_manager.get_auto_start_status()
return {
"whitelist": whitelist,
"auto_start_status": auto_start_status
}
@app.post("/api/whitelist/{name}/add")
def add_to_whitelist(name: str):
"""添加策略到白名单"""
if manager.add_to_whitelist(name):
return {"success": True, "message": f"已添加到白名单: {name}"}
raise HTTPException(400, f"添加失败,策略可能已存在: {name}")
@app.post("/api/whitelist/{name}/remove")
def remove_from_whitelist(name: str):
"""从白名单移除策略"""
if manager.remove_from_whitelist(name):
return {"success": True, "message": f"已从白名单移除: {name}"}
raise HTTPException(400, f"移除失败,策略可能不在白名单中: {name}")
@app.post("/api/whitelist/{name}/enable")
def enable_in_whitelist(name: str):
"""启用白名单中的策略"""
if manager.set_whitelist_enabled(name, True):
return {"success": True, "message": f"已启用: {name}"}
raise HTTPException(400, f"操作失败,策略可能不在白名单中: {name}")
@app.post("/api/whitelist/{name}/disable")
def disable_in_whitelist(name: str):
"""禁用白名单中的策略"""
if manager.set_whitelist_enabled(name, False):
return {"success": True, "message": f"已禁用: {name}"}
raise HTTPException(400, f"操作失败,策略可能不在白名单中: {name}")
@app.post("/api/whitelist/auto-start")
def trigger_auto_start():
"""手动触发白名单自动启动(用于测试)"""
results = manager.auto_start_whitelist_strategies()
return {
"success": True,
"results": results,
"count": len(results),
"success_count": sum(1 for v in results.values() if v),
"fail_count": sum(1 for v in results.values() if not v)
}
# ================== 静态文件挂载 ==================
# 服务前端构建文件
app.mount("/static", StaticFiles(directory="frontend/dist"), name="static")
@app.get("/")
def serve_frontend():
return FileResponse("frontend/dist/index.html")