import os import sys import json import time import psutil import subprocess from pathlib import Path from typing import Dict, Any, List from datetime import datetime # ==================== 动态路径配置 ==================== from core.path_utils import add_project_root_to_path # 添加项目根路径到sys.path PROJECT_ROOT = add_project_root_to_path() # ================================================== class StrategyManager: def __init__(self, config_path: str = "config/main.json"): self.config = self._load_main_config(config_path) self.strategies_dir = Path("strategies") self.logs_dir = Path(self.config["logs_dir"]) self.status_file = Path(self.config["status_file"]) self.pid_dir = Path(self.config["pid_dir"]) # 创建目录 self.logs_dir.mkdir(exist_ok=True) self.pid_dir.mkdir(exist_ok=True) self.strategies: Dict[str, Dict[str, Any]] = {} self.load_strategies() def _load_main_config(self, config_path: str) -> Dict[str, Any]: path = Path(config_path) if not path.exists(): return { "logs_dir": "logs", "status_file": "status.json", "pid_dir": "pids" } with open(path, 'r') as f: return json.load(f) def load_strategies(self): """递归扫描 strategies/ 目录,查找 .config 文件""" self.strategies = {} if not self.strategies_dir.exists(): print("[ERROR] 策略配置目录不存在: {}".format(self.strategies_dir)) return for config_file in self.strategies_dir.rglob("*.config"): try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) required = ['name', 'strategy_class', 'enabled', 'engine_params', 'strategy_params'] for field in required: if field not in config: raise ValueError("配置缺少必要字段: {}".format(field)) relative_path = config_file.relative_to(self.strategies_dir) strategy_name = relative_path.parent.name symbol = config_file.stem strategy_key = "{}_{}".format(strategy_name, symbol) self.strategies[strategy_key] = { "strategy_name": strategy_name, "symbol": symbol, "config_file": str(config_file), "config": config, "status": "stopped", "pid": None, "started_at": None, "uptime": None } except Exception as e: print("[ERROR] 加载配置失败 {}: {}".format(config_file, e)) def get_status(self) -> Dict[str, Any]: """获取完整状态""" self._refresh_status() return { "timestamp": datetime.now().isoformat(), "total": len(self.strategies), "running": sum(1 for s in self.strategies.values() if s["status"] == "running"), "strategies": self.strategies } def _refresh_status(self): """ 刷新进程状态 - 双重验证 1. 检查PID文件是否存在 2. 检查进程是否存在 3. 验证进程名是否为python(防止PID复用) """ for name, info in self.strategies.items(): pid_file = self.pid_dir / "{}.pid".format(name) if pid_file.exists(): try: with open(pid_file, 'r') as f: pid = int(f.read().strip()) # 双重验证 if psutil.pid_exists(pid): try: proc = psutil.Process(pid) # 验证进程名是否包含python if "python" in proc.name().lower(): # 验证成功,更新为运行中 info["status"] = "running" info["pid"] = pid if info["started_at"]: started = datetime.fromisoformat(info["started_at"]) uptime = datetime.now() - started info["uptime"] = str(uptime).split('.')[0] continue # 跳过清理逻辑 except (psutil.NoSuchProcess, psutil.AccessDenied): # 进程已死或无权访问,继续清理 pass # PID不存在或验证失败,清理 self._cleanup_stopped_strategy(name, pid_file) except Exception as e: print("[WARNING] 刷新状态失败 {}: {}".format(name, e)) self._cleanup_stopped_strategy(name, pid_file) else: info["status"] = "stopped" info["pid"] = None info["started_at"] = None info["uptime"] = None def _is_running(self, name: str) -> bool: """ 检查策略是否运行中 - 实时刷新状态 确保与status命令结果一致 """ # 先刷新状态确保最新 self._refresh_status() info = self.strategies[name] if not info["pid"]: return False pid_file = self.pid_dir / "{}.pid".format(name) if not pid_file.exists(): return False try: with open(pid_file, 'r') as f: pid = int(f.read().strip()) # 双重验证 if psutil.pid_exists(pid): try: proc = psutil.Process(pid) return "python" in proc.name().lower() except (psutil.NoSuchProcess, psutil.AccessDenied): return False return False except: return False def stop_strategy(self, name: str, timeout: int = 30) -> bool: """停止单个策略""" if name not in self.strategies: print("[ERROR] 策略不存在: {}".format(name)) return False # 再次检查状态(确保最新) if not self._is_running(name): print("[WARNING] 策略未运行: {}".format(name)) return False info = self.strategies[name] try: pid = info["pid"] process = psutil.Process(pid) print("\n[INFO] 正在停止: {} (PID: {})...".format(name, pid)) # 优雅终止 process.terminate() try: process.wait(timeout=timeout) print("[SUCCESS] 已停止: {}".format(name)) except psutil.TimeoutExpired: print("[WARNING] 超时,强制终止: {}".format(name)) process.kill() process.wait() # 清理状态 self._cleanup_stopped_strategy(name, self.pid_dir / "{}.pid".format(name)) self._save_status() return True except Exception as e: print("[ERROR] 停止失败 {}: {}".format(name, e)) return False def restart_strategy(self, name: str) -> bool: """重启策略""" print("\n[INFO] 正在重启: {}".format(name)) self.stop_strategy(name) time.sleep(2) return self.start_strategy(name) def start_all(self): """启动所有启用的策略""" print("\n" + "=" * 100) print("正在启动所有启用的策略...") print("=" * 100) started = [] for name, info in self.strategies.items(): if info["config"]["enabled"] and not self._is_running(name): if self.start_strategy(name): started.append(name) print("\n[SUCCESS] 成功启动 {} 个策略".format(len(started))) if started: print("策略: {}".format(", ".join(started))) def stop_all(self): """停止所有运行的策略""" print("\n" + "=" * 100) print("正在停止所有运行的策略...") print("=" * 100) stopped = [] for name in self.strategies.keys(): if self._is_running(name): if self.stop_strategy(name): stopped.append(name) print("\n[SUCCESS] 成功停止 {} 个策略".format(len(stopped))) if stopped: print("策略: {}".format(", ".join(stopped))) def _cleanup_stopped_strategy(self, name: str, pid_file: Path): """清理已停止的策略状态""" pid_file.unlink(missing_ok=True) info = self.strategies[name] info["status"] = "stopped" info["pid"] = None info["started_at"] = None info["uptime"] = None def _save_status(self): """状态持久化""" status = self.get_status() with open(self.status_file, 'w') as f: json.dump(status, f, indent=2, ensure_ascii=False) def print_status_table(status: Dict[str, Any]): """格式化打印状态表格""" print("\n" + "=" * 130) print("策略状态总览 (更新时间: {})".format(status['timestamp'])) print("总计: {} | 运行中: {} | 已停止: {}".format( status['total'], status['running'], status['total'] - status['running'] )) print("=" * 130) if not status["strategies"]: print("未找到任何策略") return print( "配置标识 策略名称 状态 PID 运行时长 启动时间") print("-" * 130) for name, info in status["strategies"].items(): status_text = "RUNNING" if info["status"] == "running" else "STOPPED" pid_text = str(info["pid"]) if info["pid"] else "-" uptime_text = info["uptime"] if info["uptime"] else "-" started_text = info["started_at"][:19] if info["started_at"] else "-" print("{:<35} {:<40} {:<10} {:<10} {:<15} {:<25}".format( name, info['config']['name'], status_text, pid_text, uptime_text, started_text )) print("=" * 130)