import numpy as np import pandas as pd from dataclasses import dataclass from scipy.stats import kurtosis, skew from datetime import time, timedelta from typing import List, Tuple, Optional from src.core_data import Bar @dataclass class ProfileStats: """封装剖面图的所有核心统计量。""" vah: float val: float poc: float def calculate_profile_from_bars(bars: List[Bar], tick_size: float, va_percentage: int = 70) -> Optional[ProfileStats]: """ [全局核心函数] 从给定的K线列表计算剖面图统计量 (VAH, VAL, POC)。 """ if not bars: return None data = [] for bar in bars: price_range = np.arange(bar.low, bar.high + tick_size, tick_size) if len(price_range) == 0 or bar.volume == 0: continue volume_per_tick = bar.volume / len(price_range) for price in price_range: data.append({'price': price, 'volume': volume_per_tick}) if not data: return None df = pd.DataFrame(data) if df.empty: return None price_volume_dist = df.groupby('price')['volume'].sum().sort_index() if price_volume_dist.empty or len(price_volume_dist) < 3: return None poc = price_volume_dist.idxmax() total_volume = price_volume_dist.sum() value_area_volume_target = total_volume * (va_percentage / 100.0) current_va_volume = price_volume_dist.loc[poc] vah, val = poc, poc prices_above = price_volume_dist.index[price_volume_dist.index > poc] prices_below = price_volume_dist.index[price_volume_dist.index < poc].sort_values(ascending=False) idx_above, idx_below = 0, 0 while current_va_volume < value_area_volume_target: vol_above = price_volume_dist.loc[prices_above[idx_above]] if idx_above < len(prices_above) else 0 vol_below = price_volume_dist.loc[prices_below[idx_below]] if idx_below < len(prices_below) else 0 if vol_above == 0 and vol_below == 0: break if vol_above > vol_below: current_va_volume += vol_above vah = prices_above[idx_above] idx_above += 1 else: current_va_volume += vol_below val = prices_below[idx_below] idx_below += 1 return ProfileStats(vah=vah, val=val, poc=poc)