Files
NewStock/main/factor/polars_factors.py

238 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Polars因子主入口 - 整合所有Polars-based因子计算
提供统一的接口来应用所有类别的因子
"""
import polars as pl
from typing import Dict, List, Optional, Any
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 因子类别映射
FACTOR_CATEGORIES = {
'money_flow': '资金流因子',
'chip': '筹码分布因子',
'volatility': '波动率因子',
'volume': '成交量因子',
'technical': '技术指标因子',
'sentiment': '情绪因子',
'momentum': '动量因子',
'complex': '复杂组合因子'
}
def apply_money_flow_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用资金流因子"""
try:
from polars_money_flow_factors import apply_money_flow_factors as _apply_money_flow
return _apply_money_flow(df, operators)
except ImportError as e:
logger.warning(f"无法导入资金流因子模块: {e}")
return df
def apply_chip_distribution_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用筹码分布因子"""
try:
from polars_chip_factors import apply_chip_distribution_factors as _apply_chip
return _apply_chip(df, operators)
except ImportError as e:
logger.warning(f"无法导入筹码分布因子模块: {e}")
return df
def apply_volatility_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用波动率因子"""
try:
from polars_volatility_factors import apply_volatility_factors as _apply_volatility
return _apply_volatility(df, operators)
except ImportError as e:
logger.warning(f"无法导入波动率因子模块: {e}")
return df
def apply_volume_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用成交量因子"""
try:
from polars_volume_factors import apply_volume_factors as _apply_volume
return _apply_volume(df, operators)
except ImportError as e:
logger.warning(f"无法导入成交量因子模块: {e}")
return df
def apply_technical_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用技术指标因子"""
try:
from polars_technical_factors import apply_technical_factors as _apply_technical
return _apply_technical(df, operators)
except ImportError as e:
logger.warning(f"无法导入技术指标因子模块: {e}")
return df
def apply_sentiment_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用情绪因子"""
try:
from polars_sentiment_factors import apply_sentiment_factors as _apply_sentiment
return _apply_sentiment(df, operators)
except ImportError as e:
logger.warning(f"无法导入情绪因子模块: {e}")
return df
def apply_momentum_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用动量因子"""
try:
from polars_momentum_factors import apply_momentum_factors as _apply_momentum
return _apply_momentum(df, operators)
except ImportError as e:
logger.warning(f"无法导入动量因子模块: {e}")
return df
def apply_complex_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""应用复杂组合因子"""
try:
from polars_complex_factors import apply_complex_factors as _apply_complex
return _apply_complex(df, operators)
except ImportError as e:
logger.warning(f"无法导入复杂组合因子模块: {e}")
return df
def apply_all_factors(df: pl.DataFrame,
factor_categories: List[str] = None,
exclude_categories: List[str] = None) -> pl.DataFrame:
"""
应用所有类别的因子
Args:
df: 输入的Polars DataFrame必须包含必需的列
factor_categories: 要应用的因子类别列表如果为None则应用所有类别
exclude_categories: 要排除的因子类别列表
Returns:
添加了所有因子的DataFrame
"""
if factor_categories is None:
factor_categories = list(FACTOR_CATEGORIES.keys())
if exclude_categories:
factor_categories = [cat for cat in factor_categories if cat not in exclude_categories]
logger.info(f"开始应用因子类别: {factor_categories}")
result_df = df
total_factors = 0
# 因子类别到函数的映射
factor_functions = {
'money_flow': apply_money_flow_factors,
'chip': apply_chip_distribution_factors,
'volatility': apply_volatility_factors,
'volume': apply_volume_factors,
'technical': apply_technical_factors,
'sentiment': apply_sentiment_factors,
'momentum': apply_momentum_factors,
'complex': apply_complex_factors
}
for category in factor_categories:
if category not in factor_functions:
logger.warning(f"未知的因子类别: {category}")
continue
logger.info(f"应用{FACTOR_CATEGORIES[category]}...")
try:
before_cols = len(result_df.columns)
result_df = factor_functions[category](result_df)
after_cols = len(result_df.columns)
new_factors = after_cols - before_cols
logger.info(f"{FACTOR_CATEGORIES[category]}应用完成,新增{new_factors}个因子")
total_factors += new_factors
except Exception as e:
logger.error(f"应用{FACTOR_CATEGORIES[category]}时出错: {e}")
continue
logger.info(f"因子应用完成,总共新增{total_factors}个因子")
return result_df
def get_factor_info() -> Dict[str, Any]:
"""
获取因子信息
Returns:
包含因子类别信息的字典
"""
return {
'categories': FACTOR_CATEGORIES,
'total_categories': len(FACTOR_CATEGORIES),
'category_descriptions': list(FACTOR_CATEGORIES.values())
}
def validate_required_columns(df: pl.DataFrame, factor_categories: List[str] = None) -> Dict[str, List[str]]:
"""
验证DataFrame是否包含必需的列
Args:
df: 输入的Polars DataFrame
factor_categories: 要验证的因子类别列表
Returns:
包含缺失列信息的字典
"""
if factor_categories is None:
factor_categories = list(FACTOR_CATEGORIES.keys())
missing_columns = {}
# 基础必需列
base_required = ['ts_code', 'trade_date']
missing_base = [col for col in base_required if col not in df.columns]
if missing_base:
missing_columns['base'] = missing_base
# 各因子类别的必需列
category_requirements = {
'money_flow': ['buy_lg_vol', 'buy_elg_vol', 'sell_lg_vol', 'sell_elg_vol', 'vol'],
'chip': ['cost_95pct', 'cost_85pct', 'cost_50pct', 'cost_15pct', 'cost_5pct',
'winner_rate', 'weight_avg', 'close'],
'volatility': ['pct_chg'],
'volume': ['vol', 'turnover_rate', 'volume_ratio', 'amount'],
'technical': ['open', 'high', 'low', 'close', 'vol'],
'sentiment': ['pct_chg', 'vol', 'volume_ratio'],
'momentum': ['close', 'turnover_rate'],
'complex': ['close', 'vol', 'pct_chg', 'turnover_rate', 'winner_rate']
}
for category in factor_categories:
if category in category_requirements:
required_cols = category_requirements[category]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
missing_columns[category] = missing_cols
return missing_columns
# 向后兼容的函数名
apply_factors = apply_all_factors
if __name__ == "__main__":
# 测试代码
print("Polars因子系统已加载")
print("可用的因子类别:")
for key, description in FACTOR_CATEGORIES.items():
print(f" {key}: {description}")