Files
ProStock/docs/api/FINANCIAL_API_SPEC.md
liaozhaorun d4e0e2a0b6 feat(data): 添加每日筹码及胜率数据接口 (cyq_perf)
- 新增 api_cyq_perf 模块,支持筹码分布数据获取和同步
- 在 sync_registry 中注册 cyq_perf 同步器
2026-03-26 22:22:43 +08:00

39 KiB
Raw Permalink Blame History

财务数据 API 封装规范

文档版本: v1.3
适用范围: 所有财务数据 API利润表、资产负债表、现金流量表等
更新日期: 2026-03-08


目录

  1. 概述
  2. 架构设计原则
  3. 文件结构规范
  4. 类设计规范
  5. 同步策略规范
  6. 数据差异检测
  7. 表结构设计
  8. 索引设计规范
  9. 报表类型过滤
  10. 代码示例
  11. 常见问题

概述

财务数据特点

财务数据与日频数据(日线、分钟线)有本质区别:

特性 日频数据 财务数据
更新频率 每日更新 季度更新
获取方式 按股票循环获取 VIP接口一次性获取全市场
数据修正 极少发生 经常发生(财报修正)
数据量 5000+股票×250交易日 5000+股票×4季度
版本控制 多版本report_type

核心要求

  1. 必须实现 QuarterBasedSync 基类:所有财务数据同步必须继承此基类
  2. 不设置唯一约束:不创建主键和唯一索引,支持数据多次修正
  3. 先删除后插入:数据更新采用删除旧数据再插入新数据的策略
  4. 无跳过逻辑:财务数据必须每次都进行对比更新
  5. 报表类型过滤:默认只同步合并报表,支持灵活配置

架构设计原则

1. 职责分离

调度中心 (api_financial_sync.py)
    |
    v
各 API 文件 (api_income.py, api_balance.py, api_cashflow.py)
    |
    v
基类 (base_financial_sync.py - QuarterBasedSync)
    |
    v
存储层 (storage.py - ThreadSafeStorage)

规范:

  • 调度中心只负责任务协调,不包含具体同步逻辑
  • 各 API 文件实现具体的 fetch_single_quarter() 方法
  • 通用逻辑下沉到 QuarterBasedSync 基类

2. 统一继承

必须继承 QuarterBasedSync 基类:

from src.data.api_wrappers.base_financial_sync import QuarterBasedSync

class IncomeQuarterSync(QuarterBasedSync):
    """利润表季度同步实现。"""
    pass

禁止在 API 文件中重复实现同步逻辑。


文件结构规范

文件位置

财务数据 API 文件必须位于:

src/data/api_wrappers/financial_data/
├── __init__.py              # 可选:导出公共接口
├── api_income.py            # 利润表接口(已实现)
├── api_balance.py           # 资产负债表接口(已实现)
├── api_cashflow.py          # 现金流量表接口(已实现)
└── api_financial_sync.py    # 调度中心(只保留调度逻辑)

基类位置

src/data/api_wrappers/
├── base_sync.py             # StockBasedSync, DateBasedSync
└── base_financial_sync.py   # QuarterBasedSync本规范核心

快速开始

已实现的财务数据接口可以直接使用:

# 同步所有财务数据(利润表 + 资产负债表 + 现金流量表)
from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
sync_financial()

# 只同步利润表
from src.data.api_wrappers.financial_data.api_income import sync_income
sync_income()

# 只同步资产负债表
from src.data.api_wrappers.financial_data.api_balance import sync_balance
sync_balance()

# 只同步现金流量表
from src.data.api_wrappers.financial_data.api_cashflow import sync_cashflow
sync_cashflow()

# 全量同步
sync_cashflow(force_full=True)

# 预览同步
from src.data.api_wrappers.financial_data.api_cashflow import preview_cashflow_sync
preview_cashflow_sync()

文件内容结构

每个 API 文件必须包含以下部分(按顺序):

"""模块文档字符串。

包含:模块用途、使用方式、注意事项。
"""

# 1. 标准库导入
from typing import Optional
import pandas as pd

# 2. 第三方库导入
# (无特殊要求)

# 3. 本地模块导入
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
    QuarterBasedSync, 
    sync_financial_data,
    preview_financial_sync
)

# 4. 同步类实现
class XXXQuarterSync(QuarterBasedSync):
    """具体财务数据同步实现类。"""
    pass

# 5. 便捷函数

def sync_xxx(force_full: bool = False, dry_run: bool = False) -> list:
    """同步数据便捷函数。"""
    pass

def preview_xxx_sync() -> dict:
    """预览同步信息便捷函数。"""
    pass

# 6. 原始数据接口(可选,向后兼容)
def get_xxx(period: str, fields: Optional[str] = None) -> pd.DataFrame:
    """获取原始数据接口。"""
    pass

速率限制规范(关键)

问题背景

财务数据同步使用 VIP 接口(如 income_vipbalancesheet_vip)按季度获取全市场数据。在并发场景下,如果每个线程创建独立的 TushareClient 实例,每个实例会有独立的令牌桶限流器,导致限流失效

实际案例

  • 配置 RATE_LIMIT=150,理论上每分钟最多 150 次请求
  • 如果 10 个线程各自创建独立客户端,实际并发数 = 10 × 150 = 1500 次/分钟
  • 结果:触发 Tushare API 限流,请求失败

解决方案

必须在数据获取函数中接受可选的 client 参数,并在同步类中传递共享实例:

from src.data.client import TushareClient
from typing import Optional

# 1. 数据获取函数必须支持 client 参数
def get_{data_type}(
    period: str,
    client: Optional[TushareClient] = None,  # 关键参数
) -> pd.DataFrame:
    """Fetch financial data.
    
    Args:
        period: 报告期YYYYMMDD
        client: Optional TushareClient for shared rate limiting
    """
    client = client or TushareClient()  # 如果没有提供则创建新实例
    return client.query("{api_name}", period=period)

# 2. 同步类中传递共享 client
class XXXQuarterSync(QuarterBasedSync):
    def fetch_single_quarter(self, period: str) -> pd.DataFrame:
        # 使用 self.client基类创建的共享实例
        return get_{data_type}(period=period, client=self.client)

关键规则

  1. 数据获取函数:必须接受 client: Optional[TushareClient] = None 参数
  2. 同步类实现:必须在 fetch_single_quarter() 中传递 self.client
  3. 基类保证QuarterBasedSync 基类在 __init__ 中创建 self.client = TushareClient()
  4. 使用模式:数据获取函数使用 client = client or TushareClient() 模式

注意TushareClient 内部使用类级别共享限流器_shared_limiter),确保所有实例共享同一个令牌桶,但前提是必须复用同一个客户端实例。


类设计规范

类命名规范

必须遵循以下命名模式:

# 格式: {DataType}QuarterSync

# 正确示例
IncomeQuarterSync       # 利润表
BalanceQuarterSync      # 资产负债表
CashflowQuarterSync     # 现金流量表

# 错误示例
IncomeSync              # 缺少 Quarter不统一
SyncIncome              # 动词开头,不符合类命名规范

必须覆盖的类属性

子类必须定义以下类属性:

class IncomeQuarterSync(QuarterBasedSync):
    """利润表季度同步实现。"""
    
    # 1. 表名(必须)
    table_name = "financial_income"
    
    # 2. API 接口名(必须)
    api_name = "income_vip"
    
    # 3. 目标报表类型(可选,默认 "1"
    TARGET_REPORT_TYPE = "1"
    
    # 4. 表结构定义(必须)
    TABLE_SCHEMA = {
        "ts_code": "VARCHAR(16) NOT NULL",
        "end_date": "DATE NOT NULL",
        "report_type": "INTEGER",
        # ... 其他字段
    }
    
    # 5. 索引定义(必须)
    TABLE_INDEXES = [
        ("idx_financial_income_ts_code", ["ts_code"]),
        ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
    ]

必须实现的抽象方法

子类必须实现以下抽象方法:

@abstractmethod
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
    """获取单季度的全部上市公司数据。
    
    Args:
        period: 报告期,季度最后一天日期(如 '20231231'
    
    Returns:
        包含该季度全部上市公司财务数据的 DataFrame
        
    注意:
        - 使用 VIP 接口(如 income_vip
        - 不要在此方法中过滤 report_type基类会统一处理
        - 返回的 DataFrame 必须包含 ts_code 和 end_date 列
    """
    params = {"period": period}
    return self.client.query(self.api_name, **params)

禁止的操作

子类禁止覆盖或修改以下方法:

# 基类核心方法,禁止覆盖
- sync_quarter()          # 单季度同步流程
- sync_range()            # 范围同步
- sync_incremental()      # 增量同步
- sync_full()             # 全量同步
- delete_stock_quarter_data()  # 删除数据
- compare_and_find_differences()  # 差异检测
- ensure_table_exists()   # 建表逻辑

同步策略规范

增量同步策略

规范: 财务数据同步必须每次都执行,不存在"跳过"的情况。

原因: 财务数据可能会被修正,即使本地已有数据,也需要重新对比更新。

流程:

def sync_incremental(self, dry_run: bool = False) -> List[Dict]:
    """增量同步流程。
    
    注意:财务数据必须每次都进行对比更新,因为数据可能被修正。
    """
    # 1. 确保表存在(首次同步时自动建表)
    self.ensure_table_exists()
    
    # 2. 获取本地最新季度
    latest_quarter = self._get_latest_quarter()
    
    # 3. 获取当前季度
    current_quarter = self.get_current_quarter()
    
    # 4. 确定同步范围(不检查是否需要同步,直接执行)
    # 注意:即使 latest_quarter >= current_quarter也要执行
    start_quarter = self.get_prev_quarter(latest_quarter)
    
    # 5. 执行同步
    return self.sync_range(start_quarter, current_quarter, dry_run)

全量同步策略

规范: 全量同步从默认起始日期2018Q1同步到当前季度。

流程:

def sync_full(self, dry_run: bool = False) -> List[Dict]:
    """全量同步流程。"""
    # 1. 创建表结构(如不存在)
    self.ensure_table_exists()
    
    # 2. 清空表(可选,根据需求决定)
    # self.storage.clear_table(self.table_name)
    
    # 3. 获取同步范围
    start_quarter = self.DEFAULT_START_DATE  # "20180331"
    end_quarter = self.get_current_quarter()
    
    # 4. 执行同步
    return self.sync_range(start_quarter, end_quarter, dry_run)

单季度同步策略

规范: 单季度同步采用"先删除后插入"策略,并优化首次同步场景。

流程:

def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
    """单季度同步流程(核心)。"""
    # 1. 获取远程数据
    remote_df = self.fetch_single_quarter(period)
    
    # 2. 根据 TARGET_REPORT_TYPE 过滤报表类型
    if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
        remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
    
    remote_total = len(remote_df)
    
    # 3. 检查本地是否有该季度数据(首次同步优化)
    local_counts = self.get_local_data_count_by_stock(period)
    is_first_sync_for_period = len(local_counts) == 0
    
    if is_first_sync_for_period:
        # 首次同步:直接插入所有数据,跳过差异检测
        print(f"[{self.__class__.__name__}] First sync for quarter {period}, inserting all data directly")
        if not dry_run:
            self.storage.queue_save(self.table_name, remote_df, use_upsert=False)
            self.storage.flush()
        return {...}
    
    # 4. 非首次同步:对比找出差异股票
    diff_df, stats_df = self.compare_and_find_differences(remote_df, period)
    
    # 5. 执行同步(先删除后插入)
    if not dry_run and not diff_df.empty:
        diff_stocks = list(diff_df['ts_code'].unique())
        
        # 5.1 删除差异股票的旧数据
        self.delete_stock_quarter_data(period, diff_stocks)
        
        # 5.2 插入新数据(必须使用 use_upsert=False
        self.storage.queue_save(self.table_name, diff_df, use_upsert=False)
        self.storage.flush()
    
    return {...}

重要:

  1. 禁止使用 UPSERTINSERT OR REPLACE必须使用"先删除后插入"
  2. 首次同步优化:本地无数据时直接插入,不进行差异检测,提升性能
  3. 必须使用 use_upsert=False:调用 queue_save() 时必须显式指定,避免触发 UPSERT 错误

数据差异检测

检测逻辑

规范: 按股票级别对比本地与远程数据量,识别差异。

算法:

def compare_and_find_differences(
    self, 
    remote_df: pd.DataFrame, 
    period: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """对比远程数据与本地数据,找出差异。
    
    逻辑:
    1. 统计远程数据中每只股票的数据量
    2. 查询本地数据库中该季度每只股票的数据量
    3. 对比找出差异股票(新增或数据量不一致)
    4. 返回需要插入的差异数据
    
    注意:主键为 (ts_code, end_date, report_type),但差异检测按股票级别进行。
    如果某股票的记录总数不一致,则更新该股票的所有记录。
    """
    # 1. 统计远程数据中每只股票的数据量
    remote_counts = remote_df.groupby('ts_code').size().to_dict()
    
    # 2. 获取本地数据量(按股票汇总)
    local_counts = self.get_local_data_count_by_stock(period)
    
    # 3. 对比找出差异
    diff_stocks = []
    stats = []
    
    for ts_code, remote_count in remote_counts.items():
        local_count = local_counts.get(ts_code, 0)
        
        if local_count == 0:
            status = "new"           # 本地不存在
            diff_stocks.append(ts_code)
        elif local_count != remote_count:
            status = "modified"      # 数据量不一致,可能包含修正
            diff_stocks.append(ts_code)
        else:
            status = "same"          # 数据量一致
        
        stats.append({
            'ts_code': ts_code,
            'remote_count': remote_count,
            'local_count': local_count,
            'status': status
        })
    
    # 4. 提取差异数据
    diff_df = remote_df[remote_df['ts_code'].isin(diff_stocks)].copy()
    stats_df = pd.DataFrame(stats)
    
    return diff_df, stats_df

删除策略

规范: 删除指定季度和指定股票的所有数据。

def delete_stock_quarter_data(
    self,
    period: str,
    ts_codes: Optional[List[str]] = None
) -> int:
    """删除指定季度和股票的数据。
    
    Args:
        period: 季度字符串 (YYYYMMDD)
        ts_codes: 股票代码列表None 表示删除该季度所有数据
    
    Returns:
        删除的记录数
    """
    if ts_codes:
        # 删除指定股票的数据
        placeholders = ', '.join(['?' for _ in ts_codes])
        query = f'''
            DELETE FROM "{self.table_name}" 
            WHERE end_date = ? AND ts_code IN ({placeholders})
        '''
        result = storage._connection.execute(query, [period] + ts_codes)
    else:
        # 删除整个季度的数据
        query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?'
        result = storage._connection.execute(query, [period])
    
    return result.rowcount

删除计数处理

注意: DuckDB 的 DELETE 操作 rowcount 属性可能返回 -1(表示未知数量),需要特殊处理。

改进方案:

def delete_stock_quarter_data(self, period: str, ts_codes: Optional[List[str]] = None) -> int:
    """删除指定季度和股票的数据。"""
    storage = Storage()
    
    try:
        # 将 YYYYMMDD 转换为 YYYY-MM-DD 格式DuckDB DATE 类型要求)
        period_formatted = f"{period[:4]}-{period[4:6]}-{period[6:]}"
        
        if ts_codes:
            # 删除指定股票的数据
            placeholders = ', '.join(['?' for _ in ts_codes])
            query = f'''
                DELETE FROM "{self.table_name}" 
                WHERE end_date = ? AND ts_code IN ({placeholders})
            '''
            storage._connection.execute(query, [period_formatted] + ts_codes)
            # DuckDB rowcount 返回 -1使用传入的股票数量作为估算
            return len(ts_codes)
        else:
            # 删除整个季度的数据
            query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?'
            storage._connection.execute(query, [period_formatted])
            return -1  # 标记为未知
    except Exception as e:
        print(f"[{self.__class__.__name__}] Error deleting data: {e}")
        return 0

日志输出改进:

# 改进后的日志输出
if not dry_run and not diff_df.empty:
    deleted_stocks_count = len(diff_stocks)
    self.delete_stock_quarter_data(period, diff_stocks)
    deleted_count = len(diff_df)
    print(f"[{self.__class__.__name__}] Deleted {deleted_stocks_count} stocks' old records (approx {deleted_count} rows)")

输出示例:

[IncomeQuarterSync] Deleted 100 stocks' old records (approx 500 rows)

日期格式转换

DuckDB DATE 类型要求

DuckDB 的 DATE 类型要求格式为 YYYY-MM-DD,而 Tushare API 返回的日期格式为 YYYYMMDD(字符串)。必须在 SQL 查询前进行转换。

转换方法

def _format_period_for_sql(self, period: str) -> str:
    """将 YYYYMMDD 格式转换为 YYYY-MM-DD 格式。
    
    Args:
        period: YYYYMMDD 格式的日期字符串
        
    Returns:
        YYYY-MM-DD 格式的日期字符串
    """
    return f"{period[:4]}-{period[4:6]}-{period[6:]}"

# 使用示例
period = "20240331"
period_sql = self._format_period_for_sql(period)  # "2024-03-31"

query = f'SELECT * FROM "{self.table_name}" WHERE end_date = ?'
result = storage._connection.execute(query, [period_sql])

需要转换的位置

以下方法中涉及 SQL 查询的 period 参数时必须进行转换:

  1. get_local_data_count_by_stock() - 查询本地数据计数
  2. get_local_records_by_key() - 按主键查询本地记录
  3. delete_stock_quarter_data() - 删除季度数据

错误示例

如果不进行转换,会报以下错误:

Conversion Error: invalid date field format: "20250331", 
expected format is (YYYY-MM-DD)

存储层配置

禁用 UPSERT

财务数据表没有主键约束,必须在调用存储层方法时禁用 UPSERT。

ThreadSafeStorage 配置

class ThreadSafeStorage:
    """线程安全的 DuckDB 写入包装器。"""
    
    def queue_save(self, name: str, data: pd.DataFrame, use_upsert: bool = True):
        """将数据放入写入队列。
        
        Args:
            name: 表名
            data: DataFrame 数据
            use_upsert: 若为 True 使用 INSERT OR REPLACE若为 False 使用普通 INSERT
        """
        if not data.empty:
            self._pending_writes.append((name, data, use_upsert))

财务数据同步时的调用

# 正确:禁用 UPSERT
self.storage.queue_save(self.table_name, diff_df, use_upsert=False)

# 错误:使用默认 UPSERT会导致 Binder Error
self.storage.queue_save(self.table_name, diff_df)  # 默认 use_upsert=True

错误信息

如果错误地使用 UPSERT

Binder Error: There are no UNIQUE/PRIMARY KEY constraints that refer 
to this table, specify ON CONFLICT columns manually

表结构设计

必备字段

财务数据表必须包含以下字段:

TABLE_SCHEMA = {
    "ts_code": "VARCHAR(16) NOT NULL",     # 股票代码
    "end_date": "DATE NOT NULL",           # 报告期(季度最后一天)
    "report_type": "INTEGER",              # 报表类型
    "ann_date": "DATE",                    # 公告日期(可选)
    # ... 其他业务字段
}

字段命名规范

遵循 Tushare API 返回的字段名,保持与原 API 一致。

正确示例:

"basic_eps": "DOUBLE",        # 基本每股收益
"total_revenue": "DOUBLE",    # 营业总收入
"operate_profit": "DOUBLE",   # 营业利润

错误示例:

"basicEPS": "DOUBLE",         # 驼峰命名,不符合
"basic_eps_value": "DOUBLE",  # 添加多余后缀
"eps_basic": "DOUBLE",        # 词序颠倒

数据类型规范

Tushare 类型 DuckDB 类型
str VARCHAR(n)
float DOUBLE
int INTEGER
date DATE

示例:

TABLE_SCHEMA = {
    "ts_code": "VARCHAR(16) NOT NULL",
    "ann_date": "DATE",
    "report_type": "INTEGER",
    "basic_eps": "DOUBLE",
}

索引设计规范

禁止唯一索引

严格禁止创建主键和唯一索引:

# 禁止创建主键
PRIMARY_KEY = ("ts_code", "end_date", "report_type")  # 错误!

# 禁止创建唯一索引
("idx_unique", ["ts_code", "end_date"], True)  # 错误!
CREATE UNIQUE INDEX ...  # 错误!

原因: 财务数据可能发生多次修正,同一支股票在同一季度可能有多个版本(不同的 ann_date),设置唯一约束会导致插入失败。

推荐索引

必须创建以下索引:

TABLE_INDEXES = [
    # 1. 股票代码索引(单字段查询)
    ("idx_financial_income_ts_code", ["ts_code"]),
    
    # 2. 报告期索引(时间范围查询)
    ("idx_financial_income_end_date", ["end_date"]),
    
    # 3. 复合索引(股票+报告期+报表类型,最常用)
    ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]

索引命名规范

索引名必须遵循以下格式:

# 格式: idx_{table_name}_{fields_description}

# 正确示例
"idx_financial_income_ts_code"
"idx_financial_income_end_date"
"idx_financial_income_ts_period"

# 错误示例
"ts_code_idx"           # 缺少表名前缀
"income_ts"             # 表名缩写不明确
"index_1"               # 无意义名称

报表类型过滤

默认行为

规范: 默认只同步合并报表(report_type='1')。

class QuarterBasedSync(ABC):
    # 目标报表类型(子类可覆盖)
    # 默认只同步合并报表report_type='1'
    # 设为 None 则同步所有报表类型
    TARGET_REPORT_TYPE: Optional[str] = "1"

覆盖方式

子类可以通过覆盖类属性来修改默认行为:

class IncomeQuarterSync(QuarterBasedSync):
    """利润表同步 - 只同步合并报表。"""
    TARGET_REPORT_TYPE = "1"  # 明确指定

class BalanceQuarterSync(QuarterBasedSync):
    """资产负债表同步 - 同步所有报表类型。"""
    TARGET_REPORT_TYPE = None  # 不过滤

过滤逻辑

过滤逻辑在基类中统一处理:

def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
    # 1. 获取远程数据
    remote_df = self.fetch_single_quarter(period)
    
    # 2. 根据 TARGET_REPORT_TYPE 过滤
    if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
        remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
    
    # ... 后续处理

报表类型说明

根据 Tushare 文档:

代码 类型 说明
1 合并报表 上市公司最新报表(默认)
2 单季合并 单一季度的合并报表
3 调整单季合并表 调整后的单季合并报表
4 调整合并报表 本年度公布上年同期的财务报表数据
5 调整前合并报表 数据发生变更,将原数据进行保留
6 母公司报表 该公司母公司的财务报表数据
... ... ...

代码示例

完整实现示例:利润表接口

"""利润表数据接口 (VIP 版本)

使用 Tushare VIP 接口 (income_vip) 获取利润表数据。
按季度同步,一次请求获取一个季度的全部上市公司数据。

使用方式:
    from src.data.api_wrappers.financial_data.api_income import (
        IncomeQuarterSync, 
        sync_income,
        preview_income_sync
    )
    
    # 方式1: 使用类
    syncer = IncomeQuarterSync()
    syncer.sync_incremental()  # 增量同步
    syncer.sync_full()         # 全量同步
    
    # 方式2: 使用便捷函数
    sync_income()              # 增量同步
    sync_income(force_full=True)  # 全量同步
"""

from typing import Optional
import pandas as pd

from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
    QuarterBasedSync,
    sync_financial_data,
    preview_financial_sync
)


class IncomeQuarterSync(QuarterBasedSync):
    """利润表季度同步实现。
    
    使用 income_vip 接口按季度获取全部上市公司利润表数据。
    
    表结构: financial_income
    注意: 不设置主键和唯一索引,支持财务数据多次修正
    """
    
    table_name = "financial_income"
    api_name = "income_vip"
    
    # 只同步合并报表
    TARGET_REPORT_TYPE = "1"
    
    # 表结构定义
    TABLE_SCHEMA = {
        "ts_code": "VARCHAR(16) NOT NULL",
        "ann_date": "DATE",
        "f_ann_date": "DATE",
        "end_date": "DATE NOT NULL",
        "report_type": "INTEGER",
        "comp_type": "INTEGER",
        "basic_eps": "DOUBLE",
        "diluted_eps": "DOUBLE",
        "total_revenue": "DOUBLE",
        "revenue": "DOUBLE",
        # ... 其他字段
    }
    
    # 普通索引(不要创建唯一索引)
    TABLE_INDEXES = [
        ("idx_financial_income_ts_code", ["ts_code"]),
        ("idx_financial_income_end_date", ["end_date"]),
        ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
    ]
    
    def fetch_single_quarter(self, period: str) -> pd.DataFrame:
        """获取单季度的全部上市公司利润表数据。
        
        Args:
            period: 报告期,季度最后一天日期(如 '20231231'
        
        Returns:
            包含该季度全部上市公司利润表数据的 DataFrame
        """
        params = {"period": period}
        return self.client.query(self.api_name, **params)


# =============================================================================
# 便捷函数
# =============================================================================


def sync_income(
    force_full: bool = False,
    dry_run: bool = False,
) -> list:
    """同步利润表数据(便捷函数)。
    
    Args:
        force_full: 若为 True强制全量同步
        dry_run: 若为 True仅预览不写入
    
    Returns:
        同步结果列表
    """
    return sync_financial_data(IncomeQuarterSync, force_full, dry_run)


def preview_income_sync() -> dict:
    """预览利润表同步信息。
    
    Returns:
        预览信息字典
    """
    return preview_financial_sync(IncomeQuarterSync)


def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame:
    """获取利润表数据(原始接口,单季度)。
    
    用于直接获取某个季度的数据,不进行同步管理。
    
    Args:
        period: 报告期,季度最后一天日期(如 '20231231'
        fields: 指定返回字段,默认返回全部字段
    
    Returns:
        包含利润表数据的 DataFrame
    """
    client = TushareClient()
    
    if fields is None:
        fields = "ts_code,ann_date,end_date,report_type,basic_eps,..."
    
    return client.query("income_vip", period=period, fields=fields)

完整实现示例:资产负债表接口

"""资产负债表数据接口 (VIP 版本)

使用 Tushare VIP 接口 (balancesheet_vip) 获取资产负债表数据。
按季度同步,一次请求获取一个季度的全部上市公司数据。

使用方式:
    from src.data.api_wrappers.financial_data.api_balance import (
        BalanceQuarterSync,
        sync_balance,
        preview_balance_sync
    )

    # 方式1: 使用类
    syncer = BalanceQuarterSync()
    syncer.sync_incremental()  # 增量同步
    syncer.sync_full()         # 全量同步

    # 方式2: 使用便捷函数
    sync_balance()             # 增量同步
    sync_balance(force_full=True)  # 全量同步
"""

from typing import Any, override
import pandas as pd

from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
    QuarterBasedSync,
    sync_financial_data,
    preview_financial_sync,
)


class BalanceQuarterSync(QuarterBasedSync):
    """资产负债表季度同步实现。

    使用 balancesheet_vip 接口按季度获取全部上市公司资产负债表数据。

    表结构: financial_balance
    注意: 不设置主键和唯一索引,支持财务数据多次修正
    """

    table_name: str = "financial_balance"
    api_name: str = "balancesheet_vip"

    # 只同步合并报表
    TARGET_REPORT_TYPE: str | None = "1"

    # 表结构定义 - 完整的资产负债表字段
    TABLE_SCHEMA: dict[str, str] = {
        "ts_code": "VARCHAR(16) NOT NULL",
        "ann_date": "DATE",
        "f_ann_date": "DATE",
        "end_date": "DATE NOT NULL",
        "report_type": "INTEGER",
        "comp_type": "INTEGER",
        "end_type": "VARCHAR(10)",
        "total_share": "DOUBLE",
        "cap_rese": "DOUBLE",
        "undistr_porfit": "DOUBLE",
        "surplus_rese": "DOUBLE",
        "special_rese": "DOUBLE",
        "money_cap": "DOUBLE",
        "trad_asset": "DOUBLE",
        "notes_receiv": "DOUBLE",
        "accounts_receiv": "DOUBLE",
        # ... 其他150+个字段(完整字段列表见 api_balance.py
    }

    # 普通索引(不要创建唯一索引)
    TABLE_INDEXES: list[tuple[str, list[str]]] = [
        ("idx_financial_balance_ts_code", ["ts_code"]),
        ("idx_financial_balance_end_date", ["end_date"]),
        ("idx_financial_balance_ts_period", ["ts_code", "end_date", "report_type"]),
    ]

    def __init__(self):
        """初始化资产负债表同步器。"""
        super().__init__()
        self._fields: str | None = None  # 默认返回全部字段

    @override
    def fetch_single_quarter(self, period: str) -> pd.DataFrame:
        """获取单季度的全部上市公司资产负债表数据。

        Args:
            period: 报告期,季度最后一天日期(如 '20231231'

        Returns:
            包含该季度全部上市公司资产负债表数据的 DataFrame
        """
        params = {"period": period}

        if self._fields:
            params["fields"] = self._fields

        return self.client.query(self.api_name, **params)


# =============================================================================
# 便捷函数
# =============================================================================


def sync_balance(
    force_full: bool = False,
    dry_run: bool = False,
) -> list[dict[str, Any]]:
    """同步资产负债表数据(便捷函数)。

    Args:
        force_full: 若为 True强制全量同步
        dry_run: 若为 True仅预览不写入

    Returns:
        同步结果列表
    """
    return sync_financial_data(BalanceQuarterSync, force_full, dry_run)


def preview_balance_sync() -> dict[str, Any]:
    """预览资产负债表同步信息。

    Returns:
        预览信息字典
    """
    return preview_financial_sync(BalanceQuarterSync)


def get_balance(period: str, fields: str | None = None) -> pd.DataFrame:
    """获取资产负债表数据(原始接口,单季度)。

    Args:
        period: 报告期,季度最后一天日期(如 '20231231'
        fields: 指定返回字段,默认返回全部字段

    Returns:
        包含资产负债表数据的 DataFrame
    """
    client = TushareClient()

    if fields is None:
        fields = (
            "ts_code,ann_date,f_ann_date,end_date,report_type,comp_type,end_type,"
            "total_share,cap_rese,undistr_porfit,surplus_rese,special_rese,"
            "money_cap,trad_asset,notes_receiv,accounts_receiv,..."
        )

    return client.query("balancesheet_vip", period=period, fields=fields)

常见问题

Q1: 为什么不设置主键和唯一索引?

A: 财务数据可能发生多次修正。例如:

第一次发布600000.SH, 20240331, report_type='1', ann_date='20240428'
第二次修正600000.SH, 20240331, report_type='1', ann_date='20240515'

如果设置了唯一约束(如 PRIMARY KEY (ts_code, end_date, report_type)),第二次插入会失败。因此采用"先删除后插入"策略,不设置唯一约束。

Q2: 为什么增量同步不跳过已同步的季度?

A: 财务数据与日频数据不同,可能会被修正。即使本地已有某季度的数据,也需要重新获取远程数据进行对比,确保数据完整性。

Q3: 为什么要获取前一季度?

A: 财务数据修正可能发生在发布后的一段时间内。获取前一季度可以捕获上一季度可能发生的修正数据。

示例:

当前日期: 2024-04-15
当前季度: 2024Q1 (20240331)
本地最新: 2023Q4 (20231231)

同步范围: 2023Q3 (20230930) -> 2024Q1 (20240331)
          (包含前一季度以确保数据完整性)

Q4: 如何支持其他报表类型?

A: 覆盖 TARGET_REPORT_TYPE 类属性:

class IncomeAllReportSync(QuarterBasedSync):
    """同步所有报表类型。"""
    TARGET_REPORT_TYPE = None  # 不过滤

或者同步特定类型:

class IncomeParentReportSync(QuarterBasedSync):
    """只同步母公司报表。"""
    TARGET_REPORT_TYPE = "6"  # 母公司报表

Q5: 如何处理字段变更?

A: 如果 Tushare API 字段发生变更:

  1. 更新 TABLE_SCHEMA 添加新字段
  2. 使用 ALTER TABLE 添加新列(对已存在的数据)
  3. 更新 fetch_single_quarter() 中的字段列表(如果使用了 fields 参数)

注意: 不要删除已有字段,保持向后兼容。

Q6: 如何调试同步问题?

A: 使用 dry_run=True 进行预览:

from src.data.api_wrappers.financial_data.api_income import sync_income

# 预览同步,不写入数据
result = sync_income(dry_run=True)
print(result)

查看日志输出,检查:

  • 远程数据量
  • 本地数据量
  • 差异股票列表
  • 删除/插入记录数

Q7: 为什么要优化首次同步?

A: 首次同步某个季度时,本地没有数据,不需要进行差异检测和删除操作。直接插入所有数据可以提升性能。

优化逻辑

# 检查本地是否有该季度数据
local_counts = self.get_local_data_count_by_stock(period)
is_first_sync_for_period = len(local_counts) == 0

if is_first_sync_for_period:
    # 首次同步:直接插入,跳过差异检测
    print(f"First sync for quarter {period}, inserting all data directly")
    self.storage.queue_save(self.table_name, remote_df, use_upsert=False)
    self.storage.flush()
else:
    # 非首次同步:进行差异检测
    diff_df, stats_df = self.compare_and_find_differences(remote_df, period)
    # ... 删除旧数据并插入新数据

输出对比

首次同步:

[IncomeQuarterSync] Syncing quarter 20240331...
[IncomeQuarterSync] Fetched 5300 records from API
[IncomeQuarterSync] First sync for quarter 20240331, inserting all data directly
[IncomeQuarterSync] Inserted 5300 new records

非首次同步:

[IncomeQuarterSync] Syncing quarter 20240331...
[IncomeQuarterSync] Fetched 5300 records from API
[IncomeQuarterSync] Comparison result:
  - Stocks with differences: 100
  - Unchanged stocks: 5200
[IncomeQuarterSync] Deleted 100 stocks' old records (approx 500 rows)
[IncomeQuarterSync] Inserted 500 new records

Q8: 为什么会报日期格式错误?

A: DuckDB 的 DATE 类型要求格式为 YYYY-MM-DD,而系统中使用的日期格式为 YYYYMMDD(字符串)。在 SQL 查询前必须进行转换。

错误示例

# 错误:直接传入 YYYYMMDD 格式
query = 'SELECT * FROM table WHERE end_date = ?'
result = storage.execute(query, ["20240331"])
# 错误Conversion Error: invalid date field format: "20240331"

正确示例

# 正确:转换为 YYYY-MM-DD 格式
period_formatted = f"{period[:4]}-{period[4:6]}-{period[6:]}"
query = 'SELECT * FROM table WHERE end_date = ?'
result = storage.execute(query, [period_formatted])

需要转换的方法

  • get_local_data_count_by_stock()
  • get_local_records_by_key()
  • delete_stock_quarter_data()

Q9: 为什么会报 UPSERT 错误?

A: 财务数据表没有主键约束,不能使用 INSERT OR REPLACEUPSERT。必须使用普通 INSERT,并通过"先删除后插入"策略确保数据一致性。

错误信息

Binder Error: There are no UNIQUE/PRIMARY KEY constraints that refer
to this table, specify ON CONFLICT columns manually

正确做法

# 1. 调用 storage.save() 时指定 use_upsert=False
storage.save(table_name, data, use_upsert=False)

# 2. 调用 queue_save() 时指定 use_upsert=False
self.storage.queue_save(self.table_name, diff_df, use_upsert=False)

# 3. 在删除旧数据后插入新数据
self.delete_stock_quarter_data(period, diff_stocks)
self.storage.queue_save(self.table_name, diff_df, use_upsert=False)
self.storage.flush()

附录

相关文档

相关代码

  • src/data/api_wrappers/base_financial_sync.py - QuarterBasedSync 基类
  • src/data/api_wrappers/financial_data/api_income.py - 利润表示例实现
  • src/data/api_wrappers/financial_data/api_balance.py - 资产负债表实现
  • src/data/api_wrappers/financial_data/api_cashflow.py - 现金流量表实现
  • src/data/api_wrappers/financial_data/api_financial_sync.py - 调度中心

变更历史

日期 版本 变更内容
2026-03-26 v1.4 添加速率限制规范:
- 强调多线程场景下 client 参数传递
- 添加实际案例分析
- 说明 TushareClient 共享限流器机制
2026-03-08 v1.3 现金流量表接口实现:
- 完成 api_cashflow.py 封装
- 添加 95 个现金流量表完整字段
- 更新调度中心注册
- 更新文档标记现金流为已实现
2026-03-08 v1.2 资产负债表接口实现:
- 完成 api_balance.py 封装
- 添加 157 个资产负债表完整字段
- 更新调度中心注册
- 更新文档中的资产负债表示例为完整实现
2026-03-08 v1.1 完善实际编码细节:
- 添加首次同步优化说明
- 添加日期格式转换规范
- 添加存储层 UPSERT 禁用说明
- 添加删除计数处理说明
- 扩充常见问题Q7-Q9
2026-03-07 v1.0 初始版本,规范财务数据 API 封装要求

注意: 本文档为强制性规范,所有财务数据 API 封装必须遵循。如有特殊情况需要例外,需经过架构评审。