import sys import signal from pathlib import Path import importlib.util import logging # 新增:日志模块 # ==================== 动态路径配置 ==================== from core.path_utils import add_project_root_to_path # 添加项目根路径到sys.path PROJECT_ROOT = add_project_root_to_path() print(f"[INFO] 项目根路径: {PROJECT_ROOT}") print(f"[INFO] Python路径: {sys.path[:3]}") # ==================================================== def load_strategy_class(class_path: str): """动态加载策略类""" try: module_path, class_name = class_path.rsplit('.', 1) module = importlib.import_module(module_path) return getattr(module, class_name) except Exception as e: print(f"[ERROR] 加载策略类失败 {class_path}: {e}") print(f"[ERROR] 请检查项目根路径是否正确: {PROJECT_ROOT}") print(f"[ERROR] 当前sys.path: {sys.path}") sys.exit(1) def setup_strategy_logger(config_file: Path, strategy_name: str, symbol: str) -> logging.Logger: """配置策略专属日志""" # 创建日志目录: logs/{策略名}/ log_dir = Path("logs") / strategy_name log_dir.mkdir(exist_ok=True, parents=True) # 日志文件: logs/{策略名}/{品种}.log log_file = log_dir / f"{symbol}.log" # 配置logger logger = logging.getLogger(f"Strategy.{strategy_name}.{symbol}") logger.setLevel(logging.INFO) # 清除旧handler(防止重复) for h in logger.handlers[:]: logger.removeHandler(h) # 文件handler file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') ) logger.addHandler(file_handler) return logger, log_file def run_strategy(config_path: str): """通过配置文件运行策略""" config_file = Path(config_path) if not config_file.exists(): print(f"[ERROR] 配置文件不存在: {config_file}") sys.exit(1) # 动态加载配置模块 try: spec = importlib.util.spec_from_file_location( f"strategy_config_{config_file.stem}", config_file ) config_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(config_module) if not hasattr(config_module, 'CONFIG'): print(f"[ERROR] 配置文件缺少 CONFIG 变量: {config_file}") sys.exit(1) config = config_module.CONFIG except Exception as e: print(f"[ERROR] 加载配置文件失败 {config_path}: {e}") sys.exit(1) # 配置策略专属日志(关键修改点) strategy_name = config_file.parent.name symbol = config_file.stem logger, log_file = setup_strategy_logger(config_file, strategy_name, symbol) # 重定向print到logger(捕获策略内的print调用) class PrintToLogger: def __init__(self, logger, level=logging.INFO): self.logger = logger self.level = level self.linebuf = '' def write(self, buf): for line in buf.rstrip().splitlines(): self.logger.log(self.level, line.rstrip()) def flush(self): pass # 重定向stdout和stderr sys.stdout = PrintToLogger(logger, logging.INFO) sys.stderr = PrintToLogger(logger, logging.ERROR) # 所有后续的print都会写入日志文件 print(f"[INFO] [{config['name']}] 正在启动...") print(f"[INFO] 日志文件: {log_file}") # 动态加载策略类 strategy_class = load_strategy_class(config["strategy_class"]) # 创建API from tqsdk import TqApi, TqAuth, TqKq, TqAccount api = TqApi(TqAccount('H宏源期货', '903308830', 'lzr520102'), auth=TqAuth("emanresu", "dfgvfgdfgg")) # 准备策略参数 strategy_params = config["strategy_params"].copy() strategy_params["main_symbol"] = config["engine_params"]["symbol"].split(".")[-1] # 创建引擎 from src.tqsdk_real_engine import TqsdkEngine engine = TqsdkEngine( strategy_class=strategy_class, strategy_params=strategy_params, api=api, symbol=config["engine_params"]["symbol"], duration_seconds=config["engine_params"]["duration_seconds"], roll_over_mode=config["engine_params"]["roll_over_mode"], history_length=config["engine_params"]["history_length"], close_bar_delta=config["engine_params"]["close_bar_delta"] ) # 信号处理 def signal_handler(sig, frame): print(f"\n[INFO] [{config['name']}] 收到停止信号 {sig},正在关闭...") # api.close() sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # 运行 try: print(f"[INFO] [{config['name']}] 开始运行") engine.run() except Exception as e: print(f"[ERROR] [{config['name']}] 运行出错: {e}", exc_info=True) sys.exit(1) finally: api.close() print(f"[INFO] [{config['name']}] 已停止") if __name__ == "__main__": if len(sys.argv) != 3 or sys.argv[1] != "--config": print("使用方法: python launcher.py --config ") sys.exit(1) run_strategy(sys.argv[2])