""" 筹码分布因子模块 包含基于股票截面的筹码分布因子实现 """ import numpy as np import polars as pl from main.factor.operator_framework import StockWiseFactor class ChipConcentrationFactor(StockWiseFactor): """筹码集中度因子""" def __init__(self): super().__init__( name="chip_concentration", parameters={}, required_factor_ids=["cost_95pct", "cost_5pct", "close"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算筹码集中度 cost_95 = group_df["cost_95pct"] cost_5 = group_df["cost_5pct"] close = group_df["close"] chip_concentration = (cost_95 - cost_5) / (close + 1e-8) # 避免除零 return chip_concentration.alias(self.factor_id) class ChipSkewnessFactor(StockWiseFactor): """筹码分布偏度因子""" def __init__(self): super().__init__( name="chip_skewness", parameters={}, required_factor_ids=["weight_avg", "cost_50pct", "cost_50pct"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算筹码分布偏度 weight_avg = group_df["weight_avg"] cost_50 = group_df["cost_50pct"] chip_skewness = (weight_avg - cost_50) / (cost_50 + 1e-8) # 避免除零 return chip_skewness.alias(self.factor_id) class FloatingChipFactor(StockWiseFactor): """浮筹比例因子""" def __init__(self): super().__init__( name="floating_chip", parameters={}, required_factor_ids=["winner_rate", "cost_15pct", "close"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算浮筹比例 winner_rate = group_df["winner_rate"] cost_15 = group_df["cost_15pct"] close = group_df["close"] price_dist_cost15 = (close - cost_15) / (close + 1e-8) # 避免除零 floating_chip = winner_rate * pl.Series(np.maximum(0, price_dist_cost15)) return floating_chip.alias(self.factor_id) class CostSupportFactor(StockWiseFactor): """成本支撑强度因子""" def __init__(self): super().__init__( name="cost_support", parameters={}, required_factor_ids=["cost_15pct", "close"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: # 计算成本支撑强度 cost_15 = group_df["cost_15pct"] close = group_df["close"] # 成本支撑强度变化 cost_support_change = (cost_15.diff() / (cost_15 + 1e-8) * 100).alias(self.factor_id) return cost_support_change class WinnerPriceZoneFactor(StockWiseFactor): """获利盘价格区域因子""" def __init__(self): super().__init__( name="winner_price_zone", parameters={}, required_factor_ids=["close", "cost_85pct", "cost_15pct", "cost_50pct", "winner_rate"] ) def calc_factor(self, group_df: pl.DataFrame) -> pl.Series: close = group_df["close"] cost_85 = group_df["cost_85pct"] cost_15 = group_df["cost_15pct"] cost_50 = group_df["cost_50pct"] winner_rate = group_df["winner_rate"] # 使用 Polars 的 when/then/otherwise 链 winner_zone = ( pl.when((close > cost_85) & (winner_rate > 0.8)) .then(1) .when((close < cost_15) & (winner_rate < 0.2)) .then(2) .when((close > cost_50) & (winner_rate > 0.5)) .then(3) .when((close < cost_50) & (winner_rate < 0.5)) .then(4) .otherwise(0) .alias(self.factor_id) ) return winner_zone