Files
NewQuant/strategy_manager/launcher.py

160 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
import signal
from pathlib import Path
import importlib.util
import logging # 新增:日志模块
# ==================== 动态路径配置 ====================
from core.path_utils import add_project_root_to_path
# 添加项目根路径到sys.path
PROJECT_ROOT = add_project_root_to_path()
print(f"[INFO] 项目根路径: {PROJECT_ROOT}")
print(f"[INFO] Python路径: {sys.path[:3]}")
# ====================================================
def load_strategy_class(class_path: str):
"""动态加载策略类"""
try:
module_path, class_name = class_path.rsplit('.', 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
except Exception as e:
print(f"[ERROR] 加载策略类失败 {class_path}: {e}")
print(f"[ERROR] 请检查项目根路径是否正确: {PROJECT_ROOT}")
print(f"[ERROR] 当前sys.path: {sys.path}")
sys.exit(1)
def setup_strategy_logger(config_file: Path, strategy_name: str, symbol: str) -> logging.Logger:
"""配置策略专属日志"""
# 创建日志目录: logs/{策略名}/
log_dir = Path("logs") / strategy_name
log_dir.mkdir(exist_ok=True, parents=True)
# 日志文件: logs/{策略名}/{品种}.log
log_file = log_dir / f"{symbol}.log"
# 配置logger
logger = logging.getLogger(f"Strategy.{strategy_name}.{symbol}")
logger.setLevel(logging.INFO)
# 清除旧handler防止重复
for h in logger.handlers[:]:
logger.removeHandler(h)
# 文件handler
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(
logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
)
logger.addHandler(file_handler)
return logger, log_file
def run_strategy(config_path: str):
"""通过配置文件运行策略"""
config_file = Path(config_path)
if not config_file.exists():
print(f"[ERROR] 配置文件不存在: {config_file}")
sys.exit(1)
# 动态加载配置模块
try:
spec = importlib.util.spec_from_file_location(
f"strategy_config_{config_file.stem}", config_file
)
config_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config_module)
if not hasattr(config_module, 'CONFIG'):
print(f"[ERROR] 配置文件缺少 CONFIG 变量: {config_file}")
sys.exit(1)
config = config_module.CONFIG
except Exception as e:
print(f"[ERROR] 加载配置文件失败 {config_path}: {e}")
sys.exit(1)
# 配置策略专属日志(关键修改点)
strategy_name = config_file.parent.name
symbol = config_file.stem
logger, log_file = setup_strategy_logger(config_file, strategy_name, symbol)
# 重定向print到logger捕获策略内的print调用
class PrintToLogger:
def __init__(self, logger, level=logging.INFO):
self.logger = logger
self.level = level
self.linebuf = ''
def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.log(self.level, line.rstrip())
def flush(self):
pass
# 重定向stdout和stderr
sys.stdout = PrintToLogger(logger, logging.INFO)
sys.stderr = PrintToLogger(logger, logging.ERROR)
# 所有后续的print都会写入日志文件
print(f"[INFO] [{config['name']}] 正在启动...")
print(f"[INFO] 日志文件: {log_file}")
# 动态加载策略类
strategy_class = load_strategy_class(config["strategy_class"])
# 创建API
from tqsdk import TqApi, TqAuth, TqKq
api = TqApi(TqKq(), auth=TqAuth("emanresu", "dfgvfgdfgg"))
# 准备策略参数
strategy_params = config["strategy_params"].copy()
strategy_params["main_symbol"] = config["engine_params"]["symbol"].split(".")[-1]
# 创建引擎
from src.tqsdk_real_engine import TqsdkEngine
engine = TqsdkEngine(
strategy_class=strategy_class,
strategy_params=strategy_params,
api=api,
symbol=config["engine_params"]["symbol"],
duration_seconds=config["engine_params"]["duration_seconds"],
roll_over_mode=config["engine_params"]["roll_over_mode"],
history_length=config["engine_params"]["history_length"],
close_bar_delta=config["engine_params"]["close_bar_delta"]
)
# 信号处理
def signal_handler(sig, frame):
print(f"\n[INFO] [{config['name']}] 收到停止信号 {sig},正在关闭...")
# api.close()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# 运行
try:
print(f"[INFO] [{config['name']}] 开始运行")
engine.run()
except Exception as e:
print(f"[ERROR] [{config['name']}] 运行出错: {e}", exc_info=True)
sys.exit(1)
finally:
api.close()
print(f"[INFO] [{config['name']}] 已停止")
if __name__ == "__main__":
if len(sys.argv) != 3 or sys.argv[1] != "--config":
print("使用方法: python launcher.py --config <config_file>")
sys.exit(1)
run_strategy(sys.argv[2])