# src/execution_simulator.py from datetime import datetime from typing import Dict, List, Optional import pandas as pd from .core_data import Order, Trade, Bar, PortfolioSnapshot class ExecutionSimulator: """ 模拟交易执行和管理账户资金、持仓。 """ def __init__(self, initial_capital: float, slippage_rate: float = 0.0001, commission_rate: float = 0.0002, initial_positions: Optional[Dict[str, int]] = None): self.initial_capital = initial_capital self.cash = initial_capital self.positions: Dict[str, int] = initial_positions if initial_positions is not None else {} self.average_costs: Dict[str, float] = {} if initial_positions: for symbol, qty in initial_positions.items(): self.average_costs[symbol] = 0.0 self.slippage_rate = slippage_rate self.commission_rate = commission_rate self.trade_log: List[Trade] = [] self.pending_orders: Dict[str, Order] = {} self._current_time: Optional[datetime] = None print( f"模拟器初始化:初始资金={self.initial_capital:.2f}, 滑点率={self.slippage_rate}, 佣金率={self.commission_rate}") if self.positions: print(f"初始持仓:{self.positions}") def update_time(self, current_time: datetime): self._current_time = current_time def get_current_time(self) -> datetime: if self._current_time is None: # 改进:如果时间未设置,可以抛出错误,防止策略在 on_init 阶段意外调用 # raise RuntimeError("Simulator time has not been set. Ensure update_time is called.") return None return self._current_time def _calculate_fill_price(self, order: Order, current_bar: Bar) -> float: """ 内部方法:根据订单类型和滑点计算实际成交价格。 撮合逻辑:所有订单(市价/限价)都以当前K线的 **开盘价 (open)** 为基准进行撮合。 """ fill_price = -1.0 # 默认未成交 base_price = current_bar.open # 所有成交都以当前K线的开盘价为基准 if order.price_type == "MARKET": # 市价单:直接以开盘价成交,考虑滑点 if order.direction == "BUY" or order.direction == "CLOSE_SHORT": # 买入/平空:向上偏离(多付) fill_price = base_price * (1 + self.slippage_rate) elif order.direction == "SELL" or order.direction == "CLOSE_LONG": # 卖出/平多:向下偏离(少收) fill_price = base_price * (1 - self.slippage_rate) else: fill_price = base_price # 理论上不发生 elif order.price_type == "LIMIT" and order.limit_price is not None: limit_price = order.limit_price # 限价单:判断开盘价是否满足限价条件,如果满足,则以开盘价成交(考虑滑点) if order.direction == "BUY" or order.direction == "CLOSE_SHORT": # 限价买入/平空 # 买单只有当开盘价低于或等于限价时才可能成交 # 即:我愿意出 limit_price 买,开盘价 open_price 更低或一样,当然买 if base_price <= limit_price: fill_price = base_price * (1 + self.slippage_rate) # else: 未满足限价条件,不成交 elif order.direction == "SELL" or order.direction == "CLOSE_LONG": # 限价卖出/平多 # 卖单只有当开盘价高于或等于限价时才可能成交 # 即:我愿意出 limit_price 卖,开盘价 open_price 更高或一样,当然卖 if base_price >= limit_price: fill_price = base_price * (1 - self.slippage_rate) # else: 未满足限价条件,不成交 # 最终检查成交价是否有效且合理(大于0) if fill_price <= 0: return -1.0 # 未成交或价格无效 return fill_price def send_order_to_pending(self, order: Order) -> Optional[Order]: """ 将订单添加到待处理队列。由 BacktestEngine 或 Strategy 调用。 此方法不进行撮合,撮合由 process_pending_orders 统一处理。 """ if order.id in self.pending_orders: # print(f"订单 {order.id} 已经存在于待处理队列。") return None self.pending_orders[order.id] = order # print(f"订单 {order.id} 加入待处理队列。") return order def process_pending_orders(self, current_bar: Bar): """ 处理所有待撮合的订单。在每个K线数据到来时调用。 """ # 复制一份待处理订单的键,防止在迭代时修改字典 order_ids_to_process = list(self.pending_orders.keys()) for order_id in order_ids_to_process: if order_id not in self.pending_orders: # 订单可能已被取消 continue order = self.pending_orders[order_id] # 只有当订单的symbol与当前bar的symbol一致时才尝试撮合 # 这样确保了在换月后,旧合约的挂单不会被尝试撮合 (尽管换月时会强制取消) if order.symbol != current_bar.symbol: # 这种情况理论上应该被换月逻辑清理掉的旧合约挂单, # 如果因为某种原因漏掉了,这里直接跳过,避免异常。 continue # 尝试成交订单 self._execute_single_order(order, current_bar) def _execute_single_order(self, order: Order, current_bar: Bar) -> Optional[Trade]: """ 内部方法:尝试执行单个订单,并处理资金和持仓变化。 由 send_order 或 process_pending_orders 调用。 """ # --- 处理撤单指令 --- if order.direction == "CANCEL": # 策略主动发起撤单 success = self.cancel_order(order.id) if success: # print(f"[{current_bar.datetime}] 模拟器: 收到并成功处理撤单指令 for Order ID: {order.id}") pass return None # 撤单操作不返回Trade symbol = order.symbol volume = order.volume # 尝试计算成交价格 fill_price = self._calculate_fill_price(order, current_bar) if fill_price <= 0: # 未成交或不满足限价条件 return None # --- 以下是订单成功成交前的预检查逻辑 --- trade_value = volume * fill_price commission = trade_value * self.commission_rate current_position = self.positions.get(symbol, 0) current_average_cost = self.average_costs.get(symbol, 0.0) realized_pnl = 0.0 # 预先计算的实现盈亏 # ----------------------------------------------------------- # 精确判断 is_open_trade 和 is_close_trade # ----------------------------------------------------------- is_trade_a_close_operation = False is_trade_an_open_operation = False # 1. 判断是否为平仓操作 # 显式平仓指令 if order.direction in ["CLOSE_LONG", "CLOSE_SELL", "CLOSE_SHORT"]: is_trade_a_close_operation = True # 隐式平仓 (例如,持有空头时买入,或持有多头时卖出) elif order.direction == "BUY" and current_position < 0: # 买入平空 is_trade_a_close_operation = True elif order.direction == "SELL" and current_position > 0: # 卖出平多 is_trade_a_close_operation = True # 2. 判断是否为开仓操作 if order.direction == "BUY": # 买入开多: 如果当前持有多头或无仓位,或者从空头转为多头 if current_position >= 0 or (current_position < 0 and (current_position + volume) > 0): is_trade_an_open_operation = True elif order.direction == "SELL": # 卖出开空: 如果当前持有空头或无仓位,或者从多头转为空头 if current_position <= 0 or (current_position > 0 and (current_position - volume) < 0): is_trade_an_open_operation = True # ----------------------------------------------------------- # 区分实际的买卖方向 (用于资金和持仓计算) actual_execution_direction = "" if order.direction == "BUY" or order.direction == "CLOSE_SHORT": actual_execution_direction = "BUY" elif order.direction == "SELL" or order.direction == "CLOSE_LONG" or order.direction == "CLOSE_SELL": actual_execution_direction = "SELL" else: print( f"[{current_bar.datetime}] 模拟器: 收到未知订单方向 {order.direction} for Order ID: {order.id}. 订单未处理。") if order.id in self.pending_orders: del self.pending_orders[order.id] return None # --- 临时变量,用于预计算新的资金和持仓状态 --- temp_cash = self.cash temp_positions = self.positions.copy() temp_average_costs = self.average_costs.copy() # 根据实际执行方向进行预计算和资金检查 if actual_execution_direction == "BUY": # 处理实际的买入 (开多 / 平空) if current_position >= 0: # 当前持有多仓或无仓位 (开多) required_cash = trade_value + commission if temp_cash < required_cash: print( f"[{current_bar.datetime}] 模拟器: 资金不足 (开多), 无法执行买入 {volume} {symbol} @ {fill_price:.2f}. 需要: {required_cash:.2f}, 当前: {temp_cash:.2f}") if order.id in self.pending_orders: del self.pending_orders[order.id] return None temp_cash -= required_cash new_total_cost = (temp_average_costs.get(symbol, 0.0) * temp_positions.get(symbol, 0)) + ( fill_price * volume) new_total_volume = temp_positions.get(symbol, 0) + volume temp_average_costs[symbol] = new_total_cost / new_total_volume if new_total_volume > 0 else 0.0 temp_positions[symbol] = new_total_volume else: # 当前持有空仓 (平空) - 平仓交易,佣金从交易价值中扣除,不单独检查现金余额 pnl_per_share = current_average_cost - fill_price # 空头平仓盈亏 realized_pnl = pnl_per_share * volume temp_cash -= commission # 扣除佣金 temp_cash += trade_value # 回收平仓价值 temp_cash += realized_pnl # 计入实现盈亏 temp_positions[symbol] += volume if temp_positions[symbol] == 0: del temp_positions[symbol] if symbol in temp_average_costs: del temp_average_costs[symbol] elif current_position < 0 and temp_positions[symbol] > 0: # 发生空转多 temp_average_costs[symbol] = fill_price # 新多头仓位成本以成交价为准 elif actual_execution_direction == "SELL": # 处理实际的卖出 (开空 / 平多) if current_position <= 0: # 当前持有空仓或无仓位 (开空) # 开空主要检查佣金是否足够 if temp_cash < commission: print( f"[{current_bar.datetime}] 模拟器: 资金不足 (开空佣金), 无法执行卖出 {volume} {symbol} @ {fill_price:.2f}. 佣金: {commission:.2f}, 当前: {temp_cash:.2f}") if order.id in self.pending_orders: del self.pending_orders[order.id] return None temp_cash -= commission new_total_value = (temp_average_costs.get(symbol, 0.0) * abs(temp_positions.get(symbol, 0))) + ( fill_price * volume) new_total_volume = abs(temp_positions.get(symbol, 0)) + volume temp_average_costs[symbol] = new_total_value / new_total_volume if new_total_volume > 0 else 0.0 # 平均成本 temp_positions[symbol] -= volume else: # 当前持有多仓 (平多) - 平仓交易,佣金从交易价值中扣除,不单独检查现金余额 pnl_per_share = fill_price - current_average_cost # 多头平仓盈亏 realized_pnl = pnl_per_share * volume temp_cash -= commission # 扣除佣金 temp_cash += trade_value # 回收平仓价值 temp_cash += realized_pnl # 计入实现盈亏 temp_positions[symbol] -= volume if temp_positions[symbol] == 0: del temp_positions[symbol] if symbol in temp_average_costs: del temp_average_costs[symbol] elif current_position > 0 and temp_positions[symbol] < 0: # 发生多转空 temp_average_costs[symbol] = fill_price # 新空头仓位成本以成交价为准 # --- 所有检查通过后,才正式更新模拟器状态 --- self.cash = temp_cash self.positions = temp_positions self.average_costs = temp_average_costs # 创建 Trade 对象时,direction 使用原始订单的 direction executed_trade = Trade( order_id=order.id, fill_time=current_bar.datetime, symbol=symbol, direction=order.direction, # 使用原始订单的 direction volume=volume, price=fill_price, commission=commission, cash_after_trade=self.cash, positions_after_trade=self.positions.copy(), realized_pnl=realized_pnl, is_open_trade=is_trade_an_open_operation, # 使用更精确的判断 is_close_trade=is_trade_a_close_operation # 使用更精确的判断 ) self.trade_log.append(executed_trade) # 订单成交,从待处理订单中移除 if order.id in self.pending_orders: del self.pending_orders[order.id] return executed_trade def cancel_order(self, order_id: str) -> bool: """ 尝试取消一个待处理订单。 """ if order_id in self.pending_orders: del self.pending_orders[order_id] return True return False # --- 新增:强制平仓指定合约的所有持仓 --- def force_close_all_positions_for_symbol(self, symbol_to_close: str, closing_bar: Bar) -> List[Trade]: """ 强制平仓指定合约的所有持仓。 Args: symbol_to_close (str): 需要平仓的合约代码。 closing_bar (Bar): 用于获取平仓价格的当前K线数据(通常是旧合约的最后一根K线)。 Returns: List[Trade]: 因强制平仓而产生的交易记录。 """ closed_trades: List[Trade] = [] # 仅处理指定symbol的持仓 if symbol_to_close in self.positions and self.positions[symbol_to_close] != 0: volume_to_close = self.positions[symbol_to_close] # 根据持仓方向决定平仓订单的方向 direction = "CLOSE_LONG" if volume_to_close > 0 else "CLOSE_SELL" # 多头平仓是卖出,空头平仓是买入 # 构造一个市价平仓订单 rollover_order = Order( id=f"FORCE_CLOSE_{symbol_to_close}_{closing_bar.datetime.strftime('%Y%m%d%H%M%S%f')}", symbol=symbol_to_close, direction=direction, volume=abs(volume_to_close), price_type="MARKET", limit_price=None, submitted_time=closing_bar.datetime, ) # 使用内部的执行逻辑进行撮合 trade = self._execute_single_order(rollover_order, closing_bar) if trade: closed_trades.append(trade) else: print(f"[{closing_bar.datetime}] 警告: 强制平仓 {symbol_to_close} 失败!") return closed_trades # --- 新增:取消指定合约的所有挂单 --- def cancel_all_pending_orders_for_symbol(self, symbol_to_cancel: str) -> int: """ 取消指定合约的所有待处理订单。 """ cancelled_count = 0 order_ids_to_cancel = [ order_id for order_id, order in self.pending_orders.items() if order.symbol == symbol_to_cancel ] for order_id in order_ids_to_cancel: if self.cancel_order(order_id): # 调用现有的 cancel_order 方法 cancelled_count += 1 return cancelled_count def get_pending_orders(self) -> Dict[str, Order]: return self.pending_orders.copy() def get_portfolio_value(self, current_bar: Bar) -> float: """ 计算当前的投资组合总价值(包括现金和持仓市值)。 此方法需要兼容多合约持仓的场景。 Args: current_bar (Bar): 当前的Bar数据,用于计算**当前活跃合约**的持仓市值。 注意:如果 simulator 中持有多个合约,这里需要更复杂的逻辑。 目前假设主力合约回测时,simulator.positions 主要只包含当前主力合约。 Returns: float: 当前的投资组合总价值。 """ total_value = self.cash # 遍历所有持仓,计算市值。 # 注意:这里假设 current_bar 提供了当前活跃主力合约的价格。 # 如果 self.positions 中包含其他非 current_bar.symbol 的旧合约, # 它们的市值将无法用 current_bar.open 来准确计算。 # 在换月模式下,旧合约会被强制平仓,因此 simulator.positions 通常只包含一个合约。 for symbol, quantity in self.positions.items(): # 这里简单处理:如果持仓合约与 current_bar.symbol 相同,则使用 current_bar.open 计算。 # 如果是其他合约,则需要外部提供其最新价格,但这超出了本函数当前的能力范围。 # 考虑到换月模式,旧合约会被平仓,所以大部分时候这不会是问题。 if symbol == current_bar.symbol: total_value += quantity * current_bar.open else: # 警告:如果这里出现,说明有未平仓的旧合约持仓,且没有其最新价格来计算市值。 # 在严谨的主力连续回测中,这不应该发生,因为换月会强制平仓。 print(f"[{current_bar.datetime}] 警告:持仓中存在非当前K线合约 {symbol},无法准确计算其市值。") # 可以选择将这部分持仓价值计为0,或者使用上一个已知价格(需要额外数据结构) # 这里我们假设它不影响总价值计算,因为换月时会处理掉 pass return total_value def get_current_positions(self) -> Dict[str, int]: return self.positions.copy() def get_trade_history(self) -> List[Trade]: return self.trade_log.copy() def reset(self, new_initial_capital: float = None, new_initial_positions: Dict[str, int] = None) -> None: """ 重置模拟器状态到新的初始条件。 此方法不用于换月时的平仓,它用于整个回测开始前的初始化。 """ print("ExecutionSimulator: 重置状态。") self.cash = new_initial_capital if new_initial_capital is not None else self.initial_capital self.positions = new_initial_positions.copy() if new_initial_positions is not None else {} self.average_costs = {} for symbol, qty in self.positions.items(): # 重置平均成本 self.average_costs[symbol] = 0.0 self.trade_log = [] self.pending_orders = {} # 清空挂单 self._current_time = None # Removed clear_trade_history as trade_log is cleared in reset def get_average_position_price(self, symbol: str) -> Optional[float]: if symbol in self.positions and self.positions[symbol] != 0: return self.average_costs.get(symbol) return None