Files
ProStock/src/data/api_wrappers/api_cyq_perf.py

204 lines
6.4 KiB
Python
Raw Normal View History

"""CYQ Performance (筹码分布) interface.
Fetch A-share stock chip distribution data (cost distribution and win rate) from Tushare.
This interface retrieves daily chip average cost and win rate information.
Data starts from 2018.
"""
import pandas as pd
from typing import Optional
from src.data.client import TushareClient
from src.data.api_wrappers.base_sync import DateBasedSync
def get_cyq_perf(
trade_date: Optional[str] = None,
ts_code: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
client: Optional[TushareClient] = None,
) -> pd.DataFrame:
"""Fetch chip distribution (CYQ) performance data from Tushare.
This interface retrieves daily chip average cost and win rate information
for A-share stocks. Data starts from 2018.
Args:
trade_date: Specific trade date in YYYYMMDD format
ts_code: Stock code filter (optional, e.g., '000001.SZ')
start_date: Start date for date range query (YYYYMMDD format)
end_date: End date for date range query (YYYYMMDD format)
client: Optional TushareClient instance for shared rate limiting.
If None, creates a new client. For concurrent sync operations,
pass a shared client to ensure proper rate limiting.
Returns:
pd.DataFrame with columns:
- ts_code: Stock code
- trade_date: Trade date (YYYYMMDD)
- his_low: Historical lowest price
- his_high: Historical highest price
- cost_5pct: 5th percentile cost
- cost_15pct: 15th percentile cost
- cost_50pct: 50th percentile cost (median)
- cost_85pct: 85th percentile cost
- cost_95pct: 95th percentile cost
- weight_avg: Weighted average cost
- winner_rate: Win rate (percentage)
Example:
>>> # Get all stocks' chip distribution for a single date
>>> data = get_cyq_perf(trade_date='20240115')
>>>
>>> # Get date range data for a specific stock
>>> data = get_cyq_perf(ts_code='000001.SZ', start_date='20240101', end_date='20240131')
>>>
>>> # Get specific stock on specific date
>>> data = get_cyq_perf(ts_code='000001.SZ', trade_date='20240115')
"""
client = client or TushareClient()
# Build parameters
params = {}
if trade_date:
params["trade_date"] = trade_date
if ts_code:
params["ts_code"] = ts_code
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
# Fetch data using cyq_perf API
data = client.query("cyq_perf", **params)
# Rename date column if needed
if "date" in data.columns:
data = data.rename(columns={"date": "trade_date"})
return data
class CyqPerfSync(DateBasedSync):
"""筹码分布数据批量同步管理器,支持全量/增量同步。
继承自 DateBasedSync使用按日期并发获取数据
Example:
>>> sync = CyqPerfSync()
>>> results = sync.sync_all() # 增量同步
>>> results = sync.sync_all(force_full=True) # 全量同步
>>> preview = sync.preview_sync() # 预览
"""
table_name = "cyq_perf"
default_start_date = "20180101"
# 表结构定义
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"trade_date": "DATE NOT NULL",
"his_low": "DOUBLE",
"his_high": "DOUBLE",
"cost_5pct": "DOUBLE",
"cost_15pct": "DOUBLE",
"cost_50pct": "DOUBLE",
"cost_85pct": "DOUBLE",
"cost_95pct": "DOUBLE",
"weight_avg": "DOUBLE",
"winner_rate": "DOUBLE",
}
# 索引定义
TABLE_INDEXES = [
("idx_cyq_perf_date_code", ["trade_date", "ts_code"]),
]
# 主键定义
PRIMARY_KEY = ("ts_code", "trade_date")
def fetch_single_date(self, trade_date: str) -> pd.DataFrame:
"""获取单日所有股票的筹码分布数据。
Args:
trade_date: 交易日期YYYYMMDD
Returns:
包含当日所有股票筹码分布数据的 DataFrame
"""
return get_cyq_perf(trade_date=trade_date, client=self.client)
def sync_cyq_perf(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
force_full: bool = False,
) -> pd.DataFrame:
"""同步筹码分布数据到 DuckDB支持智能增量同步。
逻辑
- 若表不存在创建表 + 复合索引 (trade_date, ts_code) + 全量同步
- 若表存在 last_date + 1 开始增量同步
Args:
start_date: 起始日期YYYYMMDD 格式默认全量从 20180101增量从 last_date+1
end_date: 结束日期YYYYMMDD 格式默认为今天
force_full: 若为 True强制从 20180101 完整重载
Returns:
包含同步数据的 pd.DataFrame
Example:
>>> # 首次同步(从 20180101 全量加载)
>>> result = sync_cyq_perf()
>>>
>>> # 后续同步(增量 - 仅新数据)
>>> result = sync_cyq_perf()
>>>
>>> # 强制完整重载
>>> result = sync_cyq_perf(force_full=True)
>>>
>>> # 手动指定日期范围
>>> result = sync_cyq_perf(start_date='20240101', end_date='20240131')
"""
sync_manager = CyqPerfSync()
return sync_manager.sync_all(
start_date=start_date,
end_date=end_date,
force_full=force_full,
)
def preview_cyq_perf_sync(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
force_full: bool = False,
sample_size: int = 3,
) -> dict:
"""预览筹码分布数据同步数据量和样本(不实际同步)。
Args:
start_date: 手动指定起始日期覆盖自动检测
end_date: 手动指定结束日期默认为今天
force_full: 若为 True预览全量同步 20180101
sample_size: 预览天数默认: 3
Returns:
包含预览信息的字典
Example:
>>> # 预览将要同步的内容
>>> preview = preview_cyq_perf_sync()
>>>
>>> # 预览全量同步
>>> preview = preview_cyq_perf_sync(force_full=True)
"""
sync_manager = CyqPerfSync()
return sync_manager.preview_sync(
start_date=start_date,
end_date=end_date,
force_full=force_full,
sample_size=sample_size,
)