Files
NewQuant/src/execution_simulator.py

271 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# src/execution_simulator.py (修改部分)
from typing import Dict, List, Optional
import pandas as pd
from .core_data import Order, Trade, Bar, PortfolioSnapshot
class ExecutionSimulator:
"""
模拟交易执行和管理账户资金、持仓。
"""
def __init__(self, initial_capital: float,
slippage_rate: float = 0.0001,
commission_rate: float = 0.0002,
initial_positions: Optional[Dict[str, int]] = None):
"""
Args:
initial_capital (float): 初始资金。
slippage_rate (float): 滑点率(相对于成交价格的百分比)。
commission_rate (float): 佣金率(相对于成交金额的百分比)。
initial_positions (Optional[Dict[str, int]]): 初始持仓,格式为 {symbol: quantity}。
"""
self.initial_capital = initial_capital
self.cash = initial_capital
self.positions: Dict[str, int] = initial_positions if initial_positions is not None else {}
# 新增:跟踪持仓的平均成本 {symbol: average_cost}
self.average_costs: Dict[str, float] = {}
# 如果有初始持仓需要设置初始成本简化为0或在外部配置
if initial_positions:
for symbol, qty in initial_positions.items():
# 初始持仓成本,如果需要精确,应该从外部传入
self.average_costs[symbol] = 0.0 # 简化处理初始持仓成本为0
self.slippage_rate = slippage_rate
self.commission_rate = commission_rate
self.trade_log: List[Trade] = [] # 存储所有成交记录
self.pending_orders: Dict[str, Order] = {} # {order_id: Order_object}
print(
f"模拟器初始化:初始资金={self.initial_capital:.2f}, 滑点率={self.slippage_rate}, 佣金率={self.commission_rate}")
if self.positions:
print(f"初始持仓:{self.positions}")
def _calculate_fill_price(self, order: Order, current_bar: Bar) -> float:
"""
内部方法:根据订单类型和滑点计算实际成交价格。
简化处理市价单以当前Bar收盘价为基准考虑滑点。
"""
base_price = current_bar.close # 简化为收盘价成交
# 考虑滑点
if order.direction in ["BUY", "CLOSE_SHORT"]: # 买入或平空,价格向上偏离
fill_price = base_price * (1 + self.slippage_rate)
elif order.direction in ["SELL", "CLOSE_LONG"]: # 卖出或平多,价格向下偏离
fill_price = base_price * (1 - self.slippage_rate)
else: # 默认情况,无滑点
fill_price = base_price
# 如果是限价单且成交价格不满足条件,则可能不成交
if order.price_type == "LIMIT" and order.limit_price is not None:
# 对于BUY和CLOSE_SHORT成交价必须 <= 限价
if (order.direction == "BUY" or order.direction == "CLOSE_SHORT") and fill_price > order.limit_price:
return -1.0 # 未触及限价
# 对于SELL和CLOSE_LONG成交价必须 >= 限价
elif (order.direction == "SELL" or order.direction == "CLOSE_LONG") and fill_price < order.limit_price:
return -1.0 # 未触及限价
return fill_price
def send_order(self, order: Order, current_bar: Bar) -> Optional[Trade]:
"""
接收策略发出的订单,并模拟执行。
如果订单未立即成交,则加入待处理订单列表。
特殊处理:如果 order.direction 是 "CANCEL",则调用 cancel_order。
Args:
order (Order): 待执行的订单对象。
current_bar (Bar): 当前的Bar数据用于确定成交价格。
Returns:
Optional[Trade]: 如果订单成功执行则返回 Trade 对象,否则返回 None。
"""
# --- 处理撤单指令 ---
if order.direction == "CANCEL":
success = self.cancel_order(order.id)
if success:
# print(f"[{current_bar.datetime}] 模拟器: 收到并成功处理撤单指令 for Order ID: {order.id}")
pass
else:
# print(f"[{current_bar.datetime}] 模拟器: 收到撤单指令 for Order ID: {order.id}, 但订单已成交或不存在。")
pass
return None # 撤单操作不返回Trade
# --- 正常买卖订单处理 ---
symbol = order.symbol
volume = order.volume
# 尝试计算成交价格
fill_price = self._calculate_fill_price(order, current_bar)
executed_trade: Optional[Trade] = None
realized_pnl = 0.0 # 初始化实现盈亏
is_open_trade = False
is_close_trade = False
if fill_price <= 0: # 表示未成交或不满足限价条件
if order.price_type == "LIMIT":
self.pending_orders[order.id] = order
return None # 未成交返回None
# --- 以下是订单成功成交的逻辑 ---
trade_value = volume * fill_price
commission = trade_value * self.commission_rate
current_position = self.positions.get(symbol, 0)
current_average_cost = self.average_costs.get(symbol, 0.0)
if order.direction == "BUY":
# 开多仓或平空仓
if current_position >= 0: # 当前持有多仓或无仓位 (开多)
is_open_trade = True
# 更新平均成本 (加权平均)
new_total_cost = (current_average_cost * current_position) + (fill_price * volume)
new_total_volume = current_position + volume
self.average_costs[symbol] = new_total_cost / new_total_volume if new_total_volume > 0 else 0.0
self.positions[symbol] = new_total_volume
else: # 当前持有空仓 (平空)
is_close_trade = True
# 计算平空盈亏
# PnL = (开仓成本 - 平仓价格) * 平仓数量 (注意空头方向)
# 简化:假设平空时,直接使用当前的平均开仓成本来计算盈亏
# 更精确的FIFO/LIFO需更多逻辑
pnl_per_share = current_average_cost - fill_price # (买入平空,成本高于平仓价则盈利)
realized_pnl = pnl_per_share * volume
# 更新持仓和成本
self.positions[symbol] += volume
if self.positions[symbol] == 0:
del self.positions[symbol]
if symbol in self.average_costs: del self.average_costs[symbol] # 清理成本
elif self.positions[symbol] > 0 and current_position < 0: # 部分平空转为多头,需重新设置成本
# 这部分逻辑可以更复杂这里简化处理如果转为多头成本重置为0
# 实际应该用剩余的空头成本 + 新开多的成本
self.average_costs[symbol] = fill_price # 简单地将剩下的多头仓位成本设为当前价格
# 资金扣除
if self.cash >= trade_value + commission:
self.cash -= (trade_value + commission)
else:
# print(f"[{current_bar.datetime}] 资金不足,无法执行买入 {volume} {symbol}")
return None
elif order.direction == "SELL":
# 开空仓或平多仓
if current_position <= 0: # 当前持有空仓或无仓位 (开空)
is_open_trade = True
# 更新平均成本 (空头成本为负值)
new_total_cost = (current_average_cost * current_position) - (fill_price * volume) # 负的持仓乘以负的卖出价
new_total_volume = current_position - volume # 空头持仓量更负
self.average_costs[symbol] = new_total_cost / new_total_volume if new_total_volume < 0 else 0.0
self.positions[symbol] = new_total_volume
else: # 当前持有多仓 (平多)
is_close_trade = True
# 计算平多盈亏
# PnL = (平仓价格 - 开仓成本) * 平仓数量
pnl_per_share = fill_price - current_average_cost # (卖出平多,平仓价高于成本则盈利)
realized_pnl = pnl_per_share * volume
# 更新持仓和成本
self.positions[symbol] -= volume
if self.positions[symbol] == 0:
del self.positions[symbol]
if symbol in self.average_costs: del self.average_costs[symbol] # 清理成本
elif self.positions[symbol] < 0 and current_position > 0: # 部分平多转为空头,需重新设置成本
self.average_costs[symbol] = fill_price # 简单地将剩下的空头仓位成本设为当前价格
# 资金扣除 (佣金) 和增加 (卖出收入)
if self.cash >= commission:
self.cash -= commission
self.cash += trade_value
else:
# print(f"[{current_bar.datetime}] 资金不足,无法执行卖出 {volume} {symbol}")
return None
# 创建 Trade 对象
executed_trade = Trade(
order_id=order.id, fill_time=current_bar.datetime, symbol=symbol,
direction=order.direction, # 记录原始订单方向 (BUY/SELL)
volume=volume, price=fill_price, commission=commission,
cash_after_trade=self.cash, positions_after_trade=self.positions.copy(),
realized_pnl=realized_pnl, # 填充实现盈亏
is_open_trade=is_open_trade,
is_close_trade=is_close_trade
)
self.trade_log.append(executed_trade)
# 如果订单成交,无论它是市价单还是限价单,都从待处理订单中移除
if order.id in self.pending_orders:
del self.pending_orders[order.id]
# print(f"[{current_bar.datetime}] 成交: {executed_trade.direction} {executed_trade.volume} {executed_trade.symbol} @ {executed_trade.price:.2f}, 佣金: {executed_trade.commission:.2f}, PnL: {executed_trade.realized_pnl:.2f}")
return executed_trade
def cancel_order(self, order_id: str) -> bool:
"""
尝试取消一个待处理订单。
Args:
order_id (str): 要取消的订单ID。
Returns:
bool: 如果成功取消则返回 True否则返回 False例如订单不存在或已成交
"""
if order_id in self.pending_orders:
# print(f"订单 {order_id} 已成功取消。")
del self.pending_orders[order_id]
return True
# print(f"订单 {order_id} 不存在或已成交,无法取消。")
return False
def get_pending_orders(self) -> Dict[str, Order]:
"""
获取当前所有待处理订单的副本。
"""
return self.pending_orders.copy()
def get_portfolio_value(self, current_bar: Bar) -> float:
"""
计算当前的投资组合总价值(包括现金和持仓市值)。
Args:
current_bar (Bar): 当前的Bar数据用于计算持仓市值。
Returns:
float: 当前的投资组合总价值。
"""
total_value = self.cash
# 在单品种场景下,我们假设 self.positions 最多只包含一个品种
# 并且这个品种就是 current_bar.symbol 所代表的品种
symbol_in_position = list(self.positions.keys())[0] if self.positions else None
if symbol_in_position and symbol_in_position == current_bar.symbol:
quantity = self.positions[symbol_in_position]
# 持仓市值 = 数量 * 当前市场价格 (current_bar.close)
# 无论多头(quantity > 0)还是空头(quantity < 0),这个计算都是正确的
total_value += quantity * current_bar.close
# 您也可以选择在这里打印调试信息
# print(f" DEBUG Portfolio Value Calculation: Cash={self.cash:.2f}, "
# f"Position for {symbol_in_position}: {quantity} @ {current_bar.close:.2f}, "
# f"Position Value={quantity * current_bar.close:.2f}, Total Value={total_value:.2f}")
# 如果没有持仓或者持仓品种与当前Bar品种不符 (理论上单品种不会发生)
# 那么 total_value 依然是 self.cash
return total_value
def get_current_positions(self) -> Dict[str, int]:
"""
返回当前持仓字典的副本。
"""
return self.positions.copy()
def get_trade_history(self) -> List[Trade]:
"""
返回所有成交记录的副本。
"""
return self.trade_log.copy()