feat(factors): 实现因子元数据管理模块

- 新增 FactorManager 类,支持 JSONL 文件读写和 DuckDB SQL 查询
- 实现零拷贝输出 Polars DataFrame,与现有因子引擎无缝集成
- 添加字段校验器(FactorValidator)和完整的异常处理机制
- 包含 49 个示例因子数据(趋势、波动率、量价、基本面等类别)
- 更新 src/factors/__init__.py 导出元数据管理组件
This commit is contained in:
2026-03-11 22:03:16 +08:00
parent e8ac9d8662
commit 038f5f1722
5 changed files with 569 additions and 0 deletions

View File

@@ -65,6 +65,18 @@ from src.factors.exceptions import (
DuplicateFunctionError,
)
# 元数据管理模块
from src.factors.metadata import (
FactorManager,
FactorValidator,
FactorMetadataError,
ValidationError,
DuplicateFactorError,
FactorNotFoundError,
QueryError,
FileOperationError,
)
# 保持向后兼容factor_engine.py 中的类也可以通过 src.factors.engine 访问
# 例如from src.factors.engine import FactorEngine
@@ -100,4 +112,13 @@ __all__ = [
"EmptyExpressionError",
"RegistryError",
"DuplicateFunctionError",
# 元数据管理模块
"FactorManager",
"FactorValidator",
"FactorMetadataError",
"ValidationError",
"DuplicateFactorError",
"FactorNotFoundError",
"QueryError",
"FileOperationError",
]

View File

@@ -0,0 +1,52 @@
"""因子元数据管理模块。
提供基于DuckDB查询JSONL文件、零拷贝输出Polars DataFrame的因子管理能力。
核心组件:
- FactorManager: 元数据管理器主类
- FactorValidator: 字段校验器
- 异常类: FactorMetadataError, ValidationError等
使用示例:
>>> from src.factors.metadata import FactorManager
>>>
>>> # 初始化管理器
>>> manager = FactorManager("data/factors.jsonl")
>>>
>>> # 添加因子
>>> manager.add_factor({
... "factor_id": "F_001",
... "name": "mom_5d",
... "desc": "5日价格动量截面排序",
... "dsl": "cs_rank(close / delay(close, 5) - 1)"
... })
>>>
>>> # 查询因子
>>> df = manager.search_factors("name LIKE 'mom_%'")
>>> print(df)
"""
from src.factors.metadata.manager import FactorManager
from src.factors.metadata.validator import FactorValidator
from src.factors.metadata.exceptions import (
FactorMetadataError,
ValidationError,
DuplicateFactorError,
FactorNotFoundError,
QueryError,
FileOperationError,
)
__all__ = [
# 管理器
"FactorManager",
# 校验器
"FactorValidator",
# 异常类
"FactorMetadataError",
"ValidationError",
"DuplicateFactorError",
"FactorNotFoundError",
"QueryError",
"FileOperationError",
]

View File

@@ -0,0 +1,90 @@
"""因子元数据模块异常定义。
提供清晰的异常类型帮助定位元数据操作中的问题。
"""
class FactorMetadataError(Exception):
"""因子元数据操作基础异常。"""
pass
class ValidationError(FactorMetadataError):
"""字段校验失败异常。
当因子字典缺少核心字段或字段类型不匹配时抛出。
Attributes:
field: 校验失败的字段名
message: 详细错误信息
"""
def __init__(self, field: str, message: str) -> None:
self.field = field
self.message = message
super().__init__(f"字段 '{field}' 校验失败: {message}")
class DuplicateFactorError(FactorMetadataError):
"""因子重复异常。
当尝试添加已存在的factor_id时抛出。
Attributes:
factor_id: 重复的因子ID
"""
def __init__(self, factor_id: str) -> None:
self.factor_id = factor_id
super().__init__(f"因子 '{factor_id}' 已存在")
class FactorNotFoundError(FactorMetadataError):
"""因子未找到异常。
当查询不存在的factor_id时抛出。
Attributes:
factor_id: 未找到的因子ID
"""
def __init__(self, factor_id: str) -> None:
self.factor_id = factor_id
super().__init__(f"因子 '{factor_id}' 不存在")
class QueryError(FactorMetadataError):
"""DuckDB查询异常。
当SQL查询执行失败时抛出。
Attributes:
sql: 失败的SQL语句
original_error: 原始异常
"""
def __init__(self, sql: str, original_error: Exception) -> None:
self.sql = sql
self.original_error = original_error
super().__init__(f"查询执行失败: {original_error}\nSQL: {sql}")
class FileOperationError(FactorMetadataError):
"""文件操作异常。
当文件读写操作失败时抛出。
Attributes:
filepath: 操作的文件路径
operation: 操作类型read/write
original_error: 原始异常
"""
def __init__(
self, filepath: str, operation: str, original_error: Exception
) -> None:
self.filepath = filepath
self.operation = operation
self.original_error = original_error
super().__init__(f"文件{operation}失败 '{filepath}': {original_error}")

View File

@@ -0,0 +1,311 @@
"""因子元数据管理器。
提供基于DuckDB查询JSONL文件、零拷贝输出Polars DataFrame的因子管理能力。
支持:
- JSONL文件的读写操作
- DuckDB SQL查询输出Polars DataFrame
- 字段校验和异常处理
- 扩展字段的自由定义
使用示例:
>>> from src.factors.metadata import FactorManager
>>> manager = FactorManager("data/factors.jsonl")
>>>
>>> # 添加因子
>>> manager.add_factor({
... "factor_id": "F_001",
... "name": "mom_5d",
... "desc": "5日价格动量",
... "dsl": "cs_rank(close / delay(close, 5) - 1)",
... "category": "momentum" # 扩展字段
... })
>>>
>>> # 查询因子
>>> df = manager.search_factors("category = 'momentum'")
>>> print(df)
"""
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
import duckdb
import polars as pl
from src.factors.metadata.exceptions import (
DuplicateFactorError,
FactorNotFoundError,
FileOperationError,
QueryError,
)
from src.factors.metadata.validator import FactorValidator
class FactorManager:
"""因子元数据管理器。
封装底层JSONL文件I/O和DuckDB SQL查询提供高层次的因子管理接口。
Attributes:
filepath: JSONL文件的完整路径
_conn: DuckDB连接对象懒加载
Example:
>>> manager = FactorManager("data/factors.jsonl")
>>> df = manager.get_factors_by_name("mom_5d")
>>> print(df["dsl"][0])
"""
def __init__(self, filepath: str) -> None:
"""初始化因子管理器。
如果文件不存在会自动创建空的JSONL文件。
Args:
filepath: JSONL文件路径相对或绝对路径
Raises:
FileOperationError: 当文件创建失败时
"""
self.filepath = Path(filepath).resolve()
self._conn: Optional[duckdb.DuckDBPyConnection] = None
# 确保文件存在
self._ensure_file_exists()
def _ensure_file_exists(self) -> None:
"""确保JSONL文件存在。
如果文件不存在,创建空文件和父目录。
Raises:
FileOperationError: 当文件创建失败时
"""
if not self.filepath.exists():
try:
self.filepath.parent.mkdir(parents=True, exist_ok=True)
self.filepath.write_text("", encoding="utf-8")
except Exception as e:
raise FileOperationError(
filepath=str(self.filepath), operation="create", original_error=e
)
def _get_connection(self) -> duckdb.DuckDBPyConnection:
"""获取DuckDB连接懒加载
Returns:
DuckDB连接对象
"""
if self._conn is None:
self._conn = duckdb.connect(":memory:")
# DuckDB 0.7+ 内置JSON支持无需手动安装扩展
return self._conn
def _close_connection(self) -> None:
"""关闭DuckDB连接。"""
if self._conn is not None:
self._conn.close()
self._conn = None
def add_factor(self, factor_dict: Dict[str, Any]) -> None:
"""添加新因子到JSONL文件。
校验核心字段后,将因子追加到文件末尾。
Args:
factor_dict: 因子字典,必须包含核心字段
Raises:
ValidationError: 当缺少核心字段或类型错误
DuplicateFactorError: 当factor_id已存在
FileOperationError: 当文件写入失败
Example:
>>> manager.add_factor({
... "factor_id": "F_001",
... "name": "mom_5d",
... "desc": "5日动量",
... "dsl": "cs_rank(close / delay(close, 5) - 1)",
... "category": "momentum" # 扩展字段
... })
"""
# 校验字段
FactorValidator.validate(factor_dict)
# 检查factor_id是否已存在
factor_id = factor_dict["factor_id"]
existing = self._get_factor_by_id(factor_id)
if existing is not None:
raise DuplicateFactorError(factor_id)
# 追加到文件
try:
with open(self.filepath, "a", encoding="utf-8") as f:
json_line = json.dumps(factor_dict, ensure_ascii=False)
f.write(json_line + "\n")
except Exception as e:
raise FileOperationError(
filepath=str(self.filepath), operation="write", original_error=e
)
def get_factors_by_name(self, name: str) -> pl.DataFrame:
"""根据名称查询因子。
使用DuckDB执行SQL查询返回Polars DataFrame。
Args:
name: 因子名称(如 'mom_5d'
Returns:
匹配因子的Polars DataFrame可能为空
Raises:
QueryError: 当SQL查询执行失败时
Example:
>>> df = manager.get_factors_by_name("mom_5d")
>>> if len(df) > 0:
... print(df["dsl"][0])
"""
sql = f"""
SELECT *
FROM read_json_auto('{self.filepath}')
WHERE name = '{name}'
"""
return self._execute_query(sql)
def get_factor_dsl(self, factor_id: str) -> str:
"""根据ID获取因子的DSL公式。
Args:
factor_id: 因子唯一标识符
Returns:
DSL公式字符串
Raises:
FactorNotFoundError: 当因子不存在时
QueryError: 当SQL查询执行失败时
Example:
>>> dsl = manager.get_factor_dsl("F_001")
>>> print(dsl)
'cs_rank(close / delay(close, 5) - 1)'
"""
sql = f"""
SELECT dsl
FROM read_json_auto('{self.filepath}')
WHERE factor_id = '{factor_id}'
"""
df = self._execute_query(sql)
if len(df) == 0:
raise FactorNotFoundError(factor_id)
return df["dsl"][0]
def search_factors(self, sql_condition: str) -> pl.DataFrame:
"""使用自定义SQL条件查询因子。
允许传入WHERE子句条件利用DuckDB进行高级筛选。
Args:
sql_condition: SQL WHERE子句条件"category = 'momentum'"
Returns:
匹配因子的Polars DataFrame
Raises:
QueryError: 当SQL查询执行失败时
Example:
>>> # 查询动量类因子
>>> df = manager.search_factors("category = 'momentum'")
>>>
>>> # 查询包含特定关键词的因子
>>> df = manager.search_factors("desc LIKE '%动量%'")
>>>
>>> # 复杂条件查询
>>> df = manager.search_factors(
... "category = 'momentum' AND name LIKE 'mom_%'"
... )
"""
sql = f"""
SELECT *
FROM read_json_auto('{self.filepath}')
WHERE {sql_condition}
"""
return self._execute_query(sql)
def get_all_factors(self) -> pl.DataFrame:
"""获取所有因子。
Returns:
所有因子的Polars DataFrame
Raises:
QueryError: 当SQL查询执行失败时
"""
sql = f"""
SELECT *
FROM read_json_auto('{self.filepath}')
"""
return self._execute_query(sql)
def _get_factor_by_id(self, factor_id: str) -> Optional[Dict[str, Any]]:
"""根据ID获取因子内部方法
Args:
factor_id: 因子唯一标识符
Returns:
因子字典不存在返回None
"""
try:
sql = f"""
SELECT *
FROM read_json_auto('{self.filepath}')
WHERE factor_id = '{factor_id}'
"""
df = self._execute_query(sql)
if len(df) == 0:
return None
# 转换为字典
return df.to_dicts()[0]
except QueryError:
return None
def _execute_query(self, sql: str) -> pl.DataFrame:
"""执行DuckDB SQL查询并返回Polars DataFrame。
Args:
sql: SQL查询语句
Returns:
查询结果的Polars DataFrame
Raises:
QueryError: 当SQL执行失败时
"""
try:
conn = self._get_connection()
result = conn.execute(sql).pl()
return result
except Exception as e:
raise QueryError(sql, e)
def __enter__(self) -> "FactorManager":
"""上下文管理器入口。"""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""上下文管理器出口,确保连接关闭。"""
self._close_connection()
def __repr__(self) -> str:
"""返回管理器字符串表示。"""
return f"FactorManager('{self.filepath}')"

View File

@@ -0,0 +1,95 @@
"""因子元数据字段校验器。
提供核心字段的校验逻辑,支持扩展字段的自由定义。
"""
from typing import Any, Dict, List, Set
from src.factors.metadata.exceptions import ValidationError
class FactorValidator:
"""因子字段校验器。
校验核心字段的存在性和类型,允许任意扩展字段。
核心字段:
- factor_id (str): 全局唯一标识符
- name (str): 可读短名称
- desc (str): 详细描述
- dsl (str): DSL计算公式
Attributes:
required_fields: 必需字段集合
"""
REQUIRED_FIELDS: Set[str] = {"factor_id", "name", "desc", "dsl"}
@classmethod
def validate(cls, factor_dict: Dict[str, Any]) -> None:
"""校验因子字典。
检查是否包含所有核心字段,以及字段类型是否正确。
Args:
factor_dict: 待校验的因子字典
Raises:
ValidationError: 当缺少核心字段或字段类型错误时
Example:
>>> factor = {
... "factor_id": "F_001",
... "name": "mom_5d",
... "desc": "5日动量",
... "dsl": "cs_rank(close / delay(close, 5) - 1)"
... }
>>> FactorValidator.validate(factor) # 通过
"""
# 检查核心字段是否存在
missing_fields = cls.REQUIRED_FIELDS - set(factor_dict.keys())
if missing_fields:
raise ValidationError(
field=list(missing_fields)[0],
message=f"缺少必需字段: {sorted(missing_fields)}",
)
# 检查核心字段类型
for field in cls.REQUIRED_FIELDS:
value = factor_dict[field]
if not isinstance(value, str):
raise ValidationError(
field=field,
message=f"期望类型 str实际类型 {type(value).__name__}",
)
# 检查非空字符串
if not value.strip():
raise ValidationError(field=field, message="字段不能为空字符串")
@classmethod
def get_required_fields(cls) -> List[str]:
"""获取必需字段列表。
Returns:
按字母排序的必需字段列表
"""
return sorted(cls.REQUIRED_FIELDS)
@classmethod
def is_valid(cls, factor_dict: Dict[str, Any]) -> bool:
"""快速校验因子字典是否有效。
不抛出异常,仅返回布尔结果。
Args:
factor_dict: 待校验的因子字典
Returns:
是否通过校验
"""
try:
cls.validate(factor_dict)
return True
except ValidationError:
return False