Merge branch 'master' of ssh://140.143.91.66:2221/liaozhaorun/NewStock

# Conflicts:
#	.gitignore
This commit is contained in:
2025-12-15 22:24:19 +08:00
2 changed files with 370 additions and 143 deletions

5
.gitignore vendored
View File

@@ -23,4 +23,7 @@ model
**/mlruns/
**/mnt/
predications_test.csv
predications_test.csv
/qmt/config.json
/qmt/logs/*

View File

@@ -1,182 +1,406 @@
# coding:utf-8
import time, datetime, traceback, sys, json
import time, datetime, traceback, sys, json, os
import logging
import redis
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
# ================= 配置区域 =================
QMT_PATH = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata'
ACCOUNT_ID = '2000128'
ACCOUNT_TYPE = 'STOCK'
# 全局变量
CURRENT_LOG_DATE = None
CONFIG = {}
ORDER_CACHE = {}
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PASS = None
# ================= 1. 日志系统 (按日期直写) =================
def setup_logger():
"""
配置日志系统:
每天生成一个新的日志文件,文件名格式为 YYYY-MM-DD.log
"""
global CURRENT_LOG_DATE
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 策略基础名称 (不需要加 _real代码会自动加)
STRATEGY_BASE_NAME = 'default_strategy'
# ===========================================
# 获取今日日期
today_str = datetime.date.today().strftime('%Y-%m-%d')
CURRENT_LOG_DATE = today_str
# 定义监听的队列名称 (只监听实盘队列,物理屏蔽回测数据)
LISTEN_QUEUE = f"{STRATEGY_BASE_NAME}_real"
log_file = os.path.join(log_dir, f"{today_str}.log")
logger = logging.getLogger("QMT_Trader")
logger.setLevel(logging.INFO)
# 清除旧 handler
if logger.handlers:
for handler in logger.handlers[:]:
try:
handler.close()
logger.removeHandler(handler)
except: pass
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 文件输出
file_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
file_handler.setFormatter(formatter)
# 控制台输出
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
return logger
# 初始化日志
logger = setup_logger()
# ================= 2. 配置加载 =================
def load_config(config_file='config.json'):
if getattr(sys, 'frozen', False):
base_path = os.path.dirname(sys.executable)
else:
base_path = os.path.dirname(os.path.abspath(__file__))
full_path = os.path.join(base_path, config_file)
if not os.path.exists(full_path):
if os.path.exists(config_file): full_path = config_file
else:
logger.error(f"找不到配置文件: {full_path}")
sys.exit(1)
try:
with open(full_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"配置文件错误: {e}")
sys.exit(1)
# ================= 3. 业务逻辑类 =================
class PositionManager:
def __init__(self, r_client):
self.r = r_client
def _get_key(self, strategy_name):
return f"POS:{strategy_name}"
def mark_holding(self, strategy_name, code):
"""乐观占位"""
self.r.hsetnx(self._get_key(strategy_name), code, 0)
def rollback_holding(self, strategy_name, code):
"""失败回滚"""
key = self._get_key(strategy_name)
val = self.r.hget(key, code)
if val is not None and int(val) == 0:
self.r.hdel(key, code)
logger.warning(f"[{strategy_name}] 回滚释放槽位: {code}")
def update_actual_volume(self, strategy_name, code, delta_vol):
"""成交更新"""
key = self._get_key(strategy_name)
new_vol = self.r.hincrby(key, code, int(delta_vol))
if new_vol <= 0:
self.r.hdel(key, code)
new_vol = 0
return new_vol
def get_position(self, strategy_name, code):
vol = self.r.hget(self._get_key(strategy_name), code)
return int(vol) if vol else 0
def get_holding_count(self, strategy_name):
return self.r.hlen(self._get_key(strategy_name))
def get_all_virtual_positions(self, strategy_name):
return self.r.hgetall(self._get_key(strategy_name))
def force_delete(self, strategy_name, code):
self.r.hdel(self._get_key(strategy_name), code)
class DailySettlement:
def __init__(self, xt_trader, acc, pos_mgr, strategies):
self.trader = xt_trader
self.acc = acc
self.pos_mgr = pos_mgr
self.strategies = strategies
self.has_settled = False
def run_settlement(self):
logger.info("="*40)
logger.info("执行收盘清算流程...")
# 1. 撤单
try:
orders = self.trader.query_stock_orders(self.acc, cancelable_only=True)
if orders:
for o in orders:
self.trader.cancel_order_stock(self.acc, o.order_id)
time.sleep(2)
except: pass
# 2. 获取实盘
real_positions = self.trader.query_stock_positions(self.acc)
real_pos_map = {}
if real_positions:
for p in real_positions:
if p.volume > 0: real_pos_map[p.stock_code] = p.volume
# 3. 校准Redis
for strategy in self.strategies:
virtual = self.pos_mgr.get_all_virtual_positions(strategy)
for code, v in virtual.items():
if code not in real_pos_map:
logger.warning(f" [修正] {strategy} 幽灵持仓 {code} -> 释放")
self.pos_mgr.force_delete(strategy, code)
logger.info("清算完成")
self.has_settled = True
def reset_flag(self):
self.has_settled = False
# ================= 4. 回调类 =================
class MyXtQuantTraderCallback(XtQuantTraderCallback):
def __init__(self, pos_mgr):
self.pos_mgr = pos_mgr
self.is_connected = False
def on_disconnected(self):
print("连接断开")
def on_stock_order(self, order):
print(f"委托回报: {order.order_id} {order.order_remark}")
logger.warning(">> 回调通知: 交易端连接断开")
self.is_connected = False
def on_stock_trade(self, trade):
print(f"成交: {trade.stock_code} {trade.traded_volume}")
try:
cache_info = ORDER_CACHE.get(trade.order_id)
if not cache_info: return
strategy, _, action = cache_info
logger.info(f">>> [成交] {strategy} {trade.stock_code} {trade.traded_volume}")
if action == 'BUY': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, trade.traded_volume)
elif action == 'SELL': self.pos_mgr.update_actual_volume(strategy, trade.stock_code, -trade.traded_volume)
except: traceback.print_exc()
def on_order_error(self, err):
try:
logger.error(f"下单失败: {err.error_msg}")
cache = ORDER_CACHE.get(err.order_id)
if cache and cache[2] == 'BUY':
self.pos_mgr.rollback_holding(cache[0], cache[1])
del ORDER_CACHE[err.order_id]
except: pass
def on_order_error(self, order_error):
print(f"下单失败: {order_error.error_msg}")
# ================= 5. 核心消息处理 =================
def process_strategy_queue(strategy_name, r_client, xt_trader, acc, pos_manager):
queue_key = f"{strategy_name}_real"
msg_json = r_client.lpop(queue_key)
if not msg_json: return
def init_redis():
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASS, decode_responses=True)
r.ping()
return r
except Exception as e:
print(f"Redis连接失败: {e}")
return None
def is_msg_valid(data):
"""
【安全核心】校验消息时效性与合法性
"""
try:
# 1. 检查是否为回测标记 (防御性编程,虽然队列已物理隔离)
if data.get('is_backtest', False):
print(f"警报:拦截到回测数据,已丢弃!")
return False
# 2. 检查时间戳
msg_time_str = data.get('timestamp')
if not msg_time_str:
print("数据缺失时间戳,丢弃")
return False
# 解析消息时间
# 格式必须匹配策略端发送的 '%Y-%m-%d %H:%M:%S'
msg_dt = datetime.datetime.strptime(msg_time_str, '%Y-%m-%d %H:%M:%S')
msg_date = msg_dt.date()
# 获取当前服务器日期
today = datetime.date.today()
# 3. 【核心】判断是否为当天的消息
if msg_date != today:
print(f"拦截过期消息: 消息日期[{msg_date}] != 今日[{today}]")
return False
# 可选如果你想更严格可以判断时间差不能超过5分钟
# delta = datetime.datetime.now() - msg_dt
# if abs(delta.total_seconds()) > 300: ...
return True
except Exception as e:
print(f"校验逻辑异常: {e}")
return False
def process_redis_signal(r_client, xt_trader, acc):
try:
msg_json = r_client.lpop(LISTEN_QUEUE)
if not msg_json: return
print(f"收到信号: {msg_json}")
# 归档
r_client.rpush(f"{queue_key}:history", msg_json)
data = json.loads(msg_json)
if not is_msg_valid(data): return # 之前的校验逻辑
# 1. 过滤回测
if data.get('is_backtest'): return
# 2. 校验日期 (只处理当日消息)
msg_ts = data.get('timestamp')
today_str = datetime.date.today().strftime('%Y-%m-%d')
if msg_ts:
msg_date = msg_ts.split(' ')[0]
if msg_date != today_str:
logger.warning(f"[{strategy_name}] 拦截非当日消息: MsgDate={msg_date}")
return
else:
logger.warning(f"[{strategy_name}] 消息无时间戳,忽略")
return
stock_code = data['stock_code']
action = data['action']
price = float(data['price'])
total_slots = int(data.get('total_slots', 1))
# 获取切分份数
# 兼容性处理如果redis里还是旧key 'weight',也可以尝试获取
div_count = float(data.get('div_count', data.get('weight', 1)))
# =========================================================
# 买入逻辑:资金切片法
# =========================================================
if action == 'BUY':
# 1. 必须查最新的可用资金 (Available Cash)
# 槽位检查
holding = pos_manager.get_holding_count(strategy_name)
empty = total_slots - holding
if empty <= 0:
logger.warning(f"[{strategy_name}] 拦截买入: 槽位已满 (Target:{total_slots} Held:{holding})")
return
asset = xt_trader.query_stock_asset(acc)
if not asset:
print("错误:无法查询资产")
if not asset:
logger.error("无法查询资产QMT可能未就绪")
return
current_cash = asset.cash
# 2. 计算下单金额
# 逻辑Amount = Cash / div_count
if div_count <= 0: div_count = 1 # 防止除0
target_amount = current_cash / div_count
# 3. 打印调试信息 (非常重要)
print(f"【资金分配】可用现金:{current_cash:.2f} / 切分份数:{div_count} = 下单金额:{target_amount:.2f}")
# 4. 计算股数
# 金额计算
amt = asset.cash / empty
if amt < 2000:
logger.warning(f"[{strategy_name}] 拦截买入: 金额过小 ({amt:.2f})")
return
if price <= 0: price = 1.0
# 过滤小额杂单
if target_amount < 2000:
print(f"忽略:金额过小 ({target_amount:.2f})")
return
vol = int(target_amount / price / 100) * 100
vol = int(amt / price / 100) * 100
if vol >= 100:
xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price,
STRATEGY_BASE_NAME, 'PyBuy')
print(f"买入下单: {stock_code} {vol}")
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_BUY, vol, xtconstant.FIX_PRICE, price, strategy_name, 'PyBuy')
if oid != -1:
logger.info(f"[{strategy_name}] 发出买单: {stock_code} {vol} ID:{oid}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'BUY')
pos_manager.mark_holding(strategy_name, stock_code)
else:
logger.error(f"[{strategy_name}] 下单被拒绝 (-1)")
else:
print(f"计算股数不足100股")
logger.warning(f"[{strategy_name}] 股数不足100 (Amt:{amt:.2f})")
# =========================================================
# 卖出逻辑 (清仓)
# =========================================================
elif action == 'SELL':
positions = xt_trader.query_stock_positions(acc)
target_pos = next((p for p in positions if p.stock_code == stock_code), None)
if target_pos and target_pos.can_use_volume > 0:
xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, target_pos.can_use_volume,
xtconstant.FIX_PRICE, price, STRATEGY_BASE_NAME, 'PySell')
print(f"卖出下单: {stock_code} {target_pos.can_use_volume}")
v_vol = pos_manager.get_position(strategy_name, stock_code)
if v_vol > 0:
real_pos = xt_trader.query_stock_positions(acc)
rp = next((p for p in real_pos if p.stock_code==stock_code), None)
can_use = rp.can_use_volume if rp else 0
final = min(v_vol, can_use)
if final > 0:
oid = xt_trader.order_stock(acc, stock_code, xtconstant.STOCK_SELL, final, xtconstant.FIX_PRICE, price, strategy_name, 'PySell')
if oid != -1:
logger.info(f"[{strategy_name}] 发出卖单: {stock_code} {final}")
ORDER_CACHE[oid] = (strategy_name, stock_code, 'SELL')
else:
logger.warning(f"[{strategy_name}] 卖出拦截: 虚拟{v_vol}但实盘可用{can_use}")
else:
print(f"无可用持仓: {stock_code}")
logger.info(f"[{strategy_name}] Redis无持仓忽略卖出")
except Exception:
logger.error("消息处理异常", exc_info=True)
# ================= 6. QMT初始化 =================
def init_qmt_trader(qmt_path, account_id, account_type, pos_manager):
try:
session_id = int(time.time())
trader = XtQuantTrader(qmt_path, session_id)
acc = StockAccount(account_id, account_type)
callback = MyXtQuantTraderCallback(pos_manager)
trader.register_callback(callback)
trader.start()
res = trader.connect()
if res == 0:
logger.info(f"QMT 连接成功 [Session:{session_id}]")
trader.subscribe(acc)
callback.is_connected = True
return trader, acc, callback
else:
logger.error(f"QMT 连接失败 Code:{res}")
return None, None, None
except Exception as e:
print(f"处理异常: {e}")
traceback.print_exc()
logger.error(f"初始化异常: {e}")
return None, None, None
# ================= 7. 主程序 =================
if __name__ == '__main__':
r_client = init_redis()
session_id = int(time.time())
xt_trader = XtQuantTrader(QMT_PATH, session_id)
acc = StockAccount(ACCOUNT_ID, ACCOUNT_TYPE)
callback = MyXtQuantTraderCallback()
xt_trader.register_callback(callback)
xt_trader.start()
xt_trader.connect()
xt_trader.subscribe(acc)
logger.info(">>> 系统启动 (实盘生产模式) <<<")
# 加载配置
CONFIG = load_config('config.json')
redis_cfg = CONFIG['redis']
qmt_cfg = CONFIG['qmt']
watch_list = CONFIG['strategies']
print(f"=== 启动监听: {LISTEN_QUEUE} ===")
print("只处理当日的实盘/模拟信号,自动过滤回测数据及历史遗留数据。")
# 连接Redis
try:
r = redis.Redis(**redis_cfg, decode_responses=True)
r.ping()
pos_manager = PositionManager(r)
except Exception as e:
logger.critical(f"Redis 连接失败: {e}")
sys.exit(1)
# 初次连接 QMT
xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
settler = None
if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
logger.info(">>> 进入主循环监听 <<<")
while True:
if r_client:
process_redis_signal(r_client, xt_trader, acc)
time.sleep(60)
try:
# --- 1. 日志跨天处理 ---
today_str = datetime.date.today().strftime('%Y-%m-%d')
if today_str != CURRENT_LOG_DATE:
logger.info(f"日期变更 ({CURRENT_LOG_DATE} -> {today_str}),切换日志...")
logger = setup_logger()
# --- 2. 断线重连 (仅工作日) ---
need_reconnect = (xt_trader is None) or (callback is None) or (not callback.is_connected)
if need_reconnect:
# 检查是否为周末 (5=周六, 6=周日)
if datetime.date.today().weekday() >= 5:
logger.info("当前是周末暂停重连休眠1小时")
time.sleep(3600)
continue
logger.warning("连接丢失,尝试重连...")
if xt_trader:
try: xt_trader.stop()
except: pass
xt_trader, acc, callback = init_qmt_trader(
qmt_cfg['path'], qmt_cfg['account_id'], qmt_cfg['account_type'], pos_manager
)
if xt_trader:
settler = DailySettlement(xt_trader, acc, pos_manager, watch_list)
logger.info("重连成功")
else:
logger.error("重连失败60秒后重试")
time.sleep(60)
continue
# --- 3. 交易时段轮询逻辑 ---
now = datetime.datetime.now()
current_time_str = now.strftime('%H%M%S') # 格式化为 HHMMSS
# 默认休眠 60秒
sleep_sec = 60
is_trading_time = False
# 判断时段
# 09:15:00 - 10:00:00 (竞价 + 开盘半小时) -> 高频 1秒
if '091500' <= current_time_str <= '100000':
sleep_sec = 1
is_trading_time = True
# 10:00:01 - 11:30:00 -> 低频 60秒
elif '100000' < current_time_str <= '113000':
sleep_sec = 60
is_trading_time = True
# 13:00:00 - 14:50:00 -> 低频 60秒
elif '130000' <= current_time_str < '145000':
sleep_sec = 60
is_trading_time = True
# 14:50:00 - 15:00:00 (尾盘 + 收盘竞价) -> 高频 1秒
elif '145000' <= current_time_str <= '150000':
sleep_sec = 1
is_trading_time = True
# --- 4. 执行业务 ---
if is_trading_time:
# 每日重置清算标记
if settler and settler.has_settled:
settler.reset_flag()
# 处理信号
for s in watch_list:
process_strategy_queue(s, r, xt_trader, acc, pos_manager)
# --- 5. 收盘清算 (15:05 - 15:10) ---
elif '150500' <= current_time_str <= '151000':
if settler and not settler.has_settled:
settler.run_settlement()
sleep_sec = 60
# 执行休眠
time.sleep(sleep_sec)
except KeyboardInterrupt:
logger.info("用户停止")
break
except Exception as e:
logger.critical("主循环未捕获异常", exc_info=True)
time.sleep(10)