1、trend + hawks 策略

This commit is contained in:
2025-09-24 23:14:14 +08:00
parent bc93a547f0
commit f43a6b2822
33 changed files with 20646 additions and 1609 deletions

44
src/algo/HawksProcess.py Normal file
View File

@@ -0,0 +1,44 @@
# 您可以创建一个新的辅助文件,例如 src/algo/hawkes_process.py
import numpy as np
import pandas as pd
# from numba import njit
# @njit
def hawkes_intensity(data: np.ndarray, kappa: float) -> np.ndarray:
"""
【Numba加速版】计算霍克斯过程强度。
"""
alpha = np.exp(-kappa)
output = np.zeros(len(data))
if len(data) == 0:
return output
# 初始化第一个点
output[0] = data[0] if not np.isnan(data[0]) else 0.0
for i in range(1, len(data)):
if np.isnan(data[i]):
output[i] = output[i - 1] * alpha # 如果数据无效,强度依然会衰减
else:
output[i] = output[i - 1] * alpha + data[i]
return output * kappa
def calculate_hawkes_bands(volume_series: pd.Series, lookback: int, kappa: float, high_percent: float,
low_percent: float):
"""
计算霍克斯强度及其动态上下轨。
"""
# 1. 计算霍克斯强度
vol_hawkes = hawkes_intensity(volume_series.values, kappa)
vol_hawkes_series = pd.Series(vol_hawkes, index=volume_series.index)
# 2. 计算滚动分位数作为动态上下轨
rolling_hawkes = vol_hawkes_series.rolling(lookback)
upper_band = rolling_hawkes.quantile(high_percent)
lower_band = rolling_hawkes.quantile(low_percent)
return vol_hawkes_series, upper_band, lower_band

79
src/algo/TrendLine.py Normal file
View File

@@ -0,0 +1,79 @@
import numpy as np
from typing import Tuple, Optional
def calculate_latest_trendline_values(prices: np.ndarray) -> Tuple[Optional[float], Optional[float]]:
"""
【独立优化方法】
根据给定的价格序列,仅计算并返回上、下趋势线在最后一个点的值。
:param prices: 价格序列 (长度为 n)
:return: (latest_upper_value, latest_lower_value)
"""
n = len(prices)
if n < 2:
return None, None
x = np.arange(n)
# --- 计算上趋势线 ---
high_point_idx = np.argmax(prices)
high_point_price = prices[high_point_idx]
best_upper_slope = None
min_upper_distance_sum = float('inf')
for i in range(n):
if i == high_point_idx:
continue
# 避免除以零
if (i - high_point_idx) == 0: continue
candidate_slope = (prices[i] - high_point_price) / (i - high_point_idx)
intercept = high_point_price - candidate_slope * high_point_idx
# 循环内部仍然需要生成整条候选线用于检查约束条件
candidate_line = candidate_slope * x + intercept
if np.all(candidate_line >= prices - 1e-9):
distance_sum = np.sum(candidate_line - prices)
if distance_sum < min_upper_distance_sum:
min_upper_distance_sum = distance_sum
best_upper_slope = candidate_slope
if best_upper_slope is None:
return None, None
# 优化点:找到最优斜率后,只计算最后一个点的值
upper_intercept = high_point_price - best_upper_slope * high_point_idx
latest_upper_value = best_upper_slope * (n - 1) + upper_intercept
# --- 计算下趋势线 ---
low_point_idx = np.argmin(prices)
low_point_price = prices[low_point_idx]
best_lower_slope = None
min_lower_distance_sum = float('inf')
for i in range(n):
if i == low_point_idx:
continue
if (i - low_point_idx) == 0: continue
candidate_slope = (prices[i] - low_point_price) / (i - low_point_idx)
intercept = low_point_price - candidate_slope * low_point_idx
candidate_line = candidate_slope * x + intercept
if np.all(candidate_line <= prices + 1e-9):
distance_sum = np.sum(prices - candidate_line)
if distance_sum < min_lower_distance_sum:
min_lower_distance_sum = distance_sum
best_lower_slope = candidate_slope
if best_lower_slope is None:
return None, None
# 优化点:只计算最后一个点的值
lower_intercept = low_point_price - best_lower_slope * low_point_idx
latest_lower_value = best_lower_slope * (n - 1) + lower_intercept
return latest_upper_value, latest_lower_value