Files

62 lines
2.2 KiB
Python

import numpy as np
import pandas as pd
from dataclasses import dataclass
from scipy.stats import kurtosis, skew
from datetime import time, timedelta
from typing import List, Tuple, Optional
from src.core_data import Bar
@dataclass
class ProfileStats:
"""封装剖面图的所有核心统计量。"""
vah: float
val: float
poc: float
def calculate_profile_from_bars(bars: List[Bar], tick_size: float, va_percentage: int = 70) -> Optional[ProfileStats]:
"""
[全局核心函数] 从给定的K线列表计算剖面图统计量 (VAH, VAL, POC)。
"""
if not bars:
return None
data = []
for bar in bars:
price_range = np.arange(bar.low, bar.high + tick_size, tick_size)
if len(price_range) == 0 or bar.volume == 0: continue
volume_per_tick = bar.volume / len(price_range)
for price in price_range:
data.append({'price': price, 'volume': volume_per_tick})
if not data: return None
df = pd.DataFrame(data)
if df.empty: return None
price_volume_dist = df.groupby('price')['volume'].sum().sort_index()
if price_volume_dist.empty or len(price_volume_dist) < 3: return None
poc = price_volume_dist.idxmax()
total_volume = price_volume_dist.sum()
value_area_volume_target = total_volume * (va_percentage / 100.0)
current_va_volume = price_volume_dist.loc[poc]
vah, val = poc, poc
prices_above = price_volume_dist.index[price_volume_dist.index > poc]
prices_below = price_volume_dist.index[price_volume_dist.index < poc].sort_values(ascending=False)
idx_above, idx_below = 0, 0
while current_va_volume < value_area_volume_target:
vol_above = price_volume_dist.loc[prices_above[idx_above]] if idx_above < len(prices_above) else 0
vol_below = price_volume_dist.loc[prices_below[idx_below]] if idx_below < len(prices_below) else 0
if vol_above == 0 and vol_below == 0: break
if vol_above > vol_below:
current_va_volume += vol_above
vah = prices_above[idx_above]
idx_above += 1
else:
current_va_volume += vol_below
val = prices_below[idx_below]
idx_below += 1
return ProfileStats(vah=vah, val=val, poc=poc)