主力合约回测

This commit is contained in:
2025-06-23 22:21:59 +08:00
parent a81a32ce73
commit afed83f96f
12 changed files with 739 additions and 100713 deletions

View File

@@ -10,5 +10,9 @@ print(ls)
ls = api.query_quotes(ins_class="CONT", product_id="au")
print(ls)
quote = api.get_quote("KQ.m@SHFE.rb")
# 打印现在螺纹钢主连的标的合约
print(quote.underlying_symbol)
# 关闭api,释放相应资源
api.close()

View File

@@ -2,7 +2,13 @@ import traceback
import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TqSim # 确保导入所有需要的回测/模拟类
from tqsdk import (
TqApi,
TqAuth,
TqBacktest,
TqSim,
BacktestFinished,
) # 确保导入所有需要的回测/模拟类
import os
import datetime
from datetime import date # 导入 datetime.date
@@ -13,18 +19,18 @@ from datetime import date # 导入 datetime.date
TQ_USER_NAME = "emanresu" # 例如: "123456"
TQ_PASSWORD = "dfgvfgdfgg" # 例如: "your_password"
BEIJING_TZ = 'Asia/Shanghai'
BEIJING_TZ = "Asia/Shanghai"
def collect_and_save_tqsdk_data_stream(
symbol: str,
freq: str,
start_date_str: str,
end_date_str: str,
mode: str = "backtest", # 默认为回测模式,因为获取历史数据通常用于回测
output_dir: str = "../data",
tq_user: str = TQ_USER_NAME,
tq_pwd: str = TQ_PASSWORD
symbol: str,
freq: str,
start_date_str: str,
end_date_str: str,
mode: str = "backtest", # 默认为回测模式,因为获取历史数据通常用于回测
output_dir: str = "../data",
tq_user: str = TQ_USER_NAME,
tq_pwd: str = TQ_PASSWORD,
) -> pd.DataFrame:
"""
通过 TqSdk 在指定模式下回测或模拟运行监听并收集指定品种、频率、日期范围的K线数据流
@@ -52,16 +58,20 @@ def collect_and_save_tqsdk_data_stream(
collected_data = [] # 用于收集每一根完整K线的数据
try:
start_dt_data_obj = datetime.datetime.strptime(start_date_str, '%Y-%m-%d')
end_dt_data_obj = datetime.datetime.strptime(end_date_str, '%Y-%m-%d')
start_dt_data_obj = datetime.datetime.strptime(start_date_str, "%Y-%m-%d")
end_dt_data_obj = datetime.datetime.strptime(end_date_str, "%Y-%m-%d")
if mode == "backtest":
backtest_start_date = start_dt_data_obj.date()
backtest_end_date = end_dt_data_obj.date()
print(f"初始化天勤回测API回测日期范围{backtest_start_date}{backtest_end_date}")
print(
f"初始化天勤回测API回测日期范围{backtest_start_date}{backtest_end_date}"
)
api = TqApi(
backtest=TqBacktest(start_dt=backtest_start_date, end_dt=backtest_end_date),
auth=TqAuth(tq_user, tq_pwd)
backtest=TqBacktest(
start_dt=backtest_start_date, end_dt=backtest_end_date
),
auth=TqAuth(tq_user, tq_pwd),
)
elif mode == "sim":
print("初始化天勤模拟/实盘API")
@@ -83,21 +93,31 @@ def collect_and_save_tqsdk_data_stream(
elif freq == "month":
duration_seconds = 30 * 24 * 60 * 60 # 大约一个月
else:
print(f"错误: 不支持的数据频率 '{freq}'。目前支持 '1min', '5min', 'day', 'week', 'month'")
print(
f"错误: 不支持的数据频率 '{freq}'。目前支持 '1min', '5min', 'day', 'week', 'month'"
)
print("注意Tick数据量巨大不建议用此方法直接收集因为它会耗尽内存。")
return None
# 获取K线序列这里获取的是指定频率的K线天勤会根据模式从历史或实时流中推送
klines = api.get_kline_serial(symbol, duration_seconds)
quote = api.get_quote(symbol=symbol)
underlying_symbol = quote.underlying_symbol
print(f"开始在 '{mode}' 模式下收集 {symbol}{start_date_str}{end_date_str}{freq} 数据...")
print(
f"开始在 '{mode}' 模式下收集 {symbol}{start_date_str}{end_date_str}{freq} 数据..."
)
last_kline_datetime = None # 用于跟踪上一根已完成K线的时间
while api.wait_update():
if underlying_symbol is None:
underlying_symbol = quote.underlying_symbol
# 检查是否有新的完整K线生成或者当前K线是最后一次更新 (在回测结束时)
# TqSdk会在K线完成时发送最后一次更新或者在回测结束时强制更新
if api.is_changing(quote, "underlying_symbol"):
underlying_symbol = quote.underlying_symbol
if api.is_changing(klines):
# 只有当K线序列发生变化时才处理
# 关注最新一根 K 线(即 klines.iloc[-1]
@@ -107,32 +127,44 @@ def collect_and_save_tqsdk_data_stream(
# 判断当前K线是否已经结束 (is_last=True) 并且与上一次保存的K线不同
# 或者在回测模式下回测结束时最后一根K线也会被视为“完成”
# 判断条件K线时间戳不是 None 且 大于上一次记录的 K线时间
if not pd.isna(current_kline['datetime']) and (last_kline_datetime is None or (
last_kline_datetime is not None and current_kline['datetime'] > last_kline_datetime)):
if not pd.isna(current_kline["datetime"]) and (
last_kline_datetime is None
or (
last_kline_datetime is not None
and current_kline["datetime"] > last_kline_datetime
)
):
# 将datetime (微秒) 转换为可读格式
# 检查K线的时间戳是否在我们要获取的日期范围内
# 注意get_kline_serial 会获取指定范围前后的一小段数据,我们需要过滤
kline_dt = pd.to_datetime(current_kline['datetime'], unit='ns', utc=True)
kline_dt = kline_dt.tz_convert(BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')
kline_dt = pd.to_datetime(
current_kline["datetime"], unit="ns", utc=True
)
kline_dt = kline_dt.tz_convert(BEIJING_TZ).strftime(
"%Y-%m-%d %H:%M:%S"
)
kline_data_to_save = {
'datetime': kline_dt,
'open': current_kline['open'],
'high': current_kline['high'],
'low': current_kline['low'],
'close': current_kline['close'],
'volume': current_kline['volume'],
'open_oi': current_kline['open_oi'],
'close_oi': current_kline['close_oi']
"datetime": kline_dt,
"open": current_kline["open"],
"high": current_kline["high"],
"low": current_kline["low"],
"close": current_kline["close"],
"volume": current_kline["volume"],
"open_oi": current_kline["open_oi"],
"close_oi": current_kline["close_oi"],
"underlying_symbol": underlying_symbol,
}
collected_data.append(kline_data_to_save)
last_kline_datetime = current_kline['datetime']
last_kline_datetime = current_kline["datetime"]
# print(f"收集到 K线: {kline_dt}, close: {current_kline['close']}") # 用于调试
# 在回测模式下当回测结束时api.wait_update() 会抛出异常,此时我们可以退出循环
if api.is_changing(api.get_account()) or api.is_changing(api.get_position()):
if api.is_changing(api.get_account()) or api.is_changing(
api.get_position()
):
break
except Exception as e:
@@ -146,16 +178,19 @@ def collect_and_save_tqsdk_data_stream(
# 无论如何,都尝试处理剩余数据并保存
finally:
if collected_data:
df = pd.DataFrame(collected_data).set_index('datetime')
df = pd.DataFrame(collected_data).set_index("datetime")
df = df.sort_index() # 确保数据按时间排序
# 构造保存路径
freq_folder = freq.replace("min", "m") if "min" in freq else freq
if freq == "day": freq_folder = "daily"
if freq == "week": freq_folder = "weekly"
if freq == "month": freq_folder = "monthly"
if freq == "day":
freq_folder = "daily"
if freq == "week":
freq_folder = "weekly"
if freq == "month":
freq_folder = "monthly"
safe_symbol = symbol.replace('.', '_')
safe_symbol = symbol.replace(".", "_")
save_folder = os.path.join(output_dir, safe_symbol)
os.makedirs(save_folder, exist_ok=True)
@@ -163,6 +198,7 @@ def collect_and_save_tqsdk_data_stream(
file_name = f"{safe_symbol}_{freq}.csv"
file_path = os.path.join(save_folder, file_name)
print(df.head())
df.to_csv(file_path, index=True)
print(f"数据已成功保存到: {file_path}, 共 {len(df)} 条记录。")
@@ -175,6 +211,7 @@ def collect_and_save_tqsdk_data_stream(
api.close()
return None
# --- 示例用法 ---
if __name__ == "__main__":
import os
@@ -187,16 +224,17 @@ if __name__ == "__main__":
TQ_USER_NAME = "emanresu" # 例如: "123456"
TQ_PASSWORD = "dfgvfgdfgg" # 例如: "your_password"
# 示例1: 在回测模式下获取沪深300指数主连的日线数据 (用于历史回测)
# 这种方式适合获取相对较短或中等长度的历史K线数据。
df_if_backtest_daily = collect_and_save_tqsdk_data_stream(
symbol="KQ.i@SHFE.rb",
freq="day",
symbol="KQ.m@SHFE.rb",
# symbol='SHFE.rb2510',
# symbol='KQ.i@SHFE.bu',
freq="min60",
start_date_str="2023-01-01",
end_date_str="2025-05-01",
end_date_str="2025-06-22",
mode="backtest", # 指定为回测模式
tq_user=TQ_USER_NAME,
tq_pwd=TQ_PASSWORD
tq_pwd=TQ_PASSWORD,
)
if df_if_backtest_daily is not None:
print(df_if_backtest_daily.tail())