更新qmt代码

This commit is contained in:
2025-12-19 14:11:32 +08:00
parent 1bb0a56857
commit 27ea270353
4 changed files with 693 additions and 162 deletions

258
qmt/dashboard.html Normal file
View File

@@ -0,0 +1,258 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>QMT 实盘监控看板</title>
<script src="https://unpkg.com/vue@3/dist/vue.global.js"></script>
<link rel="stylesheet" href="https://unpkg.com/element-plus/dist/index.css" />
<script src="https://unpkg.com/element-plus"></script>
<script src="https://unpkg.com/@element-plus/icons-vue"></script>
<style>
body { background-color: #f0f2f5; margin: 0; padding: 20px; font-family: 'Helvetica Neue', Helvetica, 'PingFang SC', sans-serif; }
.card-header { display: flex; justify-content: space-between; align-items: center; }
.box-card { margin-bottom: 20px; }
.log-box {
background: #1e1e1e;
color: #d4d4d4;
padding: 10px;
border-radius: 4px;
height: 400px;
overflow-y: scroll;
font-family: 'Consolas', 'Monaco', monospace;
font-size: 13px;
line-height: 1.5;
}
.log-line { margin: 0; border-bottom: 1px solid #333; white-space: pre-wrap; word-break: break-all; }
.log-line:hover { background-color: #2a2a2a; }
.virtual-item { margin-bottom: 20px; border-left: 4px solid #409EFF; padding-left: 10px; }
.virtual-title { font-weight: bold; font-size: 14px; margin-bottom: 8px; color: #606266; }
.status-badge { display: inline-block; width: 10px; height: 10px; border-radius: 50%; margin-right: 5px; }
.bg-green { background-color: #67C23A; }
.bg-gray { background-color: #909399; }
</style>
</head>
<body>
<div id="app">
<el-container>
<el-header height="auto" style="padding: 0;">
<el-card class="box-card">
<template #header>
<div class="card-header">
<div style="display:flex; align-items:center;">
<el-icon size="24" style="margin-right: 10px;"><Monitor /></el-icon>
<span style="font-weight: bold; font-size: 20px;">QMT 实盘守护系统</span>
</div>
<div>
<el-tag :type="status.running ? 'success' : 'info'" effect="dark" style="margin-right: 10px;">
API: {{ status.running ? 'Running' : 'Offline' }}
</el-tag>
<el-tag :type="status.qmt_connected ? 'success' : 'danger'" effect="dark">
QMT: {{ status.qmt_connected ? 'Connected' : 'Disconnected' }}
</el-tag>
</div>
</div>
</template>
<el-descriptions border :column="4" size="large">
<el-descriptions-item label="资金账号">{{ status.account_id || '---' }}</el-descriptions-item>
<el-descriptions-item label="启动时间">{{ status.start_time || '---' }}</el-descriptions-item>
<el-descriptions-item label="心跳时间">
<span :style="{color: isHeartbeatStalled ? 'red' : 'green', fontWeight: 'bold'}">
{{ status.last_loop_update || '---' }}
</span>
</el-descriptions-item>
<el-descriptions-item label="控制">
<el-button type="primary" :icon="Refresh" @click="manualRefresh" :loading="loading">手动刷新</el-button>
<div style="margin-left: 15px; display: inline-flex; align-items: center;">
<el-checkbox v-model="autoRefresh" label="自动刷新(1min)" border></el-checkbox>
<span style="font-size: 12px; margin-left: 8px; color: #909399;">
{{ tradingStatusText }}
</span>
</div>
</el-descriptions-item>
</el-descriptions>
</el-card>
</el-header>
<el-main style="padding: 0;">
<el-row :gutter="20">
<el-col :span="12">
<el-card class="box-card" shadow="hover">
<template #header>
<div class="card-header">
<span><span class="status-badge bg-green"></span>实盘真实持仓 (QMT)</span>
</div>
</template>
<el-table :data="positions.real_positions" style="width: 100%" border stripe size="small" empty-text="当前空仓">
<el-table-column prop="code" label="代码" width="100" sortable></el-table-column>
<el-table-column prop="volume" label="总持仓" width="100"></el-table-column>
<el-table-column prop="can_use" label="可用" width="100"></el-table-column>
<el-table-column prop="market_value" label="市值"></el-table-column>
</el-table>
</el-card>
</el-col>
<el-col :span="12">
<el-card class="box-card" shadow="hover">
<template #header>
<div class="card-header">
<span><span class="status-badge bg-gray"></span>Redis 虚拟账本 (策略隔离)</span>
</div>
</template>
<div v-if="Object.keys(positions.virtual_positions).length === 0" style="color:#909399; text-align:center; padding: 20px;">
暂无策略数据 / Redis未连接
</div>
<div v-for="(posObj, strategyName) in positions.virtual_positions" :key="strategyName" class="virtual-item">
<div class="virtual-title">{{ strategyName }}</div>
<el-table :data="formatVirtual(posObj)" style="width: 100%;" border size="small" empty-text="该策略当前空仓">
<el-table-column prop="code" label="代码"></el-table-column>
<el-table-column prop="vol" label="记账数量">
<template #default="scope"><span style="font-weight: bold;">{{ scope.row.vol }}</span></template>
</el-table-column>
</el-table>
</div>
</el-card>
</el-col>
</el-row>
<el-row>
<el-col :span="24">
<el-card class="box-card" shadow="never">
<template #header>
<div class="card-header"><span>系统实时日志 (Last 50 lines)</span></div>
</template>
<div class="log-box" ref="logBox">
<div v-for="(line, index) in logs" :key="index" class="log-line">{{ line }}</div>
</div>
</el-card>
</el-col>
</el-row>
</el-main>
</el-container>
</div>
<script>
const { createApp, ref, onMounted, onUnmounted, computed } = Vue;
const { Monitor, Refresh } = ElementPlusIconsVue;
const app = createApp({
setup() {
const status = ref({});
const positions = ref({ real_positions: [], virtual_positions: {} });
const logs = ref([]);
const autoRefresh = ref(true); // 默认开启自动刷新
const loading = ref(false);
const logBox = ref(null);
let timer = null;
const API_BASE = "";
// === 核心逻辑修改:判断是否为交易时间 ===
const isTradingTime = () => {
const now = new Date();
const day = now.getDay();
const hour = now.getHours();
const minute = now.getMinutes();
const currentTimeVal = hour * 100 + minute;
// 1. 判断是否为周末 (0是周日, 6是周六)
if (day === 0 || day === 6) return false;
// 2. 判断时间段 (09:00 - 15:10)
// 包含集合竞价和收盘清算时间
if (currentTimeVal >= 900 && currentTimeVal <= 1510) {
return true;
}
return false;
};
// 界面显示的提示文本
const tradingStatusText = computed(() => {
if (!autoRefresh.value) return "已关闭";
return isTradingTime() ? "监控中..." : "休市暂停";
});
const isHeartbeatStalled = computed(() => {
if (!status.value.last_loop_update) return true;
return false;
});
const fetchData = async () => {
loading.value = true;
try {
const resStatus = await fetch(`${API_BASE}/api/status`);
if(resStatus.ok) status.value = await resStatus.json();
else status.value = { running: false };
const resPos = await fetch(`${API_BASE}/api/positions`);
if(resPos.ok) positions.value = await resPos.json();
const resLogs = await fetch(`${API_BASE}/api/logs`);
if(resLogs.ok) {
const logData = await resLogs.json();
const needScroll = (logs.value.length !== logData.logs.length);
logs.value = logData.logs;
// 只有在自动刷新且有新日志时才自动滚动
if (needScroll && autoRefresh.value) {
setTimeout(() => {
if(logBox.value) logBox.value.scrollTop = logBox.value.scrollHeight;
}, 100);
}
}
} catch (e) {
console.error("API Error:", e);
status.value.running = false;
} finally {
loading.value = false;
}
};
// 手动刷新按钮:不受时间限制
const manualRefresh = () => {
fetchData();
};
const formatVirtual = (obj) => {
if (!obj) return [];
return Object.keys(obj).map(key => ({ code: key, vol: obj[key] }));
};
onMounted(() => {
// 页面加载时先拉取一次
fetchData();
// === 修改定时器:每 60 秒触发一次 ===
timer = setInterval(() => {
// 只有在 "开关开启" 且 "处于交易时间" 时才请求
if (autoRefresh.value && isTradingTime()) {
fetchData();
}
}, 60000); // 60000ms = 1分钟
});
onUnmounted(() => {
if (timer) clearInterval(timer);
});
return {
status, positions, logs, autoRefresh, loading, logBox,
manualRefresh, // 绑定到按钮
fetchData,
formatVirtual,
isHeartbeatStalled,
tradingStatusText, // 绑定到提示文本
Monitor, Refresh
};
}
});
for (const [key, component] of Object.entries(ElementPlusIconsVue)) {
app.component(key, component)
}
app.use(ElementPlus);
app.mount('#app');
</script>
</body>
</html>

1
qmt/heartbeat.txt Normal file
View File

@@ -0,0 +1 @@
2025-12-19 14:11:10

View File

@@ -1,5 +1,5 @@
# coding:utf-8 # coding:utf-8
import time, datetime, traceback, sys, json, os import time, datetime, traceback, sys, json, os, threading
import logging import logging
import redis import redis
from xtquant import xtdata from xtquant import xtdata
@@ -7,27 +7,46 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount from xtquant.xttype import StockAccount
from xtquant import xtconstant from xtquant import xtconstant
# 全局变量 # FastAPI 相关
CURRENT_LOG_DATE = None from fastapi import FastAPI
CONFIG = {} from fastapi.middleware.cors import CORSMiddleware
ORDER_CACHE = {} from fastapi.responses import FileResponse
import uvicorn
# ================= 1. 日志系统 (按日期直写) ================= # ================= 0. Windows 防卡死补丁 =================
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# 禁用快速编辑模式 (0x0040)
kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 128)
except:
pass
# ================= 1. 全局状态管理 =================
class SystemState:
def __init__(self):
self.xt_trader = None
self.acc = None
self.pos_manager = None
self.callback = None
self.is_running = True
self.start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.last_heartbeat = "Initializing..."
self.config = {}
GLOBAL_STATE = SystemState()
CURRENT_LOG_DATE = None
ORDER_CACHE = {} # 内存缓存: OrderID -> (Strategy, Code, Action)
# ================= 2. 增强型日志系统 =================
def setup_logger(): def setup_logger():
"""
配置日志系统:
每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log
"""
global CURRENT_LOG_DATE global CURRENT_LOG_DATE
log_dir = "logs" log_dir = "logs"
if not os.path.exists(log_dir): if not os.path.exists(log_dir):
os.makedirs(log_dir) os.makedirs(log_dir)
# 获取今日日期
today_str = datetime.date.today().strftime('%Y-%m-%d') today_str = datetime.date.today().strftime('%Y-%m-%d')
CURRENT_LOG_DATE = today_str CURRENT_LOG_DATE = today_str
log_file = os.path.join(log_dir, f"{today_str}.log") log_file = os.path.join(log_dir, f"{today_str}.log")
logger = logging.getLogger("QMT_Trader") logger = logging.getLogger("QMT_Trader")
@@ -41,8 +60,9 @@ def setup_logger():
logger.removeHandler(handler) logger.removeHandler(handler)
except: pass except: pass
# 格式中增加 线程名,方便排查是 API 线程还是 交易线程
formatter = logging.Formatter( formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] %(message)s', '[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
) )
@@ -50,19 +70,18 @@ def setup_logger():
file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8') file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
# 控制台输出 # 控制台输出 (强制刷新流,防止命令行卡住不显示)
stream_handler = logging.StreamHandler(sys.stdout) stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter) stream_handler.setFormatter(formatter)
stream_handler.flush = sys.stdout.flush
logger.addHandler(file_handler) logger.addHandler(file_handler)
logger.addHandler(stream_handler) logger.addHandler(stream_handler)
return logger return logger
# 初始化日志
logger = setup_logger() logger = setup_logger()
# ================= 2. 配置加载 ================= # ================= 3. 配置加载 =================
def load_config(config_file='config.json'): def load_config(config_file='config.json'):
if getattr(sys, 'frozen', False): if getattr(sys, 'frozen', False):
base_path = os.path.dirname(sys.executable) base_path = os.path.dirname(sys.executable)
@@ -81,40 +100,65 @@ def load_config(config_file='config.json'):
logger.error(f"配置文件错误: {e}") logger.error(f"配置文件错误: {e}")
sys.exit(1) sys.exit(1)
# ================= 3. 业务逻辑类 ================= # ================= 4. 业务逻辑类 =================
class PositionManager: class PositionManager:
def __init__(self, r_client): def __init__(self, r_client):
self.r = r_client self.r = r_client
def _get_key(self, strategy_name): def _get_key(self, strategy_name):
return f"POS:{strategy_name}" return f"POS:{strategy_name}"
def mark_holding(self, strategy_name, code): def mark_holding(self, strategy_name, code):
"""乐观占位"""
self.r.hsetnx(self._get_key(strategy_name), code, 0) self.r.hsetnx(self._get_key(strategy_name), code, 0)
def rollback_holding(self, strategy_name, code): def rollback_holding(self, strategy_name, code):
"""失败回滚"""
key = self._get_key(strategy_name) key = self._get_key(strategy_name)
val = self.r.hget(key, code) val = self.r.hget(key, code)
if val is not None and int(val) == 0: if val is not None and int(val) == 0:
self.r.hdel(key, code) self.r.hdel(key, code)
logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}") logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}")
def update_actual_volume(self, strategy_name, code, delta_vol): def update_actual_volume(self, strategy_name, code, delta_vol):
"""成交更新"""
key = self._get_key(strategy_name) key = self._get_key(strategy_name)
new_vol = self.r.hincrby(key, code, int(delta_vol)) new_vol = self.r.hincrby(key, code, int(delta_vol))
if new_vol <= 0: if new_vol <= 0:
self.r.hdel(key, code) self.r.hdel(key, code)
new_vol = 0 new_vol = 0
return new_vol return new_vol
def get_position(self, strategy_name, code): def get_position(self, strategy_name, code):
vol = self.r.hget(self._get_key(strategy_name), code) vol = self.r.hget(self._get_key(strategy_name), code)
return int(vol) if vol else 0 return int(vol) if vol else 0
def get_holding_count(self, strategy_name): def get_holding_count(self, strategy_name):
return self.r.hlen(self._get_key(strategy_name)) return self.r.hlen(self._get_key(strategy_name))
def get_all_virtual_positions(self, strategy_name): def get_all_virtual_positions(self, strategy_name):
return self.r.hgetall(self._get_key(strategy_name)) return self.r.hgetall(self._get_key(strategy_name))
def force_delete(self, strategy_name, code): def force_delete(self, strategy_name, code):
self.r.hdel(self._get_key(strategy_name), code) self.r.hdel(self._get_key(strategy_name), code)
def clean_stale_placeholders(self, strategy_name, xt_trader, acc):
try:
key = self._get_key(strategy_name)
all_pos = self.r.hgetall(key)
if not all_pos: return
active_orders = xt_trader.query_stock_orders(acc, cancelable_only=True)
active_codes = [o.stock_code for o in active_orders] if active_orders else []
real_positions = xt_trader.query_stock_positions(acc)
real_holdings = [p.stock_code for p in real_positions if p.volume > 0] if real_positions else []
for code, vol_str in all_pos.items():
if int(vol_str) == 0:
if (code not in real_holdings) and (code not in active_codes):
self.r.hdel(key, code)
logger.warning(f"[{strategy_name}] 自动清理僵尸占位: {code}")
except Exception as e:
logger.error(f"清理僵尸占位异常: {e}")
class DailySettlement: class DailySettlement:
def __init__(self, xt_trader, acc, pos_mgr, strategies): def __init__(self, xt_trader, acc, pos_mgr, strategies):
self.trader = xt_trader self.trader = xt_trader
@@ -122,10 +166,10 @@ class DailySettlement:
self.pos_mgr = pos_mgr self.pos_mgr = pos_mgr
self.strategies = strategies self.strategies = strategies
self.has_settled = False self.has_settled = False
def run_settlement(self): def run_settlement(self):
logger.info("="*40) logger.info("="*40)
logger.info("执行收盘清算流程...") logger.info("执行收盘清算流程...")
# 1. 撤单
try: try:
orders = self.trader.query_stock_orders(self.acc, cancelable_only=True) orders = self.trader.query_stock_orders(self.acc, cancelable_only=True)
if orders: if orders:
@@ -133,25 +177,28 @@ class DailySettlement:
self.trader.cancel_order_stock(self.acc, o.order_id) self.trader.cancel_order_stock(self.acc, o.order_id)
time.sleep(2) time.sleep(2)
except: pass except: pass
# 2. 获取实盘
real_positions = self.trader.query_stock_positions(self.acc) real_positions = self.trader.query_stock_positions(self.acc)
real_pos_map = {} real_pos_map = {p.stock_code: p.volume for p in real_positions if p.volume > 0} if real_positions else {}
if real_positions:
for p in real_positions:
if p.volume > 0: real_pos_map[p.stock_code] = p.volume
# 3. 校准Redis
for strategy in self.strategies: for strategy in self.strategies:
virtual = self.pos_mgr.get_all_virtual_positions(strategy) virtual = self.pos_mgr.get_all_virtual_positions(strategy)
for code, v in virtual.items(): for code, v_str in virtual.items():
v = int(v_str)
if code not in real_pos_map: if code not in real_pos_map:
logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放") logger.warning(f" [修正] {strategy} 幽灵持仓 {code} (Redis={v}) -> 强制释放")
self.pos_mgr.force_delete(strategy, code) self.pos_mgr.force_delete(strategy, code)
elif v == 0 and code in real_pos_map:
real_vol = real_pos_map[code]
self.pos_mgr.update_actual_volume(strategy, code, real_vol)
logger.info(f" [修正] {strategy} 修正占位符 {code} 0 -> {real_vol}")
logger.info("清算完成") logger.info("清算完成")
self.has_settled = True self.has_settled = True
def reset_flag(self): def reset_flag(self):
self.has_settled = False self.has_settled = False
# ================= 4. 回调类 =================
class MyXtQuantTraderCallback(XtQuantTraderCallback): class MyXtQuantTraderCallback(XtQuantTraderCallback):
def __init__(self, pos_mgr): def __init__(self, pos_mgr):
self.pos_mgr = pos_mgr self.pos_mgr = pos_mgr
@@ -164,110 +211,158 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
cache_info = ORDER_CACHE.get(trade.order_id) cache_info = ORDER_CACHE.get(trade.order_id)
if not cache_info: return if not cache_info: return
strategy, _, action = cache_info strategy, _, action = cache_info
logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}") logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}")
if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume) if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume)
elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume) elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume)
except: traceback.print_exc() except: traceback.print_exc()
def on_order_error(self, err): def on_order_error(self, err):
try: try:
logger.error(f"下单失败: {err.error_msg}") logger.error(f"下单失败回调: {err.error_msg} OrderID:{err.order_id}")
cache = ORDER_CACHE.get(err.order_id) cache = ORDER_CACHE.get(err.order_id)
if cache and cache[2] == 'BUY': if cache and cache[2] == 'BUY':
self.pos_mgr.rollback_holding(cache[0], cache[1]) self.pos_mgr.rollback_holding(cache[0], cache[1])
del ORDER_CACHE[err.order_id] del ORDER_CACHE[err.order_id]
except: pass except: pass
# ================= 5. 核心消息处理 ================= # ================= 5. 核心消息处理 (重写版:拒绝静默失败) =================
def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager): def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager):
queue_key = f"{strategy_name}_real" queue_key = f"{strategy_name}_real"
# 1. 获取消息
msg_json = r_client.lpop(queue_key) msg_json = r_client.lpop(queue_key)
if not msg_json: return if not msg_json:
return
# 2. 存入历史并解析 (打印原始消息,确保知道收到了什么)
logger.info(f"-------- 处理消息 [{strategy_name}] --------")
logger.info(f"收到原始消息: {msg_json}")
try: try:
# 归档
r_client.rpush(f"{queue_key}:history", msg_json) r_client.rpush(f"{queue_key}:history", msg_json)
data = json.loads(msg_json) try:
# 1. 过滤回测 data = json.loads(msg_json)
if data.get('is_backtest'): return except json.JSONDecodeError:
logger.error("JSON 解析失败,跳过消息")
# 2. 校验日期 (只处理当日消息)
msg_ts = data.get('timestamp')
today_str = datetime.date.today().strftime('%Y-%m-%d')
if msg_ts:
msg_date = msg_ts.split(' ')[0]
if msg_date != today_str:
logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}")
return
else:
logger.warning(f"[{strategy_name}] 消息无时间戳,忽略")
return return
stock_code = data['stock_code'] # 3. 基础校验 (每一步失败都必须打印 Log)
action = data['action'] if data.get('is_backtest'):
price = float(data['price']) logger.warning(f"检测到回测标记 is_backtest=True忽略此消息")
return
msg_ts = data.get('timestamp')
if not msg_ts:
logger.warning(f"消息缺失时间戳 timestamp忽略")
return
today_str = datetime.date.today().strftime('%Y-%m-%d')
msg_date = msg_ts.split(' ')[0]
if msg_date != today_str:
logger.warning(f"消息日期过期: {msg_date} != 今日 {today_str},忽略")
return
# 4. 提取关键字段
stock_code = data.get('stock_code')
action = data.get('action')
price = float(data.get('price', 0))
total_slots = int(data.get('total_slots', 1)) total_slots = int(data.get('total_slots', 1))
if not stock_code or not action:
logger.error(f"缺少关键字段: Code={stock_code}, Action={action}")
return
logger.info(f"解析成功: {action} {stock_code} @ {price}, 目标槽位: {total_slots}")
# 5. QMT 存活检查
if xt_trader is None or acc is None:
logger.error("严重错误: QMT 对象未初始化 (xt_trader is None)")
return
# 6. 买入逻辑
if action == 'BUY': if action == 'BUY':
# 槽位检查
holding = pos_manager.get_holding_count(strategy_name) holding = pos_manager.get_holding_count(strategy_name)
empty = total_slots - holding empty = total_slots - holding
logger.info(f"检查持仓: 当前占用 {holding} / 总槽位 {total_slots} -> 剩余 {empty}")
if empty <= 0: if empty <= 0:
logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})") logger.warning(f"拦截买入: 槽位已满,不执行下单")
return return
# 查询资金
asset = xt_trader.query_stock_asset(acc) asset = xt_trader.query_stock_asset(acc)
if not asset: if not asset:
logger.error("无法查询资产QMT可能未就绪") logger.error("API 错误: query_stock_asset 返回 None可能是 QMT 断连或未同步")
return return
# 金额计算 logger.info(f"当前可用资金: {asset.cash:.2f}")
amt = asset.cash / empty amt = asset.cash / empty
if amt < 2000: if amt < 2000:
logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})") logger.warning(f"拦截买入: 单笔金额过小 ({amt:.2f} < 2000)")
return return
if price <= 0: price = 1.0 if price <= 0:
vol = int(amt / price / 100) * 100 logger.warning(f"价格异常: {price}强制设为1.0以计算股数(仅测试用)")
price = 1.0
if vol >= 100:
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
if oid != -1:
logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol}股 ID:{oid}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY')
pos_manager.mark_holding(strategy_name, stock_code)
else:
logger.error(f"[{strategy_name}] 下单被拒绝 (-1)")
else:
logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})")
vol = int(amt / price / 100) * 100
logger.info(f"计算股数: 资金{amt:.2f} / 价格{price} -> {vol}")
if vol < 100:
logger.warning(f"拦截买入: 股数不足 100 ({vol})")
return
# 执行下单
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
if oid != -1:
logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 买入 {vol}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY')
pos_manager.mark_holding(strategy_name, stock_code)
else:
logger.error(f"XXX 下单请求被拒绝 (Result=-1),请检查 QMT 终端报错")
# 7. 卖出逻辑
elif action == 'SELL': elif action == 'SELL':
v_vol = pos_manager.get_position(strategy_name, stock_code) v_vol = pos_manager.get_position(strategy_name, stock_code)
logger.info(f"Redis 记录持仓: {v_vol}")
if v_vol > 0: if v_vol > 0:
real_pos = xt_trader.query_stock_positions(acc) real_pos = xt_trader.query_stock_positions(acc)
if real_pos is None:
logger.error("API 错误: query_stock_positions 返回 None")
return
rp = next((p for p in real_pos if p.stock_code==stock_code), None) rp = next((p for p in real_pos if p.stock_code==stock_code), None)
can_use = rp.can_use_volume if rp else 0 can_use = rp.can_use_volume if rp else 0
logger.info(f"实盘可用持仓: {can_use}")
final = min(v_vol, can_use) final = min(v_vol, can_use)
if final > 0: if final > 0:
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell') oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell')
if oid != -1: if oid != -1:
logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}") logger.info(f"√√√ 下单成功: ID={oid} {stock_code} 卖出 {final}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL') ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL')
else:
logger.error(f"XXX 下单请求被拒绝 (Result=-1)")
else: else:
logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}") logger.warning(f"拦截卖出: 最终计算卖出量为 0 (虚拟:{v_vol}, 实盘:{can_use})")
else: else:
logger.info(f"[{strategy_name}] Redis无持仓,忽略卖出") logger.warning(f"拦截卖出: Redis 中无此持仓记录,忽略")
except Exception: else:
logger.error("消息处理异常", exc_info=True) logger.error(f"未知的 Action: {action}")
except Exception as e:
logger.error(f"消息处理发生未捕获异常: {str(e)}", exc_info=True)
# ================= 6. QMT初始化 ================= # ================= 6. QMT初始化 =================
def init_qmt_trader(qmt_path, account_id, account_type, pos_manager): def init_qmt_trader(qmt_path, account_id, account_type, pos_manager):
try: try:
session_id = int(time.time()) session_id = int(time.time())
logger.info(f"正在连接 QMT (Path: {qmt_path})...")
trader = XtQuantTrader(qmt_path, session_id) trader = XtQuantTrader(qmt_path, session_id)
acc = StockAccount(account_id, account_type) acc = StockAccount(account_id, account_type)
callback = MyXtQuantTraderCallback(pos_manager) callback = MyXtQuantTraderCallback(pos_manager)
@@ -280,127 +375,229 @@ def init_qmt_trader(qmt_path, account_id, account_type, pos_manager):
callback.is_connected = True callback.is_connected = True
return trader, acc, callback return trader, acc, callback
else: else:
logger.error(f"QMT 连接失败 Code:{res}") logger.error(f"QMT 连接失败 Code:{res} (请检查 QMT 是否登录且路径正确)")
return None, None, None return None, None, None
except Exception as e: except Exception as e:
logger.error(f"初始化异常: {e}") logger.error(f"初始化异常: {e}", exc_info=True)
return None, None, None return None, None, None
# ================= 7. 主程序 ================= # ================= 7. 交易逻辑主循环 =================
if __name__ == '__main__': def trading_loop():
logger.info(">>> 系统启动 (实盘生产模式) <<<") global logger
threading.current_thread().name = "TradeThread"
logger.info(">>> 交易逻辑子线程启动 <<<")
# 加载配置 GLOBAL_STATE.config = load_config('config.json')
CONFIG = load_config('config.json') CONFIG = GLOBAL_STATE.config
redis_cfg = CONFIG['redis'] redis_cfg = CONFIG['redis']
qmt_cfg = CONFIG['qmt'] qmt_cfg = CONFIG['qmt']
watch_list = CONFIG['strategies'] watch_list = CONFIG['strategies']
# 连接Redis
try: try:
r = redis.Redis(**redis_cfg, decode_responses=True) r = redis.Redis(**redis_cfg, decode_responses=True)
r.ping() r.ping()
pos_manager = PositionManager(r) pos_manager = PositionManager(r)
GLOBAL_STATE.pos_manager = pos_manager
logger.info("Redis 连接成功")
except Exception as e: except Exception as e:
logger.critical(f"Redis 连接失败: {e}") logger.critical(f"Redis 连接失败: {e}")
sys.exit(1) return
# 初次连接 QMT # 初始化
xt_trader, acc, callback = init_qmt_trader( xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
) )
GLOBAL_STATE.xt_trader = xt_trader
GLOBAL_STATE.acc = acc
GLOBAL_STATE.callback = callback
settler = None settler = None
if xt_trader: if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list) settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
for s in watch_list:
pos_manager.clean_stale_placeholders(s, xt_trader, acc)
logger.info(">>> 进入主循环监听 <<<") logger.info(">>> 进入主轮询循环 <<<")
last_health_check = 0 # 上次深度检查时间
while True: while GLOBAL_STATE.is_running:
try: try:
# --- 1. 日志跨天处理 --- # 1. 基础心跳更新
GLOBAL_STATE.last_heartbeat = datetime.datetime.now().strftime('%H:%M:%S')
# 2. 状态诊断与自动修复 (关键修改!!!)
# 每 15 秒执行一次“深度探测”,而不是每一轮都看 callback
if time.time() - last_health_check > 15:
last_health_check = time.time()
is_alive_physically = False
# 尝试通过“查资产”来验证连接是否真的活着
if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc:
try:
asset = GLOBAL_STATE.xt_trader.query_stock_asset(GLOBAL_STATE.acc)
if asset:
is_alive_physically = True
# 【核心修复】:如果物理探测成功,强行修正 callback 状态
if GLOBAL_STATE.callback and not GLOBAL_STATE.callback.is_connected:
GLOBAL_STATE.callback.is_connected = True
logger.info("✅ [自愈] 检测到资产查询正常,修正伪造的断开状态 (False -> True)")
except:
pass
# 只有当 逻辑断开(callback) AND 物理断开(无法查资产) 时,才判定为断线
current_status = GLOBAL_STATE.callback.is_connected if GLOBAL_STATE.callback else False
# 减少日志刷屏:只有状态真的异常时才打印
if not current_status and not is_alive_physically:
logger.warning(f"⚠️ 线程存活检查 | 逻辑状态:{current_status} | 物理探测:失败")
# 3. 断线重连逻辑
# 只有“物理探测”彻底失败了,才执行重连
if not is_alive_physically:
# 避让 QMT 夜间重启高峰期 (23:20 - 23:35)
# 避免在这段时间疯狂重连打印日志
now_hm = datetime.datetime.now().strftime('%H%M')
if '2320' <= now_hm <= '2335':
logger.info("⏳ QMT维护时段暂停重连休眠60秒...")
time.sleep(60)
continue
if datetime.date.today().weekday() >= 5: # 周末
time.sleep(3600)
continue
logger.warning("🚫 确认连接丢失,执行重连...")
if GLOBAL_STATE.xt_trader:
try: GLOBAL_STATE.xt_trader.stop()
except: pass
new_trader, new_acc, new_cb = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
if new_trader:
GLOBAL_STATE.xt_trader = new_trader
GLOBAL_STATE.acc = new_acc
GLOBAL_STATE.callback = new_cb
settler = DailySettlement(new_trader, new_acc, pos_manager, watch_list)
logger.info("✅ 重连成功")
else:
logger.error("❌ 重连失败60秒后重试")
time.sleep(60)
continue
# 4. 日志轮转与心跳文件
today_str = datetime.date.today().strftime('%Y-%m-%d') today_str = datetime.date.today().strftime('%Y-%m-%d')
if today_str != CURRENT_LOG_DATE: if today_str != CURRENT_LOG_DATE:
logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...")
logger = setup_logger() logger = setup_logger()
# --- 2. 断线重连 (仅工作日) --- try:
need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected) with open("heartbeat.txt", "w") as f:
if need_reconnect: f.write(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 检查是否为周末 (5=周六, 6=周日) except: pass
if datetime.date.today().weekday() >= 5:
logger.info("当前是周末暂停重连休眠1小时")
time.sleep(3600)
continue
logger.warning("连接丢失,尝试重连...") # 5. 交易逻辑处理
if xt_trader: current_time_str = datetime.datetime.now().strftime('%H%M%S')
try: xt_trader.stop() is_trading_time = ('091500' <= current_time_str <= '113000') or ('130000' <= current_time_str <= '150000')
except: pass
xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
logger.info("重连成功")
else:
logger.error("重连失败60秒后重试")
time.sleep(60)
continue
# --- 3. 交易时段轮询逻辑 ---
now = datetime.datetime.now()
current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS
# 默认休眠 60秒 # 如果连接正常(无论 callback 怎么说只要上面探测过了xt_trader 就是可用的)
sleep_sec = 60 if is_trading_time and GLOBAL_STATE.xt_trader:
is_trading_time = False
# 判断时段
# 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒
if '091500' <= current_time_str <= '100000':
sleep_sec = 1
is_trading_time = True
# 10:00:01 - 11:30:00 -> 低频 60秒
elif '100000' < current_time_str <= '113000':
sleep_sec = 60
is_trading_time = True
# 13:00:00 - 14:50:00 -> 低频 60秒
elif '130000' <= current_time_str < '145000':
sleep_sec = 60
is_trading_time = True
# 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒
elif '145000' <= current_time_str <= '150000':
sleep_sec = 1
is_trading_time = True
# --- 4. 执行业务 ---
if is_trading_time:
# 每日重置清算标记
if settler and settler.has_settled: if settler and settler.has_settled:
settler.reset_flag() settler.reset_flag()
# 处理信号
for s in watch_list: for s in watch_list:
process_strategy_queue(s, r, xt_trader, acc, pos_manager) process_strategy_queue(s, r, GLOBAL_STATE.xt_trader, GLOBAL_STATE.acc, pos_manager)
# --- 5. 收盘清算 (15:05 - 15:10) ---
elif '150500' <= current_time_str <= '151000': elif '150500' <= current_time_str <= '151000':
if settler and not settler.has_settled: if settler and not settler.has_settled:
settler.run_settlement() settler.run_settlement()
sleep_sec = 60
# 执行休眠 time.sleep(1 if is_trading_time else 5)
time.sleep(sleep_sec)
except KeyboardInterrupt:
logger.info("用户停止")
break
except Exception as e: except Exception as e:
logger.critical("主循环未捕获异常", exc_info=True) logger.critical("交易循环异常", exc_info=True)
time.sleep(10) time.sleep(10)
# ================= 8. FastAPI 接口 =================
app = FastAPI(title="QMT Monitor")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def read_root():
if os.path.exists("dashboard.html"):
return FileResponse("dashboard.html")
return {"error": "Dashboard not found"}
@app.get("/api/status")
def get_status():
connected = False
if GLOBAL_STATE.callback:
connected = GLOBAL_STATE.callback.is_connected
return {
"running": True,
"qmt_connected": connected,
"start_time": GLOBAL_STATE.start_time,
"last_loop_update": GLOBAL_STATE.last_heartbeat,
"account_id": GLOBAL_STATE.acc.account_id if GLOBAL_STATE.acc else "Unknown"
}
@app.get("/api/positions")
def get_positions():
real_pos_list = []
virtual_pos_map = {}
if GLOBAL_STATE.xt_trader and GLOBAL_STATE.acc and GLOBAL_STATE.callback and GLOBAL_STATE.callback.is_connected:
try:
positions = GLOBAL_STATE.xt_trader.query_stock_positions(GLOBAL_STATE.acc)
if positions:
for p in positions:
if p.volume > 0:
real_pos_list.append({
"code": p.stock_code,
"volume": p.volume,
"can_use": p.can_use_volume,
"market_value": p.market_value
})
except: pass
if GLOBAL_STATE.config and GLOBAL_STATE.pos_manager:
for s in GLOBAL_STATE.config.get('strategies', []):
v_data = GLOBAL_STATE.pos_manager.get_all_virtual_positions(s)
virtual_pos_map[s] = v_data
return {
"real_positions": real_pos_list,
"virtual_positions": virtual_pos_map
}
@app.get("/api/logs")
def get_logs(lines: int = 50):
today_str = datetime.date.today().strftime('%Y-%m-%d')
log_path = os.path.join("logs", f"{today_str}.log")
if not os.path.exists(log_path):
return {"logs": ["暂无今日日志"]}
try:
with open(log_path, 'r', encoding='utf-8') as f:
all_lines = f.readlines()
return {"logs": [line.strip() for line in all_lines[-lines:]]}
except Exception as e:
return {"logs": [f"读取失败: {str(e)}"]}
# ================= 9. 启动入口 =================
if __name__ == '__main__':
# 使用 -u 参数运行是最佳实践: python -u main.py
# 但这里也在代码里强制 flush 了
print(">>> 系统正在启动...")
t = threading.Thread(target=trading_loop, daemon=True)
t.start()
print("Web服务启动: http://localhost:8001")
uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning")

75
qmt/start.bat Normal file
View File

@@ -0,0 +1,75 @@
@echo off
setlocal enabledelayedexpansion
:: ================= <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> =================
:: <20><><EFBFBD><EFBFBD>Ŀ¼ (<28><>ȷ<EFBFBD><C8B7>·<EFBFBD><C2B7><EFBFBD><EFBFBD>ȷ)
set "WORK_DIR=C:\Data\Project\NewStock\qmt"
:: Python<6F>ű<EFBFBD><C5B1><EFBFBD><EFBFBD><EFBFBD>
set "SCRIPT_NAME=qmt_trader.py"
:: <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Դ<EFBFBD><D4B4><EFBFBD>
set MAX_RETRIES=5
:: <20><><EFBFBD>Լ<EFBFBD><D4BC><EFBFBD><EFBFBD><EFBFBD>
set RETRY_COUNT=0
:: <20><><EFBFBD>Եȴ<D4B5>ʱ<EFBFBD><CAB1>(<28><>)
set RETRY_WAIT=10
:: <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־Ŀ¼
set "LOG_DIR=%WORK_DIR%\logs\launcher"
:: ===========================================
:: 1. <20>л<EFBFBD><D0BB><EFBFBD><EFBFBD><EFBFBD>Ŀ¼
cd /d "%WORK_DIR%"
title QMT ʵ<><CAB5><EFBFBD>ػ<EFBFBD>ϵͳ [Port:8001]
:: 2. <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־Ŀ¼
if not exist "%LOG_DIR%" mkdir "%LOG_DIR%"
:: <20><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD><CEAA>־<EFBFBD>ļ<EFBFBD><C4BC><EFBFBD> (<28>򵥵<EFBFBD><F2B5A5B5><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E4B3A3>Windows<77><73>ʽ)
set "TODAY=%date:~0,4%-%date:~5,2%-%date:~8,2%"
set "LOG_FILE=%LOG_DIR%\%TODAY%.log"
echo ==================================================
echo QMT ʵ<>̽<EFBFBD><CCBD><EFBFBD>ϵͳ<CFB5><CDB3><EFBFBD><EFBFBD>
echo ʱ<><CAB1>: %time%
echo <20><>־: %LOG_FILE%
echo <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: http://localhost:8001
echo ==================================================
:LOOP
echo.
echo [%time%] <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӽ<EFBFBD><D3BD><EFBFBD>...
echo [%time%] <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӽ<EFBFBD><D3BD><EFBFBD>... >> "%LOG_FILE%"
:: 3. <20><><EFBFBD><EFBFBD> Python <20>ű<EFBFBD>
:: ʹ<><CAB9> uv run <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>2>&1 <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҳд<D2B2><D0B4><EFBFBD><EFBFBD>־
uv run %SCRIPT_NAME% >> "%LOG_FILE%" 2>&1
:: 4. <20><><EFBFBD><EFBFBD><EFBFBD>˳<EFBFBD>
set EXIT_CODE=%errorlevel%
echo [%time%] <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˳<EFBFBD><CBB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: %EXIT_CODE% >> "%LOG_FILE%"
echo <20><><EFBFBD><EFBFBD>: <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˳<EFBFBD> (Code: %EXIT_CODE%)
:: 5. <20><><EFBFBD><EFBFBD><EFBFBD>߼<EFBFBD>
if %RETRY_COUNT% GEQ %MAX_RETRIES% (
echo [%time%] <20><EFBFBD><EFB5BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Դ<EFBFBD><D4B4><EFBFBD><EFBFBD><EFBFBD>ϵͳֹͣ<CDA3><D6B9> >> "%LOG_FILE%"
:: <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʾ
msg * "QMT <20><><EFBFBD><EFBFBD>ϵͳ<CFB5>ѱ<EFBFBD><D1B1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>޷<EFBFBD><DEB7>Զ<EFBFBD><D4B6>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE>"
goto FAIL
)
set /a RETRY_COUNT+=1
echo [%time%] <20>ȴ<EFBFBD> %RETRY_WAIT% <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>е<EFBFBD> %RETRY_COUNT% <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>... >> "%LOG_FILE%"
echo <20><><EFBFBD>ڵȴ<DAB5><C8B4><EFBFBD><EFBFBD><EFBFBD> (%RETRY_COUNT%/%MAX_RETRIES%)...
timeout /t %RETRY_WAIT% >nul
goto LOOP
:FAIL
title QMT ʵ<><CAB5><EFBFBD>ػ<EFBFBD>ϵͳ [<5B>ѱ<EFBFBD><D1B1><EFBFBD>]
color 4F
echo.
echo ==========================================
echo ϵͳ<CFB5><CDB3>ֹͣ<CDA3><D6B9><EFBFBD><EFBFBD>
echo <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD>ļ<EFBFBD>: %LOG_FILE%
echo ==========================================
pause
exit /b 1