diff --git a/src/factorminer/core/helix_loop.py b/src/factorminer/core/helix_loop.py index f160715..f46023c 100644 --- a/src/factorminer/core/helix_loop.py +++ b/src/factorminer/core/helix_loop.py @@ -46,9 +46,11 @@ logger = logging.getLogger(__name__) # Optional imports -- resolved at call time with graceful fallback # --------------------------------------------------------------------------- + def _try_import_debate(): try: from src.factorminer.agent.debate import DebateGenerator, DebateConfig + return DebateGenerator, DebateConfig except ImportError: return None, None @@ -57,6 +59,7 @@ def _try_import_debate(): def _try_import_canonicalizer(): try: from src.factorminer.core.canonicalizer import FormulaCanonicalizer + return FormulaCanonicalizer except ImportError: return None @@ -65,6 +68,7 @@ def _try_import_canonicalizer(): def _try_import_causal(): try: from src.factorminer.evaluation.causal import CausalValidator, CausalConfig + return CausalValidator, CausalConfig except ImportError: return None, None @@ -77,6 +81,7 @@ def _try_import_regime(): RegimeAwareEvaluator, RegimeConfig, ) + return RegimeDetector, RegimeAwareEvaluator, RegimeConfig except ImportError: return None, None, None @@ -84,7 +89,11 @@ def _try_import_regime(): def _try_import_capacity(): try: - from src.factorminer.evaluation.capacity import CapacityEstimator, CapacityConfig + from src.factorminer.evaluation.capacity import ( + CapacityEstimator, + CapacityConfig, + ) + return CapacityEstimator, CapacityConfig except ImportError: return None, None @@ -98,14 +107,24 @@ def _try_import_significance(): DeflatedSharpeCalculator, SignificanceConfig, ) - return BootstrapICTester, FDRController, DeflatedSharpeCalculator, SignificanceConfig + + return ( + BootstrapICTester, + FDRController, + DeflatedSharpeCalculator, + SignificanceConfig, + ) except ImportError: return None, None, None, None def _try_import_kg(): try: - from src.factorminer.memory.knowledge_graph import FactorKnowledgeGraph, FactorNode + from src.factorminer.memory.knowledge_graph import ( + FactorKnowledgeGraph, + FactorNode, + ) + return FactorKnowledgeGraph, FactorNode except ImportError: return None, None @@ -114,6 +133,7 @@ def _try_import_kg(): def _try_import_kg_retrieval(): try: from src.factorminer.memory.kg_retrieval import retrieve_memory_enhanced + return retrieve_memory_enhanced except ImportError: return None @@ -122,6 +142,7 @@ def _try_import_kg_retrieval(): def _try_import_embedder(): try: from src.factorminer.memory.embeddings import FormulaEmbedder + return FormulaEmbedder except ImportError: return None @@ -130,6 +151,7 @@ def _try_import_embedder(): def _try_import_auto_inventor(): try: from src.factorminer.operators.auto_inventor import OperatorInventor + return OperatorInventor except ImportError: return None @@ -138,6 +160,7 @@ def _try_import_auto_inventor(): def _try_import_custom_store(): try: from src.factorminer.operators.custom import CustomOperatorStore + return CustomOperatorStore except ImportError: return None @@ -147,6 +170,7 @@ def _try_import_custom_store(): # HelixLoop # --------------------------------------------------------------------------- + class HelixLoop(RalphLoop): """Enhanced 5-stage Helix Loop for self-evolving factor discovery. @@ -164,8 +188,9 @@ class HelixLoop(RalphLoop): ---------- config : Any Mining configuration object. - data_tensor : np.ndarray - Market data tensor D in R^(M x T x F). + evaluator : any, optional + Local factor evaluator (e.g. LocalFactorEvaluator) for on-demand + signal computation. Preferred over the legacy data_tensor path. returns : np.ndarray Forward returns array R in R^(M x T). llm_provider : LLMProvider, optional @@ -204,11 +229,13 @@ class HelixLoop(RalphLoop): def __init__( self, config: Any, - data_tensor: np.ndarray, returns: np.ndarray, llm_provider: Optional[LLMProvider] = None, memory: Optional[ExperienceMemory] = None, library: Optional[FactorLibrary] = None, + evaluator: Optional[Any] = None, + # legacy data tensor path (optional) + data_tensor: Optional[np.ndarray] = None, # Phase 2 extensions debate_config: Optional[Any] = None, enable_knowledge_graph: bool = False, @@ -223,14 +250,14 @@ class HelixLoop(RalphLoop): significance_config: Optional[Any] = None, volume: Optional[np.ndarray] = None, ) -> None: - # Initialize base RalphLoop + # Initialize base RalphLoop via the new evaluator path super().__init__( config=config, - data_tensor=data_tensor, returns=returns, llm_provider=llm_provider, memory=memory, library=library, + evaluator=evaluator, ) # Store Phase 2 configuration @@ -246,6 +273,7 @@ class HelixLoop(RalphLoop): self._capacity_config = capacity_config self._significance_config = significance_config self._volume = volume + self._data_tensor = data_tensor # Track iterations without admissions for forgetting self._no_admission_streak: int = 0 @@ -414,9 +442,12 @@ class HelixLoop(RalphLoop): CustomStoreCls = _try_import_custom_store() if InventorCls is not None: try: + _dt = getattr(self, "_data_tensor", None) + if _dt is None: + _dt = np.empty((*self.returns.shape, 1)) self._auto_inventor = InventorCls( llm_provider=llm_provider or self.generator.llm, - data_tensor=self.data_tensor, + data_tensor=_dt, returns=self.returns, ) logger.info("Helix: auto operator invention enabled") @@ -486,7 +517,9 @@ class HelixLoop(RalphLoop): # ================================================================== # Stage 3: SYNTHESIZE (canonicalize + dedup) # ================================================================== - candidates, n_canon_dupes, n_semantic_dupes = self._canonicalize_and_dedup(candidates) + candidates, n_canon_dupes, n_semantic_dupes = self._canonicalize_and_dedup( + candidates + ) helix_stats["canonical_duplicates_removed"] = n_canon_dupes helix_stats["semantic_duplicates_removed"] = n_semantic_dupes @@ -558,7 +591,10 @@ class HelixLoop(RalphLoop): ic_passed=stats["ic_passed"], correlation_passed=stats["corr_passed"], admitted=stats["admitted"], - rejected=len(candidates) + n_canon_dupes + n_semantic_dupes - stats["admitted"], + rejected=len(candidates) + + n_canon_dupes + + n_semantic_dupes + - stats["admitted"], replaced=stats["replaced"], library_size=self.library.size, best_ic=max(ic_values) if ic_values else 0.0, @@ -585,9 +621,7 @@ class HelixLoop(RalphLoop): # Stage 1: Enhanced retrieval # ------------------------------------------------------------------ - def _helix_retrieve( - self, library_state: Dict[str, Any] - ) -> Dict[str, Any]: + def _helix_retrieve(self, library_state: Dict[str, Any]) -> Dict[str, Any]: """Stage 1 RETRIEVE: KG + embeddings + flat memory hybrid retrieval. Falls back to standard retrieve_memory if no KG/embedder is available. @@ -648,9 +682,7 @@ class HelixLoop(RalphLoop): "falling back to standard generator" ) except Exception as exc: - logger.warning( - "Helix: debate generation failed, falling back: %s", exc - ) + logger.warning("Helix: debate generation failed, falling back: %s", exc) # Standard generation return self.generator.generate_batch( @@ -755,7 +787,11 @@ class HelixLoop(RalphLoop): # Collect admitted results that still have signals for extended checks to_check = [r for r in admitted_results if r.signals is not None] if not to_check: - self._no_admission_streak = 0 if any(r.admitted for r in admitted_results) else self._no_admission_streak + 1 + self._no_admission_streak = ( + 0 + if any(r.admitted for r in admitted_results) + else self._no_admission_streak + 1 + ) return 0 # -- Causal validation -- @@ -780,6 +816,13 @@ class HelixLoop(RalphLoop): rejected, len(admitted_results), ) + for r in admitted_results: + if not r.admitted and r.rejection_reason: + logger.info( + "Helix: rejection summary for '%s': %s", + r.factor_name, + r.rejection_reason, + ) if any(r.admitted for r in admitted_results): self._no_admission_streak = 0 @@ -807,7 +850,8 @@ class HelixLoop(RalphLoop): try: validator = CausalValidatorCls( returns=self.returns, - data_tensor=self.data_tensor, + data_tensor=getattr(self, "_data_tensor", None) + or np.empty((*self.returns.shape, 1)), library_signals=library_signals, config=self._causal_config, ) @@ -816,9 +860,7 @@ class HelixLoop(RalphLoop): return 0 rejected = 0 - threshold = getattr( - self._causal_config, "robustness_threshold", 0.4 - ) + threshold = getattr(self._causal_config, "robustness_threshold", 0.4) for r in to_check: if not r.admitted or r.signals is None: @@ -826,14 +868,15 @@ class HelixLoop(RalphLoop): try: result = validator.validate(r.factor_name, r.signals) if not result.passes: - self._revoke_admission(r, all_results, - f"Causal: robustness_score={result.robustness_score:.3f} < {threshold}" + reason = ( + f"Causal: robustness_score={result.robustness_score:.3f} < {threshold}, " + f"granger(p={result.granger_p_value:.3f}, pass={result.granger_passes}), " + f"intervention(ratio={result.intervention_ic_ratio:.3f}, pass={result.intervention_passes})" ) + self._revoke_admission(r, all_results, reason) rejected += 1 - logger.debug( - "Helix: causal rejection for '%s' (score=%.3f)", - r.factor_name, - result.robustness_score, + logger.info( + "Helix: causal rejection for '%s': %s", r.factor_name, reason ) except Exception as exc: logger.warning( @@ -860,15 +903,14 @@ class HelixLoop(RalphLoop): try: result = self._regime_evaluator.evaluate(r.factor_name, r.signals) if not result.passes: - self._revoke_admission(r, all_results, + reason = ( f"Regime: only {result.n_regimes_passing} regimes passing " f"(need {getattr(self._regime_config, 'min_regimes_passing', 2)})" ) + self._revoke_admission(r, all_results, reason) rejected += 1 - logger.debug( - "Helix: regime rejection for '%s' (%d regimes passing)", - r.factor_name, - result.n_regimes_passing, + logger.info( + "Helix: regime rejection for '%s': %s", r.factor_name, reason ) except Exception as exc: logger.warning( @@ -889,9 +931,7 @@ class HelixLoop(RalphLoop): return 0 rejected = 0 - net_icir_threshold = getattr( - self._capacity_config, "net_icir_threshold", 0.3 - ) + net_icir_threshold = getattr(self._capacity_config, "net_icir_threshold", 0.3) for r in to_check: if not r.admitted or r.signals is None: @@ -902,14 +942,11 @@ class HelixLoop(RalphLoop): signals=r.signals, ) if not result.passes_net_threshold: - self._revoke_admission(r, all_results, - f"Capacity: net_icir={result.net_icir:.3f} < {net_icir_threshold}" - ) + reason = f"Capacity: net_icir={result.net_icir:.3f} < {net_icir_threshold}" + self._revoke_admission(r, all_results, reason) rejected += 1 - logger.debug( - "Helix: capacity rejection for '%s' (net_icir=%.3f)", - r.factor_name, - result.net_icir, + logger.info( + "Helix: capacity rejection for '%s': %s", r.factor_name, reason ) except Exception as exc: logger.warning( @@ -964,15 +1001,14 @@ class HelixLoop(RalphLoop): r = result_map.get(name) if r is not None and r.admitted: adj_p = fdr_result.adjusted_p_values.get(name, 1.0) - self._revoke_admission(r, all_results, + reason = ( f"Significance: FDR-adjusted p={adj_p:.4f} > " f"{getattr(self._significance_config, 'fdr_level', 0.05)}" ) + self._revoke_admission(r, all_results, reason) rejected += 1 - logger.debug( - "Helix: significance rejection for '%s' (adj_p=%.4f)", - name, - adj_p, + logger.info( + "Helix: significance rejection for '%s': %s", name, reason ) return rejected @@ -999,7 +1035,7 @@ class HelixLoop(RalphLoop): ): self.library.remove_factor(factor.id) self._remove_semantic_artifacts(result.factor_name) - logger.debug( + logger.info( "Helix: revoked factor '%s' (id=%d): %s", result.factor_name, factor.id, @@ -1087,9 +1123,7 @@ class HelixLoop(RalphLoop): try: self._kg.add_factor(node) except Exception as exc: - logger.debug( - "Helix: failed to add factor to KG: %s", exc - ) + logger.debug("Helix: failed to add factor to KG: %s", exc) continue # Add correlation edges with existing library factors @@ -1191,9 +1225,7 @@ class HelixLoop(RalphLoop): for pattern in self.memory.success_patterns: # Decay occurrence count if hasattr(pattern, "occurrence_count"): - pattern.occurrence_count = int( - pattern.occurrence_count * lam - ) + pattern.occurrence_count = int(pattern.occurrence_count * lam) # Demote success_rate after prolonged drought if self._no_admission_streak >= 20: @@ -1228,6 +1260,7 @@ class HelixLoop(RalphLoop): # Gather existing operators try: from src.factorminer.core.types import OPERATOR_REGISTRY as SPEC_REG + existing_ops = dict(SPEC_REG) except ImportError: existing_ops = {} @@ -1289,7 +1322,11 @@ class HelixLoop(RalphLoop): try: from src.factorminer.operators.custom import CustomOperator - from src.factorminer.core.types import OperatorSpec, OperatorType, SignatureType + from src.factorminer.core.types import ( + OperatorSpec, + OperatorType, + SignatureType, + ) spec = OperatorSpec( name=proposal.name, @@ -1298,14 +1335,13 @@ class HelixLoop(RalphLoop): signature=SignatureType.TIME_SERIES_TO_TIME_SERIES, param_names=proposal.param_names, param_defaults=proposal.param_defaults, - param_ranges={ - k: tuple(v) for k, v in proposal.param_ranges.items() - }, + param_ranges={k: tuple(v) for k, v in proposal.param_ranges.items()}, description=proposal.description, ) # Compile the function from src.factorminer.operators.custom import _compile_operator_code + fn = _compile_operator_code(proposal.numpy_code) if fn is None: logger.warning( @@ -1449,18 +1485,14 @@ class HelixLoop(RalphLoop): self._kg.get_edge_count(), ) except Exception as exc: - logger.warning( - "Helix: failed to load knowledge graph: %s", exc - ) + logger.warning("Helix: failed to load knowledge graph: %s", exc) # Load custom operators if self._custom_op_store is not None: try: self._custom_op_store.load() except Exception as exc: - logger.warning( - "Helix: failed to load custom operators: %s", exc - ) + logger.warning("Helix: failed to load custom operators: %s", exc) # Load helix-specific state helix_state_path = checkpoint_dir / "helix_state.json" @@ -1468,17 +1500,13 @@ class HelixLoop(RalphLoop): try: with open(helix_state_path) as f: helix_state = json.load(f) - self._no_admission_streak = helix_state.get( - "no_admission_streak", 0 - ) + self._no_admission_streak = helix_state.get("no_admission_streak", 0) logger.info( "Helix: restored helix state (streak=%d)", self._no_admission_streak, ) except Exception as exc: - logger.warning( - "Helix: failed to load helix state: %s", exc - ) + logger.warning("Helix: failed to load helix state: %s", exc) self._prime_embedder_from_library() if self._session is not None and self._session.run_manifest: @@ -1490,9 +1518,7 @@ class HelixLoop(RalphLoop): with open(run_manifest_path) as f: self._run_manifest = json.load(f) except Exception as exc: - logger.warning( - "Helix: failed to load run manifest: %s", exc - ) + logger.warning("Helix: failed to load run manifest: %s", exc) def _loop_type(self) -> str: """Label the loop for provenance and manifests.""" diff --git a/src/factorminer/evaluation/local_engine.py b/src/factorminer/evaluation/local_engine.py index 35080d6..c1af262 100644 --- a/src/factorminer/evaluation/local_engine.py +++ b/src/factorminer/evaluation/local_engine.py @@ -12,6 +12,7 @@ Features: from __future__ import annotations +import re from typing import Dict, List, Optional, Tuple import numpy as np @@ -70,10 +71,23 @@ class LocalFactorEvaluator: if len(specs) > 1: print(f"[local_engine] 开始批量计算 {len(specs)} 个因子...") - # 注册所有因子 + # 注册所有因子(防御性清洗:去除 LLM 可能返回的编号前缀如 "8. ") + numbered_prefix_pattern = re.compile(r"^\s*\d+[\.\)]\s*") for name, formula in specs: + cleaned_formula = numbered_prefix_pattern.sub("", formula) + # 将 returns / $returns 替换为本地可用的 DSL 表达式 + cleaned_formula = re.sub( + r"(? CoreMiningConfig: return cfg +def _build_helix_kwargs(run_cfg: dict) -> dict: + """从 RUN_CONFIG 构建 HelixLoop 需要的 Phase 2 扩展配置。""" + helix = run_cfg.get("helix", {}) + + # significance_config 若为 dict,需实例化为 SignificanceConfig;空 dict 视为 None + sig_cfg = helix.get("significance_config") + if isinstance(sig_cfg, dict) and sig_cfg: + significance_config = SignificanceConfig(**sig_cfg) + else: + significance_config = None + + return { + "enable_knowledge_graph": helix.get("enable_knowledge_graph", True), + "enable_embeddings": helix.get("enable_embeddings", True), + "canonicalize": helix.get("canonicalize", True), + "enable_auto_inventor": helix.get("enable_auto_inventor", False), + "auto_invention_interval": helix.get("auto_invention_interval", 10), + "forgetting_lambda": helix.get("forgetting_lambda", 0.95), + "debate_config": helix.get("debate_config"), + "causal_config": helix.get("causal_config"), + "regime_config": helix.get("regime_config"), + "capacity_config": helix.get("capacity_config"), + "significance_config": significance_config, + } + + def main(config: dict | None = None) -> None: """运行因子挖掘主循环。 @@ -191,24 +235,35 @@ def main(config: dict | None = None) -> None: ) # ------------------------------------------------------------------ - # 6. 恢复 checkpoint(可选) + # 6. 选择 Loop 类型并恢复 checkpoint(可选) # ------------------------------------------------------------------ + use_helix = run_cfg.get("use_helix", False) + helix_kwargs = _build_helix_kwargs(run_cfg) if use_helix else {} + LoopCls = HelixLoop if use_helix else RalphLoop + + if use_helix: + print("[main] 使用 Helix Loop(5 阶段增强模式)") + else: + print("[main] 使用 Ralph Loop(4 阶段标准模式)") + resume_path: Optional[str] = run_cfg.get("resume") if resume_path is not None and Path(resume_path).exists(): print(f"[main] 从 checkpoint 恢复: {resume_path}") - loop = RalphLoop.resume_from( + loop = LoopCls.resume_from( checkpoint_path=resume_path, config=mining_cfg, returns=returns, llm_provider=provider, evaluator=evaluator, + **helix_kwargs, ) else: - loop = RalphLoop( + loop = LoopCls( config=mining_cfg, returns=returns, llm_provider=provider, evaluator=evaluator, + **helix_kwargs, ) # ------------------------------------------------------------------ diff --git a/src/factors/translator.py b/src/factors/translator.py index ef637b1..24d078e 100644 --- a/src/factors/translator.py +++ b/src/factors/translator.py @@ -90,6 +90,7 @@ class PolarsTranslator: self.register_handler("cs_zscore", self._handle_cs_zscore) self.register_handler("cs_neutral", self._handle_cs_neutral) self.register_handler("cs_mean", self._handle_cs_mean) + self.register_handler("cs_winsorize", self._handle_cs_winsorize) # 元素级数学函数 (element_wise) self.register_handler("abs", self._handle_abs) @@ -749,6 +750,28 @@ class PolarsTranslator: expr = self.translate(node.args[0]) return expr.mean() + @cross_section + def _handle_cs_winsorize(self, node: FunctionNode) -> pl.Expr: + """处理 cs_winsorize(expr, lower, upper) -> 截面缩尾处理。""" + if len(node.args) not in [1, 3]: + raise ValueError("cs_winsorize 需要 1 或 3 个参数: (expr, [lower, upper])") + expr = self.translate(node.args[0]) + if len(node.args) == 3: + lower = self._extract_float(node.args[1]) + upper = self._extract_float(node.args[2]) + else: + lower = 0.01 + upper = 0.99 + lower_q = expr.quantile(lower) + upper_q = expr.quantile(upper) + return ( + pl.when(expr < lower_q) + .then(lower_q) + .when(expr > upper_q) + .then(upper_q) + .otherwise(expr) + ) + # ==================== 元素级数学函数 (element_wise) ==================== # 这些函数对每个元素独立计算,不添加 over @@ -846,6 +869,26 @@ class PolarsTranslator: return node.value raise ValueError(f"窗口参数必须是常量整数,得到: {type(node).__name__}") + def _extract_float(self, node: Node) -> float: + """从节点中提取浮点参数。 + + Args: + node: 应该是 Constant 节点 + + Returns: + float 值 + + Raises: + ValueError: 当节点不是 Constant 或值不是数值时 + """ + if isinstance(node, Constant): + if not isinstance(node.value, (int, float)): + raise ValueError( + f"数值参数必须是 int 或 float,得到: {type(node.value).__name__}" + ) + return float(node.value) + raise ValueError(f"数值参数必须是常量,得到: {type(node).__name__}") + def translate_to_polars(node: Node) -> pl.Expr: """便捷函数 - 将 AST 节点翻译为 Polars 表达式。