1、新增SMCPureH1LongStrategy策略

2、修复实盘bug
This commit is contained in:
2025-07-28 14:36:58 +08:00
parent 2fa952a3da
commit 52c5ec18d8
12 changed files with 982 additions and 852 deletions

File diff suppressed because one or more lines are too long

View File

@@ -236,12 +236,12 @@ if __name__ == "__main__":
# 这种方式适合获取相对较短或中等长度的历史K线数据。
df_if_backtest_daily = collect_and_save_tqsdk_data_stream(
symbol="KQ.m@DCE.jm",
symbol="KQ.m@SHFE.rb",
# symbol='SHFE.rb2510',
# symbol='KQ.i@SHFE.bu',
freq="min60",
freq="min15",
start_date_str="2021-01-01",
end_date_str="2025-07-11",
end_date_str="2025-07-22",
mode="backtest", # 指定为回测模式
tq_user=TQ_USER_NAME,
tq_pwd=TQ_PASSWORD,

View File

@@ -6,8 +6,8 @@
"id": "initial_id",
"metadata": {
"ExecuteTime": {
"end_time": "2025-07-12T14:39:30.843472Z",
"start_time": "2025-07-12T14:39:30.823755Z"
"end_time": "2025-07-22T07:44:51.375234Z",
"start_time": "2025-07-22T07:44:51.352161Z"
},
"collapsed": true
},
@@ -20,54 +20,44 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"id": "a559dfcf",
"metadata": {
"ExecuteTime": {
"end_time": "2025-07-12T14:39:35.610573Z",
"start_time": "2025-07-12T14:39:30.855663Z"
"end_time": "2025-07-22T07:44:56.927700Z",
"start_time": "2025-07-22T07:44:51.391111Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"初始化数据管理器...\n",
"数据加载成功: /mnt/d/PyProject/NewQuant/data/data/KQ_m@CZCE_MA/KQ_m@CZCE_MA_min60.csv\n",
"数据范围从 2021-12-31 14:00:00 到 2025-07-10 09:00:00\n",
"总计 5904 条记录。\n",
"\n",
"初始化回测引擎...\n",
"模拟器初始化:初始资金=100000.00, 滑点率=0.0, 佣金率=0.0001\n",
"\n",
"--- 回测引擎初始化完成 ---\n",
" 策略: SimpleLimitBuyStrategyShort\n",
" 初始资金: 100000.00\n",
" 换月模式: 启用\n",
"\n",
"开始运行回测...\n",
"\n",
"--- 回测开始 ---\n",
"SimpleLimitBuyStrategyShort 策略初始化回调被调用。\n",
"开始将 DataFrame 转换为 Bar 对象流...\n"
"ename": "ModuleNotFoundError",
"evalue": "No module named 'src'",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mModuleNotFoundError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 2\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mturtle\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m down\n\u001b[32m----> \u001b[39m\u001b[32m2\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01msrc\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01manalysis\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mresult_analyzer\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m ResultAnalyzer\n\u001b[32m 3\u001b[39m \u001b[38;5;66;03m# 导入所有必要的模块\u001b[39;00m\n\u001b[32m 4\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01msrc\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mdata_manager\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m DataManager\n",
"\u001b[31mModuleNotFoundError\u001b[39m: No module named 'src'"
]
}
],
"source": [
"\n",
"from turtle import down\n",
"from src.analysis.result_analyzer import ResultAnalyzer\n",
"# 导入所有必要的模块\n",
"from src.data_manager import DataManager\n",
"from src.backtest_engine import BacktestEngine\n",
"from src.indicators.indicator_list import INDICATOR_LIST\n",
"from src.indicators.indicators import RSI, HistoricalRange, StochasticOscillator\n",
"from src.indicators.indicators import RSI, BollingerBandwidth, HistoricalRange, NormalizedATR, RateOfChange, StochasticOscillator\n",
"from src.strategies.OpenTwoFactorStrategy import SimpleLimitBuyStrategyLong, SimpleLimitBuyStrategyShort, SimpleLimitBuyStrategy\n",
"\n",
"\n",
"# --- 配置参数 ---\n",
"# 获取当前脚本所在目录,假设数据文件在项目根目录下的 data 文件夹内\n",
"# data_file_path = '/mnt/d/PyProject/NewQuant/data/data/SHFE_rb2510/SHFE_rb2510_min60.csv'\n",
"# data_file_path = \"/mnt/d/PyProject/NewQuant/data/data/KQ_m@CZCE_MA/KQ_m@CZCE_MA_min60.csv\"\n",
"# data_file_path = \"/mnt/d/PyProject/NewQuant/data/data/KQ_m@SHFE_rb/KQ_m@SHFE_rb_min60.csv\"\n",
"data_file_path = \"/mnt/d/PyProject/NewQuant/data/data/KQ_m@CZCE_MA/KQ_m@CZCE_MA_min60.csv\"\n",
"\n",
"initial_capital = 100000.0\n",
@@ -78,31 +68,33 @@
" 'symbol': 'KQ_m@CZCE_MA',\n",
"}\n",
"\n",
"# Short 可用\n",
"strategy_parameters = {\n",
" # 'symbol': \"SHFE_rb2501\", # 根据您的数据文件中的品种名称调整\n",
" 'main_symbol': \"MA\", # 根据您的数据文件中的品种名称调整\n",
" 'trade_volume': 1,\n",
" 'lag': 7,\n",
" # 'range_factor': 1.8, # 示例值,需要通过网格搜索优化\n",
" # 'profit_factor': 2.3, # 示例值\n",
" 'range_factor': 1.6, # 示例值,需要通过网格搜索优化\n",
" 'profit_factor': 2.1, # 示例值\n",
" # 'range_factor_l': 1.8, # 示例值,需要通过网格搜索优化\n",
" # 'profit_factor_l': 2.8, # 示例值\n",
" # 'range_factor_s': 1.6, # 示例值,需要通过网格搜索优化\n",
" # 'profit_factor_s': 2.1, # 示例值\n",
" # 'profit_factor': 2.8, # 示例值\n",
" # 'range_factor': 1.6, # 示例值,需要通过网格搜索优化\n",
" # 'profit_factor': 2.1, # 示例值\n",
" 'range_factor_l': 1.8, # 示例值,需要通过网格搜索优化\n",
" 'profit_factor_l': 2.8, # 示例值\n",
" 'range_factor_s': 1.6, # 示例值,需要通过网格搜索优化\n",
" 'profit_factor_s': 2.1, # 示例值\n",
" 'max_position': 10,\n",
" 'enable_log': False,\n",
" 'stop_loss_points': 15,\n",
" 'enable_log': True,\n",
" 'stop_loss_points': 20,\n",
" 'use_indicator': True,\n",
" # 'indicator': HistoricalRange(shift_window=20, down_bound=10, up_bound=24),\n",
" 'indicator': RSI(5, 15, 60),\n",
" # 'indicator_l': HistoricalRange(21, 10, 25),\n",
" # 'indicator_s': RSI(5, 20, 60),\n",
" # 'indicator': HistoricalRange(11, 25, 20),\n",
" # 'indicator': BollingerBandwidth(window=20, nbdev=2.0, down_bound=1.9, up_bound=3.25),\n",
" 'indicator_l': HistoricalRange(11, 25, 20),\n",
" 'indicator_s': BollingerBandwidth(window=20, nbdev=2.0, down_bound=1.9, up_bound=3.25),\n",
"}\n",
"start_time = datetime(2022, 1, 1)\n",
"start_time = datetime(2021, 1, 1)\n",
"end_time = datetime(2024, 6, 1)\n",
"\n",
"# start_time = datetime(2024, 6, 1)\n",
"# end_time = datetime(2025, 6, 1)\n",
"start_time = datetime(2024, 6, 1)\n",
"end_time = datetime(2025, 8, 1)\n",
"\n",
"\n",
"# --- 1. 初始化数据管理器 ---\n",
@@ -115,7 +107,7 @@
"print(\"\\n初始化回测引擎...\")\n",
"engine = BacktestEngine(\n",
" data_manager=data_manager,\n",
" strategy_class=SimpleLimitBuyStrategyShort,\n",
" strategy_class=SimpleLimitBuyStrategy,\n",
" # current_segment_symbol=strategy_parameters['symbol'],\n",
" strategy_params=strategy_parameters,\n",
" initial_capital=initial_capital,\n",

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -8,15 +8,11 @@ from src.tqsdk_real_engine import TqsdkEngine
# 导入你的策略类
from src.strategies.OpenTwoFactorStrategy import SimpleLimitBuyStrategyLong, SimpleLimitBuyStrategyShort, SimpleLimitBuyStrategy
from tqsdk import TqApi, TqBacktest, TqAuth, TqKq
from tqsdk import TqApi, TqBacktest, TqAuth, TqKq, TqAccount
# --- 配置参数 ---
# Tqsdk 的本地数据文件路径,注意 Tqsdk 要求文件名为 KQ_m@交易所_品种名_周期.csv
# 例如: KQ_m@SHFE_rb_min60.csv
initial_capital = 100000.0
slippage_rate = 0.000 # 在 Tqsdk 模拟中,滑点通常由 TqSim 处理或在策略中手动模拟
commission_rate = 0.0001 # 同上
# 主力合约的 symbol
main_symbol = "KQ.m@DCE.jm"
strategy_parameters = {
@@ -41,7 +37,9 @@ strategy_parameters = {
'indicator_s': RSI(5, 0, 100),
}
api = TqApi(TqKq(), auth=TqAuth("emanresu", "dfgvfgdfgg"))
# api = TqApi(TqKq(), auth=TqAuth("emanresu", "dfgvfgdfgg"))
api = TqApi(TqAccount('H宏源期货', '903308830', 'lzr520102'), auth=TqAuth("emanresu", "dfgvfgdfgg"))
# --- 1. 初始化回测引擎并运行 ---
print("\n初始化 Tqsdk 回测引擎...")
engine = TqsdkEngine(

View File

@@ -56,7 +56,7 @@ class BacktestEngine:
# self.current_segment_symbol = current_segment_symbol # 此行移除或作为内部变量动态管理
# 实例化策略。初始 symbol 会在 run_backtest 中根据第一根 Bar 动态设置。
self.strategy = strategy_class(self.context, symbol="INITIAL_PLACEHOLDER_SYMBOL", **strategy_params)
self.strategy = strategy_class(self.context, **strategy_params)
self.indicators = indicators

View File

@@ -1,5 +1,6 @@
from src.indicators.indicators import RSI, HistoricalRange, DifferencedVolumeIndicator, StochasticOscillator, \
RateOfChange, NormalizedATR
from curses import window
from src.indicators.indicators import *
INDICATOR_LIST = [
RSI(5),
@@ -29,5 +30,11 @@ INDICATOR_LIST = [
RateOfChange(window=20),
NormalizedATR(window=5),
NormalizedATR(window=14),
NormalizedATR(window=21)
NormalizedATR(window=21),
ADX(window=7),
ADX(window=14),
ADX(window=30),
BollingerBandwidth(window=10, nbdev=1.5),
BollingerBandwidth(window=20, nbdev=2.0),
BollingerBandwidth(window=50, nbdev=2.5),
]

View File

@@ -269,3 +269,85 @@ class NormalizedATR(Indicator):
def get_name(self):
return f"natr_{self.window}"
class ADX(Indicator):
"""
平均趋向指标 (ADX),用于衡量趋势的强度而非方向。
是区分趋势行情和震荡行情的核心过滤指标。
"""
def __init__(
self,
window: int = 14,
down_bound: float = None, # 例如,设置 down_bound=25 可过滤出强趋势行情
up_bound: float = None, # 例如,设置 up_bound=20 可过滤出震荡行情
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array,
low: np.array,
volume: np.array, # 不使用
) -> np.array:
"""
根据最高价、最低价和收盘价计算ADX值。
"""
adx_values = talib.ADX(high, low, close, timeperiod=self.window)
return adx_values
def get_name(self):
return f"adx_{self.window}"
class BollingerBandwidth(Indicator):
"""
布林带宽度,计算公式为 (上轨 - 下轨) / 中轨。
这是一个归一化的波动率指标用于识别波动性的收缩Squeeze和扩张。
"""
def __init__(
self,
window: int = 20,
nbdev: float = 2.0, # 标准差倍数
down_bound: float = None,
up_bound: float = None,
shift_window: int = 0,
):
super().__init__(down_bound, up_bound)
self.window = window
self.nbdev = nbdev
self.shift_window = shift_window
def get_values(
self,
close: np.array,
open: np.array, # 不使用
high: np.array, # 不使用
low: np.array, # 不使用
volume: np.array, # 不使用
) -> np.array:
"""
根据收盘价计算布林带宽度。
"""
upper, middle, lower = talib.BBANDS(
close,
timeperiod=self.window,
nbdevup=self.nbdev,
nbdevdn=self.nbdev,
matype=0 # 使用SMA
)
# 为避免除以0在 middle 为0或NaN的地方带宽也设为NaN
bandwidth = np.full_like(middle, np.nan)
mask = (middle > 0)
bandwidth[mask] = (upper[mask] - lower[mask]) / middle[mask] * 100
return bandwidth
def get_name(self):
return f"bbw_{self.window}_{int(self.nbdev*10)}"

View File

@@ -22,7 +22,7 @@ class SimpleLimitBuyStrategyLong(Strategy):
def __init__(
self,
context: Any,
symbol: str,
main_symbol: str,
enable_log: bool,
trade_volume: int,
range_factor: float,
@@ -46,7 +46,7 @@ class SimpleLimitBuyStrategyLong(Strategy):
stop_loss_points (float): 止损点数(例如,亏损达到此点数则止损)。
take_profit_points (float): 止盈点数(例如,盈利达到此点数则止盈)。
"""
super().__init__(context, symbol, enable_log)
super().__init__(context, main_symbol, enable_log)
self.trade_volume = trade_volume
self.range_factor = range_factor
self.profit_factor = profit_factor
@@ -57,6 +57,8 @@ class SimpleLimitBuyStrategyLong(Strategy):
self.indicator = indicator
self.lag = lag
self.main_symbol = main_symbol
self.order_id_counter = 0
self._last_order_id: Optional[str] = None # 用于跟踪上一根K线发出的订单ID
@@ -71,7 +73,7 @@ class SimpleLimitBuyStrategyLong(Strategy):
def on_init(self):
super().on_init()
count = self.cancel_all_pending_orders()
count = self.cancel_all_pending_orders(self.main_symbol)
self.log(f"取消{count}笔订单")
def on_open_bar(self, open: float, symbol: str):
@@ -138,12 +140,15 @@ class SimpleLimitBuyStrategyLong(Strategy):
if avg_entry_price is not None:
pnl_per_unit = open - avg_entry_price # 当前浮动盈亏(以收盘价计算)
stop_position_points = range_1_ago * self.profit_factor
stop_position_points = 20
self.log(
f"[{current_datetime}] PnL per unit: {pnl_per_unit:.2f}, 目标: {range_1_ago * self.profit_factor:.2f}"
f"[{current_datetime}] PnL per unit: {pnl_per_unit:.2f}, 目标: {stop_position_points:.2f}"
)
# 止盈条件
if pnl_per_unit >= range_1_ago * self.profit_factor:
if pnl_per_unit >= stop_position_points:
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
@@ -246,7 +251,7 @@ class SimpleLimitBuyStrategyLong(Strategy):
# self.log(f"[{current_datetime}] 不满足开仓条件:持仓={current_pos_volume}, 待处理订单={len(pending_orders_after_cancel)}, K线历史长度={len(bar_history)}")
def on_close_bar(self, bar: Bar, next_bar_open: Optional[float] = None):
self.cancel_all_pending_orders()
self.cancel_all_pending_orders(self.main_symbol)
def on_rollover(self, old_symbol: str, new_symbol: str):
"""
@@ -270,7 +275,7 @@ class SimpleLimitBuyStrategyShort(Strategy):
def __init__(
self,
context: Any,
symbol: str,
main_symbol: str,
enable_log: bool,
trade_volume: int,
range_factor: float,
@@ -294,7 +299,7 @@ class SimpleLimitBuyStrategyShort(Strategy):
stop_loss_points (float): 止损点数(例如,亏损达到此点数则止损)。
take_profit_points (float): 止盈点数(例如,盈利达到此点数则止盈)。
"""
super().__init__(context, symbol, enable_log)
super().__init__(context, main_symbol, enable_log)
self.trade_volume = trade_volume
self.range_factor = range_factor
self.profit_factor = profit_factor
@@ -305,6 +310,8 @@ class SimpleLimitBuyStrategyShort(Strategy):
self.indicator = indicator
self.lag = lag
self.main_symbol = main_symbol
self.order_id_counter = 0
self._last_order_id: Optional[str] = None # 用于跟踪上一根K线发出的订单ID
@@ -319,7 +326,7 @@ class SimpleLimitBuyStrategyShort(Strategy):
def on_init(self):
super().on_init()
count = self.cancel_all_pending_orders()
count = self.cancel_all_pending_orders(self.main_symbol)
self.log(f"取消{count}笔订单")
def on_open_bar(self, open: float, symbol: str):
@@ -489,7 +496,7 @@ class SimpleLimitBuyStrategyShort(Strategy):
# self.log(f"[{current_datetime}] 不满足开仓条件:持仓={current_pos_volume}, 待处理订单={len(pending_orders_after_cancel)}, K线历史长度={len(bar_history)}")
def on_close_bar(self, bar: Bar, next_bar_open: Optional[float] = None):
self.cancel_all_pending_orders()
self.cancel_all_pending_orders(self.main_symbol)
def on_rollover(self, old_symbol: str, new_symbol: str):
"""
@@ -513,7 +520,7 @@ class SimpleLimitBuyStrategy(Strategy):
def __init__(
self,
context: Any,
symbol: str,
main_symbol: str,
enable_log: bool,
trade_volume: int,
range_factor_l: float,
@@ -527,6 +534,8 @@ class SimpleLimitBuyStrategy(Strategy):
indicator_l: Indicator = None,
indicator_s: Indicator = None,
lag: int = 7,
lag_l: int = None,
lag_s: int = None,
): # 新增:止盈点数
"""
初始化策略。
@@ -540,7 +549,7 @@ class SimpleLimitBuyStrategy(Strategy):
stop_loss_points (float): 止损点数(例如,亏损达到此点数则止损)。
take_profit_points (float): 止盈点数(例如,盈利达到此点数则止盈)。
"""
super().__init__(context, symbol, enable_log)
super().__init__(context, main_symbol, enable_log)
self.trade_volume = trade_volume
self.range_factor_l = range_factor_l
self.profit_factor_l = profit_factor_l
@@ -552,7 +561,16 @@ class SimpleLimitBuyStrategy(Strategy):
self.use_indicator = use_indicator
self.indicator_l = indicator_l
self.indicator_s = indicator_s
self.lag = lag
self.main_symbol = main_symbol
self.lag_l = lag_l
self.lag_s = lag_s
if self.lag_l is None:
self.lag_l = lag
if self.lag_s is None:
self.lag_s = lag
self.order_id_counter = 0
@@ -570,7 +588,7 @@ class SimpleLimitBuyStrategy(Strategy):
def on_init(self):
super().on_init()
count = self.cancel_all_pending_orders()
count = self.cancel_all_pending_orders(self.main_symbol)
self.log(f"取消{count}笔订单")
def on_open_bar(self, open, symbol):
@@ -583,24 +601,30 @@ class SimpleLimitBuyStrategy(Strategy):
self.symbol = symbol
current_datetime = self.get_current_time()
self.log(
f"[{current_datetime}] - 当前Open={open:.2f}, "
)
# --- 1. 撤销上一根K线未成交的订单 ---
count = self.cancel_all_pending_orders(self.main_symbol)
self.log(f"取消{count}笔订单")
# 检查是否记录了上一笔订单ID并且该订单仍然在待处理列表中
if self._last_order_id:
pending_orders = self.get_pending_orders()
if self._last_order_id in pending_orders:
success = self.cancel_order(
self._last_order_id
) # 直接调用基类的取消方法
if success:
self.log(
f"[{current_datetime}] 策略: 成功撤销上一根K线未成交订单 {self._last_order_id}"
)
else:
self.log(
f"[{current_datetime}] 策略: 尝试撤销订单 {self._last_order_id} 失败(可能已成交或不存在)"
)
# 无论撤销成功与否,既然我们尝试了撤销,就清除记录
self._last_order_id = None
# if self._last_order_id:
# pending_orders = self.get_pending_orders()
# if self._last_order_id in pending_orders:
# success = self.cancel_order(
# self._last_order_id
# ) # 直接调用基类的取消方法
# if success:
# self.log(
# f"[{current_datetime}] 策略: 成功撤销上一根K线未成交订单 {self._last_order_id}"
# )
# else:
# self.log(
# f"[{current_datetime}] 策略: 尝试撤销订单 {self._last_order_id} 失败(可能已成交或不存在)"
# )
# # 无论撤销成功与否,既然我们尝试了撤销,就清除记录
# self._last_order_id = None
# else:
# self.log(f"[{current_datetime}] 策略: 无上一根K线未成交订单需要撤销。")
@@ -610,6 +634,7 @@ class SimpleLimitBuyStrategy(Strategy):
# 获取当前持仓和未决订单(在取消之后获取,确保是最新的状态)
current_positions = self.get_current_positions()
current_pos_volume = current_positions.get(self.symbol, 0)
self.log(f'current_pos_volume: {current_pos_volume}')
pending_orders_after_cancel = (
self.get_pending_orders()
) # 再次获取,此时应已取消旧订单
@@ -617,79 +642,21 @@ class SimpleLimitBuyStrategy(Strategy):
range_1_ago = None
bar_history = self.get_bar_history()
if len(bar_history) > 16:
# 获取前1根K线 (倒数第二根) 和前7根K线 (队列中最老的一根)
bar_1_ago = bar_history[-self.lag]
# 计算历史 K 线的 Range
# 持有多仓
if (
current_pos_volume > 0 and len(bar_history) > 16
): # 假设只做多,所以持仓量 > 0
bar_1_ago = bar_history[-self.lag_l]
range_1_ago = bar_1_ago.high - bar_1_ago.low
# --- 3. 平仓逻辑 (止损/止盈) ---
# 只有当有持仓时才考虑平仓
if (
current_pos_volume < 0 and range_1_ago is not None
): # 假设只做多,所以持仓量 > 0
avg_entry_price = self.get_average_position_price(self.symbol)
if avg_entry_price is not None:
pnl_per_unit = avg_entry_price - open # 当前浮动盈亏(以收盘价计算)
self.log(
f"[{current_datetime}] 止盈信号 - PnL per unit: {pnl_per_unit:.2f}, 目标: {self.take_profit_points:.2f}"
)
# 止盈条件
if pnl_per_unit >= range_1_ago * self.profit_factor_s:
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
# 创建一个限价多单
order = Order(
id=order_id,
symbol=self.symbol,
direction="CLOSE_SHORT",
volume=trade_volume,
price_type="MARKET",
# limit_price=limit_price,
submitted_time=current_datetime,
offset="CLOSE",
)
trade = self.send_order(order)
return # 平仓后本K线不再进行开仓判断
# 止损条件
elif pnl_per_unit <= -self.stop_loss_points:
self.log(
f"[{current_datetime}] 止损信号 - PnL per unit: {pnl_per_unit:.2f}, 目标: {-self.stop_loss_points:.2f}"
)
# 发送市价卖出订单平仓,确保立即成交
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
# 创建一个限价多单
order = Order(
id=order_id,
symbol=self.symbol,
direction="CLOSE_SHORT",
volume=trade_volume,
price_type="MARKET",
# limit_price=limit_price,
submitted_time=current_datetime,
offset="CLOSE",
)
trade = self.send_order(order)
return # 平仓后本K线不再进行开仓判断
if (
current_pos_volume > 0 and range_1_ago is not None
): # 假设只做多,所以持仓量 > 0
avg_entry_price = self.get_average_position_price(self.symbol)
if avg_entry_price is not None:
pnl_per_unit = open - avg_entry_price # 当前浮动盈亏(以收盘价计算)
self.log(
f"[{current_datetime}] 止盈信号 - PnL per unit: {pnl_per_unit:.2f}, 目标: {self.take_profit_points:.2f}"
f"[{current_datetime}] 止盈信号 - PnL per unit: {pnl_per_unit:.2f}, 目标: {self.take_profit_points:.2f}, 止损: {-self.stop_loss_points:.2f}"
)
# 止盈条件
@@ -703,7 +670,7 @@ class SimpleLimitBuyStrategy(Strategy):
id=order_id,
symbol=self.symbol,
direction="CLOSE_LONG",
volume=trade_volume,
volume=abs(current_pos_volume),
price_type="MARKET",
# limit_price=limit_price,
submitted_time=current_datetime,
@@ -714,9 +681,6 @@ class SimpleLimitBuyStrategy(Strategy):
# 止损条件
elif pnl_per_unit <= -self.stop_loss_points:
self.log(
f"[{current_datetime}] 止损信号 - PnL per unit: {pnl_per_unit:.2f}, 目标: {-self.stop_loss_points:.2f}"
)
# 发送市价卖出订单平仓,确保立即成交
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
@@ -726,7 +690,63 @@ class SimpleLimitBuyStrategy(Strategy):
id=order_id,
symbol=self.symbol,
direction="CLOSE_LONG",
volume=trade_volume,
volume=abs(current_pos_volume),
price_type="MARKET",
# limit_price=limit_price,
submitted_time=current_datetime,
offset="CLOSE",
)
trade = self.send_order(order)
return # 平仓后本K线不再进行开仓判断
# 持有空仓
if (
current_pos_volume < 0 and len(bar_history) > 16
): # 假设只做多,所以持仓量 > 0
bar_1_ago = bar_history[-self.lag_s]
range_1_ago = bar_1_ago.high - bar_1_ago.low
avg_entry_price = self.get_average_position_price(self.symbol)
if avg_entry_price is not None:
pnl_per_unit = avg_entry_price - open # 当前浮动盈亏(以收盘价计算)
self.log(
f"[{current_datetime}] 止盈信号 - PnL per unit: {pnl_per_unit:.2f}, 目标: {self.take_profit_points:.2f}, 止损: {-self.stop_loss_points:.2f}"
)
# 止盈条件
if pnl_per_unit >= range_1_ago * self.profit_factor_s:
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
# 创建一个限价多单
order = Order(
id=order_id,
symbol=self.symbol,
direction="CLOSE_SHORT",
volume=abs(current_pos_volume),
price_type="MARKET",
# limit_price=limit_price,
submitted_time=current_datetime,
offset="CLOSE",
)
trade = self.send_order(order)
return # 平仓后本K线不再进行开仓判断
# 止损条件
elif pnl_per_unit <= -self.stop_loss_points:
# 发送市价卖出订单平仓,确保立即成交
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
self.order_id_counter += 1
# 创建一个限价多单
order = Order(
id=order_id,
symbol=self.symbol,
direction="CLOSE_SHORT",
volume=abs(current_pos_volume),
price_type="MARKET",
# limit_price=limit_price,
submitted_time=current_datetime,
@@ -739,7 +759,7 @@ class SimpleLimitBuyStrategy(Strategy):
# 只有在没有持仓 (current_pos_volume == 0) 且没有待处理订单 (not pending_orders_after_cancel)
# 且K线历史足够长时才考虑开仓
if current_pos_volume == 0 and range_1_ago is not None:
if current_pos_volume == 0 and len(bar_history) > 16:
if not self.use_indicator or self.indicator_l.is_condition_met(
np.array(self.get_price_history("close")),
@@ -750,6 +770,9 @@ class SimpleLimitBuyStrategy(Strategy):
):
# 根据策略逻辑计算目标买入价格
# 目标买入价 = 当前K线Open - (前1根Range * 因子1 + 前7根Range * 因子2)
bar_1_ago = bar_history[-self.lag_l]
range_1_ago = bar_1_ago.high - bar_1_ago.low
self.log(open, range_1_ago * self.range_factor_l)
target_buy_price = open - (range_1_ago * self.range_factor_l)
@@ -757,9 +780,9 @@ class SimpleLimitBuyStrategy(Strategy):
target_buy_price = max(0.01, target_buy_price)
self.log(
f"[{current_datetime}] 开多仓信号 - 当前Open={open:.2f}, "
f"[{current_datetime}] LONG信号 - 当前Open={open:.2f}, "
f"前1Range={range_1_ago:.2f}, "
f"计算目标买入价={target_buy_price:.2f}"
f"计算目标LONG价={target_buy_price:.2f}"
)
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
@@ -780,14 +803,11 @@ class SimpleLimitBuyStrategy(Strategy):
if new_order:
self._last_order_id = new_order.id
self.log(
f"[{current_datetime}] 策略: 发送限价买入订单 {self._last_order_id} @ {target_buy_price:.2f}"
f"[{current_datetime}] 策略: 发送限价LONG订单 {self._last_order_id} @ {target_buy_price:.2f}"
)
else:
self.log(f"[{current_datetime}] 策略: 发送订单失败。")
indicator_value = RSI(5).get_latest_value(
np.array(self.get_price_history("close")), None, None, None, None
)
if not self.use_indicator or self.indicator_s.is_condition_met(
np.array(self.get_price_history("close")),
np.array(self.get_price_history("open")),
@@ -797,6 +817,9 @@ class SimpleLimitBuyStrategy(Strategy):
):
# 根据策略逻辑计算目标买入价格
# 目标买入价 = 当前K线Open - (前1根Range * 因子1 + 前7根Range * 因子2)
bar_1_ago = bar_history[-self.lag_s]
range_1_ago = bar_1_ago.high - bar_1_ago.low
self.log(open, range_1_ago * self.range_factor_s)
target_buy_price = open + (range_1_ago * self.range_factor_s)
@@ -804,9 +827,9 @@ class SimpleLimitBuyStrategy(Strategy):
target_buy_price = max(0.01, target_buy_price)
self.log(
f"[{current_datetime}] 开多仓信号 - 当前Open={open:.2f}, "
f"[{current_datetime}] SHORT信号 - 当前Open={open:.2f}, "
f"前1Range={range_1_ago:.2f}, "
f"计算目标买入价={target_buy_price:.2f}"
f"计算目标SHORT价={target_buy_price:.2f}"
)
order_id = f"{self.symbol}_BUY_{current_datetime.strftime('%Y%m%d%H%M%S')}_{self.order_id_counter}"
@@ -827,15 +850,13 @@ class SimpleLimitBuyStrategy(Strategy):
if new_order:
self._last_order_id = new_order.id
self.log(
f"[{current_datetime}] 策略: 发送限价买入订单 {self._last_order_id} @ {target_buy_price:.2f}"
f"[{current_datetime}] 策略: 发送限价SHORT订单 {self._last_order_id} @ {target_buy_price:.2f}"
)
else:
self.log(f"[{current_datetime}] 策略: 发送订单失败。")
# else:
# self.log(f"[{current_datetime}] 不满足开仓条件:持仓={current_pos_volume}, 待处理订单={len(pending_orders_after_cancel)}, K线历史长度={len(bar_history)}")
def on_close_bar(self, bar: Bar, next_bar_open: Optional[float] = None):
self.cancel_all_pending_orders()
self.cancel_all_pending_orders(self.main_symbol)
def on_rollover(self, old_symbol: str, new_symbol: str):
"""

View File

@@ -112,7 +112,8 @@ class Strategy(ABC):
return self.context.cancel_order(order_id)
def cancel_all_pending_orders(self) -> int:
def cancel_all_pending_orders(self, main_symbol = None) -> int:
"""取消当前策略的未决订单仅限于当前策略关注的Symbol。"""
# 注意:在换月模式下,引擎会自动取消旧合约的挂单,这里是策略主动取消
if not self.trading:
@@ -123,9 +124,14 @@ class Strategy(ABC):
# orders_to_cancel = [
# order.id for order in pending_orders.values() if order.symbol == self.symbol
# ]
orders_to_cancel = [
order.id for order in pending_orders.values()
]
if main_symbol is not None:
orders_to_cancel = [
order.id for order in pending_orders.values() if main_symbol in order.symbol
]
else:
orders_to_cancel = [
order.id for order in pending_orders.values()
]
for order_id in orders_to_cancel:
if self.cancel_order(order_id):
cancelled_count += 1

View File

@@ -4,7 +4,7 @@ import asyncio
from datetime import date, datetime, timedelta
from typing import Literal, Type, Dict, Any, List, Optional
import pandas as pd
import uuid
import time
# 导入你提供的 core_data 中的类型
from src.common_utils import is_bar_pre_close_period, is_futures_trading_time
@@ -74,9 +74,10 @@ class TqsdkEngine:
self._api: TqApi = api
# 从策略参数中获取主symbolTqsdkContext 需要知道它
self.symbol: str = strategy_params.get("symbol")
if not self.symbol:
raise ValueError("strategy_params 必须包含 'symbol' 字段")
# self.symbol: str = strategy_params.get("symbol")
# if not self.symbol:
# raise ValueError("strategy_params 必须包含 'symbol' 字段")
self.symbol = symbol
# 获取 K 线数据Tqsdk 自动处理)
# 这里假设策略所需 K 线周期在 strategy_params 中否则默认60秒1分钟K线
@@ -119,6 +120,9 @@ class TqsdkEngine:
self.quote = api.get_quote(symbol)
self.partial_bar: Bar = None
self.kline_row = None
self.target_pos_dict = {}
print("TqsdkEngine: 初始化完成。")
@@ -161,68 +165,53 @@ class TqsdkEngine:
if "SHFE" in order_to_send.symbol:
tqsdk_offset = "OPEN"
try:
tq_order = self._api.insert_order(
symbol=order_to_send.symbol,
direction=tqsdk_direction,
offset=tqsdk_offset,
volume=order_to_send.volume,
# Tqsdk 市价单 limit_price 设为 None限价单则传递价格
limit_price=(
order_to_send.limit_price
if order_to_send.price_type == "LIMIT"
# else self.quote.bid_price1 + (1 if tqsdk_direction == "BUY" else -1)
else (
self.quote.bid_price1
if tqsdk_direction == "SELL"
else self.quote.ask_price1
)
),
)
# 更新原始 Order 对象与 Tqsdk 的订单ID和状态
order_to_send.id = tq_order.order_id
# order_to_send.order_id = tq_order.order_id
# order_to_send.status = tq_order.status
order_to_send.submitted_time = pd.to_datetime(
tq_order.insert_date_time, unit="ns", utc=True
)
if "CLOSE" in order_to_send.direction:
current_positions = self._context.get_current_positions()
current_pos_volume = current_positions.get(order_to_send.symbol, 0)
# 等待订单状态更新(成交/撤销/报错)
# 在 Tqsdk 中,订单和成交是独立的,通常在 wait_update() 循环中通过 api.is_changing() 检查
# 这里为了模拟同步处理,直接等待订单状态最终确定
# 注意:实际回测中,不应在这里长时间阻塞,而应在主循环中持续 wait_update
# 为了简化适配,这里模拟即时处理,但可能与真实异步行为有差异。
# 更健壮的方式是在主循环中通过订单状态回调更新
# 这里我们假设订单会很快更新状态,或者在下一个 wait_update() 周期中被检测到
self._api.wait_update() # 等待一次更新
target_volume = None
if order_to_send.direction == 'CLOSE_LONG':
target_volume = current_pos_volume - order_to_send.volume
elif order_to_send.direction == 'CLOSE_SHORT':
target_volume = current_pos_volume + order_to_send.volume
# # 检查最终订单状态和成交
# if tq_order.status == "FINISHED":
# # 查找对应的成交记录
# for trade_id, tq_trade in self._api.get_trade().items():
# if tq_trade.order_id == tq_order.order_id and tq_trade.volume > 0: # 确保是实际成交
# # 创建 core_data.Trade 对象
# trade = Trade(
# order_id=tq_trade.order_id,
# fill_time=tafunc.get_datetime_from_timestamp(tq_trade.trade_date_time) if tq_trade.trade_date_time else datetime.now(),
# symbol=order_to_send.symbol, # 使用 Context 中的 symbol
# direction=tq_trade.direction, # 实际成交方向
# volume=tq_trade.volume,
# price=tq_trade.price,
# commission=tq_trade.commission,
# cash_after_trade=self._api.get_account().available,
# positions_after_trade=self._context.get_current_positions(),
# realized_pnl=tq_trade.realized_pnl, # Tqsdk TqTrade 对象有 realized_pnl
# is_open_trade=tq_trade.offset == "OPEN",
# is_close_trade=tq_trade.offset in ["CLOSE", "CLOSETODAY", "CLOSEYESTERDAY"]
# )
# self.trade_history.append(trade)
# print(f"Engine: 成交记录: {trade}")
# break # 找到成交就跳出
# order_to_send.status = tq_order.status # 更新最终状态
except Exception as e:
print(f"Engine: 发送订单 {order_to_send.id} 失败: {e}")
# order_to_send.status = "ERROR"
if target_volume is not None:
if order_to_send.symbol not in self.target_pos_dict:
self.target_pos_dict[order_to_send.symbol] = TargetPosTask(self._api, order_to_send.symbol)
self.target_pos_dict[order_to_send.symbol].set_target_volume(target_volume)
else:
try:
tq_order = self._api.insert_order(
symbol=order_to_send.symbol,
direction=tqsdk_direction,
offset=tqsdk_offset,
volume=order_to_send.volume,
# Tqsdk 市价单 limit_price 设为 None限价单则传递价格
limit_price=(
order_to_send.limit_price
if order_to_send.price_type == "LIMIT"
# else self.quote.bid_price1 + (1 if tqsdk_direction == "BUY" else -1)
else (
self.quote.bid_price1
if tqsdk_direction == "SELL"
else self.quote.ask_price1
)
),
)
# 更新原始 Order 对象与 Tqsdk 的订单ID和状态
order_to_send.id = tq_order.order_id
# order_to_send.order_id = tq_order.order_id
# order_to_send.status = tq_order.status
order_to_send.submitted_time = pd.to_datetime(
tq_order.insert_date_time, unit="ns", utc=True
)
self._api.wait_update() # 等待一次更新
except Exception as e:
print(f"Engine: 发送订单 {order_to_send.id} 失败: {e}")
# order_to_send.status = "ERROR"
# 处理取消请求
while self._context.cancel_queue:
@@ -406,15 +395,28 @@ class TqsdkEngine:
new_bar = False
if self._api.is_changing(self.klines.iloc[-1], "open"):
print(f"TqsdkEngine: open change!, open:{self.klines.iloc[-1].open}, now: {datetime.now()}")
if self._api.is_changing(self.klines.iloc[-1], "datetime"):
# 等待到整点-00 或 30 分且秒>1
while True:
now = datetime.now()
minute = now.minute
second = now.second
if (minute % 30 == 0) and (second > 1):
break
# 小粒度休眠,防止 CPU 空转
time.sleep(0.1)
# 到这里一定满足“整点-00/30 且秒>1”
kline_row = self.klines.iloc[-1]
kline_dt = pd.to_datetime(kline_row.datetime, unit="ns", utc=True)
kline_dt = kline_dt.tz_convert(BEIJING_TZ)
self.kline_row = kline_row
print(
f"TqsdkEngine: 新k线产生,k line datetime:{kline_dt}, now: {datetime.now()}"
)
print(f"TqsdkEngine: 新k线产生, k line datetime:{kline_dt}, now: {datetime.now()}")
self.main(kline_row, self.klines.iloc[-2])
new_bar = True