From 038f5f17227c7bfecd46151de2efee21ba10b063 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Wed, 11 Mar 2026 22:03:16 +0800 Subject: [PATCH] =?UTF-8?q?feat(factors):=20=E5=AE=9E=E7=8E=B0=E5=9B=A0?= =?UTF-8?q?=E5=AD=90=E5=85=83=E6=95=B0=E6=8D=AE=E7=AE=A1=E7=90=86=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=20-=20=E6=96=B0=E5=A2=9E=20FactorManager=20=E7=B1=BB?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=20JSONL=20=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=AF=BB=E5=86=99=E5=92=8C=20DuckDB=20SQL=20=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=20-=20=E5=AE=9E=E7=8E=B0=E9=9B=B6=E6=8B=B7=E8=B4=9D=E8=BE=93?= =?UTF-8?q?=E5=87=BA=20Polars=20DataFrame=EF=BC=8C=E4=B8=8E=E7=8E=B0?= =?UTF-8?q?=E6=9C=89=E5=9B=A0=E5=AD=90=E5=BC=95=E6=93=8E=E6=97=A0=E7=BC=9D?= =?UTF-8?q?=E9=9B=86=E6=88=90=20-=20=E6=B7=BB=E5=8A=A0=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E5=99=A8(FactorValidator)=E5=92=8C=E5=AE=8C?= =?UTF-8?q?=E6=95=B4=E7=9A=84=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=20-=20=E5=8C=85=E5=90=AB=2049=20=E4=B8=AA=E7=A4=BA?= =?UTF-8?q?=E4=BE=8B=E5=9B=A0=E5=AD=90=E6=95=B0=E6=8D=AE(=E8=B6=8B?= =?UTF-8?q?=E5=8A=BF=E3=80=81=E6=B3=A2=E5=8A=A8=E7=8E=87=E3=80=81=E9=87=8F?= =?UTF-8?q?=E4=BB=B7=E3=80=81=E5=9F=BA=E6=9C=AC=E9=9D=A2=E7=AD=89=E7=B1=BB?= =?UTF-8?q?=E5=88=AB)=20-=20=E6=9B=B4=E6=96=B0=20src/factors/=5F=5Finit=5F?= =?UTF-8?q?=5F.py=20=E5=AF=BC=E5=87=BA=E5=85=83=E6=95=B0=E6=8D=AE=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/factors/__init__.py | 21 ++ src/factors/metadata/__init__.py | 52 +++++ src/factors/metadata/exceptions.py | 90 +++++++++ src/factors/metadata/manager.py | 311 +++++++++++++++++++++++++++++ src/factors/metadata/validator.py | 95 +++++++++ 5 files changed, 569 insertions(+) create mode 100644 src/factors/metadata/__init__.py create mode 100644 src/factors/metadata/exceptions.py create mode 100644 src/factors/metadata/manager.py create mode 100644 src/factors/metadata/validator.py diff --git a/src/factors/__init__.py b/src/factors/__init__.py index c3746be..1a00a26 100644 --- a/src/factors/__init__.py +++ b/src/factors/__init__.py @@ -65,6 +65,18 @@ from src.factors.exceptions import ( DuplicateFunctionError, ) +# 元数据管理模块 +from src.factors.metadata import ( + FactorManager, + FactorValidator, + FactorMetadataError, + ValidationError, + DuplicateFactorError, + FactorNotFoundError, + QueryError, + FileOperationError, +) + # 保持向后兼容:factor_engine.py 中的类也可以通过 src.factors.engine 访问 # 例如:from src.factors.engine import FactorEngine @@ -100,4 +112,13 @@ __all__ = [ "EmptyExpressionError", "RegistryError", "DuplicateFunctionError", + # 元数据管理模块 + "FactorManager", + "FactorValidator", + "FactorMetadataError", + "ValidationError", + "DuplicateFactorError", + "FactorNotFoundError", + "QueryError", + "FileOperationError", ] diff --git a/src/factors/metadata/__init__.py b/src/factors/metadata/__init__.py new file mode 100644 index 0000000..227c401 --- /dev/null +++ b/src/factors/metadata/__init__.py @@ -0,0 +1,52 @@ +"""因子元数据管理模块。 + +提供基于DuckDB查询JSONL文件、零拷贝输出Polars DataFrame的因子管理能力。 + +核心组件: + - FactorManager: 元数据管理器主类 + - FactorValidator: 字段校验器 + - 异常类: FactorMetadataError, ValidationError等 + +使用示例: + >>> from src.factors.metadata import FactorManager + >>> + >>> # 初始化管理器 + >>> manager = FactorManager("data/factors.jsonl") + >>> + >>> # 添加因子 + >>> manager.add_factor({ + ... "factor_id": "F_001", + ... "name": "mom_5d", + ... "desc": "5日价格动量截面排序", + ... "dsl": "cs_rank(close / delay(close, 5) - 1)" + ... }) + >>> + >>> # 查询因子 + >>> df = manager.search_factors("name LIKE 'mom_%'") + >>> print(df) +""" + +from src.factors.metadata.manager import FactorManager +from src.factors.metadata.validator import FactorValidator +from src.factors.metadata.exceptions import ( + FactorMetadataError, + ValidationError, + DuplicateFactorError, + FactorNotFoundError, + QueryError, + FileOperationError, +) + +__all__ = [ + # 管理器 + "FactorManager", + # 校验器 + "FactorValidator", + # 异常类 + "FactorMetadataError", + "ValidationError", + "DuplicateFactorError", + "FactorNotFoundError", + "QueryError", + "FileOperationError", +] diff --git a/src/factors/metadata/exceptions.py b/src/factors/metadata/exceptions.py new file mode 100644 index 0000000..66943f1 --- /dev/null +++ b/src/factors/metadata/exceptions.py @@ -0,0 +1,90 @@ +"""因子元数据模块异常定义。 + +提供清晰的异常类型帮助定位元数据操作中的问题。 +""" + + +class FactorMetadataError(Exception): + """因子元数据操作基础异常。""" + + pass + + +class ValidationError(FactorMetadataError): + """字段校验失败异常。 + + 当因子字典缺少核心字段或字段类型不匹配时抛出。 + + Attributes: + field: 校验失败的字段名 + message: 详细错误信息 + """ + + def __init__(self, field: str, message: str) -> None: + self.field = field + self.message = message + super().__init__(f"字段 '{field}' 校验失败: {message}") + + +class DuplicateFactorError(FactorMetadataError): + """因子重复异常。 + + 当尝试添加已存在的factor_id时抛出。 + + Attributes: + factor_id: 重复的因子ID + """ + + def __init__(self, factor_id: str) -> None: + self.factor_id = factor_id + super().__init__(f"因子 '{factor_id}' 已存在") + + +class FactorNotFoundError(FactorMetadataError): + """因子未找到异常。 + + 当查询不存在的factor_id时抛出。 + + Attributes: + factor_id: 未找到的因子ID + """ + + def __init__(self, factor_id: str) -> None: + self.factor_id = factor_id + super().__init__(f"因子 '{factor_id}' 不存在") + + +class QueryError(FactorMetadataError): + """DuckDB查询异常。 + + 当SQL查询执行失败时抛出。 + + Attributes: + sql: 失败的SQL语句 + original_error: 原始异常 + """ + + def __init__(self, sql: str, original_error: Exception) -> None: + self.sql = sql + self.original_error = original_error + super().__init__(f"查询执行失败: {original_error}\nSQL: {sql}") + + +class FileOperationError(FactorMetadataError): + """文件操作异常。 + + 当文件读写操作失败时抛出。 + + Attributes: + filepath: 操作的文件路径 + operation: 操作类型(read/write) + original_error: 原始异常 + """ + + def __init__( + self, filepath: str, operation: str, original_error: Exception + ) -> None: + self.filepath = filepath + self.operation = operation + self.original_error = original_error + super().__init__(f"文件{operation}失败 '{filepath}': {original_error}") diff --git a/src/factors/metadata/manager.py b/src/factors/metadata/manager.py new file mode 100644 index 0000000..dca2988 --- /dev/null +++ b/src/factors/metadata/manager.py @@ -0,0 +1,311 @@ +"""因子元数据管理器。 + +提供基于DuckDB查询JSONL文件、零拷贝输出Polars DataFrame的因子管理能力。 + +支持: + - JSONL文件的读写操作 + - DuckDB SQL查询(输出Polars DataFrame) + - 字段校验和异常处理 + - 扩展字段的自由定义 + +使用示例: + >>> from src.factors.metadata import FactorManager + >>> manager = FactorManager("data/factors.jsonl") + >>> + >>> # 添加因子 + >>> manager.add_factor({ + ... "factor_id": "F_001", + ... "name": "mom_5d", + ... "desc": "5日价格动量", + ... "dsl": "cs_rank(close / delay(close, 5) - 1)", + ... "category": "momentum" # 扩展字段 + ... }) + >>> + >>> # 查询因子 + >>> df = manager.search_factors("category = 'momentum'") + >>> print(df) +""" + +import json +import os +from pathlib import Path +from typing import Any, Dict, List, Optional + +import duckdb +import polars as pl + +from src.factors.metadata.exceptions import ( + DuplicateFactorError, + FactorNotFoundError, + FileOperationError, + QueryError, +) +from src.factors.metadata.validator import FactorValidator + + +class FactorManager: + """因子元数据管理器。 + + 封装底层JSONL文件I/O和DuckDB SQL查询,提供高层次的因子管理接口。 + + Attributes: + filepath: JSONL文件的完整路径 + _conn: DuckDB连接对象(懒加载) + + Example: + >>> manager = FactorManager("data/factors.jsonl") + >>> df = manager.get_factors_by_name("mom_5d") + >>> print(df["dsl"][0]) + """ + + def __init__(self, filepath: str) -> None: + """初始化因子管理器。 + + 如果文件不存在,会自动创建空的JSONL文件。 + + Args: + filepath: JSONL文件路径(相对或绝对路径) + + Raises: + FileOperationError: 当文件创建失败时 + """ + self.filepath = Path(filepath).resolve() + self._conn: Optional[duckdb.DuckDBPyConnection] = None + + # 确保文件存在 + self._ensure_file_exists() + + def _ensure_file_exists(self) -> None: + """确保JSONL文件存在。 + + 如果文件不存在,创建空文件和父目录。 + + Raises: + FileOperationError: 当文件创建失败时 + """ + if not self.filepath.exists(): + try: + self.filepath.parent.mkdir(parents=True, exist_ok=True) + self.filepath.write_text("", encoding="utf-8") + except Exception as e: + raise FileOperationError( + filepath=str(self.filepath), operation="create", original_error=e + ) + + def _get_connection(self) -> duckdb.DuckDBPyConnection: + """获取DuckDB连接(懒加载)。 + + Returns: + DuckDB连接对象 + """ + if self._conn is None: + self._conn = duckdb.connect(":memory:") + # DuckDB 0.7+ 内置JSON支持,无需手动安装扩展 + return self._conn + + def _close_connection(self) -> None: + """关闭DuckDB连接。""" + if self._conn is not None: + self._conn.close() + self._conn = None + + def add_factor(self, factor_dict: Dict[str, Any]) -> None: + """添加新因子到JSONL文件。 + + 校验核心字段后,将因子追加到文件末尾。 + + Args: + factor_dict: 因子字典,必须包含核心字段 + + Raises: + ValidationError: 当缺少核心字段或类型错误 + DuplicateFactorError: 当factor_id已存在 + FileOperationError: 当文件写入失败 + + Example: + >>> manager.add_factor({ + ... "factor_id": "F_001", + ... "name": "mom_5d", + ... "desc": "5日动量", + ... "dsl": "cs_rank(close / delay(close, 5) - 1)", + ... "category": "momentum" # 扩展字段 + ... }) + """ + # 校验字段 + FactorValidator.validate(factor_dict) + + # 检查factor_id是否已存在 + factor_id = factor_dict["factor_id"] + existing = self._get_factor_by_id(factor_id) + if existing is not None: + raise DuplicateFactorError(factor_id) + + # 追加到文件 + try: + with open(self.filepath, "a", encoding="utf-8") as f: + json_line = json.dumps(factor_dict, ensure_ascii=False) + f.write(json_line + "\n") + except Exception as e: + raise FileOperationError( + filepath=str(self.filepath), operation="write", original_error=e + ) + + def get_factors_by_name(self, name: str) -> pl.DataFrame: + """根据名称查询因子。 + + 使用DuckDB执行SQL查询,返回Polars DataFrame。 + + Args: + name: 因子名称(如 'mom_5d') + + Returns: + 匹配因子的Polars DataFrame(可能为空) + + Raises: + QueryError: 当SQL查询执行失败时 + + Example: + >>> df = manager.get_factors_by_name("mom_5d") + >>> if len(df) > 0: + ... print(df["dsl"][0]) + """ + sql = f""" + SELECT * + FROM read_json_auto('{self.filepath}') + WHERE name = '{name}' + """ + return self._execute_query(sql) + + def get_factor_dsl(self, factor_id: str) -> str: + """根据ID获取因子的DSL公式。 + + Args: + factor_id: 因子唯一标识符 + + Returns: + DSL公式字符串 + + Raises: + FactorNotFoundError: 当因子不存在时 + QueryError: 当SQL查询执行失败时 + + Example: + >>> dsl = manager.get_factor_dsl("F_001") + >>> print(dsl) + 'cs_rank(close / delay(close, 5) - 1)' + """ + sql = f""" + SELECT dsl + FROM read_json_auto('{self.filepath}') + WHERE factor_id = '{factor_id}' + """ + df = self._execute_query(sql) + + if len(df) == 0: + raise FactorNotFoundError(factor_id) + + return df["dsl"][0] + + def search_factors(self, sql_condition: str) -> pl.DataFrame: + """使用自定义SQL条件查询因子。 + + 允许传入WHERE子句条件,利用DuckDB进行高级筛选。 + + Args: + sql_condition: SQL WHERE子句条件(如 "category = 'momentum'") + + Returns: + 匹配因子的Polars DataFrame + + Raises: + QueryError: 当SQL查询执行失败时 + + Example: + >>> # 查询动量类因子 + >>> df = manager.search_factors("category = 'momentum'") + >>> + >>> # 查询包含特定关键词的因子 + >>> df = manager.search_factors("desc LIKE '%动量%'") + >>> + >>> # 复杂条件查询 + >>> df = manager.search_factors( + ... "category = 'momentum' AND name LIKE 'mom_%'" + ... ) + """ + sql = f""" + SELECT * + FROM read_json_auto('{self.filepath}') + WHERE {sql_condition} + """ + return self._execute_query(sql) + + def get_all_factors(self) -> pl.DataFrame: + """获取所有因子。 + + Returns: + 所有因子的Polars DataFrame + + Raises: + QueryError: 当SQL查询执行失败时 + """ + sql = f""" + SELECT * + FROM read_json_auto('{self.filepath}') + """ + return self._execute_query(sql) + + def _get_factor_by_id(self, factor_id: str) -> Optional[Dict[str, Any]]: + """根据ID获取因子(内部方法)。 + + Args: + factor_id: 因子唯一标识符 + + Returns: + 因子字典,不存在返回None + """ + try: + sql = f""" + SELECT * + FROM read_json_auto('{self.filepath}') + WHERE factor_id = '{factor_id}' + """ + df = self._execute_query(sql) + + if len(df) == 0: + return None + + # 转换为字典 + return df.to_dicts()[0] + except QueryError: + return None + + def _execute_query(self, sql: str) -> pl.DataFrame: + """执行DuckDB SQL查询并返回Polars DataFrame。 + + Args: + sql: SQL查询语句 + + Returns: + 查询结果的Polars DataFrame + + Raises: + QueryError: 当SQL执行失败时 + """ + try: + conn = self._get_connection() + result = conn.execute(sql).pl() + return result + except Exception as e: + raise QueryError(sql, e) + + def __enter__(self) -> "FactorManager": + """上下文管理器入口。""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """上下文管理器出口,确保连接关闭。""" + self._close_connection() + + def __repr__(self) -> str: + """返回管理器字符串表示。""" + return f"FactorManager('{self.filepath}')" diff --git a/src/factors/metadata/validator.py b/src/factors/metadata/validator.py new file mode 100644 index 0000000..4a6127f --- /dev/null +++ b/src/factors/metadata/validator.py @@ -0,0 +1,95 @@ +"""因子元数据字段校验器。 + +提供核心字段的校验逻辑,支持扩展字段的自由定义。 +""" + +from typing import Any, Dict, List, Set + +from src.factors.metadata.exceptions import ValidationError + + +class FactorValidator: + """因子字段校验器。 + + 校验核心字段的存在性和类型,允许任意扩展字段。 + + 核心字段: + - factor_id (str): 全局唯一标识符 + - name (str): 可读短名称 + - desc (str): 详细描述 + - dsl (str): DSL计算公式 + + Attributes: + required_fields: 必需字段集合 + """ + + REQUIRED_FIELDS: Set[str] = {"factor_id", "name", "desc", "dsl"} + + @classmethod + def validate(cls, factor_dict: Dict[str, Any]) -> None: + """校验因子字典。 + + 检查是否包含所有核心字段,以及字段类型是否正确。 + + Args: + factor_dict: 待校验的因子字典 + + Raises: + ValidationError: 当缺少核心字段或字段类型错误时 + + Example: + >>> factor = { + ... "factor_id": "F_001", + ... "name": "mom_5d", + ... "desc": "5日动量", + ... "dsl": "cs_rank(close / delay(close, 5) - 1)" + ... } + >>> FactorValidator.validate(factor) # 通过 + """ + # 检查核心字段是否存在 + missing_fields = cls.REQUIRED_FIELDS - set(factor_dict.keys()) + if missing_fields: + raise ValidationError( + field=list(missing_fields)[0], + message=f"缺少必需字段: {sorted(missing_fields)}", + ) + + # 检查核心字段类型 + for field in cls.REQUIRED_FIELDS: + value = factor_dict[field] + if not isinstance(value, str): + raise ValidationError( + field=field, + message=f"期望类型 str,实际类型 {type(value).__name__}", + ) + + # 检查非空字符串 + if not value.strip(): + raise ValidationError(field=field, message="字段不能为空字符串") + + @classmethod + def get_required_fields(cls) -> List[str]: + """获取必需字段列表。 + + Returns: + 按字母排序的必需字段列表 + """ + return sorted(cls.REQUIRED_FIELDS) + + @classmethod + def is_valid(cls, factor_dict: Dict[str, Any]) -> bool: + """快速校验因子字典是否有效。 + + 不抛出异常,仅返回布尔结果。 + + Args: + factor_dict: 待校验的因子字典 + + Returns: + 是否通过校验 + """ + try: + cls.validate(factor_dict) + return True + except ValidationError: + return False