import sys import json import signal from datetime import timedelta from pathlib import Path import importlib # ==================== 动态路径配置 ==================== from core.path_utils import add_project_root_to_path # 添加项目根路径到sys.path PROJECT_ROOT = add_project_root_to_path() print(f"[INFO] 项目根路径: {PROJECT_ROOT}") print(f"[INFO] Python路径: {sys.path[:3]}") # ==================================================== def load_strategy_class(class_path: str): """动态加载策略类""" try: # class_path: "futures_trading_strategies.FG.TrendlineBreakoutStrategy.DualModeTrendlineHawkesStrategy2.DualModeTrendlineHawkesStrategy" module_path, class_name = class_path.rsplit('.', 1) module = importlib.import_module(module_path) return getattr(module, class_name) except Exception as e: print(f"[ERROR] 加载策略类失败 {class_path}: {e}") print(f"[ERROR] 请检查项目根路径是否正确: {PROJECT_ROOT}") print(f"[ERROR] 当前sys.path: {sys.path}") sys.exit(1) def run_strategy(config_path: str): """通过配置文件运行策略""" # 1. 加载配置 with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) print(f"[INFO] [{config['name']}] 正在启动...") # 2. 动态加载策略类 strategy_class = load_strategy_class(config["strategy_class"]) # 3. 创建API from tqsdk import TqApi, TqAuth, TqKq api = TqApi(TqKq(), auth=TqAuth("emanresu", "dfgvfgdfgg")) # 4. 准备策略参数 strategy_params = config["strategy_params"].copy() strategy_params["main_symbol"] = config["engine_params"]["symbol"].split(".")[-1] # 5. 创建引擎 from src.tqsdk_real_engine import TqsdkEngine engine = TqsdkEngine( strategy_class=strategy_class, strategy_params=strategy_params, api=api, symbol=config["engine_params"]["symbol"], duration_seconds=config["engine_params"]["duration_seconds"], roll_over_mode=config["engine_params"]["roll_over_mode"], history_length=config["engine_params"]["history_length"], close_bar_delta=timedelta(**config["engine_params"]["close_bar_delta"]) ) # 6. 信号处理 def signal_handler(sig, frame): print(f"\n[INFO] [{config['name']}] 收到停止信号 {sig},正在关闭...") api.close() sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # 7. 运行 try: print(f"[INFO] [{config['name']}] 开始运行") engine.run() except Exception as e: print(f"[ERROR] [{config['name']}] 运行出错: {e}") sys.exit(1) finally: api.close() print(f"[INFO] [{config['name']}] 已停止") if __name__ == "__main__": if len(sys.argv) != 3 or sys.argv[1] != "--config": print("使用方法: python launcher.py --config ") sys.exit(1) run_strategy(sys.argv[2])