import numpy as np from typing import Optional, Any, List from src.core_data import Bar, Order from src.strategies.base_strategy import Strategy class SemiVarianceAsymmetryStrategy(Strategy): """ 已实现半方差不对称策略 (RSVA) 核心原理: 放弃"阈值计数",改用"波动能量占比"。 因子 = (上行波动能量 - 下行波动能量) / 总波动能量 优势: 1. 自适应:自动适应2021的高波动和2023的低波动,无需调整阈值。 2. 灵敏:能捕捉到没有大阳线但持续上涨的"蠕动趋势"。 3. 稳健:使用平方项(Variance)而非三次方(Skewness),对异常值更鲁棒。 """ def __init__( self, context: Any, main_symbol: str, enable_log: bool, trade_volume: int, # --- 窗口参数 --- season_days: int = 20, # 计算日内季节性基准的回溯天数 calc_window: int = 120, # 计算不对称因子的窗口 (约5天) cycle_length: int = 23, # 固定周期 (每天23根Bar) # --- 信号阈值 --- # RSVA 范围是 [-1, 1]。 # 0.2 表示上涨能量比下跌能量多20% (即 60% vs 40%),是一个显著的失衡信号。 entry_threshold: float = 0.2, exit_threshold: float = 0.05, order_direction: Optional[List[str]] = None, ): super().__init__(context, main_symbol, enable_log) if order_direction is None: order_direction = ['BUY', 'SELL'] self.trade_volume = trade_volume self.season_days = season_days self.calc_window = calc_window self.cycle_length = cycle_length self.entry_threshold = entry_threshold self.exit_threshold = exit_threshold self.order_direction = order_direction # 计算最小历史需求 # 我们需要: calc_window 个标准化数据 # 每个标准化数据需要回溯: season_days * cycle_length self.min_history = self.calc_window + (self.season_days * self.cycle_length) # 缓冲区设大一点,避免频繁触发边界检查 self.calc_buffer_size = self.min_history + 100 self.log(f"RSVA Strategy Init: Window={calc_window}, Thresh={entry_threshold}") def on_open_bar(self, open_price: float, symbol: str): self.cancel_all_pending_orders(symbol) # 1. 获取历史数据 (切片优化) all_history = self.get_bar_history() total_len = len(all_history) if total_len < self.min_history: return # 只取计算所需的最后一段数据,保证计算复杂度恒定 start_idx = max(0, total_len - self.calc_buffer_size) relevant_bars = all_history[start_idx:] # 转为 numpy array closes = np.array([b.close for b in relevant_bars]) # 2. 计算对数收益率 (Log Returns) # 对数收益率消除了价格水平(Price Level)的影响 log_rets = np.diff(np.log(closes)) current_idx = len(log_rets) - 1 # 3. 标准化收益率计算 (De-seasonalization) # 这一步至关重要:剔除日内季节性(早盘波动大、午盘波动小)的干扰 std_rets = [] # 循环计算过去 calc_window 个点的标准化值 for i in range(self.calc_window): target_idx = current_idx - i # 高效切片:利用 stride=cycle_length 提取同一时间槽的历史 # slot_history 包含 [t, t-23, t-46, ...] slot_history = log_rets[target_idx::-self.cycle_length] # 截取 season_days if len(slot_history) > self.season_days: slot_history = slot_history[:self.season_days] # 计算该时刻的基准波动率 if len(slot_history) < 5: # 降级处理:样本不足时用近期全局波动率 slot_vol = np.std(log_rets[-self.cycle_length:]) + 1e-9 else: slot_vol = np.std(slot_history) + 1e-9 # 标准化 (Z-Score) std_ret = log_rets[target_idx] / slot_vol std_rets.append(std_ret) # 转为数组 (注意:std_rets 是倒序的,但这不影响平方和计算) std_rets_arr = np.array(std_rets) # 4. 【核心】计算已实现半方差不对称性 (RSVA) # 分离正收益和负收益 pos_rets = std_rets_arr[std_rets_arr > 0] neg_rets = std_rets_arr[std_rets_arr < 0] # 计算上行能量 (Upside Variance) 和 下行能量 (Downside Variance) rv_pos = np.sum(pos_rets ** 2) rv_neg = np.sum(neg_rets ** 2) total_rv = rv_pos + rv_neg + 1e-9 # 防止除零 # 计算因子: [-1, 1] # > 0 说明上涨更有力(或更频繁),< 0 说明下跌主导 rsva_factor = (rv_pos - rv_neg) / total_rv # 5. 交易逻辑 current_pos = self.get_current_positions().get(symbol, 0) self.log_status(rsva_factor, rv_pos, rv_neg, current_pos) if current_pos == 0: self.evaluate_entry(rsva_factor) else: self.evaluate_exit(current_pos, rsva_factor) def evaluate_entry(self, factor: float): direction = None # 因子 > 0.2: 哪怕没有极端K线,只要累计的上涨能量显著压过下跌能量,就开仓 if factor > self.entry_threshold: if "BUY" in self.order_direction: direction = "BUY" elif factor < -self.entry_threshold: if "SELL" in self.order_direction: direction = "SELL" if direction: self.log(f"ENTRY: {direction} | RSVA={factor:.4f}") self.send_market_order(direction, self.trade_volume, "OPEN") def evaluate_exit(self, volume: int, factor: float): do_exit = False reason = "" # 当多空能量趋于平衡 (因子回到 0 附近),说明趋势动能耗尽,平仓 # 这种离场方式对震荡市非常友好:一旦陷入震荡,rv_pos 和 rv_neg 会迅速接近,因子归零 if volume > 0 and factor < self.exit_threshold: do_exit = True reason = f"Bull Energy Fade (RSVA={factor:.4f})" elif volume < 0 and factor > -self.exit_threshold: do_exit = True reason = f"Bear Energy Fade (RSVA={factor:.4f})" if do_exit: direction = "CLOSE_LONG" if volume > 0 else "CLOSE_SHORT" self.log(f"EXIT: {reason}") self.send_market_order(direction, abs(volume), "CLOSE") def send_market_order(self, direction: str, volume: int, offset: str): # 严格遵守要求:使用 get_current_time() current_time = self.get_current_time() order = Order( id=f"{self.main_symbol}_{direction}_{current_time.timestamp()}", symbol=self.symbol, direction=direction, volume=volume, price_type="MARKET", submitted_time=current_time, offset=offset ) self.send_order(order) def log_status(self, factor: float, pos_e: float, neg_e: float, current_pos: int): if self.enable_log: # 仅在有持仓或信号明显时打印 if current_pos != 0 or abs(factor) > self.entry_threshold * 0.8: self.log(f"Status: Pos={current_pos} | RSVA={factor:.4f} | Energy(+/-)={pos_e:.1f}/{neg_e:.1f}")