# src/strategies/simple_limit_buy_strategy.py from .base_strategy import Strategy from ..core_data import Bar, Order from typing import Optional, Dict, Any from collections import deque class SimpleLimitBuyStrategyLong(Strategy): """ 一个基于当前K线Open、前1根和前7根K线Range计算优势价格进行限价买入的策略。 具备以下特点: - 每根K线开始时取消上一根K线未成交的订单。 - 最多只能有一个开仓挂单和一个持仓。 - 包含简单的止损和止盈逻辑。 """ def __init__( self, context: Any, symbol: str, enable_log: bool, trade_volume: int, ): # 新增:止盈点数 """ 初始化策略。 Args: context: 模拟器实例。 symbol (str): 交易合约代码。 trade_volume (int): 单笔交易量。 open_range_factor_1_ago (float): 前1根K线Range的权重因子,用于从Open价向下偏移。 open_range_factor_7_ago (float): 前7根K线Range的权重因子,用于从Open价向下偏移。 max_position (int): 最大持仓量(此处为1,因为只允许一个持仓)。 stop_loss_points (float): 止损点数(例如,亏损达到此点数则止损)。 take_profit_points (float): 止盈点数(例如,盈利达到此点数则止盈)。 """ super().__init__(context, symbol, enable_log) self.trade_volume = trade_volume self.order_id_counter = 0 self._last_order_id: Optional[str] = None # 用于跟踪上一根K线发出的订单ID def on_open_bar(self, open_price: float, symbol: str): """ 每当新的K线数据到来时调用。 Args: bar (Bar): 当前的K线数据对象。 next_bar_open (Optional[float]): 下一根K线的开盘价,此处策略未使用。 """ self.symbol = symbol current_positions = self.get_current_positions() current_pos_volume = current_positions.get(self.symbol, 0) if current_pos_volume == 0: self.send_order( Order( id=0, symbol=self.symbol, direction='BUY', volume=1, price_type="MARKET", submitted_time=self.get_current_time(), offset='OPEN', ) )