feat(data): 新增财务指标和涨跌停数据接口

- 财务指标: fina_indicator_vip 封装,166 字段,季度同步
- 涨跌停价格: stk_limit 封装,日频数据同步
- 配套单元测试和调度中心集成
This commit is contained in:
2026-03-08 23:14:18 +08:00
parent 505279c08b
commit a464ef70c0
9 changed files with 1499 additions and 465 deletions

View File

@@ -618,4 +618,70 @@ df = pro.stock_st(trade_date='20250813')
171 603721.SH *ST天择 20250813 ST 风险警示板
172 600289.SH ST信通 20250813 ST 风险警示板
173 000929.SZ *ST兰黄 20250813 ST 风险警示板
174 000638.SZ *ST万方 20250813 ST 风险警示板
174 000638.SZ *ST万方 20250813 ST 风险警示板
每日涨跌停价格
接口stk_limit
描述获取全市场包含A/B股和基金每日涨跌停价格包括涨停价格跌停价格等每个交易日8点40左右更新当日股票涨跌停价格。
限量单次最多提取5800条记录可循环调取总量不限制
积分用户积2000积分可调取单位分钟有流控积分越高流量越大请自行提高积分具体请参阅积分获取办法
输入参数
名称 类型 必选 描述
ts_code str N 股票代码
trade_date str N 交易日期
start_date str N 开始日期
end_date str N 结束日期
输出参数
名称 类型 默认显示 描述
trade_date str Y 交易日期
ts_code str Y TS股票代码
pre_close float N 昨日收盘价
up_limit float Y 涨停价
down_limit float Y 跌停价
接口示例
pro = ts.pro_api()
#获取单日全部股票数据涨跌停价格
df = pro.stk_limit(trade_date='20190625')
#获取单个股票数据
df = pro.stk_limit(ts_code='002149.SZ', start_date='20190115', end_date='20190615')
数据示例
trade_date ts_code up_limit down_limit
0 20190625 000001.SZ 15.06 12.32
1 20190625 000002.SZ 30.94 25.32
2 20190625 000004.SZ 25.15 20.57
3 20190625 000005.SZ 3.49 2.85
4 20190625 000006.SZ 6.14 5.02
5 20190625 000007.SZ 7.74 6.34
6 20190625 000008.SZ 4.28 3.50
7 20190625 000009.SZ 6.36 5.20
8 20190625 000010.SZ 3.51 3.17
9 20190625 000011.SZ 10.58 8.66
10 20190625 000012.SZ 5.16 4.22
11 20190625 000014.SZ 10.98 8.98
12 20190625 000016.SZ 4.81 3.93
13 20190625 000017.SZ 5.15 4.21
14 20190625 000018.SZ 1.44 1.30
15 20190625 000019.SZ 8.09 6.62
16 20190625 000020.SZ 12.21 9.99
17 20190625 000021.SZ 9.30 7.61
18 20190625 000023.SZ 14.61 11.95
19 20190625 000025.SZ 23.08 18.88
20 20190625 000026.SZ 8.66 7.08

View File

@@ -508,4 +508,213 @@ df2 = pro.cashflow_vip(period='20181231',fields='')
9 | 母公司调整表 | 该公司母公司的本年度公布上年同期的财务报表数据
10 | 母公司调整前报表 | 母公司调整之前的原始财务报表数据
11 | 目公司调整前合并报表 | 母公司调整之前合并报表原数据
12 | 母公司调整前报表 | 母公司报表发生变更前保留的原数据
12 | 母公司调整前报表 | 母公司报表发生变更前保留的原数据
财务指标数据
接口fina_indicator可以通过数据工具调试和查看数据。
描述获取上市公司财务指标数据为避免服务器压力现阶段每次请求最多返回100条记录可通过设置日期多次请求获取更多数据。
权限用户需要至少2000积分才可以调取具体请参阅积分获取办法
提示当前接口只能按单只股票获取其历史数据如果需要获取某一季度全部上市公司数据请使用fina_indicator_vip接口参数一致需积攒5000积分。
输入参数
名称 类型 必选 描述
ts_code str Y TS股票代码,e.g. 600001.SH/000001.SZ
ann_date str N 公告日期
start_date str N 报告期开始日期
end_date str N 报告期结束日期
period str N 报告期(每个季度最后一天的日期,比如20171231表示年报)
输出参数
名称 类型 默认显示 描述
ts_code str Y TS代码
ann_date str Y 公告日期
end_date str Y 报告期
eps float Y 基本每股收益
dt_eps float Y 稀释每股收益
total_revenue_ps float Y 每股营业总收入
revenue_ps float Y 每股营业收入
capital_rese_ps float Y 每股资本公积
surplus_rese_ps float Y 每股盈余公积
undist_profit_ps float Y 每股未分配利润
extra_item float Y 非经常性损益
profit_dedt float Y 扣除非经常性损益后的净利润(扣非净利润)
gross_margin float Y 毛利
current_ratio float Y 流动比率
quick_ratio float Y 速动比率
cash_ratio float Y 保守速动比率
invturn_days float N 存货周转天数
arturn_days float N 应收账款周转天数
inv_turn float N 存货周转率
ar_turn float Y 应收账款周转率
ca_turn float Y 流动资产周转率
fa_turn float Y 固定资产周转率
assets_turn float Y 总资产周转率
op_income float Y 经营活动净收益
valuechange_income float N 价值变动净收益
interst_income float N 利息费用
daa float N 折旧与摊销
ebit float Y 息税前利润
ebitda float Y 息税折旧摊销前利润
fcff float Y 企业自由现金流量
fcfe float Y 股权自由现金流量
current_exint float Y 无息流动负债
noncurrent_exint float Y 无息非流动负债
interestdebt float Y 带息债务
netdebt float Y 净债务
tangible_asset float Y 有形资产
working_capital float Y 营运资金
networking_capital float Y 营运流动资本
invest_capital float Y 全部投入资本
retained_earnings float Y 留存收益
diluted2_eps float Y 期末摊薄每股收益
bps float Y 每股净资产
ocfps float Y 每股经营活动产生的现金流量净额
retainedps float Y 每股留存收益
cfps float Y 每股现金流量净额
ebit_ps float Y 每股息税前利润
fcff_ps float Y 每股企业自由现金流量
fcfe_ps float Y 每股股东自由现金流量
netprofit_margin float Y 销售净利率
grossprofit_margin float Y 销售毛利率
cogs_of_sales float Y 销售成本率
expense_of_sales float Y 销售期间费用率
profit_to_gr float Y 净利润/营业总收入
saleexp_to_gr float Y 销售费用/营业总收入
adminexp_of_gr float Y 管理费用/营业总收入
finaexp_of_gr float Y 财务费用/营业总收入
impai_ttm float Y 资产减值损失/营业总收入
gc_of_gr float Y 营业总成本/营业总收入
op_of_gr float Y 营业利润/营业总收入
ebit_of_gr float Y 息税前利润/营业总收入
roe float Y 净资产收益率
roe_waa float Y 加权平均净资产收益率
roe_dt float Y 净资产收益率(扣除非经常损益)
roa float Y 总资产报酬率
npta float Y 总资产净利润
roic float Y 投入资本回报率
roe_yearly float Y 年化净资产收益率
roa2_yearly float Y 年化总资产报酬率
roe_avg float N 平均净资产收益率(增发条件)
opincome_of_ebt float N 经营活动净收益/利润总额
investincome_of_ebt float N 价值变动净收益/利润总额
n_op_profit_of_ebt float N 营业外收支净额/利润总额
tax_to_ebt float N 所得税/利润总额
dtprofit_to_profit float N 扣除非经常损益后的净利润/净利润
salescash_to_or float N 销售商品提供劳务收到的现金/营业收入
ocf_to_or float N 经营活动产生的现金流量净额/营业收入
ocf_to_opincome float N 经营活动产生的现金流量净额/经营活动净收益
capitalized_to_da float N 资本支出/折旧和摊销
debt_to_assets float Y 资产负债率
assets_to_eqt float Y 权益乘数
dp_assets_to_eqt float Y 权益乘数(杜邦分析)
ca_to_assets float Y 流动资产/总资产
nca_to_assets float Y 非流动资产/总资产
tbassets_to_totalassets float Y 有形资产/总资产
int_to_talcap float Y 带息债务/全部投入资本
eqt_to_talcapital float Y 归属于母公司的股东权益/全部投入资本
currentdebt_to_debt float Y 流动负债/负债合计
longdeb_to_debt float Y 非流动负债/负债合计
ocf_to_shortdebt float Y 经营活动产生的现金流量净额/流动负债
debt_to_eqt float Y 产权比率
eqt_to_debt float Y 归属于母公司的股东权益/负债合计
eqt_to_interestdebt float Y 归属于母公司的股东权益/带息债务
tangibleasset_to_debt float Y 有形资产/负债合计
tangasset_to_intdebt float Y 有形资产/带息债务
tangibleasset_to_netdebt float Y 有形资产/净债务
ocf_to_debt float Y 经营活动产生的现金流量净额/负债合计
ocf_to_interestdebt float N 经营活动产生的现金流量净额/带息债务
ocf_to_netdebt float N 经营活动产生的现金流量净额/净债务
ebit_to_interest float N 已获利息倍数(EBIT/利息费用)
longdebt_to_workingcapital float N 长期债务与营运资金比率
ebitda_to_debt float N 息税折旧摊销前利润/负债合计
turn_days float Y 营业周期
roa_yearly float Y 年化总资产净利率
roa_dp float Y 总资产净利率(杜邦分析)
fixed_assets float Y 固定资产合计
profit_prefin_exp float N 扣除财务费用前营业利润
non_op_profit float N 非营业利润
op_to_ebt float N 营业利润/利润总额
nop_to_ebt float N 非营业利润/利润总额
ocf_to_profit float N 经营活动产生的现金流量净额/营业利润
cash_to_liqdebt float N 货币资金/流动负债
cash_to_liqdebt_withinterest float N 货币资金/带息流动负债
op_to_liqdebt float N 营业利润/流动负债
op_to_debt float N 营业利润/负债合计
roic_yearly float N 年化投入资本回报率
total_fa_trun float N 固定资产合计周转率
profit_to_op float Y 利润总额/营业收入
q_opincome float N 经营活动单季度净收益
q_investincome float N 价值变动单季度净收益
q_dtprofit float N 扣除非经常损益后的单季度净利润
q_eps float N 每股收益(单季度)
q_netprofit_margin float N 销售净利率(单季度)
q_gsprofit_margin float N 销售毛利率(单季度)
q_exp_to_sales float N 销售期间费用率(单季度)
q_profit_to_gr float N 净利润/营业总收入(单季度)
q_saleexp_to_gr float Y 销售费用/营业总收入 (单季度)
q_adminexp_to_gr float N 管理费用/营业总收入 (单季度)
q_finaexp_to_gr float N 财务费用/营业总收入 (单季度)
q_impair_to_gr_ttm float N 资产减值损失/营业总收入(单季度)
q_gc_to_gr float Y 营业总成本/营业总收入 (单季度)
q_op_to_gr float N 营业利润/营业总收入(单季度)
q_roe float Y 净资产收益率(单季度)
q_dt_roe float Y 净资产单季度收益率(扣除非经常损益)
q_npta float Y 总资产净利润(单季度)
q_opincome_to_ebt float N 经营活动净收益/利润总额(单季度)
q_investincome_to_ebt float N 价值变动净收益/利润总额(单季度)
q_dtprofit_to_profit float N 扣除非经常损益后的净利润/净利润(单季度)
q_salescash_to_or float N 销售商品提供劳务收到的现金/营业收入(单季度)
q_ocf_to_sales float Y 经营活动产生的现金流量净额/营业收入(单季度)
q_ocf_to_or float N 经营活动产生的现金流量净额/经营活动净收益(单季度)
basic_eps_yoy float Y 基本每股收益同比增长率(%)
dt_eps_yoy float Y 稀释每股收益同比增长率(%)
cfps_yoy float Y 每股经营活动产生的现金流量净额同比增长率(%)
op_yoy float Y 营业利润同比增长率(%)
ebt_yoy float Y 利润总额同比增长率(%)
netprofit_yoy float Y 归属母公司股东的净利润同比增长率(%)
dt_netprofit_yoy float Y 归属母公司股东的净利润-扣除非经常损益同比增长率(%)
ocf_yoy float Y 经营活动产生的现金流量净额同比增长率(%)
roe_yoy float Y 净资产收益率(摊薄)同比增长率(%)
bps_yoy float Y 每股净资产相对年初增长率(%)
assets_yoy float Y 资产总计相对年初增长率(%)
eqt_yoy float Y 归属母公司的股东权益相对年初增长率(%)
tr_yoy float Y 营业总收入同比增长率(%)
or_yoy float Y 营业收入同比增长率(%)
q_gr_yoy float N 营业总收入同比增长率(%)(单季度)
q_gr_qoq float N 营业总收入环比增长率(%)(单季度)
q_sales_yoy float Y 营业收入同比增长率(%)(单季度)
q_sales_qoq float N 营业收入环比增长率(%)(单季度)
q_op_yoy float N 营业利润同比增长率(%)(单季度)
q_op_qoq float Y 营业利润环比增长率(%)(单季度)
q_profit_yoy float N 净利润同比增长率(%)(单季度)
q_profit_qoq float N 净利润环比增长率(%)(单季度)
q_netprofit_yoy float N 归属母公司股东的净利润同比增长率(%)(单季度)
q_netprofit_qoq float N 归属母公司股东的净利润环比增长率(%)(单季度)
equity_yoy float Y 净资产同比增长率
rd_exp float N 研发费用
update_flag str N 更新标识
接口用法
pro = ts.pro_api()
df = pro.fina_indicator(ts_code='600000.SH')
或者
df = pro.query('fina_indicator', ts_code='600000.SH', start_date='20170101', end_date='20180801')
数据样例
ts_code ann_date end_date eps dt_eps total_revenue_ps revenue_ps \
0 600000.SH 20180830 20180630 0.95 0.95 2.8024 2.8024
1 600000.SH 20180428 20180331 0.46 0.46 1.3501 1.3501
2 600000.SH 20180428 20171231 1.84 1.84 5.7447 5.7447
3 600000.SH 20180428 20171231 1.84 1.84 5.7447 5.7447
4 600000.SH 20171028 20170930 1.45 1.45 4.2507 4.2507
5 600000.SH 20171028 20170930 1.45 1.45 4.2507 4.2507
6 600000.SH 20170830 20170630 0.97 0.97 2.9659 2.9659
7 600000.SH 20170427 20170331 0.63 0.63 1.9595 1.9595
8 600000.SH 20170427 20170331 0.63 0.63 1.9595 1.9595

View File

@@ -12,11 +12,13 @@ Available APIs:
- api_namechange: Stock name change history (股票曾用名)
- api_bak_basic: Stock historical list (股票历史列表)
- api_stock_st: ST stock list (ST股票列表)
- api_stk_limit: Stock limit price (每日涨跌停价格)
Example:
>>> from src.data.api_wrappers import get_daily, get_stock_basic, get_trade_cal, get_bak_basic
>>> from src.data.api_wrappers import get_pro_bar, sync_pro_bar, get_daily_basic, sync_daily_basic
>>> from src.data.api_wrappers import get_stock_st, sync_stock_st
>>> from src.data.api_wrappers import get_stk_limit, sync_stk_limit
>>> data = get_daily('000001.SZ', start_date='20240101', end_date='20240131')
>>> pro_data = get_pro_bar('000001.SZ', start_date='20240101', end_date='20240131')
>>> daily_basic = get_daily_basic(trade_date='20240101')
@@ -24,6 +26,7 @@ Example:
>>> calendar = get_trade_cal('20240101', '20240131')
>>> bak_basic = get_bak_basic(trade_date='20240101')
>>> stock_st = get_stock_st(trade_date='20240101')
>>> stk_limit = get_stk_limit(trade_date='20240101')
"""
from src.data.api_wrappers.api_daily import (
@@ -58,6 +61,12 @@ from src.data.api_wrappers.api_stock_st import (
sync_stock_st,
StockSTSync,
)
from src.data.api_wrappers.api_stk_limit import (
get_stk_limit,
sync_stk_limit,
preview_stk_limit_sync,
StkLimitSync,
)
from src.data.api_wrappers.api_trade_cal import (
get_trade_cal,
get_trading_days,
@@ -107,6 +116,11 @@ __all__ = [
"get_stock_st",
"sync_stock_st",
"StockSTSync",
# Stock limit price
"get_stk_limit",
"sync_stk_limit",
"preview_stk_limit_sync",
"StkLimitSync",
]
# =============================================================================
@@ -179,6 +193,17 @@ try:
order=40,
)
# 7. Stock Limit Price - 每日涨跌停价格
from src.data.api_wrappers.api_stk_limit import StkLimitSync
sync_registry.register_class(
name="stk_limit",
sync_class=StkLimitSync,
display_name="每日涨跌停价格",
description="股票每日涨跌停价格(涨停价、跌停价)",
order=50,
)
except ImportError:
# sync_registry 可能不存在(首次导入),忽略
pass

View File

@@ -0,0 +1,224 @@
"""Stock Limit Price (涨跌停价格) interface.
Fetch daily limit up/down prices for all stocks from Tushare.
This interface retrieves the upper and lower limit prices for stocks,
which are typically available around 8:40 AM each trading day.
"""
from typing import override
import pandas as pd
from src.data.client import TushareClient
from src.data.api_wrappers.base_sync import DateBasedSync
def get_stk_limit(
trade_date: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
ts_code: str | None = None,
client: TushareClient | None = None,
) -> pd.DataFrame:
"""Fetch stock limit prices from Tushare.
This interface retrieves daily limit up/down prices for stocks.
Each trading day, limit prices are available around 8:40 AM.
Supports fetching all stocks for a single date (preferred for efficiency)
or date range data for specific stocks.
Args:
trade_date: Specific trade date (YYYYMMDD format).
If provided, fetches all stocks for this date (most efficient).
start_date: Start date (YYYYMMDD format).
Used with end_date for date range queries.
end_date: End date (YYYYMMDD format).
Used with start_date for date range queries.
ts_code: Stock code filter (optional).
e.g., '000001.SZ', '600000.SH'
client: Optional TushareClient instance for shared rate limiting.
If None, creates a new client. For concurrent sync operations,
pass a shared client to ensure proper rate limiting.
Returns:
pd.DataFrame with columns:
- trade_date: Trade date (YYYYMMDD)
- ts_code: Stock code
- pre_close: Previous closing price
- up_limit: Upper limit price (涨停价)
- down_limit: Lower limit price (跌停价)
Example:
>>> # Get all stocks limit prices for a single date (most efficient)
>>> data = get_stk_limit(trade_date='20240625')
>>>
>>> # Get date range data
>>> data = get_stk_limit(start_date='20240101', end_date='20240131')
>>>
>>> # Get specific stock data
>>> data = get_stk_limit(ts_code='000001.SZ', start_date='20240101', end_date='20240131')
"""
client = client or TushareClient()
# Build parameters
params = {}
if trade_date:
params["trade_date"] = trade_date
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
if ts_code:
params["ts_code"] = ts_code
# Fetch data
data = client.query("stk_limit", **params) # type: ignore
# Rename date column if needed
if "date" in data.columns:
data = data.rename(columns={"date": "trade_date"})
return data
class StkLimitSync(DateBasedSync):
"""Stock Limit Price data batch sync manager.
Inherits from DateBasedSync, fetches data by date for all stocks.
Each API call retrieves limit prices for all stocks on a specific date.
Example:
>>> sync = StkLimitSync()
>>> results = sync.sync_all() # Incremental sync
>>> results = sync.sync_all(force_full=True) # Full reload
>>> preview = sync.preview_sync() # Preview
"""
table_name: str = "stk_limit"
default_start_date: str = "20180101"
# Table schema definition
TABLE_SCHEMA: dict[str, str] = {
"ts_code": "VARCHAR(16) NOT NULL",
"trade_date": "DATE NOT NULL",
"pre_close": "DOUBLE",
"up_limit": "DOUBLE",
"down_limit": "DOUBLE",
}
# Index definitions
TABLE_INDEXES: list[tuple[str, list[str]]] = [
("idx_stk_limit_date_code", ["trade_date", "ts_code"]),
]
# Primary key definition
PRIMARY_KEY: tuple[str, str] = ("ts_code", "trade_date")
@override
def fetch_single_date(self, trade_date: str) -> pd.DataFrame:
"""Fetch limit prices for all stocks on a specific date.
Args:
trade_date: Trading date (YYYYMMDD)
Returns:
DataFrame with limit prices for all stocks on the date
"""
# Use get_stk_limit to fetch all stocks for a single date
data = get_stk_limit(
trade_date=trade_date,
client=self.client, # Pass shared client for rate limiting
)
return data
def sync_stk_limit(
force_full: bool = False,
start_date: str | None = None,
end_date: str | None = None,
dry_run: bool = False,
) -> pd.DataFrame:
"""Sync stock limit prices to local DuckDB storage.
This is the main entry point for stock limit price data synchronization.
Args:
force_full: If True, force full reload from default_start_date
start_date: Manual start date override (YYYYMMDD)
end_date: Manual end date override (defaults to today)
dry_run: If True, only preview what would be synced without writing
Returns:
DataFrame with synced data
Example:
>>> # First sync (full load from default_start_date)
>>> result = sync_stk_limit()
>>>
>>> # Subsequent syncs (incremental - only new data)
>>> result = sync_stk_limit()
>>>
>>> # Force full reload
>>> result = sync_stk_limit(force_full=True)
>>>
>>> # Manual date range
>>> result = sync_stk_limit(start_date='20240101', end_date='20240131')
>>>
>>> # Dry run (preview only)
>>> result = sync_stk_limit(dry_run=True)
"""
sync_manager = StkLimitSync()
return sync_manager.sync_all(
force_full=force_full,
start_date=start_date,
end_date=end_date,
dry_run=dry_run,
)
def preview_stk_limit_sync(
force_full: bool = False,
start_date: str | None = None,
end_date: str | None = None,
sample_size: int = 3,
) -> dict[str, object]:
"""Preview stock limit price sync data volume and samples.
This is the recommended way to check what would be synced before
actually performing the synchronization.
Args:
force_full: If True, preview full sync from default_start_date
start_date: Manual start date override
end_date: Manual end date override (defaults to today)
sample_size: Number of sample days to fetch for preview (default: 3)
Returns:
Dictionary with preview information:
{
'sync_needed': bool,
'date_count': int,
'start_date': str,
'end_date': str,
'estimated_records': int,
'sample_data': pd.DataFrame,
'mode': str, # 'full', 'incremental', or 'none'
}
Example:
>>> # Preview what would be synced
>>> preview = preview_stk_limit_sync()
>>>
>>> # Preview full sync
>>> preview = preview_stk_limit_sync(force_full=True)
>>>
>>> # Preview with more samples
>>> preview = preview_stk_limit_sync(sample_size=5)
"""
sync_manager = StkLimitSync()
return sync_manager.preview_sync(
force_full=force_full,
start_date=start_date,
end_date=end_date,
sample_size=sample_size,
)

View File

@@ -1324,52 +1324,6 @@ class DateBasedSync(BaseDataSync):
probe_desc = f"date={probe_date}, all stocks"
self._probe_table_and_cleanup(probe_data, probe_desc)
# 执行同步
combined = self._run_date_range_sync(sync_start, sync_end, dry_run)
if self._should_probe_table():
print(f"[{class_name}] Table '{self.table_name}' is empty, probing...")
# 使用最近一个交易日的完整数据进行探测
probe_date = get_last_trading_day(sync_start, sync_end)
if probe_date:
probe_data = self.fetch_single_date(probe_date)
probe_desc = f"date={probe_date}, all stocks"
self._probe_table_and_cleanup(probe_data, probe_desc)
# 执行同步
if self._should_probe_table():
print(f"[{class_name}] Table '{self.table_name}' is empty, probing...")
# 使用最近一个交易日的完整数据进行探测
probe_date = get_last_trading_day(sync_start, sync_end)
if probe_date:
probe_data = self.fetch_single_date(probe_date)
probe_desc = f"date={probe_date}, all stocks"
self._probe_table_and_cleanup(probe_data, probe_desc)
if self._should_probe_table():
print(f"[{class_name}] Table '{self.table_name}' is empty, probing...")
# 使用最近一个交易日的完整数据进行探测
probe_date = get_last_trading_day(sync_start, sync_end)
if probe_date:
probe_data = self.fetch_single_date(probe_date)
probe_desc = f"date={probe_date}, all stocks"
self._probe_table_and_cleanup(probe_data, probe_desc)
# 执行同步
storage = Storage()
if not storage.exists(self.table_name):
print(
f"[{class_name}] Table '{self.table_name}' doesn't exist, creating..."
)
# 获取样本数据以推断 schema
sample = self.fetch_single_date(sync_end)
if sample.empty:
# 尝试另一个日期
sample = self.fetch_single_date("20240102")
if not sample.empty:
self._ensure_table_schema(sample)
else:
print(f"[{class_name}] Cannot create table: no sample data available")
return pd.DataFrame()
# 执行同步
combined = self._run_date_range_sync(sync_start, sync_end, dry_run)

View File

@@ -0,0 +1,394 @@
"""财务指标数据接口 (VIP 版本)
使用 Tushare VIP 接口 (fina_indicator_vip) 获取财务指标数据。
按季度同步,一次请求获取一个季度的全部上市公司数据。
接口说明:
- fina_indicator_vip: 获取某一季度全部上市公司财务指标数据
- 需要 5000 积分才能调用
- period 参数为报告期(季度最后一天,如 20231231
- 每次请求最多返回 100 条记录(需多次请求获取更多数据)
使用方式:
# 同步财务指标数据
from src.data.api_wrappers.financial_data.api_fina_indicator import (
FinaIndicatorQuarterSync,
sync_fina_indicator
)
# 方式1: 使用类
syncer = FinaIndicatorQuarterSync()
syncer.sync_incremental() # 增量同步
syncer.sync_full() # 全量同步
# 方式2: 使用便捷函数
sync_fina_indicator() # 增量同步
sync_fina_indicator(force_full=True) # 全量同步
"""
from typing import Optional
import pandas as pd
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync,
)
class FinaIndicatorQuarterSync(QuarterBasedSync):
"""财务指标季度同步实现。
使用 fina_indicator_vip 接口按季度获取全部上市公司财务指标数据。
表结构: financial_fina_indicator
主键: (ts_code, end_date)
"""
table_name = "financial_fina_indicator"
api_name = "fina_indicator_vip"
# 目标报表类型:默认只同步合并报表(财务指标接口无需过滤 report_type
TARGET_REPORT_TYPE = None
# 表结构定义 - 完整的财务指标字段
TABLE_SCHEMA = {
# 基础字段
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"end_date": "DATE NOT NULL",
# 每股收益指标
"eps": "DOUBLE",
"dt_eps": "DOUBLE",
"total_revenue_ps": "DOUBLE",
"revenue_ps": "DOUBLE",
"capital_rese_ps": "DOUBLE",
"surplus_rese_ps": "DOUBLE",
"undist_profit_ps": "DOUBLE",
"extra_item": "DOUBLE",
"profit_dedt": "DOUBLE",
"gross_margin": "DOUBLE",
# 偿债能力指标
"current_ratio": "DOUBLE",
"quick_ratio": "DOUBLE",
"cash_ratio": "DOUBLE",
# 营运能力指标
"invturn_days": "DOUBLE",
"arturn_days": "DOUBLE",
"inv_turn": "DOUBLE",
"ar_turn": "DOUBLE",
"ca_turn": "DOUBLE",
"fa_turn": "DOUBLE",
"assets_turn": "DOUBLE",
# 盈利能力指标
"op_income": "DOUBLE",
"valuechange_income": "DOUBLE",
"interst_income": "DOUBLE",
"daa": "DOUBLE",
"ebit": "DOUBLE",
"ebitda": "DOUBLE",
"fcff": "DOUBLE",
"fcfe": "DOUBLE",
# 资本结构指标
"current_exint": "DOUBLE",
"noncurrent_exint": "DOUBLE",
"interestdebt": "DOUBLE",
"netdebt": "DOUBLE",
"tangible_asset": "DOUBLE",
"working_capital": "DOUBLE",
"networking_capital": "DOUBLE",
"invest_capital": "DOUBLE",
"retained_earnings": "DOUBLE",
# 每股指标
"diluted2_eps": "DOUBLE",
"bps": "DOUBLE",
"ocfps": "DOUBLE",
"retainedps": "DOUBLE",
"cfps": "DOUBLE",
"ebit_ps": "DOUBLE",
"fcff_ps": "DOUBLE",
"fcfe_ps": "DOUBLE",
# 销售能力指标
"netprofit_margin": "DOUBLE",
"grossprofit_margin": "DOUBLE",
"cogs_of_sales": "DOUBLE",
"expense_of_sales": "DOUBLE",
"profit_to_gr": "DOUBLE",
"saleexp_to_gr": "DOUBLE",
"adminexp_of_gr": "DOUBLE",
"finaexp_of_gr": "DOUBLE",
"impai_ttm": "DOUBLE",
"gc_of_gr": "DOUBLE",
"op_of_gr": "DOUBLE",
"ebit_of_gr": "DOUBLE",
# 投资回报率指标
"roe": "DOUBLE",
"roe_waa": "DOUBLE",
"roe_dt": "DOUBLE",
"roa": "DOUBLE",
"npta": "DOUBLE",
"roic": "DOUBLE",
"roe_yearly": "DOUBLE",
"roa2_yearly": "DOUBLE",
"roe_avg": "DOUBLE",
# 利润结构指标
"opincome_of_ebt": "DOUBLE",
"investincome_of_ebt": "DOUBLE",
"n_op_profit_of_ebt": "DOUBLE",
"tax_to_ebt": "DOUBLE",
"dtprofit_to_profit": "DOUBLE",
# 现金流量指标
"salescash_to_or": "DOUBLE",
"ocf_to_or": "DOUBLE",
"ocf_to_opincome": "DOUBLE",
# 资本支出指标
"capitalized_to_da": "DOUBLE",
# 杠杆与偿债能力指标
"debt_to_assets": "DOUBLE",
"assets_to_eqt": "DOUBLE",
"dp_assets_to_eqt": "DOUBLE",
"ca_to_assets": "DOUBLE",
"nca_to_assets": "DOUBLE",
"tbassets_to_totalassets": "DOUBLE",
"int_to_talcap": "DOUBLE",
"eqt_to_talcapital": "DOUBLE",
"currentdebt_to_debt": "DOUBLE",
"longdeb_to_debt": "DOUBLE",
"ocf_to_shortdebt": "DOUBLE",
"debt_to_eqt": "DOUBLE",
"eqt_to_debt": "DOUBLE",
"eqt_to_interestdebt": "DOUBLE",
"tangibleasset_to_debt": "DOUBLE",
"tangasset_to_intdebt": "DOUBLE",
"tangibleasset_to_netdebt": "DOUBLE",
"ocf_to_debt": "DOUBLE",
"ocf_to_interestdebt": "DOUBLE",
"ocf_to_netdebt": "DOUBLE",
"ebit_to_interest": "DOUBLE",
"longdebt_to_workingcapital": "DOUBLE",
"ebitda_to_debt": "DOUBLE",
# 营运周期指标
"turn_days": "DOUBLE",
"roa_yearly": "DOUBLE",
"roa_dp": "DOUBLE",
"fixed_assets": "DOUBLE",
# 利润质量指标
"profit_prefin_exp": "DOUBLE",
"non_op_profit": "DOUBLE",
"op_to_ebt": "DOUBLE",
"nop_to_ebt": "DOUBLE",
"ocf_to_profit": "DOUBLE",
# 流动性指标
"cash_to_liqdebt": "DOUBLE",
"cash_to_liqdebt_withinterest": "DOUBLE",
"op_to_liqdebt": "DOUBLE",
"op_to_debt": "DOUBLE",
"roic_yearly": "DOUBLE",
"total_fa_trun": "DOUBLE",
"profit_to_op": "DOUBLE",
# 单季度指标 (q_*)
"q_opincome": "DOUBLE",
"q_investincome": "DOUBLE",
"q_dtprofit": "DOUBLE",
"q_eps": "DOUBLE",
"q_netprofit_margin": "DOUBLE",
"q_gsprofit_margin": "DOUBLE",
"q_exp_to_sales": "DOUBLE",
"q_profit_to_gr": "DOUBLE",
"q_saleexp_to_gr": "DOUBLE",
"q_adminexp_to_gr": "DOUBLE",
"q_finaexp_to_gr": "DOUBLE",
"q_impair_to_gr_ttm": "DOUBLE",
"q_gc_to_gr": "DOUBLE",
"q_op_to_gr": "DOUBLE",
"q_roe": "DOUBLE",
"q_dt_roe": "DOUBLE",
"q_npta": "DOUBLE",
"q_opincome_to_ebt": "DOUBLE",
"q_investincome_to_ebt": "DOUBLE",
"q_dtprofit_to_profit": "DOUBLE",
"q_salescash_to_or": "DOUBLE",
"q_ocf_to_sales": "DOUBLE",
"q_ocf_to_or": "DOUBLE",
# 同比增长率指标 (*_yoy)
"basic_eps_yoy": "DOUBLE",
"dt_eps_yoy": "DOUBLE",
"cfps_yoy": "DOUBLE",
"op_yoy": "DOUBLE",
"ebt_yoy": "DOUBLE",
"netprofit_yoy": "DOUBLE",
"dt_netprofit_yoy": "DOUBLE",
"ocf_yoy": "DOUBLE",
"roe_yoy": "DOUBLE",
"bps_yoy": "DOUBLE",
"assets_yoy": "DOUBLE",
"eqt_yoy": "DOUBLE",
"tr_yoy": "DOUBLE",
"or_yoy": "DOUBLE",
# 单季度增长指标 (q_*_yoy, q_*_qoq)
"q_gr_yoy": "DOUBLE",
"q_gr_qoq": "DOUBLE",
"q_sales_yoy": "DOUBLE",
"q_sales_qoq": "DOUBLE",
"q_op_yoy": "DOUBLE",
"q_op_qoq": "DOUBLE",
"q_profit_yoy": "DOUBLE",
"q_profit_qoq": "DOUBLE",
"q_netprofit_yoy": "DOUBLE",
"q_netprofit_qoq": "DOUBLE",
# 其他指标
"equity_yoy": "DOUBLE",
"rd_exp": "DOUBLE",
"update_flag": "VARCHAR(1)",
}
# 索引定义(不要创建唯一索引)
# 注意:财务数据可能发生多次修正,不设置主键和唯一索引
TABLE_INDEXES = [
("idx_financial_fina_indicator_ts_code", ["ts_code"]),
("idx_financial_fina_indicator_end_date", ["end_date"]),
("idx_financial_fina_indicator_ts_period", ["ts_code", "end_date"]),
]
def __init__(self):
"""初始化财务指标同步器。"""
super().__init__()
self._fields = None # 默认返回全部字段
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司财务指标数据。
注意fina_indicator_vip 接口每次请求最多返回 100 条记录,
需要通过 offset 参数循环获取该季度的全部数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司财务指标数据的 DataFrame
"""
all_data = []
offset = 0
limit = 100 # API 限制每次最多返回 100 条
while True:
params = {
"period": period,
"limit": limit,
"offset": offset,
}
if self._fields:
params["fields"] = self._fields
df = self.client.query(self.api_name, **params)
if df.empty:
break
all_data.append(df)
# 如果返回的数据少于 limit说明已经取完
if len(df) < limit:
break
offset += limit
if not all_data:
return pd.DataFrame()
return pd.concat(all_data, ignore_index=True)
# =============================================================================
# 便捷函数
# =============================================================================
def sync_fina_indicator(
force_full: bool = False,
dry_run: bool = False,
) -> list:
"""同步财务指标数据(便捷函数)。
Args:
force_full: 若为 True强制全量同步
dry_run: 若为 True仅预览不写入
Returns:
同步结果列表
Example:
>>> # 增量同步
>>> sync_fina_indicator()
>>>
>>> # 全量同步
>>> sync_fina_indicator(force_full=True)
>>>
>>> # 预览
>>> sync_fina_indicator(dry_run=True)
"""
return sync_financial_data(FinaIndicatorQuarterSync, force_full, dry_run)
def preview_fina_indicator_sync() -> dict:
"""预览财务指标同步信息。
Returns:
预览信息字典
"""
return preview_financial_sync(FinaIndicatorQuarterSync)
def get_fina_indicator(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取财务指标数据(原始接口,单季度)。
用于直接获取某个季度的数据,不进行同步管理。
注意:该接口每次最多返回 100 条记录,如需获取全部数据请使用同步功能。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
fields: 指定返回字段,默认返回全部字段
Returns:
包含财务指标数据的 DataFrame
"""
client = TushareClient()
if fields is None:
fields = (
"ts_code,ann_date,end_date,eps,dt_eps,total_revenue_ps,revenue_ps,"
"capital_rese_ps,surplus_rese_ps,undist_profit_ps,extra_item,profit_dedt,"
"gross_margin,current_ratio,quick_ratio,cash_ratio,invturn_days,arturn_days,"
"inv_turn,ar_turn,ca_turn,fa_turn,assets_turn,op_income,valuechange_income,"
"interst_income,daa,ebit,ebitda,fcff,fcfe,current_exint,noncurrent_exint,"
"interestdebt,netdebt,tangible_asset,working_capital,networking_capital,"
"invest_capital,retained_earnings,diluted2_eps,bps,ocfps,retainedps,cfps,"
"ebit_ps,fcff_ps,fcfe_ps,netprofit_margin,grossprofit_margin,cogs_of_sales,"
"expense_of_sales,profit_to_gr,saleexp_to_gr,adminexp_of_gr,finaexp_of_gr,"
"impai_ttm,gc_of_gr,op_of_gr,ebit_of_gr,roe,roe_waa,roe_dt,roa,npta,roic,"
"roe_yearly,roa2_yearly,roe_avg,opincome_of_ebt,investincome_of_ebt,"
"n_op_profit_of_ebt,tax_to_ebt,dtprofit_to_profit,salescash_to_or,"
"ocf_to_or,ocf_to_opincome,capitalized_to_da,debt_to_assets,assets_to_eqt,"
"dp_assets_to_eqt,ca_to_assets,nca_to_assets,tbassets_to_totalassets,"
"int_to_talcap,eqt_to_talcapital,currentdebt_to_debt,longdeb_to_debt,"
"ocf_to_shortdebt,debt_to_eqt,eqt_to_debt,eqt_to_interestdebt,"
"tangibleasset_to_debt,tangasset_to_intdebt,tangibleasset_to_netdebt,"
"ocf_to_debt,ocf_to_interestdebt,ocf_to_netdebt,ebit_to_interest,"
"longdebt_to_workingcapital,ebitda_to_debt,turn_days,roa_yearly,roa_dp,"
"fixed_assets,profit_prefin_exp,non_op_profit,op_to_ebt,nop_to_ebt,"
"ocf_to_profit,cash_to_liqdebt,cash_to_liqdebt_withinterest,op_to_liqdebt,"
"op_to_debt,roic_yearly,total_fa_trun,profit_to_op,q_opincome,"
"q_investincome,q_dtprofit,q_eps,q_netprofit_margin,q_gsprofit_margin,"
"q_exp_to_sales,q_profit_to_gr,q_saleexp_to_gr,q_adminexp_to_gr,"
"q_finaexp_to_gr,q_impair_to_gr_ttm,q_gc_to_gr,q_op_to_gr,q_roe,q_dt_roe,"
"q_npta,q_opincome_to_ebt,q_investincome_to_ebt,q_dtprofit_to_profit,"
"q_salescash_to_or,q_ocf_to_sales,q_ocf_to_or,basic_eps_yoy,dt_eps_yoy,"
"cfps_yoy,op_yoy,ebt_yoy,netprofit_yoy,dt_netprofit_yoy,ocf_yoy,roe_yoy,"
"bps_yoy,assets_yoy,eqt_yoy,tr_yoy,or_yoy,q_gr_yoy,q_gr_qoq,q_sales_yoy,"
"q_sales_qoq,q_op_yoy,q_op_qoq,q_profit_yoy,q_profit_qoq,q_netprofit_yoy,"
"q_netprofit_qoq,equity_yoy,rd_exp,update_flag"
)
return client.query("fina_indicator_vip", period=period, fields=fields)

View File

@@ -7,6 +7,7 @@
- income: 利润表 (已实现)
- balance: 资产负债表 (已实现)
- cashflow: 现金流量表 (已实现)
- fina_indicator: 财务指标 (已实现)
使用方式:
# 同步所有财务数据(增量)
@@ -25,12 +26,15 @@
# 只同步现金流量表
sync_financial(data_types=["cashflow"])
# 只同步财务指标
sync_financial(data_types=["fina_indicator"])
# 预览同步
from src.data.api_wrappers.financial_data.api_financial_sync import preview_sync
preview = preview_sync()
"""
from typing import List, Dict, Optional
from typing import List, Optional
from src.data.api_wrappers.financial_data.api_income import (
IncomeQuarterSync,
@@ -47,6 +51,11 @@ from src.data.api_wrappers.financial_data.api_cashflow import (
sync_cashflow,
preview_cashflow_sync,
)
from src.data.api_wrappers.financial_data.api_fina_indicator import (
FinaIndicatorQuarterSync,
sync_fina_indicator,
preview_fina_indicator_sync,
)
# 支持的财务数据类型映射
@@ -69,6 +78,12 @@ FINANCIAL_SYNCERS = {
"preview_func": preview_cashflow_sync,
"display_name": "现金流量表",
},
"fina_indicator": {
"syncer_class": FinaIndicatorQuarterSync,
"sync_func": sync_fina_indicator,
"preview_func": preview_fina_indicator_sync,
"display_name": "财务指标",
},
}
@@ -76,7 +91,7 @@ def sync_financial(
data_types: Optional[List[str]] = None,
force_full: bool = False,
dry_run: bool = False,
) -> Dict[str, List]:
) -> dict[str, list]:
"""同步财务数据(调度函数)。
根据指定的数据类型,调度对应的同步器执行同步。
@@ -157,7 +172,7 @@ def sync_financial(
return results
def preview_sync(data_types: Optional[List[str]] = None) -> Dict[str, Dict]:
def preview_sync(data_types: Optional[List[str]] = None) -> dict[str, dict]:
"""预览财务数据同步信息。
Args:
@@ -189,7 +204,7 @@ def preview_sync(data_types: Optional[List[str]] = None) -> Dict[str, Dict]:
return previews
def list_financial_types() -> List[Dict]:
def list_financial_types() -> list[dict]:
"""列出所有支持的财务数据类型。
Returns:

File diff suppressed because one or more lines are too long

246
tests/test_stk_limit.py Normal file
View File

@@ -0,0 +1,246 @@
"""Tests for stock limit price API wrapper."""
import pytest
import pandas as pd
from unittest.mock import patch, MagicMock
from src.data.api_wrappers.api_stk_limit import (
get_stk_limit,
sync_stk_limit,
preview_stk_limit_sync,
StkLimitSync,
)
class TestStkLimit:
"""Test suite for stk_limit API wrapper."""
@patch("src.data.api_wrappers.api_stk_limit.TushareClient")
def test_get_by_date(self, mock_client_class):
"""Test fetching data by trade_date."""
# Setup mock
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.query.return_value = pd.DataFrame(
{
"ts_code": ["000001.SZ", "000002.SZ"],
"trade_date": ["20240625", "20240625"],
"pre_close": [10.0, 20.0],
"up_limit": [11.0, 22.0],
"down_limit": [9.0, 18.0],
}
)
# Test
result = get_stk_limit(trade_date="20240625")
# Assert
assert not result.empty
assert len(result) == 2
assert "ts_code" in result.columns
assert "trade_date" in result.columns
assert "up_limit" in result.columns
assert "down_limit" in result.columns
mock_client.query.assert_called_once_with("stk_limit", trade_date="20240625")
@patch("src.data.api_wrappers.api_stk_limit.TushareClient")
def test_get_by_date_range(self, mock_client_class):
"""Test fetching data by date range."""
# Setup mock
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.query.return_value = pd.DataFrame(
{
"ts_code": ["000001.SZ", "000001.SZ"],
"trade_date": ["20240624", "20240625"],
"pre_close": [10.0, 10.5],
"up_limit": [11.0, 11.55],
"down_limit": [9.0, 9.45],
}
)
# Test
result = get_stk_limit(start_date="20240624", end_date="20240625")
# Assert
assert not result.empty
assert len(result) == 2
mock_client.query.assert_called_once_with(
"stk_limit", start_date="20240624", end_date="20240625"
)
@patch("src.data.api_wrappers.api_stk_limit.TushareClient")
def test_get_by_stock_code(self, mock_client_class):
"""Test fetching data by stock code."""
# Setup mock
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.query.return_value = pd.DataFrame(
{
"ts_code": ["000001.SZ"],
"trade_date": ["20240625"],
"pre_close": [10.0],
"up_limit": [11.0],
"down_limit": [9.0],
}
)
# Test
result = get_stk_limit(ts_code="000001.SZ", trade_date="20240625")
# Assert
assert not result.empty
assert len(result) == 1
assert result.iloc[0]["ts_code"] == "000001.SZ"
mock_client.query.assert_called_once_with(
"stk_limit", trade_date="20240625", ts_code="000001.SZ"
)
@patch("src.data.api_wrappers.api_stk_limit.TushareClient")
def test_empty_response(self, mock_client_class):
"""Test handling empty response."""
# Setup mock
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.query.return_value = pd.DataFrame()
# Test
result = get_stk_limit(trade_date="20240625")
# Assert
assert result.empty
@patch("src.data.api_wrappers.api_stk_limit.TushareClient")
def test_shared_client(self, mock_client_class):
"""Test passing shared client for rate limiting."""
# Setup mock
shared_client = MagicMock()
shared_client.query.return_value = pd.DataFrame(
{
"ts_code": ["000001.SZ"],
"trade_date": ["20240625"],
"pre_close": [10.0],
"up_limit": [11.0],
"down_limit": [9.0],
}
)
# Test
result = get_stk_limit(trade_date="20240625", client=shared_client)
# Assert
assert not result.empty
shared_client.query.assert_called_once()
# Verify new client was not created
mock_client_class.assert_not_called()
class TestStkLimitSync:
"""Test suite for StkLimitSync class."""
@patch("src.data.api_wrappers.api_stk_limit.TushareClient")
@patch("src.data.api_wrappers.base_sync.Storage")
@patch("src.data.api_wrappers.base_sync.sync_trade_cal_cache")
def test_fetch_single_date(
self, mock_sync_cal, mock_storage_class, mock_client_class
):
"""Test fetch_single_date method."""
# Setup mock
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.query.return_value = pd.DataFrame(
{
"ts_code": ["000001.SZ", "000002.SZ"],
"trade_date": ["20240625", "20240625"],
"pre_close": [10.0, 20.0],
"up_limit": [11.0, 22.0],
"down_limit": [9.0, 18.0],
}
)
mock_storage = MagicMock()
mock_storage_class.return_value = mock_storage
mock_storage.exists.return_value = True
mock_storage.load.return_value = pd.DataFrame()
# Test
sync = StkLimitSync()
result = sync.fetch_single_date("20240625")
# Assert
assert not result.empty
assert len(result) == 2
mock_client.query.assert_called_once_with("stk_limit", trade_date="20240625")
def test_table_schema(self):
"""Test table schema definition."""
sync = StkLimitSync()
# Assert table configuration
assert sync.table_name == "stk_limit"
assert "ts_code" in sync.TABLE_SCHEMA
assert "trade_date" in sync.TABLE_SCHEMA
assert "pre_close" in sync.TABLE_SCHEMA
assert "up_limit" in sync.TABLE_SCHEMA
assert "down_limit" in sync.TABLE_SCHEMA
assert sync.PRIMARY_KEY == ("ts_code", "trade_date")
class TestSyncFunctions:
"""Test suite for sync convenience functions."""
@patch.object(StkLimitSync, "sync_all")
def test_sync_stk_limit(self, mock_sync_all):
"""Test sync_stk_limit convenience function."""
# Setup mock
mock_sync_all.return_value = pd.DataFrame(
{
"ts_code": ["000001.SZ"],
"trade_date": ["20240625"],
"up_limit": [11.0],
"down_limit": [9.0],
}
)
# Test
result = sync_stk_limit(force_full=True)
# Assert
assert not result.empty
mock_sync_all.assert_called_once_with(
force_full=True,
start_date=None,
end_date=None,
dry_run=False,
)
@patch.object(StkLimitSync, "preview_sync")
def test_preview_stk_limit_sync(self, mock_preview):
"""Test preview_stk_limit_sync convenience function."""
# Setup mock
mock_preview.return_value = {
"sync_needed": True,
"date_count": 10,
"start_date": "20240601",
"end_date": "20240610",
"estimated_records": 5000,
"sample_data": pd.DataFrame(),
"mode": "incremental",
}
# Test
result = preview_stk_limit_sync()
# Assert
assert result["sync_needed"] is True
assert result["mode"] == "incremental"
mock_preview.assert_called_once_with(
force_full=False,
start_date=None,
end_date=None,
sample_size=3,
)
if __name__ == "__main__":
pytest.main([__file__, "-v"])