62 lines
2.2 KiB
Python
62 lines
2.2 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
from dataclasses import dataclass
|
|
from scipy.stats import kurtosis, skew
|
|
from datetime import time, timedelta
|
|
|
|
from typing import List, Tuple, Optional
|
|
|
|
from src.core_data import Bar
|
|
|
|
|
|
@dataclass
|
|
class ProfileStats:
|
|
"""封装剖面图的所有核心统计量。"""
|
|
vah: float
|
|
val: float
|
|
poc: float
|
|
|
|
|
|
def calculate_profile_from_bars(bars: List[Bar], tick_size: float, va_percentage: int = 70) -> Optional[ProfileStats]:
|
|
"""
|
|
[全局核心函数] 从给定的K线列表计算剖面图统计量 (VAH, VAL, POC)。
|
|
"""
|
|
if not bars:
|
|
return None
|
|
|
|
data = []
|
|
for bar in bars:
|
|
price_range = np.arange(bar.low, bar.high + tick_size, tick_size)
|
|
if len(price_range) == 0 or bar.volume == 0: continue
|
|
volume_per_tick = bar.volume / len(price_range)
|
|
for price in price_range:
|
|
data.append({'price': price, 'volume': volume_per_tick})
|
|
if not data: return None
|
|
df = pd.DataFrame(data)
|
|
if df.empty: return None
|
|
price_volume_dist = df.groupby('price')['volume'].sum().sort_index()
|
|
|
|
if price_volume_dist.empty or len(price_volume_dist) < 3: return None
|
|
|
|
poc = price_volume_dist.idxmax()
|
|
total_volume = price_volume_dist.sum()
|
|
value_area_volume_target = total_volume * (va_percentage / 100.0)
|
|
current_va_volume = price_volume_dist.loc[poc]
|
|
vah, val = poc, poc
|
|
prices_above = price_volume_dist.index[price_volume_dist.index > poc]
|
|
prices_below = price_volume_dist.index[price_volume_dist.index < poc].sort_values(ascending=False)
|
|
idx_above, idx_below = 0, 0
|
|
while current_va_volume < value_area_volume_target:
|
|
vol_above = price_volume_dist.loc[prices_above[idx_above]] if idx_above < len(prices_above) else 0
|
|
vol_below = price_volume_dist.loc[prices_below[idx_below]] if idx_below < len(prices_below) else 0
|
|
if vol_above == 0 and vol_below == 0: break
|
|
if vol_above > vol_below:
|
|
current_va_volume += vol_above
|
|
vah = prices_above[idx_above]
|
|
idx_above += 1
|
|
else:
|
|
current_va_volume += vol_below
|
|
val = prices_below[idx_below]
|
|
idx_below += 1
|
|
|
|
return ProfileStats(vah=vah, val=val, poc=poc) |