diff --git a/.gitignore b/.gitignore index 07bdb8a..90b812f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,6 @@ data/data/ logs/ pids/ -status/ -status.json \ No newline at end of file +states/ +status.json +/strategy_manager/states/ diff --git a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb index 06edbf3..d3b2022 100644 --- a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb +++ b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb @@ -6,8 +6,8 @@ "metadata": { "collapsed": true, "ExecuteTime": { - "end_time": "2025-11-19T07:13:51.818890Z", - "start_time": "2025-11-19T07:13:50.827090Z" + "end_time": "2025-11-21T01:54:34.434731Z", + "start_time": "2025-11-21T01:54:31.509589Z" } }, "source": [ @@ -44,8 +44,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-11-19T07:13:51.840183Z", - "start_time": "2025-11-19T07:13:51.822905Z" + "end_time": "2025-11-21T01:54:34.537950Z", + "start_time": "2025-11-21T01:54:34.445969Z" } }, "cell_type": "code", @@ -79,8 +79,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-11-19T07:14:58.236155Z", - "start_time": "2025-11-19T07:13:51.846812Z" + "end_time": "2025-11-21T01:55:38.643534Z", + "start_time": "2025-11-21T01:54:34.551413Z" } }, "cell_type": "code", @@ -148,7 +148,7 @@ "\n", "初始化回测引擎...\n", "模拟器初始化:初始资金=100000.00, 滑点率=0.0, 佣金率=0.0001\n", - "内存仓储已初始化,管理ID: 'futures_trading_strategies.SA.Spectral.SpectralTrendStrategy.SpectralTrendStrategy_228362dc13ea83accde90203f5f5434e'\n", + "内存仓储已初始化,管理ID: 'futures_trading_strategies.SA.Spectral.SpectralTrendStrategy.SpectralTrendStrategy_f774488b98ac53758169305e51fd8595'\n", "\n", "--- 回测引擎初始化完成 ---\n", " 策略: SpectralTrendStrategy\n", @@ -196,8 +196,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-11-19T07:15:05.968736Z", - "start_time": "2025-11-19T07:14:58.277441Z" + "end_time": "2025-11-21T01:55:46.179638Z", + "start_time": "2025-11-21T01:55:38.675521Z" } }, "cell_type": "code", diff --git a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py index f07d033..642f31a 100644 --- a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py +++ b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py @@ -123,10 +123,11 @@ class SpectralTrendStrategy(Strategy): return # 核心逻辑:相变入场/退出 - if position_volume == 0: - self.evaluate_entry_signal(open_price, trend_strength, dominant_freq) - else: - self.manage_open_position(position_volume, trend_strength, dominant_freq) + if self.trading: + if position_volume == 0: + self.evaluate_entry_signal(open_price, trend_strength, dominant_freq) + else: + self.manage_open_position(position_volume, trend_strength, dominant_freq) def calculate_trend_strength(self, prices: np.array) -> (float, float): """ diff --git a/strategy_manager/core/manager.py b/strategy_manager/core/manager.py index 6dfc61d..8e81de3 100644 --- a/strategy_manager/core/manager.py +++ b/strategy_manager/core/manager.py @@ -1,12 +1,14 @@ import os import sys -import json import time import psutil import subprocess +import logging from pathlib import Path from typing import Dict, Any, List from datetime import datetime +import importlib.util +import json # 确保导入json模块 # ==================== 动态路径配置 ==================== from core.path_utils import add_project_root_to_path @@ -29,12 +31,32 @@ class StrategyManager: self.logs_dir.mkdir(exist_ok=True) self.pid_dir.mkdir(exist_ok=True) + # 配置管理器日志 + self._setup_logger() + self.strategies: Dict[str, Dict[str, Any]] = {} + self.logger.info("🔄 正在加载策略配置...") self.load_strategies() + self.logger.info("✅ 策略加载完成,共发现 %d 个策略", len(self.strategies)) + + def _setup_logger(self): + """配置管理器日志""" + log_file = self.logs_dir / "manager.log" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + handlers=[ + logging.FileHandler(log_file, encoding='utf-8'), + logging.StreamHandler(sys.stdout) + ], + force=True + ) + self.logger = logging.getLogger("StrategyManager") def _load_main_config(self, config_path: str) -> Dict[str, Any]: path = Path(config_path) if not path.exists(): + self.logger.warning("⚠️ 主配置文件不存在,使用默认配置") return { "logs_dir": "logs", "status_file": "status.json", @@ -44,16 +66,23 @@ class StrategyManager: return json.load(f) def load_strategies(self): - """递归扫描 strategies/ 目录,查找 .config 文件""" + """递归扫描 strategies/ 目录,查找 .py 配置文件""" self.strategies = {} if not self.strategies_dir.exists(): - print("[ERROR] 策略配置目录不存在: {}".format(self.strategies_dir)) + self.logger.error("❌ 策略配置目录不存在: %s", self.strategies_dir) return - for config_file in self.strategies_dir.rglob("*.config"): + for config_file in self.strategies_dir.rglob("*.py"): try: - with open(config_file, 'r', encoding='utf-8') as f: - config = json.load(f) + spec = importlib.util.spec_from_file_location( + f"config_{config_file.stem}", config_file + ) + config_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(config_module) + + if not hasattr(config_module, 'CONFIG'): + raise ValueError("配置文件缺少 CONFIG 变量") + config = config_module.CONFIG required = ['name', 'strategy_class', 'enabled', 'engine_params', 'strategy_params'] for field in required: @@ -75,8 +104,9 @@ class StrategyManager: "started_at": None, "uptime": None } + self.logger.info("📄 加载配置: %s", config_file) except Exception as e: - print("[ERROR] 加载配置失败 {}: {}".format(config_file, e)) + self.logger.error("❌ 加载配置失败 %s: %s", config_file, e, exc_info=True) def get_status(self) -> Dict[str, Any]: """获取完整状态""" @@ -89,12 +119,7 @@ class StrategyManager: } def _refresh_status(self): - """ - 刷新进程状态 - 双重验证 - 1. 检查PID文件是否存在 - 2. 检查进程是否存在 - 3. 验证进程名是否为python(防止PID复用) - """ + """刷新进程状态 - 双重验证""" for name, info in self.strategies.items(): pid_file = self.pid_dir / "{}.pid".format(name) @@ -103,28 +128,23 @@ class StrategyManager: with open(pid_file, 'r') as f: pid = int(f.read().strip()) - # 双重验证 if psutil.pid_exists(pid): try: proc = psutil.Process(pid) - # 验证进程名是否包含python if "python" in proc.name().lower(): - # 验证成功,更新为运行中 info["status"] = "running" info["pid"] = pid if info["started_at"]: started = datetime.fromisoformat(info["started_at"]) uptime = datetime.now() - started info["uptime"] = str(uptime).split('.')[0] - continue # 跳过清理逻辑 + continue except (psutil.NoSuchProcess, psutil.AccessDenied): - # 进程已死或无权访问,继续清理 pass - # PID不存在或验证失败,清理 self._cleanup_stopped_strategy(name, pid_file) except Exception as e: - print("[WARNING] 刷新状态失败 {}: {}".format(name, e)) + self.logger.warning("⚠️ 刷新状态失败 %s: %s", name, e) self._cleanup_stopped_strategy(name, pid_file) else: info["status"] = "stopped" @@ -133,13 +153,8 @@ class StrategyManager: info["uptime"] = None def _is_running(self, name: str) -> bool: - """ - 检查策略是否运行中 - 实时刷新状态 - 确保与status命令结果一致 - """ - # 先刷新状态确保最新 + """检查策略是否运行中""" self._refresh_status() - info = self.strategies[name] if not info["pid"]: return False @@ -152,7 +167,6 @@ class StrategyManager: with open(pid_file, 'r') as f: pid = int(f.read().strip()) - # 双重验证 if psutil.pid_exists(pid): try: proc = psutil.Process(pid) @@ -163,15 +177,74 @@ class StrategyManager: except: return False + def start_strategy(self, name: str) -> bool: + """启动单个策略(守护进程模式)""" + if name not in self.strategies: + self.logger.error("❌ 策略不存在: %s", name) + return False + + if self._is_running(name): + self.logger.warning("⚠️ 策略已在运行: %s", name) + return False + + info = self.strategies[name] + config_file = Path(info["config_file"]) + + # 创建策略专属日志目录 + strategy_log_dir = self.logs_dir / info["strategy_name"] + strategy_log_dir.mkdir(exist_ok=True, parents=True) + log_file = strategy_log_dir / "{}.log".format(info["symbol"]) + + self.logger.info("\n" + "=" * 80) + self.logger.info("🚀 启动策略: %s", name) + self.logger.info("📄 配置文件: %s", config_file) + self.logger.info("📝 日志文件: %s", log_file) + + try: + # 启动子进程 - 关键修改:脱离终端会话 + with open(log_file, 'a') as f: + # 使用 start_new_session=True 创建新会话,防止终端关闭影响 + # stdin 重定向到 /dev/null,完全脱离终端 + with open(os.devnull, 'r') as devnull: + process = subprocess.Popen( + [sys.executable, "launcher.py", "--config", str(config_file)], + stdout=f, + stderr=subprocess.STDOUT, + stdin=devnull, # 关键:从/dev/null读取输入 + # start_new_session=True, # 关键:创建新会话,脱离终端 + cwd=Path.cwd() + ) + + # 更新状态 + info["pid"] = process.pid + info["status"] = "running" + info["started_at"] = datetime.now().isoformat() + info["uptime"] = "00:00:00" + + # 保存PID文件 + pid_file = self.pid_dir / "{}.pid".format(name) + with open(pid_file, 'w') as f: + f.write(str(process.pid)) + + self._save_status() + self.logger.info("✅ 启动成功! PID: %d", process.pid) + self.logger.info("ℹ️ 该进程已脱离终端会话,关闭窗口不会停止策略") + self.logger.info("=" * 80) + return True + + except Exception as e: + self.logger.error("❌ 启动失败: %s", e, exc_info=True) + self._cleanup_stopped_strategy(name, self.pid_dir / "{}.pid".format(name)) + return False + def stop_strategy(self, name: str, timeout: int = 30) -> bool: """停止单个策略""" if name not in self.strategies: - print("[ERROR] 策略不存在: {}".format(name)) + self.logger.error("❌ 策略不存在: %s", name) return False - # 再次检查状态(确保最新) if not self._is_running(name): - print("[WARNING] 策略未运行: {}".format(name)) + self.logger.warning("⚠️ 策略未运行: %s", name) return False info = self.strategies[name] @@ -180,40 +253,43 @@ class StrategyManager: pid = info["pid"] process = psutil.Process(pid) - print("\n[INFO] 正在停止: {} (PID: {})...".format(name, pid)) + self.logger.info("\n" + "=" * 80) + self.logger.info("⏹️ 正在停止策略: %s (PID: %d)", name, pid) # 优雅终止 process.terminate() try: process.wait(timeout=timeout) - print("[SUCCESS] 已停止: {}".format(name)) + self.logger.info("✅ 已优雅停止: %s", name) except psutil.TimeoutExpired: - print("[WARNING] 超时,强制终止: {}".format(name)) + self.logger.warning("⏱️ 超时,强制终止: %s", name) process.kill() process.wait() # 清理状态 self._cleanup_stopped_strategy(name, self.pid_dir / "{}.pid".format(name)) self._save_status() + self.logger.info("=" * 80) return True except Exception as e: - print("[ERROR] 停止失败 {}: {}".format(name, e)) + self.logger.error("❌ 停止失败 %s: %s", name, e, exc_info=True) return False def restart_strategy(self, name: str) -> bool: """重启策略""" - print("\n[INFO] 正在重启: {}".format(name)) + self.logger.info("\n" + "=" * 80) + self.logger.info("🔄 正在重启: %s", name) self.stop_strategy(name) time.sleep(2) return self.start_strategy(name) def start_all(self): """启动所有启用的策略""" - print("\n" + "=" * 100) - print("正在启动所有启用的策略...") - print("=" * 100) + self.logger.info("\n" + "=" * 100) + self.logger.info("🚀 正在启动所有启用的策略...") + self.logger.info("=" * 100) started = [] for name, info in self.strategies.items(): @@ -221,15 +297,15 @@ class StrategyManager: if self.start_strategy(name): started.append(name) - print("\n[SUCCESS] 成功启动 {} 个策略".format(len(started))) + self.logger.info("\n✅ 成功启动 %d 个策略", len(started)) if started: - print("策略: {}".format(", ".join(started))) + self.logger.info("📋 策略: %s", ", ".join(started)) def stop_all(self): """停止所有运行的策略""" - print("\n" + "=" * 100) - print("正在停止所有运行的策略...") - print("=" * 100) + self.logger.info("\n" + "=" * 100) + self.logger.info("⏹️ 正在停止所有运行的策略...") + self.logger.info("=" * 100) stopped = [] for name in self.strategies.keys(): @@ -237,9 +313,9 @@ class StrategyManager: if self.stop_strategy(name): stopped.append(name) - print("\n[SUCCESS] 成功停止 {} 个策略".format(len(stopped))) + self.logger.info("\n✅ 成功停止 %d 个策略", len(stopped)) if stopped: - print("策略: {}".format(", ".join(stopped))) + self.logger.info("📋 策略: %s", ", ".join(stopped)) def _cleanup_stopped_strategy(self, name: str, pid_file: Path): """清理已停止的策略状态""" @@ -251,10 +327,25 @@ class StrategyManager: info["uptime"] = None def _save_status(self): - """状态持久化""" - status = self.get_status() - with open(self.status_file, 'w') as f: - json.dump(status, f, indent=2, ensure_ascii=False) + """状态持久化(修复:排除不可序列化的config字段)""" + try: + status = self.get_status() + + # 创建JSON安全的版本(排除config字段,因为它可能包含timedelta等不可序列化对象) + status_for_json = status.copy() + status_for_json["strategies"] = {} + + for name, info in status["strategies"].items(): + # 复制所有字段除了config + strategy_info = {k: v for k, v in info.items() if k != "config"} + status_for_json["strategies"][name] = strategy_info + + with open(self.status_file, 'w') as f: + json.dump(status_for_json, f, indent=2, ensure_ascii=False) + + self.logger.debug("💾 状态已保存到 %s", self.status_file) + except Exception as e: + self.logger.error("❌ 保存状态失败: %s", e, exc_info=True) def print_status_table(status: Dict[str, Any]): @@ -284,4 +375,31 @@ def print_status_table(status: Dict[str, Any]): name, info['config']['name'], status_text, pid_text, uptime_text, started_text )) - print("=" * 130) \ No newline at end of file + print("=" * 130) + + +if __name__ == "__main__": + manager = StrategyManager() + + if len(sys.argv) > 1: + command = sys.argv[1] + if command == "status": + print_status_table(manager.get_status()) + elif command == "start-all": + manager.start_all() + elif command == "stop-all": + manager.stop_all() + elif command.startswith("start:"): + name = command.split(":", 1)[1] + manager.start_strategy(name) + elif command.startswith("stop:"): + name = command.split(":", 1)[1] + manager.stop_strategy(name) + elif command.startswith("restart:"): + name = command.split(":", 1)[1] + manager.restart_strategy(name) + else: + print("未知命令:", command) + print("用法: python manager.py [status|start-all|stop-all|start:NAME|stop:NAME|restart:NAME]") + else: + print("用法: python manager.py [status|start-all|stop-all|start:NAME|stop:NAME|restart:NAME]") \ No newline at end of file diff --git a/strategy_manager/frontend/dist/index.html b/strategy_manager/frontend/dist/index.html new file mode 100644 index 0000000..dae6a84 --- /dev/null +++ b/strategy_manager/frontend/dist/index.html @@ -0,0 +1,253 @@ + + + + + + 策略控制台 + + + + + + + + + + + + +
+ + + + + + + +
+ + + + + \ No newline at end of file diff --git a/strategy_manager/launcher.py b/strategy_manager/launcher.py index 082861c..db8e810 100644 --- a/strategy_manager/launcher.py +++ b/strategy_manager/launcher.py @@ -1,9 +1,8 @@ import sys -import json import signal -from datetime import timedelta from pathlib import Path -import importlib +import importlib.util +import logging # 新增:日志模块 # ==================== 动态路径配置 ==================== from core.path_utils import add_project_root_to_path @@ -19,7 +18,6 @@ print(f"[INFO] Python路径: {sys.path[:3]}") def load_strategy_class(class_path: str): """动态加载策略类""" try: - # class_path: "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy" module_path, class_name = class_path.rsplit('.', 1) module = importlib.import_module(module_path) return getattr(module, class_name) @@ -30,26 +28,96 @@ def load_strategy_class(class_path: str): sys.exit(1) +def setup_strategy_logger(config_file: Path, strategy_name: str, symbol: str) -> logging.Logger: + """配置策略专属日志""" + # 创建日志目录: logs/{策略名}/ + log_dir = Path("logs") / strategy_name + log_dir.mkdir(exist_ok=True, parents=True) + + # 日志文件: logs/{策略名}/{品种}.log + log_file = log_dir / f"{symbol}.log" + + # 配置logger + logger = logging.getLogger(f"Strategy.{strategy_name}.{symbol}") + logger.setLevel(logging.INFO) + + # 清除旧handler(防止重复) + for h in logger.handlers[:]: + logger.removeHandler(h) + + # 文件handler + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setFormatter( + logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') + ) + logger.addHandler(file_handler) + + return logger, log_file + + def run_strategy(config_path: str): """通过配置文件运行策略""" - # 1. 加载配置 - with open(config_path, 'r', encoding='utf-8') as f: - config = json.load(f) + config_file = Path(config_path) + if not config_file.exists(): + print(f"[ERROR] 配置文件不存在: {config_file}") + sys.exit(1) + # 动态加载配置模块 + try: + spec = importlib.util.spec_from_file_location( + f"strategy_config_{config_file.stem}", config_file + ) + config_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(config_module) + + if not hasattr(config_module, 'CONFIG'): + print(f"[ERROR] 配置文件缺少 CONFIG 变量: {config_file}") + sys.exit(1) + + config = config_module.CONFIG + except Exception as e: + print(f"[ERROR] 加载配置文件失败 {config_path}: {e}") + sys.exit(1) + + # 配置策略专属日志(关键修改点) + strategy_name = config_file.parent.name + symbol = config_file.stem + logger, log_file = setup_strategy_logger(config_file, strategy_name, symbol) + + # 重定向print到logger(捕获策略内的print调用) + class PrintToLogger: + def __init__(self, logger, level=logging.INFO): + self.logger = logger + self.level = level + self.linebuf = '' + + def write(self, buf): + for line in buf.rstrip().splitlines(): + self.logger.log(self.level, line.rstrip()) + + def flush(self): + pass + + # 重定向stdout和stderr + sys.stdout = PrintToLogger(logger, logging.INFO) + sys.stderr = PrintToLogger(logger, logging.ERROR) + + # 所有后续的print都会写入日志文件 print(f"[INFO] [{config['name']}] 正在启动...") + print(f"[INFO] 日志文件: {log_file}") - # 2. 动态加载策略类 + # 动态加载策略类 strategy_class = load_strategy_class(config["strategy_class"]) - # 3. 创建API + # 创建API from tqsdk import TqApi, TqAuth, TqKq api = TqApi(TqKq(), auth=TqAuth("emanresu", "dfgvfgdfgg")) - # 4. 准备策略参数 + # 准备策略参数 strategy_params = config["strategy_params"].copy() strategy_params["main_symbol"] = config["engine_params"]["symbol"].split(".")[-1] - # 5. 创建引擎 + # 创建引擎 from src.tqsdk_real_engine import TqsdkEngine engine = TqsdkEngine( @@ -60,24 +128,24 @@ def run_strategy(config_path: str): duration_seconds=config["engine_params"]["duration_seconds"], roll_over_mode=config["engine_params"]["roll_over_mode"], history_length=config["engine_params"]["history_length"], - close_bar_delta=timedelta(**config["engine_params"]["close_bar_delta"]) + close_bar_delta=config["engine_params"]["close_bar_delta"] ) - # 6. 信号处理 + # 信号处理 def signal_handler(sig, frame): print(f"\n[INFO] [{config['name']}] 收到停止信号 {sig},正在关闭...") - api.close() + # api.close() sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) - # 7. 运行 + # 运行 try: print(f"[INFO] [{config['name']}] 开始运行") engine.run() except Exception as e: - print(f"[ERROR] [{config['name']}] 运行出错: {e}") + print(f"[ERROR] [{config['name']}] 运行出错: {e}", exc_info=True) sys.exit(1) finally: api.close() diff --git a/strategy_manager/restart_daemon.py b/strategy_manager/restart_daemon.py index bb0e4be..f3121b5 100644 --- a/strategy_manager/restart_daemon.py +++ b/strategy_manager/restart_daemon.py @@ -29,10 +29,10 @@ class RestartDaemon: # 确保目录存在 self.pid_dir.mkdir(exist_ok=True) + self.log_dir.mkdir(exist_ok=True) # 确保日志目录存在 def _setup_logger(self): """配置日志""" - self.log_dir.mkdir(exist_ok=True) log_file = self.log_dir / "restart_daemon.log" logging.basicConfig( @@ -41,9 +41,14 @@ class RestartDaemon: handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler(sys.stdout) - ] + ], + force=True # 防止日志配置冲突 ) - return logging.getLogger("RestartDaemon") + logger = logging.getLogger("RestartDaemon") + logger.info("=" * 80) + logger.info("📝 日志系统初始化完成") + logger.info("📂 日志文件: %s", log_file.absolute()) + return logger def start(self): """启动守护进程""" @@ -57,8 +62,8 @@ class RestartDaemon: self.logger.info("=" * 80) self.logger.info("✅ 重启守护进程已启动") - self.logger.info("⏰ 监控时间点: {}".format(", ".join(self.RESTART_TIMES))) - self.logger.info("📂 PID目录: {}".format(self.pid_dir.absolute())) + self.logger.info("⏰ 监控时间点: %s", ", ".join(self.RESTART_TIMES)) + self.logger.info("📂 PID目录: %s", self.pid_dir.absolute()) self.logger.info("=" * 80) # 主线程阻塞(保持进程运行) @@ -97,14 +102,14 @@ class RestartDaemon: time.sleep(60) # 每分钟检查一次 except Exception as e: - self.logger.error("❌ 检查循环出错: {}".format(e)) + self.logger.error("❌ 检查循环出错: %s", e, exc_info=True) self.logger.error("=" * 80) time.sleep(60) # 出错后等待1分钟继续 def _perform_restart(self, time_point: str): """执行重启""" self.logger.info("\n" + "=" * 80) - self.logger.info("⏰ 到达重启时间: {}".format(time_point)) + self.logger.info("⏰ 到达重启时间: %s", time_point) self.logger.info("=" * 80) # 1. 扫描所有PID文件 @@ -113,7 +118,7 @@ class RestartDaemon: self.logger.info("⚠️ 未发现运行中的策略") return - self.logger.info("📋 发现 {} 个策略需要重启".format(len(pid_files))) + self.logger.info("📋 发现 %d 个策略需要重启", len(pid_files)) # 2. 停止所有策略 stopped_count = 0 @@ -124,21 +129,21 @@ class RestartDaemon: if psutil.pid_exists(pid): proc = psutil.Process(pid) - self.logger.info("⏹️ 停止策略 PID {}: {}".format(pid, proc.name())) + self.logger.info("⏹️ 停止策略 PID %d: %s", pid, proc.name()) proc.terminate() try: proc.wait(timeout=30) - self.logger.info("✅ 已优雅停止 PID {}".format(pid)) + self.logger.info("✅ 已优雅停止 PID %d", pid) stopped_count += 1 except psutil.TimeoutExpired: proc.kill() - self.logger.info("🔥 强制终止 PID {}".format(pid)) + self.logger.info("🔥 强制终止 PID %d", pid) stopped_count += 1 else: - self.logger.warning("⚠️ PID文件存在但进程已死: {}".format(pid)) + self.logger.warning("⚠️ PID文件存在但进程已死: %d", pid) except Exception as e: - self.logger.error("❌ 停止失败 {}: {}".format(pid_file, e)) + self.logger.error("❌ 停止失败 %s: %s", pid_file, e, exc_info=True) if stopped_count == 0: self.logger.warning("⚠️ 未成功停止任何策略") @@ -154,38 +159,38 @@ class RestartDaemon: for pid_file in pid_files: try: # 从PID文件名推导配置路径 - # DualModeTrendlineHawkesStrategy2_FG.pid -> strategies/DualModeTrendlineHawkesStrategy2/FG.config + # DualModeTrendlineHawkesStrategy2_FG.pid -> strategies/DualModeTrendlineHawkesStrategy2/FG.py name = pid_file.stem if '_' not in name: - self.logger.error("❌ PID文件名格式错误: {}".format(name)) + self.logger.error("❌ PID文件名格式错误: %s", name) continue strategy_name, symbol = name.split('_', 1) - config_file = Path("strategies") / strategy_name / "{}.config".format(symbol) + config_file = Path("strategies") / strategy_name / "{}.py".format(symbol) if not config_file.exists(): - self.logger.error("❌ 配置文件不存在: {}".format(config_file)) + self.logger.error("❌ 配置文件不存在: %s", config_file) continue # 启动新进程(不阻塞,立即返回) process = subprocess.Popen( [sys.executable, "launcher.py", "--config", str(config_file)], - stdout=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, # launcher内会自行处理日志 stderr=subprocess.DEVNULL, cwd=Path.cwd() ) - self.logger.info("✅ 启动新进程 PID {}: {}".format(process.pid, config_file.name)) + self.logger.info("✅ 启动新进程 PID %d: %s", process.pid, config_file.name) restarted_count += 1 except Exception as e: - self.logger.error("❌ 启动失败: {}".format(e)) + self.logger.error("❌ 启动失败: %s", e, exc_info=True) # 5. 统计结果 self.logger.info("\n" + "=" * 80) self.logger.info("📊 重启统计:") - self.logger.info(" 停止成功: {}个".format(stopped_count)) - self.logger.info(" 启动成功: {}个".format(restarted_count)) + self.logger.info(" 停止成功: %d个", stopped_count) + self.logger.info(" 启动成功: %d个", restarted_count) if stopped_count == restarted_count and stopped_count > 0: self.logger.info("✅ 所有策略重启成功") diff --git a/strategy_manager/strategies/SpectralTrendStrategy/SA.py b/strategy_manager/strategies/SpectralTrendStrategy/SA.py new file mode 100644 index 0000000..474924e --- /dev/null +++ b/strategy_manager/strategies/SpectralTrendStrategy/SA.py @@ -0,0 +1,31 @@ +# 策略配置(Python格式) +from src.indicators.indicators import ZScoreATR + +CONFIG = { + "name": "傅里叶趋势策略", + "version": "1.0", + "enabled": True, + + "strategy_class": "futures_trading_strategies.SA.Spectral.SpectralTrendStrategy.SpectralTrendStrategy", + + "engine_params": { + "symbol": "KQ.m@CZCE.SA", + "duration_seconds": 900, + "roll_over_mode": True, + "history_length": 1000, + # 支持Python对象 + "close_bar_delta": __import__('datetime').timedelta(minutes=58) + }, + + "strategy_params": { + 'main_symbol': 'SA', # <-- 替换为你的交易品种代码,例如 'GC=F' (黄金期货), 'ZC=F' (玉米期货) + 'trade_volume': 1, + 'model_indicator': ZScoreATR(14, 100, 0.5, 3), + 'spectral_window_days': 8, # STFT窗口大小(天) + 'low_freq_days': 8, # 低频下限(天) + 'high_freq_days': 4, # 高频上限(天) + 'trend_strength_threshold': 0.7, # 相变临界值 + 'exit_threshold': 0.3, # 退出阈值 + 'enable_log': True + } +} \ No newline at end of file diff --git a/strategy_manager/strategies/TestConnectionStrategy/FG.config b/strategy_manager/strategies/TestConnectionStrategy/FG.config deleted file mode 100644 index 3e14983..0000000 --- a/strategy_manager/strategies/TestConnectionStrategy/FG.config +++ /dev/null @@ -1,21 +0,0 @@ -{ - "name": "test策略", - "version": "1.0", - "enabled": true, - - "strategy_class": "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy", - - "engine_params": { - "symbol": "KQ.m@CZCE.FG", - "duration_seconds": 900, - "roll_over_mode": true, - "history_length": 1000, - "close_bar_delta": {"minutes": 58} - }, - - "strategy_params": { - "main_symbol": "FG", - "trade_volume": 2, - "enable_log": true - } -} \ No newline at end of file diff --git a/strategy_manager/strategies/TestConnectionStrategy/FG.py b/strategy_manager/strategies/TestConnectionStrategy/FG.py new file mode 100644 index 0000000..8de90e5 --- /dev/null +++ b/strategy_manager/strategies/TestConnectionStrategy/FG.py @@ -0,0 +1,23 @@ +# 策略配置(Python格式) +CONFIG = { + "name": "玻璃双模式趋势线策略", + "version": "1.0", + "enabled": True, + + "strategy_class": "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy", + + "engine_params": { + "symbol": "KQ.m@CZCE.FG", + "duration_seconds": 900, + "roll_over_mode": True, + "history_length": 1000, + # 支持Python对象 + "close_bar_delta": __import__('datetime').timedelta(minutes=58) + }, + + "strategy_params": { + "main_symbol": "FG", + "trade_volume": 2, + "enable_log": True, + } +} \ No newline at end of file diff --git a/strategy_manager/web_backend.py b/strategy_manager/web_backend.py new file mode 100644 index 0000000..f10a69d --- /dev/null +++ b/strategy_manager/web_backend.py @@ -0,0 +1,144 @@ +from fastapi import FastAPI, HTTPException, Query +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from pathlib import Path +import logging +from collections import deque + +# 引入调度器 +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger + +# 复用现有manager +from core.manager import StrategyManager + +# ================== 初始化 ================== +app = FastAPI(title="策略控制台") +manager = StrategyManager() + +# 初始化调度器 +scheduler = AsyncIOScheduler() + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# ================== 定时任务逻辑 ================== + +def scheduled_restart_task(): + """ + 定时任务:重启所有正在运行的策略 + """ + logger.info("⏰ [定时任务] 触发自动重启流程...") + + # 获取当前所有策略状态 + status = manager.get_status() + running_strategies = [ + name for name, info in status['strategies'].items() + if info['status'] == 'running' + ] + + if not running_strategies: + logger.info("⏰ [定时任务] 当前无运行中的策略,无需重启") + return + + logger.info(f"⏰ [定时任务] 即将重启以下策略: {running_strategies}") + + for name in running_strategies: + try: + # 调用 manager 的重启逻辑 (包含 stop -> sleep -> start) + manager.restart_strategy(name) + logger.info(f"✅ [定时任务] {name} 重启成功") + except Exception as e: + logger.error(f"❌ [定时任务] {name} 重启失败: {e}") + + logger.info("⏰ [定时任务] 自动重启流程结束") + + +# ================== FastAPI 事件钩子 ================== + +@app.on_event("startup") +async def start_scheduler(): + """服务启动时,加载定时任务""" + # 任务 1: 每天 08:55 + scheduler.add_job( + scheduled_restart_task, + CronTrigger(hour=8, minute=55), + id="restart_morning", + replace_existing=True + ) + + # 任务 2: 每天 20:55 + scheduler.add_job( + scheduled_restart_task, + CronTrigger(hour=20, minute=55), + id="restart_evening", + replace_existing=True + ) + + scheduler.start() + logger.info("📅 定时任务调度器已启动 (计划时间: 08:55, 20:55)") + + +@app.on_event("shutdown") +async def stop_scheduler(): + """服务关闭时停止调度器""" + scheduler.shutdown() + + +# ================== 原有 REST API (保持不变) ================== + +@app.get("/api/status") +def get_status(): + return manager.get_status() + + +@app.post("/api/strategy/{name}/start") +def start_strategy(name: str): + if manager.start_strategy(name): + return {"success": True} + raise HTTPException(400, "启动失败") + + +@app.post("/api/strategy/{name}/stop") +def stop_strategy(name: str): + if manager.stop_strategy(name): + return {"success": True} + raise HTTPException(400, "停止失败") + + +@app.post("/api/strategy/{name}/restart") +def restart_strategy(name: str): + # 修复了之前提到的返回值问题 + if manager.restart_strategy(name): + return {"success": True} + raise HTTPException(400, "重启失败,请检查日志") + + +@app.get("/api/logs/{name}") +def get_logs(name: str, lines: int = Query(50, le=500)): + try: + if '_' not in name: + return {"lines": []} + strategy_name, symbol = name.split('_', 1) + log_file = Path(f"logs/{strategy_name}/{symbol}.log") + + if not log_file.exists(): + return {"lines": ["日志文件尚未生成"]} + + # 修复编码和读取 + with open(log_file, 'r', encoding='utf-8', errors='replace') as f: + last_lines = deque(f, maxlen=lines) + return {"lines": [line.rstrip() for line in last_lines]} + except Exception as e: + raise HTTPException(500, f"读取日志失败: {e}") + + +# ================== 静态文件挂载 ================== +app.mount("/static", StaticFiles(directory="frontend/dist"), name="static") + + +@app.get("/") +def serve_frontend(): + return FileResponse("frontend/dist/index.html") \ No newline at end of file