From 218ca5f533bbae0222200e336832d96abbfac626 Mon Sep 17 00:00:00 2001
From: liaozhaorun <1300336796@qq.com>
Date: Fri, 21 Nov 2025 16:08:03 +0800
Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Eweb=E7=95=8C=E9=9D=A2?=
=?UTF-8?q?=E7=AE=A1=E7=90=86=E7=AD=96=E7=95=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 5 +-
.../SA/Spectral/SpectralTrendStrategy.ipynb | 18 +-
.../SA/Spectral/SpectralTrendStrategy.py | 9 +-
strategy_manager/core/manager.py | 218 +++++++++++----
strategy_manager/frontend/dist/index.html | 253 ++++++++++++++++++
strategy_manager/launcher.py | 100 +++++--
strategy_manager/restart_daemon.py | 49 ++--
.../strategies/SpectralTrendStrategy/SA.py | 31 +++
.../TestConnectionStrategy/FG.config | 21 --
.../strategies/TestConnectionStrategy/FG.py | 23 ++
strategy_manager/web_backend.py | 144 ++++++++++
11 files changed, 747 insertions(+), 124 deletions(-)
create mode 100644 strategy_manager/frontend/dist/index.html
create mode 100644 strategy_manager/strategies/SpectralTrendStrategy/SA.py
delete mode 100644 strategy_manager/strategies/TestConnectionStrategy/FG.config
create mode 100644 strategy_manager/strategies/TestConnectionStrategy/FG.py
create mode 100644 strategy_manager/web_backend.py
diff --git a/.gitignore b/.gitignore
index 07bdb8a..90b812f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,5 +5,6 @@ data/data/
logs/
pids/
-status/
-status.json
\ No newline at end of file
+states/
+status.json
+/strategy_manager/states/
diff --git a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb
index 06edbf3..d3b2022 100644
--- a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb
+++ b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.ipynb
@@ -6,8 +6,8 @@
"metadata": {
"collapsed": true,
"ExecuteTime": {
- "end_time": "2025-11-19T07:13:51.818890Z",
- "start_time": "2025-11-19T07:13:50.827090Z"
+ "end_time": "2025-11-21T01:54:34.434731Z",
+ "start_time": "2025-11-21T01:54:31.509589Z"
}
},
"source": [
@@ -44,8 +44,8 @@
{
"metadata": {
"ExecuteTime": {
- "end_time": "2025-11-19T07:13:51.840183Z",
- "start_time": "2025-11-19T07:13:51.822905Z"
+ "end_time": "2025-11-21T01:54:34.537950Z",
+ "start_time": "2025-11-21T01:54:34.445969Z"
}
},
"cell_type": "code",
@@ -79,8 +79,8 @@
{
"metadata": {
"ExecuteTime": {
- "end_time": "2025-11-19T07:14:58.236155Z",
- "start_time": "2025-11-19T07:13:51.846812Z"
+ "end_time": "2025-11-21T01:55:38.643534Z",
+ "start_time": "2025-11-21T01:54:34.551413Z"
}
},
"cell_type": "code",
@@ -148,7 +148,7 @@
"\n",
"初始化回测引擎...\n",
"模拟器初始化:初始资金=100000.00, 滑点率=0.0, 佣金率=0.0001\n",
- "内存仓储已初始化,管理ID: 'futures_trading_strategies.SA.Spectral.SpectralTrendStrategy.SpectralTrendStrategy_228362dc13ea83accde90203f5f5434e'\n",
+ "内存仓储已初始化,管理ID: 'futures_trading_strategies.SA.Spectral.SpectralTrendStrategy.SpectralTrendStrategy_f774488b98ac53758169305e51fd8595'\n",
"\n",
"--- 回测引擎初始化完成 ---\n",
" 策略: SpectralTrendStrategy\n",
@@ -196,8 +196,8 @@
{
"metadata": {
"ExecuteTime": {
- "end_time": "2025-11-19T07:15:05.968736Z",
- "start_time": "2025-11-19T07:14:58.277441Z"
+ "end_time": "2025-11-21T01:55:46.179638Z",
+ "start_time": "2025-11-21T01:55:38.675521Z"
}
},
"cell_type": "code",
diff --git a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py
index f07d033..642f31a 100644
--- a/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py
+++ b/futures_trading_strategies/SA/Spectral/SpectralTrendStrategy.py
@@ -123,10 +123,11 @@ class SpectralTrendStrategy(Strategy):
return
# 核心逻辑:相变入场/退出
- if position_volume == 0:
- self.evaluate_entry_signal(open_price, trend_strength, dominant_freq)
- else:
- self.manage_open_position(position_volume, trend_strength, dominant_freq)
+ if self.trading:
+ if position_volume == 0:
+ self.evaluate_entry_signal(open_price, trend_strength, dominant_freq)
+ else:
+ self.manage_open_position(position_volume, trend_strength, dominant_freq)
def calculate_trend_strength(self, prices: np.array) -> (float, float):
"""
diff --git a/strategy_manager/core/manager.py b/strategy_manager/core/manager.py
index 6dfc61d..8e81de3 100644
--- a/strategy_manager/core/manager.py
+++ b/strategy_manager/core/manager.py
@@ -1,12 +1,14 @@
import os
import sys
-import json
import time
import psutil
import subprocess
+import logging
from pathlib import Path
from typing import Dict, Any, List
from datetime import datetime
+import importlib.util
+import json # 确保导入json模块
# ==================== 动态路径配置 ====================
from core.path_utils import add_project_root_to_path
@@ -29,12 +31,32 @@ class StrategyManager:
self.logs_dir.mkdir(exist_ok=True)
self.pid_dir.mkdir(exist_ok=True)
+ # 配置管理器日志
+ self._setup_logger()
+
self.strategies: Dict[str, Dict[str, Any]] = {}
+ self.logger.info("🔄 正在加载策略配置...")
self.load_strategies()
+ self.logger.info("✅ 策略加载完成,共发现 %d 个策略", len(self.strategies))
+
+ def _setup_logger(self):
+ """配置管理器日志"""
+ log_file = self.logs_dir / "manager.log"
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s [%(levelname)s] %(message)s',
+ handlers=[
+ logging.FileHandler(log_file, encoding='utf-8'),
+ logging.StreamHandler(sys.stdout)
+ ],
+ force=True
+ )
+ self.logger = logging.getLogger("StrategyManager")
def _load_main_config(self, config_path: str) -> Dict[str, Any]:
path = Path(config_path)
if not path.exists():
+ self.logger.warning("⚠️ 主配置文件不存在,使用默认配置")
return {
"logs_dir": "logs",
"status_file": "status.json",
@@ -44,16 +66,23 @@ class StrategyManager:
return json.load(f)
def load_strategies(self):
- """递归扫描 strategies/ 目录,查找 .config 文件"""
+ """递归扫描 strategies/ 目录,查找 .py 配置文件"""
self.strategies = {}
if not self.strategies_dir.exists():
- print("[ERROR] 策略配置目录不存在: {}".format(self.strategies_dir))
+ self.logger.error("❌ 策略配置目录不存在: %s", self.strategies_dir)
return
- for config_file in self.strategies_dir.rglob("*.config"):
+ for config_file in self.strategies_dir.rglob("*.py"):
try:
- with open(config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
+ spec = importlib.util.spec_from_file_location(
+ f"config_{config_file.stem}", config_file
+ )
+ config_module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(config_module)
+
+ if not hasattr(config_module, 'CONFIG'):
+ raise ValueError("配置文件缺少 CONFIG 变量")
+ config = config_module.CONFIG
required = ['name', 'strategy_class', 'enabled', 'engine_params', 'strategy_params']
for field in required:
@@ -75,8 +104,9 @@ class StrategyManager:
"started_at": None,
"uptime": None
}
+ self.logger.info("📄 加载配置: %s", config_file)
except Exception as e:
- print("[ERROR] 加载配置失败 {}: {}".format(config_file, e))
+ self.logger.error("❌ 加载配置失败 %s: %s", config_file, e, exc_info=True)
def get_status(self) -> Dict[str, Any]:
"""获取完整状态"""
@@ -89,12 +119,7 @@ class StrategyManager:
}
def _refresh_status(self):
- """
- 刷新进程状态 - 双重验证
- 1. 检查PID文件是否存在
- 2. 检查进程是否存在
- 3. 验证进程名是否为python(防止PID复用)
- """
+ """刷新进程状态 - 双重验证"""
for name, info in self.strategies.items():
pid_file = self.pid_dir / "{}.pid".format(name)
@@ -103,28 +128,23 @@ class StrategyManager:
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
- # 双重验证
if psutil.pid_exists(pid):
try:
proc = psutil.Process(pid)
- # 验证进程名是否包含python
if "python" in proc.name().lower():
- # 验证成功,更新为运行中
info["status"] = "running"
info["pid"] = pid
if info["started_at"]:
started = datetime.fromisoformat(info["started_at"])
uptime = datetime.now() - started
info["uptime"] = str(uptime).split('.')[0]
- continue # 跳过清理逻辑
+ continue
except (psutil.NoSuchProcess, psutil.AccessDenied):
- # 进程已死或无权访问,继续清理
pass
- # PID不存在或验证失败,清理
self._cleanup_stopped_strategy(name, pid_file)
except Exception as e:
- print("[WARNING] 刷新状态失败 {}: {}".format(name, e))
+ self.logger.warning("⚠️ 刷新状态失败 %s: %s", name, e)
self._cleanup_stopped_strategy(name, pid_file)
else:
info["status"] = "stopped"
@@ -133,13 +153,8 @@ class StrategyManager:
info["uptime"] = None
def _is_running(self, name: str) -> bool:
- """
- 检查策略是否运行中 - 实时刷新状态
- 确保与status命令结果一致
- """
- # 先刷新状态确保最新
+ """检查策略是否运行中"""
self._refresh_status()
-
info = self.strategies[name]
if not info["pid"]:
return False
@@ -152,7 +167,6 @@ class StrategyManager:
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
- # 双重验证
if psutil.pid_exists(pid):
try:
proc = psutil.Process(pid)
@@ -163,15 +177,74 @@ class StrategyManager:
except:
return False
+ def start_strategy(self, name: str) -> bool:
+ """启动单个策略(守护进程模式)"""
+ if name not in self.strategies:
+ self.logger.error("❌ 策略不存在: %s", name)
+ return False
+
+ if self._is_running(name):
+ self.logger.warning("⚠️ 策略已在运行: %s", name)
+ return False
+
+ info = self.strategies[name]
+ config_file = Path(info["config_file"])
+
+ # 创建策略专属日志目录
+ strategy_log_dir = self.logs_dir / info["strategy_name"]
+ strategy_log_dir.mkdir(exist_ok=True, parents=True)
+ log_file = strategy_log_dir / "{}.log".format(info["symbol"])
+
+ self.logger.info("\n" + "=" * 80)
+ self.logger.info("🚀 启动策略: %s", name)
+ self.logger.info("📄 配置文件: %s", config_file)
+ self.logger.info("📝 日志文件: %s", log_file)
+
+ try:
+ # 启动子进程 - 关键修改:脱离终端会话
+ with open(log_file, 'a') as f:
+ # 使用 start_new_session=True 创建新会话,防止终端关闭影响
+ # stdin 重定向到 /dev/null,完全脱离终端
+ with open(os.devnull, 'r') as devnull:
+ process = subprocess.Popen(
+ [sys.executable, "launcher.py", "--config", str(config_file)],
+ stdout=f,
+ stderr=subprocess.STDOUT,
+ stdin=devnull, # 关键:从/dev/null读取输入
+ # start_new_session=True, # 关键:创建新会话,脱离终端
+ cwd=Path.cwd()
+ )
+
+ # 更新状态
+ info["pid"] = process.pid
+ info["status"] = "running"
+ info["started_at"] = datetime.now().isoformat()
+ info["uptime"] = "00:00:00"
+
+ # 保存PID文件
+ pid_file = self.pid_dir / "{}.pid".format(name)
+ with open(pid_file, 'w') as f:
+ f.write(str(process.pid))
+
+ self._save_status()
+ self.logger.info("✅ 启动成功! PID: %d", process.pid)
+ self.logger.info("ℹ️ 该进程已脱离终端会话,关闭窗口不会停止策略")
+ self.logger.info("=" * 80)
+ return True
+
+ except Exception as e:
+ self.logger.error("❌ 启动失败: %s", e, exc_info=True)
+ self._cleanup_stopped_strategy(name, self.pid_dir / "{}.pid".format(name))
+ return False
+
def stop_strategy(self, name: str, timeout: int = 30) -> bool:
"""停止单个策略"""
if name not in self.strategies:
- print("[ERROR] 策略不存在: {}".format(name))
+ self.logger.error("❌ 策略不存在: %s", name)
return False
- # 再次检查状态(确保最新)
if not self._is_running(name):
- print("[WARNING] 策略未运行: {}".format(name))
+ self.logger.warning("⚠️ 策略未运行: %s", name)
return False
info = self.strategies[name]
@@ -180,40 +253,43 @@ class StrategyManager:
pid = info["pid"]
process = psutil.Process(pid)
- print("\n[INFO] 正在停止: {} (PID: {})...".format(name, pid))
+ self.logger.info("\n" + "=" * 80)
+ self.logger.info("⏹️ 正在停止策略: %s (PID: %d)", name, pid)
# 优雅终止
process.terminate()
try:
process.wait(timeout=timeout)
- print("[SUCCESS] 已停止: {}".format(name))
+ self.logger.info("✅ 已优雅停止: %s", name)
except psutil.TimeoutExpired:
- print("[WARNING] 超时,强制终止: {}".format(name))
+ self.logger.warning("⏱️ 超时,强制终止: %s", name)
process.kill()
process.wait()
# 清理状态
self._cleanup_stopped_strategy(name, self.pid_dir / "{}.pid".format(name))
self._save_status()
+ self.logger.info("=" * 80)
return True
except Exception as e:
- print("[ERROR] 停止失败 {}: {}".format(name, e))
+ self.logger.error("❌ 停止失败 %s: %s", name, e, exc_info=True)
return False
def restart_strategy(self, name: str) -> bool:
"""重启策略"""
- print("\n[INFO] 正在重启: {}".format(name))
+ self.logger.info("\n" + "=" * 80)
+ self.logger.info("🔄 正在重启: %s", name)
self.stop_strategy(name)
time.sleep(2)
return self.start_strategy(name)
def start_all(self):
"""启动所有启用的策略"""
- print("\n" + "=" * 100)
- print("正在启动所有启用的策略...")
- print("=" * 100)
+ self.logger.info("\n" + "=" * 100)
+ self.logger.info("🚀 正在启动所有启用的策略...")
+ self.logger.info("=" * 100)
started = []
for name, info in self.strategies.items():
@@ -221,15 +297,15 @@ class StrategyManager:
if self.start_strategy(name):
started.append(name)
- print("\n[SUCCESS] 成功启动 {} 个策略".format(len(started)))
+ self.logger.info("\n✅ 成功启动 %d 个策略", len(started))
if started:
- print("策略: {}".format(", ".join(started)))
+ self.logger.info("📋 策略: %s", ", ".join(started))
def stop_all(self):
"""停止所有运行的策略"""
- print("\n" + "=" * 100)
- print("正在停止所有运行的策略...")
- print("=" * 100)
+ self.logger.info("\n" + "=" * 100)
+ self.logger.info("⏹️ 正在停止所有运行的策略...")
+ self.logger.info("=" * 100)
stopped = []
for name in self.strategies.keys():
@@ -237,9 +313,9 @@ class StrategyManager:
if self.stop_strategy(name):
stopped.append(name)
- print("\n[SUCCESS] 成功停止 {} 个策略".format(len(stopped)))
+ self.logger.info("\n✅ 成功停止 %d 个策略", len(stopped))
if stopped:
- print("策略: {}".format(", ".join(stopped)))
+ self.logger.info("📋 策略: %s", ", ".join(stopped))
def _cleanup_stopped_strategy(self, name: str, pid_file: Path):
"""清理已停止的策略状态"""
@@ -251,10 +327,25 @@ class StrategyManager:
info["uptime"] = None
def _save_status(self):
- """状态持久化"""
- status = self.get_status()
- with open(self.status_file, 'w') as f:
- json.dump(status, f, indent=2, ensure_ascii=False)
+ """状态持久化(修复:排除不可序列化的config字段)"""
+ try:
+ status = self.get_status()
+
+ # 创建JSON安全的版本(排除config字段,因为它可能包含timedelta等不可序列化对象)
+ status_for_json = status.copy()
+ status_for_json["strategies"] = {}
+
+ for name, info in status["strategies"].items():
+ # 复制所有字段除了config
+ strategy_info = {k: v for k, v in info.items() if k != "config"}
+ status_for_json["strategies"][name] = strategy_info
+
+ with open(self.status_file, 'w') as f:
+ json.dump(status_for_json, f, indent=2, ensure_ascii=False)
+
+ self.logger.debug("💾 状态已保存到 %s", self.status_file)
+ except Exception as e:
+ self.logger.error("❌ 保存状态失败: %s", e, exc_info=True)
def print_status_table(status: Dict[str, Any]):
@@ -284,4 +375,31 @@ def print_status_table(status: Dict[str, Any]):
name, info['config']['name'], status_text, pid_text, uptime_text, started_text
))
- print("=" * 130)
\ No newline at end of file
+ print("=" * 130)
+
+
+if __name__ == "__main__":
+ manager = StrategyManager()
+
+ if len(sys.argv) > 1:
+ command = sys.argv[1]
+ if command == "status":
+ print_status_table(manager.get_status())
+ elif command == "start-all":
+ manager.start_all()
+ elif command == "stop-all":
+ manager.stop_all()
+ elif command.startswith("start:"):
+ name = command.split(":", 1)[1]
+ manager.start_strategy(name)
+ elif command.startswith("stop:"):
+ name = command.split(":", 1)[1]
+ manager.stop_strategy(name)
+ elif command.startswith("restart:"):
+ name = command.split(":", 1)[1]
+ manager.restart_strategy(name)
+ else:
+ print("未知命令:", command)
+ print("用法: python manager.py [status|start-all|stop-all|start:NAME|stop:NAME|restart:NAME]")
+ else:
+ print("用法: python manager.py [status|start-all|stop-all|start:NAME|stop:NAME|restart:NAME]")
\ No newline at end of file
diff --git a/strategy_manager/frontend/dist/index.html b/strategy_manager/frontend/dist/index.html
new file mode 100644
index 0000000..dae6a84
--- /dev/null
+++ b/strategy_manager/frontend/dist/index.html
@@ -0,0 +1,253 @@
+
+
+
+
+
+ 策略控制台
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/strategy_manager/launcher.py b/strategy_manager/launcher.py
index 082861c..db8e810 100644
--- a/strategy_manager/launcher.py
+++ b/strategy_manager/launcher.py
@@ -1,9 +1,8 @@
import sys
-import json
import signal
-from datetime import timedelta
from pathlib import Path
-import importlib
+import importlib.util
+import logging # 新增:日志模块
# ==================== 动态路径配置 ====================
from core.path_utils import add_project_root_to_path
@@ -19,7 +18,6 @@ print(f"[INFO] Python路径: {sys.path[:3]}")
def load_strategy_class(class_path: str):
"""动态加载策略类"""
try:
- # class_path: "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy"
module_path, class_name = class_path.rsplit('.', 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
@@ -30,26 +28,96 @@ def load_strategy_class(class_path: str):
sys.exit(1)
+def setup_strategy_logger(config_file: Path, strategy_name: str, symbol: str) -> logging.Logger:
+ """配置策略专属日志"""
+ # 创建日志目录: logs/{策略名}/
+ log_dir = Path("logs") / strategy_name
+ log_dir.mkdir(exist_ok=True, parents=True)
+
+ # 日志文件: logs/{策略名}/{品种}.log
+ log_file = log_dir / f"{symbol}.log"
+
+ # 配置logger
+ logger = logging.getLogger(f"Strategy.{strategy_name}.{symbol}")
+ logger.setLevel(logging.INFO)
+
+ # 清除旧handler(防止重复)
+ for h in logger.handlers[:]:
+ logger.removeHandler(h)
+
+ # 文件handler
+ file_handler = logging.FileHandler(log_file, encoding='utf-8')
+ file_handler.setFormatter(
+ logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
+ )
+ logger.addHandler(file_handler)
+
+ return logger, log_file
+
+
def run_strategy(config_path: str):
"""通过配置文件运行策略"""
- # 1. 加载配置
- with open(config_path, 'r', encoding='utf-8') as f:
- config = json.load(f)
+ config_file = Path(config_path)
+ if not config_file.exists():
+ print(f"[ERROR] 配置文件不存在: {config_file}")
+ sys.exit(1)
+ # 动态加载配置模块
+ try:
+ spec = importlib.util.spec_from_file_location(
+ f"strategy_config_{config_file.stem}", config_file
+ )
+ config_module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(config_module)
+
+ if not hasattr(config_module, 'CONFIG'):
+ print(f"[ERROR] 配置文件缺少 CONFIG 变量: {config_file}")
+ sys.exit(1)
+
+ config = config_module.CONFIG
+ except Exception as e:
+ print(f"[ERROR] 加载配置文件失败 {config_path}: {e}")
+ sys.exit(1)
+
+ # 配置策略专属日志(关键修改点)
+ strategy_name = config_file.parent.name
+ symbol = config_file.stem
+ logger, log_file = setup_strategy_logger(config_file, strategy_name, symbol)
+
+ # 重定向print到logger(捕获策略内的print调用)
+ class PrintToLogger:
+ def __init__(self, logger, level=logging.INFO):
+ self.logger = logger
+ self.level = level
+ self.linebuf = ''
+
+ def write(self, buf):
+ for line in buf.rstrip().splitlines():
+ self.logger.log(self.level, line.rstrip())
+
+ def flush(self):
+ pass
+
+ # 重定向stdout和stderr
+ sys.stdout = PrintToLogger(logger, logging.INFO)
+ sys.stderr = PrintToLogger(logger, logging.ERROR)
+
+ # 所有后续的print都会写入日志文件
print(f"[INFO] [{config['name']}] 正在启动...")
+ print(f"[INFO] 日志文件: {log_file}")
- # 2. 动态加载策略类
+ # 动态加载策略类
strategy_class = load_strategy_class(config["strategy_class"])
- # 3. 创建API
+ # 创建API
from tqsdk import TqApi, TqAuth, TqKq
api = TqApi(TqKq(), auth=TqAuth("emanresu", "dfgvfgdfgg"))
- # 4. 准备策略参数
+ # 准备策略参数
strategy_params = config["strategy_params"].copy()
strategy_params["main_symbol"] = config["engine_params"]["symbol"].split(".")[-1]
- # 5. 创建引擎
+ # 创建引擎
from src.tqsdk_real_engine import TqsdkEngine
engine = TqsdkEngine(
@@ -60,24 +128,24 @@ def run_strategy(config_path: str):
duration_seconds=config["engine_params"]["duration_seconds"],
roll_over_mode=config["engine_params"]["roll_over_mode"],
history_length=config["engine_params"]["history_length"],
- close_bar_delta=timedelta(**config["engine_params"]["close_bar_delta"])
+ close_bar_delta=config["engine_params"]["close_bar_delta"]
)
- # 6. 信号处理
+ # 信号处理
def signal_handler(sig, frame):
print(f"\n[INFO] [{config['name']}] 收到停止信号 {sig},正在关闭...")
- api.close()
+ # api.close()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
- # 7. 运行
+ # 运行
try:
print(f"[INFO] [{config['name']}] 开始运行")
engine.run()
except Exception as e:
- print(f"[ERROR] [{config['name']}] 运行出错: {e}")
+ print(f"[ERROR] [{config['name']}] 运行出错: {e}", exc_info=True)
sys.exit(1)
finally:
api.close()
diff --git a/strategy_manager/restart_daemon.py b/strategy_manager/restart_daemon.py
index bb0e4be..f3121b5 100644
--- a/strategy_manager/restart_daemon.py
+++ b/strategy_manager/restart_daemon.py
@@ -29,10 +29,10 @@ class RestartDaemon:
# 确保目录存在
self.pid_dir.mkdir(exist_ok=True)
+ self.log_dir.mkdir(exist_ok=True) # 确保日志目录存在
def _setup_logger(self):
"""配置日志"""
- self.log_dir.mkdir(exist_ok=True)
log_file = self.log_dir / "restart_daemon.log"
logging.basicConfig(
@@ -41,9 +41,14 @@ class RestartDaemon:
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
- ]
+ ],
+ force=True # 防止日志配置冲突
)
- return logging.getLogger("RestartDaemon")
+ logger = logging.getLogger("RestartDaemon")
+ logger.info("=" * 80)
+ logger.info("📝 日志系统初始化完成")
+ logger.info("📂 日志文件: %s", log_file.absolute())
+ return logger
def start(self):
"""启动守护进程"""
@@ -57,8 +62,8 @@ class RestartDaemon:
self.logger.info("=" * 80)
self.logger.info("✅ 重启守护进程已启动")
- self.logger.info("⏰ 监控时间点: {}".format(", ".join(self.RESTART_TIMES)))
- self.logger.info("📂 PID目录: {}".format(self.pid_dir.absolute()))
+ self.logger.info("⏰ 监控时间点: %s", ", ".join(self.RESTART_TIMES))
+ self.logger.info("📂 PID目录: %s", self.pid_dir.absolute())
self.logger.info("=" * 80)
# 主线程阻塞(保持进程运行)
@@ -97,14 +102,14 @@ class RestartDaemon:
time.sleep(60) # 每分钟检查一次
except Exception as e:
- self.logger.error("❌ 检查循环出错: {}".format(e))
+ self.logger.error("❌ 检查循环出错: %s", e, exc_info=True)
self.logger.error("=" * 80)
time.sleep(60) # 出错后等待1分钟继续
def _perform_restart(self, time_point: str):
"""执行重启"""
self.logger.info("\n" + "=" * 80)
- self.logger.info("⏰ 到达重启时间: {}".format(time_point))
+ self.logger.info("⏰ 到达重启时间: %s", time_point)
self.logger.info("=" * 80)
# 1. 扫描所有PID文件
@@ -113,7 +118,7 @@ class RestartDaemon:
self.logger.info("⚠️ 未发现运行中的策略")
return
- self.logger.info("📋 发现 {} 个策略需要重启".format(len(pid_files)))
+ self.logger.info("📋 发现 %d 个策略需要重启", len(pid_files))
# 2. 停止所有策略
stopped_count = 0
@@ -124,21 +129,21 @@ class RestartDaemon:
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
- self.logger.info("⏹️ 停止策略 PID {}: {}".format(pid, proc.name()))
+ self.logger.info("⏹️ 停止策略 PID %d: %s", pid, proc.name())
proc.terminate()
try:
proc.wait(timeout=30)
- self.logger.info("✅ 已优雅停止 PID {}".format(pid))
+ self.logger.info("✅ 已优雅停止 PID %d", pid)
stopped_count += 1
except psutil.TimeoutExpired:
proc.kill()
- self.logger.info("🔥 强制终止 PID {}".format(pid))
+ self.logger.info("🔥 强制终止 PID %d", pid)
stopped_count += 1
else:
- self.logger.warning("⚠️ PID文件存在但进程已死: {}".format(pid))
+ self.logger.warning("⚠️ PID文件存在但进程已死: %d", pid)
except Exception as e:
- self.logger.error("❌ 停止失败 {}: {}".format(pid_file, e))
+ self.logger.error("❌ 停止失败 %s: %s", pid_file, e, exc_info=True)
if stopped_count == 0:
self.logger.warning("⚠️ 未成功停止任何策略")
@@ -154,38 +159,38 @@ class RestartDaemon:
for pid_file in pid_files:
try:
# 从PID文件名推导配置路径
- # DualModeTrendlineHawkesStrategy2_FG.pid -> strategies/DualModeTrendlineHawkesStrategy2/FG.config
+ # DualModeTrendlineHawkesStrategy2_FG.pid -> strategies/DualModeTrendlineHawkesStrategy2/FG.py
name = pid_file.stem
if '_' not in name:
- self.logger.error("❌ PID文件名格式错误: {}".format(name))
+ self.logger.error("❌ PID文件名格式错误: %s", name)
continue
strategy_name, symbol = name.split('_', 1)
- config_file = Path("strategies") / strategy_name / "{}.config".format(symbol)
+ config_file = Path("strategies") / strategy_name / "{}.py".format(symbol)
if not config_file.exists():
- self.logger.error("❌ 配置文件不存在: {}".format(config_file))
+ self.logger.error("❌ 配置文件不存在: %s", config_file)
continue
# 启动新进程(不阻塞,立即返回)
process = subprocess.Popen(
[sys.executable, "launcher.py", "--config", str(config_file)],
- stdout=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL, # launcher内会自行处理日志
stderr=subprocess.DEVNULL,
cwd=Path.cwd()
)
- self.logger.info("✅ 启动新进程 PID {}: {}".format(process.pid, config_file.name))
+ self.logger.info("✅ 启动新进程 PID %d: %s", process.pid, config_file.name)
restarted_count += 1
except Exception as e:
- self.logger.error("❌ 启动失败: {}".format(e))
+ self.logger.error("❌ 启动失败: %s", e, exc_info=True)
# 5. 统计结果
self.logger.info("\n" + "=" * 80)
self.logger.info("📊 重启统计:")
- self.logger.info(" 停止成功: {}个".format(stopped_count))
- self.logger.info(" 启动成功: {}个".format(restarted_count))
+ self.logger.info(" 停止成功: %d个", stopped_count)
+ self.logger.info(" 启动成功: %d个", restarted_count)
if stopped_count == restarted_count and stopped_count > 0:
self.logger.info("✅ 所有策略重启成功")
diff --git a/strategy_manager/strategies/SpectralTrendStrategy/SA.py b/strategy_manager/strategies/SpectralTrendStrategy/SA.py
new file mode 100644
index 0000000..474924e
--- /dev/null
+++ b/strategy_manager/strategies/SpectralTrendStrategy/SA.py
@@ -0,0 +1,31 @@
+# 策略配置(Python格式)
+from src.indicators.indicators import ZScoreATR
+
+CONFIG = {
+ "name": "傅里叶趋势策略",
+ "version": "1.0",
+ "enabled": True,
+
+ "strategy_class": "futures_trading_strategies.SA.Spectral.SpectralTrendStrategy.SpectralTrendStrategy",
+
+ "engine_params": {
+ "symbol": "KQ.m@CZCE.SA",
+ "duration_seconds": 900,
+ "roll_over_mode": True,
+ "history_length": 1000,
+ # 支持Python对象
+ "close_bar_delta": __import__('datetime').timedelta(minutes=58)
+ },
+
+ "strategy_params": {
+ 'main_symbol': 'SA', # <-- 替换为你的交易品种代码,例如 'GC=F' (黄金期货), 'ZC=F' (玉米期货)
+ 'trade_volume': 1,
+ 'model_indicator': ZScoreATR(14, 100, 0.5, 3),
+ 'spectral_window_days': 8, # STFT窗口大小(天)
+ 'low_freq_days': 8, # 低频下限(天)
+ 'high_freq_days': 4, # 高频上限(天)
+ 'trend_strength_threshold': 0.7, # 相变临界值
+ 'exit_threshold': 0.3, # 退出阈值
+ 'enable_log': True
+ }
+}
\ No newline at end of file
diff --git a/strategy_manager/strategies/TestConnectionStrategy/FG.config b/strategy_manager/strategies/TestConnectionStrategy/FG.config
deleted file mode 100644
index 3e14983..0000000
--- a/strategy_manager/strategies/TestConnectionStrategy/FG.config
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "name": "test策略",
- "version": "1.0",
- "enabled": true,
-
- "strategy_class": "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy",
-
- "engine_params": {
- "symbol": "KQ.m@CZCE.FG",
- "duration_seconds": 900,
- "roll_over_mode": true,
- "history_length": 1000,
- "close_bar_delta": {"minutes": 58}
- },
-
- "strategy_params": {
- "main_symbol": "FG",
- "trade_volume": 2,
- "enable_log": true
- }
-}
\ No newline at end of file
diff --git a/strategy_manager/strategies/TestConnectionStrategy/FG.py b/strategy_manager/strategies/TestConnectionStrategy/FG.py
new file mode 100644
index 0000000..8de90e5
--- /dev/null
+++ b/strategy_manager/strategies/TestConnectionStrategy/FG.py
@@ -0,0 +1,23 @@
+# 策略配置(Python格式)
+CONFIG = {
+ "name": "玻璃双模式趋势线策略",
+ "version": "1.0",
+ "enabled": True,
+
+ "strategy_class": "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy",
+
+ "engine_params": {
+ "symbol": "KQ.m@CZCE.FG",
+ "duration_seconds": 900,
+ "roll_over_mode": True,
+ "history_length": 1000,
+ # 支持Python对象
+ "close_bar_delta": __import__('datetime').timedelta(minutes=58)
+ },
+
+ "strategy_params": {
+ "main_symbol": "FG",
+ "trade_volume": 2,
+ "enable_log": True,
+ }
+}
\ No newline at end of file
diff --git a/strategy_manager/web_backend.py b/strategy_manager/web_backend.py
new file mode 100644
index 0000000..f10a69d
--- /dev/null
+++ b/strategy_manager/web_backend.py
@@ -0,0 +1,144 @@
+from fastapi import FastAPI, HTTPException, Query
+from fastapi.staticfiles import StaticFiles
+from fastapi.responses import FileResponse
+from pathlib import Path
+import logging
+from collections import deque
+
+# 引入调度器
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.cron import CronTrigger
+
+# 复用现有manager
+from core.manager import StrategyManager
+
+# ================== 初始化 ==================
+app = FastAPI(title="策略控制台")
+manager = StrategyManager()
+
+# 初始化调度器
+scheduler = AsyncIOScheduler()
+
+# 配置日志
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+# ================== 定时任务逻辑 ==================
+
+def scheduled_restart_task():
+ """
+ 定时任务:重启所有正在运行的策略
+ """
+ logger.info("⏰ [定时任务] 触发自动重启流程...")
+
+ # 获取当前所有策略状态
+ status = manager.get_status()
+ running_strategies = [
+ name for name, info in status['strategies'].items()
+ if info['status'] == 'running'
+ ]
+
+ if not running_strategies:
+ logger.info("⏰ [定时任务] 当前无运行中的策略,无需重启")
+ return
+
+ logger.info(f"⏰ [定时任务] 即将重启以下策略: {running_strategies}")
+
+ for name in running_strategies:
+ try:
+ # 调用 manager 的重启逻辑 (包含 stop -> sleep -> start)
+ manager.restart_strategy(name)
+ logger.info(f"✅ [定时任务] {name} 重启成功")
+ except Exception as e:
+ logger.error(f"❌ [定时任务] {name} 重启失败: {e}")
+
+ logger.info("⏰ [定时任务] 自动重启流程结束")
+
+
+# ================== FastAPI 事件钩子 ==================
+
+@app.on_event("startup")
+async def start_scheduler():
+ """服务启动时,加载定时任务"""
+ # 任务 1: 每天 08:55
+ scheduler.add_job(
+ scheduled_restart_task,
+ CronTrigger(hour=8, minute=55),
+ id="restart_morning",
+ replace_existing=True
+ )
+
+ # 任务 2: 每天 20:55
+ scheduler.add_job(
+ scheduled_restart_task,
+ CronTrigger(hour=20, minute=55),
+ id="restart_evening",
+ replace_existing=True
+ )
+
+ scheduler.start()
+ logger.info("📅 定时任务调度器已启动 (计划时间: 08:55, 20:55)")
+
+
+@app.on_event("shutdown")
+async def stop_scheduler():
+ """服务关闭时停止调度器"""
+ scheduler.shutdown()
+
+
+# ================== 原有 REST API (保持不变) ==================
+
+@app.get("/api/status")
+def get_status():
+ return manager.get_status()
+
+
+@app.post("/api/strategy/{name}/start")
+def start_strategy(name: str):
+ if manager.start_strategy(name):
+ return {"success": True}
+ raise HTTPException(400, "启动失败")
+
+
+@app.post("/api/strategy/{name}/stop")
+def stop_strategy(name: str):
+ if manager.stop_strategy(name):
+ return {"success": True}
+ raise HTTPException(400, "停止失败")
+
+
+@app.post("/api/strategy/{name}/restart")
+def restart_strategy(name: str):
+ # 修复了之前提到的返回值问题
+ if manager.restart_strategy(name):
+ return {"success": True}
+ raise HTTPException(400, "重启失败,请检查日志")
+
+
+@app.get("/api/logs/{name}")
+def get_logs(name: str, lines: int = Query(50, le=500)):
+ try:
+ if '_' not in name:
+ return {"lines": []}
+ strategy_name, symbol = name.split('_', 1)
+ log_file = Path(f"logs/{strategy_name}/{symbol}.log")
+
+ if not log_file.exists():
+ return {"lines": ["日志文件尚未生成"]}
+
+ # 修复编码和读取
+ with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
+ last_lines = deque(f, maxlen=lines)
+ return {"lines": [line.rstrip() for line in last_lines]}
+ except Exception as e:
+ raise HTTPException(500, f"读取日志失败: {e}")
+
+
+# ================== 静态文件挂载 ==================
+app.mount("/static", StaticFiles(directory="frontend/dist"), name="static")
+
+
+@app.get("/")
+def serve_frontend():
+ return FileResponse("frontend/dist/index.html")
\ No newline at end of file