feat(factorminer): 升级 Helix Loop 并增强本地引擎与因子翻译器

- 将 HelixLoop 的 data_tensor 改为可选,优先使用 evaluator 路径
- 在 main.py 中新增 use_helix 开关,支持 RalphLoop / HelixLoop 切换
- 扩展 RUN_CONFIG 的日期范围并添加 Helix 高级验证器配置
- 本地引擎添加公式清洗:去除 LLM 编号前缀并将 returns 替换为 DSL
- 因子 translator 新增 cs_winsorize 截面缩尾函数
- 增强 Helix 各阶段拒绝原因的日志可读性
This commit is contained in:
2026-04-11 12:19:11 +08:00
parent 45469c8fed
commit 613223edd6
4 changed files with 219 additions and 81 deletions

View File

@@ -46,9 +46,11 @@ logger = logging.getLogger(__name__)
# Optional imports -- resolved at call time with graceful fallback
# ---------------------------------------------------------------------------
def _try_import_debate():
try:
from src.factorminer.agent.debate import DebateGenerator, DebateConfig
return DebateGenerator, DebateConfig
except ImportError:
return None, None
@@ -57,6 +59,7 @@ def _try_import_debate():
def _try_import_canonicalizer():
try:
from src.factorminer.core.canonicalizer import FormulaCanonicalizer
return FormulaCanonicalizer
except ImportError:
return None
@@ -65,6 +68,7 @@ def _try_import_canonicalizer():
def _try_import_causal():
try:
from src.factorminer.evaluation.causal import CausalValidator, CausalConfig
return CausalValidator, CausalConfig
except ImportError:
return None, None
@@ -77,6 +81,7 @@ def _try_import_regime():
RegimeAwareEvaluator,
RegimeConfig,
)
return RegimeDetector, RegimeAwareEvaluator, RegimeConfig
except ImportError:
return None, None, None
@@ -84,7 +89,11 @@ def _try_import_regime():
def _try_import_capacity():
try:
from src.factorminer.evaluation.capacity import CapacityEstimator, CapacityConfig
from src.factorminer.evaluation.capacity import (
CapacityEstimator,
CapacityConfig,
)
return CapacityEstimator, CapacityConfig
except ImportError:
return None, None
@@ -98,14 +107,24 @@ def _try_import_significance():
DeflatedSharpeCalculator,
SignificanceConfig,
)
return BootstrapICTester, FDRController, DeflatedSharpeCalculator, SignificanceConfig
return (
BootstrapICTester,
FDRController,
DeflatedSharpeCalculator,
SignificanceConfig,
)
except ImportError:
return None, None, None, None
def _try_import_kg():
try:
from src.factorminer.memory.knowledge_graph import FactorKnowledgeGraph, FactorNode
from src.factorminer.memory.knowledge_graph import (
FactorKnowledgeGraph,
FactorNode,
)
return FactorKnowledgeGraph, FactorNode
except ImportError:
return None, None
@@ -114,6 +133,7 @@ def _try_import_kg():
def _try_import_kg_retrieval():
try:
from src.factorminer.memory.kg_retrieval import retrieve_memory_enhanced
return retrieve_memory_enhanced
except ImportError:
return None
@@ -122,6 +142,7 @@ def _try_import_kg_retrieval():
def _try_import_embedder():
try:
from src.factorminer.memory.embeddings import FormulaEmbedder
return FormulaEmbedder
except ImportError:
return None
@@ -130,6 +151,7 @@ def _try_import_embedder():
def _try_import_auto_inventor():
try:
from src.factorminer.operators.auto_inventor import OperatorInventor
return OperatorInventor
except ImportError:
return None
@@ -138,6 +160,7 @@ def _try_import_auto_inventor():
def _try_import_custom_store():
try:
from src.factorminer.operators.custom import CustomOperatorStore
return CustomOperatorStore
except ImportError:
return None
@@ -147,6 +170,7 @@ def _try_import_custom_store():
# HelixLoop
# ---------------------------------------------------------------------------
class HelixLoop(RalphLoop):
"""Enhanced 5-stage Helix Loop for self-evolving factor discovery.
@@ -164,8 +188,9 @@ class HelixLoop(RalphLoop):
----------
config : Any
Mining configuration object.
data_tensor : np.ndarray
Market data tensor D in R^(M x T x F).
evaluator : any, optional
Local factor evaluator (e.g. LocalFactorEvaluator) for on-demand
signal computation. Preferred over the legacy data_tensor path.
returns : np.ndarray
Forward returns array R in R^(M x T).
llm_provider : LLMProvider, optional
@@ -204,11 +229,13 @@ class HelixLoop(RalphLoop):
def __init__(
self,
config: Any,
data_tensor: np.ndarray,
returns: np.ndarray,
llm_provider: Optional[LLMProvider] = None,
memory: Optional[ExperienceMemory] = None,
library: Optional[FactorLibrary] = None,
evaluator: Optional[Any] = None,
# legacy data tensor path (optional)
data_tensor: Optional[np.ndarray] = None,
# Phase 2 extensions
debate_config: Optional[Any] = None,
enable_knowledge_graph: bool = False,
@@ -223,14 +250,14 @@ class HelixLoop(RalphLoop):
significance_config: Optional[Any] = None,
volume: Optional[np.ndarray] = None,
) -> None:
# Initialize base RalphLoop
# Initialize base RalphLoop via the new evaluator path
super().__init__(
config=config,
data_tensor=data_tensor,
returns=returns,
llm_provider=llm_provider,
memory=memory,
library=library,
evaluator=evaluator,
)
# Store Phase 2 configuration
@@ -246,6 +273,7 @@ class HelixLoop(RalphLoop):
self._capacity_config = capacity_config
self._significance_config = significance_config
self._volume = volume
self._data_tensor = data_tensor
# Track iterations without admissions for forgetting
self._no_admission_streak: int = 0
@@ -414,9 +442,12 @@ class HelixLoop(RalphLoop):
CustomStoreCls = _try_import_custom_store()
if InventorCls is not None:
try:
_dt = getattr(self, "_data_tensor", None)
if _dt is None:
_dt = np.empty((*self.returns.shape, 1))
self._auto_inventor = InventorCls(
llm_provider=llm_provider or self.generator.llm,
data_tensor=self.data_tensor,
data_tensor=_dt,
returns=self.returns,
)
logger.info("Helix: auto operator invention enabled")
@@ -486,7 +517,9 @@ class HelixLoop(RalphLoop):
# ==================================================================
# Stage 3: SYNTHESIZE (canonicalize + dedup)
# ==================================================================
candidates, n_canon_dupes, n_semantic_dupes = self._canonicalize_and_dedup(candidates)
candidates, n_canon_dupes, n_semantic_dupes = self._canonicalize_and_dedup(
candidates
)
helix_stats["canonical_duplicates_removed"] = n_canon_dupes
helix_stats["semantic_duplicates_removed"] = n_semantic_dupes
@@ -558,7 +591,10 @@ class HelixLoop(RalphLoop):
ic_passed=stats["ic_passed"],
correlation_passed=stats["corr_passed"],
admitted=stats["admitted"],
rejected=len(candidates) + n_canon_dupes + n_semantic_dupes - stats["admitted"],
rejected=len(candidates)
+ n_canon_dupes
+ n_semantic_dupes
- stats["admitted"],
replaced=stats["replaced"],
library_size=self.library.size,
best_ic=max(ic_values) if ic_values else 0.0,
@@ -585,9 +621,7 @@ class HelixLoop(RalphLoop):
# Stage 1: Enhanced retrieval
# ------------------------------------------------------------------
def _helix_retrieve(
self, library_state: Dict[str, Any]
) -> Dict[str, Any]:
def _helix_retrieve(self, library_state: Dict[str, Any]) -> Dict[str, Any]:
"""Stage 1 RETRIEVE: KG + embeddings + flat memory hybrid retrieval.
Falls back to standard retrieve_memory if no KG/embedder is available.
@@ -648,9 +682,7 @@ class HelixLoop(RalphLoop):
"falling back to standard generator"
)
except Exception as exc:
logger.warning(
"Helix: debate generation failed, falling back: %s", exc
)
logger.warning("Helix: debate generation failed, falling back: %s", exc)
# Standard generation
return self.generator.generate_batch(
@@ -755,7 +787,11 @@ class HelixLoop(RalphLoop):
# Collect admitted results that still have signals for extended checks
to_check = [r for r in admitted_results if r.signals is not None]
if not to_check:
self._no_admission_streak = 0 if any(r.admitted for r in admitted_results) else self._no_admission_streak + 1
self._no_admission_streak = (
0
if any(r.admitted for r in admitted_results)
else self._no_admission_streak + 1
)
return 0
# -- Causal validation --
@@ -780,6 +816,13 @@ class HelixLoop(RalphLoop):
rejected,
len(admitted_results),
)
for r in admitted_results:
if not r.admitted and r.rejection_reason:
logger.info(
"Helix: rejection summary for '%s': %s",
r.factor_name,
r.rejection_reason,
)
if any(r.admitted for r in admitted_results):
self._no_admission_streak = 0
@@ -807,7 +850,8 @@ class HelixLoop(RalphLoop):
try:
validator = CausalValidatorCls(
returns=self.returns,
data_tensor=self.data_tensor,
data_tensor=getattr(self, "_data_tensor", None)
or np.empty((*self.returns.shape, 1)),
library_signals=library_signals,
config=self._causal_config,
)
@@ -816,9 +860,7 @@ class HelixLoop(RalphLoop):
return 0
rejected = 0
threshold = getattr(
self._causal_config, "robustness_threshold", 0.4
)
threshold = getattr(self._causal_config, "robustness_threshold", 0.4)
for r in to_check:
if not r.admitted or r.signals is None:
@@ -826,14 +868,15 @@ class HelixLoop(RalphLoop):
try:
result = validator.validate(r.factor_name, r.signals)
if not result.passes:
self._revoke_admission(r, all_results,
f"Causal: robustness_score={result.robustness_score:.3f} < {threshold}"
reason = (
f"Causal: robustness_score={result.robustness_score:.3f} < {threshold}, "
f"granger(p={result.granger_p_value:.3f}, pass={result.granger_passes}), "
f"intervention(ratio={result.intervention_ic_ratio:.3f}, pass={result.intervention_passes})"
)
self._revoke_admission(r, all_results, reason)
rejected += 1
logger.debug(
"Helix: causal rejection for '%s' (score=%.3f)",
r.factor_name,
result.robustness_score,
logger.info(
"Helix: causal rejection for '%s': %s", r.factor_name, reason
)
except Exception as exc:
logger.warning(
@@ -860,15 +903,14 @@ class HelixLoop(RalphLoop):
try:
result = self._regime_evaluator.evaluate(r.factor_name, r.signals)
if not result.passes:
self._revoke_admission(r, all_results,
reason = (
f"Regime: only {result.n_regimes_passing} regimes passing "
f"(need {getattr(self._regime_config, 'min_regimes_passing', 2)})"
)
self._revoke_admission(r, all_results, reason)
rejected += 1
logger.debug(
"Helix: regime rejection for '%s' (%d regimes passing)",
r.factor_name,
result.n_regimes_passing,
logger.info(
"Helix: regime rejection for '%s': %s", r.factor_name, reason
)
except Exception as exc:
logger.warning(
@@ -889,9 +931,7 @@ class HelixLoop(RalphLoop):
return 0
rejected = 0
net_icir_threshold = getattr(
self._capacity_config, "net_icir_threshold", 0.3
)
net_icir_threshold = getattr(self._capacity_config, "net_icir_threshold", 0.3)
for r in to_check:
if not r.admitted or r.signals is None:
@@ -902,14 +942,11 @@ class HelixLoop(RalphLoop):
signals=r.signals,
)
if not result.passes_net_threshold:
self._revoke_admission(r, all_results,
f"Capacity: net_icir={result.net_icir:.3f} < {net_icir_threshold}"
)
reason = f"Capacity: net_icir={result.net_icir:.3f} < {net_icir_threshold}"
self._revoke_admission(r, all_results, reason)
rejected += 1
logger.debug(
"Helix: capacity rejection for '%s' (net_icir=%.3f)",
r.factor_name,
result.net_icir,
logger.info(
"Helix: capacity rejection for '%s': %s", r.factor_name, reason
)
except Exception as exc:
logger.warning(
@@ -964,15 +1001,14 @@ class HelixLoop(RalphLoop):
r = result_map.get(name)
if r is not None and r.admitted:
adj_p = fdr_result.adjusted_p_values.get(name, 1.0)
self._revoke_admission(r, all_results,
reason = (
f"Significance: FDR-adjusted p={adj_p:.4f} > "
f"{getattr(self._significance_config, 'fdr_level', 0.05)}"
)
self._revoke_admission(r, all_results, reason)
rejected += 1
logger.debug(
"Helix: significance rejection for '%s' (adj_p=%.4f)",
name,
adj_p,
logger.info(
"Helix: significance rejection for '%s': %s", name, reason
)
return rejected
@@ -999,7 +1035,7 @@ class HelixLoop(RalphLoop):
):
self.library.remove_factor(factor.id)
self._remove_semantic_artifacts(result.factor_name)
logger.debug(
logger.info(
"Helix: revoked factor '%s' (id=%d): %s",
result.factor_name,
factor.id,
@@ -1087,9 +1123,7 @@ class HelixLoop(RalphLoop):
try:
self._kg.add_factor(node)
except Exception as exc:
logger.debug(
"Helix: failed to add factor to KG: %s", exc
)
logger.debug("Helix: failed to add factor to KG: %s", exc)
continue
# Add correlation edges with existing library factors
@@ -1191,9 +1225,7 @@ class HelixLoop(RalphLoop):
for pattern in self.memory.success_patterns:
# Decay occurrence count
if hasattr(pattern, "occurrence_count"):
pattern.occurrence_count = int(
pattern.occurrence_count * lam
)
pattern.occurrence_count = int(pattern.occurrence_count * lam)
# Demote success_rate after prolonged drought
if self._no_admission_streak >= 20:
@@ -1228,6 +1260,7 @@ class HelixLoop(RalphLoop):
# Gather existing operators
try:
from src.factorminer.core.types import OPERATOR_REGISTRY as SPEC_REG
existing_ops = dict(SPEC_REG)
except ImportError:
existing_ops = {}
@@ -1289,7 +1322,11 @@ class HelixLoop(RalphLoop):
try:
from src.factorminer.operators.custom import CustomOperator
from src.factorminer.core.types import OperatorSpec, OperatorType, SignatureType
from src.factorminer.core.types import (
OperatorSpec,
OperatorType,
SignatureType,
)
spec = OperatorSpec(
name=proposal.name,
@@ -1298,14 +1335,13 @@ class HelixLoop(RalphLoop):
signature=SignatureType.TIME_SERIES_TO_TIME_SERIES,
param_names=proposal.param_names,
param_defaults=proposal.param_defaults,
param_ranges={
k: tuple(v) for k, v in proposal.param_ranges.items()
},
param_ranges={k: tuple(v) for k, v in proposal.param_ranges.items()},
description=proposal.description,
)
# Compile the function
from src.factorminer.operators.custom import _compile_operator_code
fn = _compile_operator_code(proposal.numpy_code)
if fn is None:
logger.warning(
@@ -1449,18 +1485,14 @@ class HelixLoop(RalphLoop):
self._kg.get_edge_count(),
)
except Exception as exc:
logger.warning(
"Helix: failed to load knowledge graph: %s", exc
)
logger.warning("Helix: failed to load knowledge graph: %s", exc)
# Load custom operators
if self._custom_op_store is not None:
try:
self._custom_op_store.load()
except Exception as exc:
logger.warning(
"Helix: failed to load custom operators: %s", exc
)
logger.warning("Helix: failed to load custom operators: %s", exc)
# Load helix-specific state
helix_state_path = checkpoint_dir / "helix_state.json"
@@ -1468,17 +1500,13 @@ class HelixLoop(RalphLoop):
try:
with open(helix_state_path) as f:
helix_state = json.load(f)
self._no_admission_streak = helix_state.get(
"no_admission_streak", 0
)
self._no_admission_streak = helix_state.get("no_admission_streak", 0)
logger.info(
"Helix: restored helix state (streak=%d)",
self._no_admission_streak,
)
except Exception as exc:
logger.warning(
"Helix: failed to load helix state: %s", exc
)
logger.warning("Helix: failed to load helix state: %s", exc)
self._prime_embedder_from_library()
if self._session is not None and self._session.run_manifest:
@@ -1490,9 +1518,7 @@ class HelixLoop(RalphLoop):
with open(run_manifest_path) as f:
self._run_manifest = json.load(f)
except Exception as exc:
logger.warning(
"Helix: failed to load run manifest: %s", exc
)
logger.warning("Helix: failed to load run manifest: %s", exc)
def _loop_type(self) -> str:
"""Label the loop for provenance and manifests."""

View File

@@ -12,6 +12,7 @@ Features:
from __future__ import annotations
import re
from typing import Dict, List, Optional, Tuple
import numpy as np
@@ -70,10 +71,23 @@ class LocalFactorEvaluator:
if len(specs) > 1:
print(f"[local_engine] 开始批量计算 {len(specs)} 个因子...")
# 注册所有因子
# 注册所有因子(防御性清洗:去除 LLM 可能返回的编号前缀如 "8. ")
numbered_prefix_pattern = re.compile(r"^\s*\d+[\.\)]\s*")
for name, formula in specs:
cleaned_formula = numbered_prefix_pattern.sub("", formula)
# 将 returns / $returns 替换为本地可用的 DSL 表达式
cleaned_formula = re.sub(
r"(?<!\w)\$returns(?!\w)",
"(close / ts_delay(close, 1) - 1)",
cleaned_formula,
)
cleaned_formula = re.sub(
r"(?<!\w)returns(?!\w)",
"(close / ts_delay(close, 1) - 1)",
cleaned_formula,
)
try:
self.engine.add_factor(name, formula)
self.engine.add_factor(name, cleaned_formula)
except Exception as e:
print(f"[ERROR] 注册因子 {name} 失败: {e}")
raise

View File

@@ -16,7 +16,9 @@ from src.factorminer.agent.llm_interface import create_provider, AnthropicProvid
from src.factorminer.core.config import MiningConfig as CoreMiningConfig
from src.factorminer.core.library_io import import_from_paper, save_library
from src.factorminer.core.ralph_loop import RalphLoop
from src.factorminer.core.helix_loop import HelixLoop
from src.factorminer.evaluation.local_engine import LocalFactorEvaluator
from src.factorminer.evaluation.significance import SignificanceConfig
from src.factorminer.utils.config import load_config
RUN_CONFIG: dict = {
@@ -28,8 +30,8 @@ RUN_CONFIG: dict = {
"output_dir": "./output", # 输出目录
"resume": None, # 从已有 checkpoint 恢复(可选)
# 本地数据范围FactorEngine 自动读取 DuckDB
"start_date": "20200101", # 计算开始日期
"end_date": "20201231", # 计算结束日期
"start_date": "20190101", # 计算开始日期
"end_date": "20231231", # 计算结束日期
"stock_codes": None, # 可选股票列表None 表示全量
# 种子库
"seed_paper_library": True, # 是否预加载 110 Paper Factors 作为种子库
@@ -45,6 +47,22 @@ RUN_CONFIG: dict = {
"fast_screen_assets": 100,
"num_workers": 1,
},
# Helix 扩展开关
"use_helix": True, # True: 使用 Helix Loop5 阶段增强模式)
"helix": {
"enable_knowledge_graph": True,
"enable_embeddings": True,
"canonicalize": True,
"enable_auto_inventor": True,
"auto_invention_interval": 10,
"forgetting_lambda": 0.95,
# 高级验证器配置(可选,默认开启完整版)
"debate_config": {},
"causal_config": {},
"regime_config": {},
"capacity_config": {},
"significance_config": {},
},
# LLM 配置mock=False 时使用)
"llm": {
"provider": "anthropic",
@@ -121,6 +139,32 @@ def _build_core_mining_config(run_cfg: dict) -> CoreMiningConfig:
return cfg
def _build_helix_kwargs(run_cfg: dict) -> dict:
"""从 RUN_CONFIG 构建 HelixLoop 需要的 Phase 2 扩展配置。"""
helix = run_cfg.get("helix", {})
# significance_config 若为 dict需实例化为 SignificanceConfig空 dict 视为 None
sig_cfg = helix.get("significance_config")
if isinstance(sig_cfg, dict) and sig_cfg:
significance_config = SignificanceConfig(**sig_cfg)
else:
significance_config = None
return {
"enable_knowledge_graph": helix.get("enable_knowledge_graph", True),
"enable_embeddings": helix.get("enable_embeddings", True),
"canonicalize": helix.get("canonicalize", True),
"enable_auto_inventor": helix.get("enable_auto_inventor", False),
"auto_invention_interval": helix.get("auto_invention_interval", 10),
"forgetting_lambda": helix.get("forgetting_lambda", 0.95),
"debate_config": helix.get("debate_config"),
"causal_config": helix.get("causal_config"),
"regime_config": helix.get("regime_config"),
"capacity_config": helix.get("capacity_config"),
"significance_config": significance_config,
}
def main(config: dict | None = None) -> None:
"""运行因子挖掘主循环。
@@ -191,24 +235,35 @@ def main(config: dict | None = None) -> None:
)
# ------------------------------------------------------------------
# 6. 恢复 checkpoint可选
# 6. 选择 Loop 类型并恢复 checkpoint可选
# ------------------------------------------------------------------
use_helix = run_cfg.get("use_helix", False)
helix_kwargs = _build_helix_kwargs(run_cfg) if use_helix else {}
LoopCls = HelixLoop if use_helix else RalphLoop
if use_helix:
print("[main] 使用 Helix Loop5 阶段增强模式)")
else:
print("[main] 使用 Ralph Loop4 阶段标准模式)")
resume_path: Optional[str] = run_cfg.get("resume")
if resume_path is not None and Path(resume_path).exists():
print(f"[main] 从 checkpoint 恢复: {resume_path}")
loop = RalphLoop.resume_from(
loop = LoopCls.resume_from(
checkpoint_path=resume_path,
config=mining_cfg,
returns=returns,
llm_provider=provider,
evaluator=evaluator,
**helix_kwargs,
)
else:
loop = RalphLoop(
loop = LoopCls(
config=mining_cfg,
returns=returns,
llm_provider=provider,
evaluator=evaluator,
**helix_kwargs,
)
# ------------------------------------------------------------------

View File

@@ -90,6 +90,7 @@ class PolarsTranslator:
self.register_handler("cs_zscore", self._handle_cs_zscore)
self.register_handler("cs_neutral", self._handle_cs_neutral)
self.register_handler("cs_mean", self._handle_cs_mean)
self.register_handler("cs_winsorize", self._handle_cs_winsorize)
# 元素级数学函数 (element_wise)
self.register_handler("abs", self._handle_abs)
@@ -749,6 +750,28 @@ class PolarsTranslator:
expr = self.translate(node.args[0])
return expr.mean()
@cross_section
def _handle_cs_winsorize(self, node: FunctionNode) -> pl.Expr:
"""处理 cs_winsorize(expr, lower, upper) -> 截面缩尾处理。"""
if len(node.args) not in [1, 3]:
raise ValueError("cs_winsorize 需要 1 或 3 个参数: (expr, [lower, upper])")
expr = self.translate(node.args[0])
if len(node.args) == 3:
lower = self._extract_float(node.args[1])
upper = self._extract_float(node.args[2])
else:
lower = 0.01
upper = 0.99
lower_q = expr.quantile(lower)
upper_q = expr.quantile(upper)
return (
pl.when(expr < lower_q)
.then(lower_q)
.when(expr > upper_q)
.then(upper_q)
.otherwise(expr)
)
# ==================== 元素级数学函数 (element_wise) ====================
# 这些函数对每个元素独立计算,不添加 over
@@ -846,6 +869,26 @@ class PolarsTranslator:
return node.value
raise ValueError(f"窗口参数必须是常量整数,得到: {type(node).__name__}")
def _extract_float(self, node: Node) -> float:
"""从节点中提取浮点参数。
Args:
node: 应该是 Constant 节点
Returns:
float 值
Raises:
ValueError: 当节点不是 Constant 或值不是数值时
"""
if isinstance(node, Constant):
if not isinstance(node.value, (int, float)):
raise ValueError(
f"数值参数必须是 int 或 float得到: {type(node.value).__name__}"
)
return float(node.value)
raise ValueError(f"数值参数必须是常量,得到: {type(node).__name__}")
def translate_to_polars(node: Node) -> pl.Expr:
"""便捷函数 - 将 AST 节点翻译为 Polars 表达式。