Files
NewStock/main/factor/polars_chip_factors.py

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
筹码分布因子 - 使用Polars实现
包含筹码集中度、分布偏度、浮筹比例等相关因子计算
"""
import polars as pl
import numpy as np
from typing import Dict, List, Optional, Any
from operator_framework import StockWiseOperator, OperatorConfig
class ChipConcentrationOperator(StockWiseOperator):
"""筹码集中度算子"""
def __init__(self):
config = OperatorConfig(
name="chip_concentration",
description="筹码集中度",
required_columns=['cost_95pct', 'cost_5pct', 'close'],
output_columns=['chip_concentration_range'],
parameters={}
)
super().__init__(config)
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算筹码集中度"""
epsilon = 1e-8
# 计算筹码集中度范围,相对于当前价格标准化
concentration_range = (pl.col('cost_95pct') - pl.col('cost_5pct')) / (pl.col('close') + epsilon)
return stock_df.with_columns(concentration_range.alias('chip_concentration_range'))
class ChipSkewnessOperator(StockWiseOperator):
"""筹码分布偏度算子"""
def __init__(self):
config = OperatorConfig(
name="chip_skewness",
description="筹码分布偏度",
required_columns=['weight_avg', 'cost_50pct'],
output_columns=['chip_skewness'],
parameters={}
)
super().__init__(config)
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算筹码分布偏度"""
epsilon = 1e-8
# 计算偏度:(加权平均成本 - 中位数成本) / 中位数成本
skewness = (pl.col('weight_avg') - pl.col('cost_50pct')) / (pl.col('cost_50pct') + epsilon)
return stock_df.with_columns(skewness.alias('chip_skewness'))
class FloatingChipProxyOperator(StockWiseOperator):
"""浮筹比例代理算子"""
def __init__(self):
config = OperatorConfig(
name="floating_chip_proxy",
description="浮筹比例代理",
required_columns=['close', 'cost_15pct', 'winner_rate'],
output_columns=['floating_chip_proxy'],
parameters={}
)
super().__init__(config)
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算浮筹比例代理"""
# 计算价格与15%成本线的距离
price_dist_cost15 = (pl.col('close') - pl.col('cost_15pct')) / pl.col('close')
# 计算浮筹代理:获利盘比例 * max(0, 价格距离)
floating_proxy = pl.col('winner_rate') * pl.max_horizontal(0, price_dist_cost15)
return stock_df.with_columns(floating_proxy.alias('floating_chip_proxy'))
class CostSupportChangeOperator(StockWiseOperator):
"""成本支撑强度变化算子"""
def __init__(self, n: int = 1):
config = OperatorConfig(
name=f"cost_support_change_{n}",
description=f"{n}日成本支撑强度变化",
required_columns=['cost_15pct'],
output_columns=[f'cost_support_15pct_change_{n}'],
parameters={'n': n}
)
super().__init__(config)
self.n = n
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算成本支撑强度变化"""
# 计算百分比变化
pct_change = pl.col('cost_15pct').pct_change(self.n) * 100
return stock_df.with_columns(pct_change.alias(f'cost_support_15pct_change_{self.n}'))
class WinnerPriceZoneOperator(StockWiseOperator):
"""获利盘压力/支撑区分类算子"""
def __init__(self):
config = OperatorConfig(
name="winner_price_zone",
description="获利盘压力/支撑区分类",
required_columns=['close', 'cost_85pct', 'cost_15pct', 'cost_50pct', 'winner_rate'],
output_columns=['cat_winner_price_zone'],
parameters={}
)
super().__init__(config)
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算获利盘压力/支撑区分类"""
# 定义条件
conditions = [
# 1: 高风险区 (高位 & 高获利盘)
(pl.col('close') > pl.col('cost_85pct')) & (pl.col('winner_rate') > 0.8),
# 2: 低潜力区 (低位 & 低获利盘)
(pl.col('close') < pl.col('cost_15pct')) & (pl.col('winner_rate') < 0.2),
# 3: 中上获利区 (中高位 & 多数获利)
(pl.col('close') > pl.col('cost_50pct')) & (pl.col('winner_rate') > 0.5),
# 4: 中下亏损区 (中低位 & 多数亏损)
(pl.col('close') < pl.col('cost_50pct')) & (pl.col('winner_rate') < 0.5),
]
choices = [1, 2, 3, 4]
# 使用select函数进行分类
zone_classification = pl.select(
conditions=conditions,
choices=choices,
default=0 # 0: 其他情况
)
return stock_df.with_columns(zone_classification.alias('cat_winner_price_zone'))
class FlowChipConsistencyOperator(StockWiseOperator):
"""主力行为与筹码结构一致性算子"""
def __init__(self):
config = OperatorConfig(
name="flow_chip_consistency",
description="主力行为与筹码结构一致性",
required_columns=['buy_lg_vol', 'buy_elg_vol', 'sell_lg_vol', 'sell_elg_vol',
'close', 'cost_15pct', 'cost_50pct'],
output_columns=['flow_chip_consistency'],
parameters={}
)
super().__init__(config)
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算主力行为与筹码结构一致性"""
# 计算大单净买入量
lg_elg_net_buy_vol = (
pl.col('buy_lg_vol') + pl.col('buy_elg_vol') -
pl.col('sell_lg_vol') - pl.col('sell_elg_vol')
)
# 判断价格是否接近下方筹码密集区
price_near_low_support = (
(pl.col('close') > pl.col('cost_15pct')) &
(pl.col('close') < pl.col('cost_50pct'))
)
# 计算一致性:主力净买入 * 价格位置指示器
consistency = lg_elg_net_buy_vol * price_near_low_support.cast(int)
return stock_df.with_columns(consistency.alias('flow_chip_consistency'))
class ProfitTakingVsAbsorptionOperator(StockWiseOperator):
"""获利了结压力/承接盘强度算子"""
def __init__(self):
config = OperatorConfig(
name="profit_taking_vs_absorb",
description="获利了结压力vs承接盘强度",
required_columns=['buy_lg_vol', 'buy_elg_vol', 'sell_lg_vol', 'sell_elg_vol',
'winner_rate'],
output_columns=['profit_taking_vs_absorb'],
parameters={}
)
super().__init__(config)
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算获利了结压力vs承接盘强度"""
# 计算大单净买入量
lg_elg_net_buy_vol = (
pl.col('buy_lg_vol') + pl.col('buy_elg_vol') -
pl.col('sell_lg_vol') - pl.col('sell_elg_vol')
)
# 判断高获利盘
high_winner_rate_flag = (pl.col('winner_rate') > 0.7).cast(int)
# 计算因子:主力净买入 * 高获利盘指示器
# 正值表示高获利盘下主力仍在买入(承接),负值表示主力在卖出(了结)
factor = lg_elg_net_buy_vol * high_winner_rate_flag
return stock_df.with_columns(factor.alias('profit_taking_vs_absorb'))
class ChipConcentrationChangeOperator(StockWiseOperator):
"""筹码集中度变化算子"""
def __init__(self, n: int = 20):
config = OperatorConfig(
name=f"chip_conc_std_{n}",
description=f"{n}日筹码集中度变化",
required_columns=['cost_85pct', 'cost_15pct', 'weight_avg'],
output_columns=[f'chip_conc_std_{n}'],
parameters={'n': n}
)
super().__init__(config)
self.n = n
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算筹码集中度变化"""
epsilon = 1e-8
# 计算成本区间标准化值
cost_range_norm = (pl.col('cost_85pct') - pl.col('cost_15pct')) / (pl.col('weight_avg') + epsilon)
# 计算滚动标准差
conc_std = cost_range_norm.rolling_std(window=self.n)
return stock_df.with_columns(conc_std.alias(f'chip_conc_std_{self.n}'))
class CostBreakoutConfirmationOperator(StockWiseOperator):
"""成本突破确认算子"""
def __init__(self, m: int = 5):
config = OperatorConfig(
name=f"cost_break_confirm_cnt_{m}",
description=f"{m}日成本突破确认",
required_columns=['close', 'cost_85pct', 'cost_15pct',
'buy_lg_vol', 'buy_elg_vol', 'sell_lg_vol', 'sell_elg_vol'],
output_columns=[f'cost_break_confirm_cnt_{m}'],
parameters={'m': m}
)
super().__init__(config)
self.m = m
def apply_stock(self, stock_df: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""计算成本突破确认"""
# 获取前一日的成本位
prev_cost_85 = pl.col('cost_85pct').shift(1)
prev_cost_15 = pl.col('cost_15pct').shift(1)
# 判断突破
break_up = pl.col('close') > prev_cost_85
break_down = pl.col('close') < prev_cost_15
# 计算大单净流
net_lg_flow_vol = (
pl.col('buy_lg_vol') + pl.col('buy_elg_vol') -
pl.col('sell_lg_vol') - pl.col('sell_elg_vol')
)
# 判断确认信号
confirm_up = break_up & (net_lg_flow_vol > 0)
confirm_down = break_down & (net_lg_flow_vol < 0)
# 计算净确认信号
net_confirm = confirm_up.cast(int) - confirm_down.cast(int)
# 计算m日累计
confirm_cnt = net_confirm.rolling_sum(window=self.m)
return stock_df.with_columns(confirm_cnt.alias(f'cost_break_confirm_cnt_{self.m}'))
# 筹码分布因子集合
CHIP_DISTRIBUTION_OPERATORS = [
ChipConcentrationOperator(),
ChipSkewnessOperator(),
FloatingChipProxyOperator(),
CostSupportChangeOperator(),
WinnerPriceZoneOperator(),
FlowChipConsistencyOperator(),
ProfitTakingVsAbsorptionOperator(),
ChipConcentrationChangeOperator(),
CostBreakoutConfirmationOperator(),
]
def apply_chip_distribution_factors(df: pl.DataFrame, operators: List = None) -> pl.DataFrame:
"""
应用所有筹码分布因子
Args:
df: 输入的Polars DataFrame
operators: 要应用的算子列表如果为None则使用默认列表
Returns:
添加了筹码分布因子的DataFrame
"""
if operators is None:
operators = CHIP_DISTRIBUTION_OPERATORS
result_df = df
for operator in operators:
result_df = operator(result_df)
return result_df