- 添加财务数据 API 封装规范文档 (FINANCIAL_API_SPEC.md) 包含架构设计原则、类设计规范、同步策略、数据差异检测等 - 添加 n_income 因子生命周期分析文档 详细追踪因子从定义到训练的全流程 - 添加财务数据同步模块重构计划文档 明确 QuarterBasedSync 基类设计、重构任务清单 这些文档为后续财务数据同步模块重构提供完整的设计依据和实施方案
52 KiB
财务数据同步模块重构计划
目标: 重构财务数据同步模块,将同步逻辑从调度中心分离到对应的 API 文件中,建立统一的季度同步基类,实现数据差异检测机制。
架构: 新增 QuarterBasedSync 基类专门处理按季度同步的财务数据,各财务接口(income/balance/cashflow)实现具体的同步子类,调度中心仅负责协调各同步任务的执行顺序。
影响范围:
- 新建:
src/data/api_wrappers/base_financial_sync.py(QuarterBasedSync 基类) - 重构:
src/data/api_wrappers/financial_data/api_income.py(实现 IncomeQuarterSync) - 简化:
src/data/api_wrappers/financial_data/api_financial_sync.py(仅保留调度) - 新增:
src/data/api_wrappers/financial_data/api_balance.py(预留资产负债表接口) - 新增:
src/data/api_wrappers/financial_data/api_cashflow.py(预留现金流量表接口)
重构背景
当前问题
- 职责混淆:
api_financial_sync.py包含完整的同步逻辑(696行),违反了"调度中心只包含调度"的设计原则 - 缺乏统一基类: 财务数据同步未继承任何公共基类,代码风格与 daily/pro_bar 等模块不一致
- 缺少差异检测: 增量同步时未实现本地 vs 远程数据对比,可能导致财务修正数据遗漏
- 扩展性差: 新增资产负债表、现金流量表时需要重复编写相似的同步逻辑
重构目标
- 建立 QuarterBasedSync 基类: 专门处理按季度同步的财务数据,支持增量/全量同步
- 实现数据差异检测: 按股票+季度对比本地与远程数据量,识别差异并补充
- 采用先删除后插入策略: 数据不一致时,先删除旧数据再插入新数据,确保数据一致性
- 移除"跳过同步"逻辑: 财务数据必须每次都进行对比更新,不存在"不需要同步"的情况
- 支持报表类型过滤: 通过
TARGET_REPORT_TYPE类属性灵活配置同步的报表类型(默认合并报表) - 不设置唯一约束: 不创建主键和唯一索引,因为财务数据可能发生多次修正(同一股票同一季度多版本)
- 分离调度与实现: 调度中心只负责任务协调,具体同步逻辑下沉到各 API 文件
- 统一代码风格: 与
StockBasedSync/DateBasedSync保持一致的结构和命名规范
任务清单
- Task 1: 创建
base_financial_sync.py- QuarterBasedSync 基类(含自动建表逻辑) - Task 2: 重构
api_income.py- 实现 IncomeQuarterSync 类 - Task 3: 重构
api_financial_sync.py- 仅保留调度逻辑 - Task 4: 验证测试 - 确保重构后同步功能正常
Task 1: 创建 QuarterBasedSync 基类
文件: src/data/api_wrappers/base_financial_sync.py (新建)
目标: 创建专门用于财务数据季度同步的抽象基类,提供通用的季度计算、数据差异检测、增量/全量同步能力,以及首次同步时的自动建表逻辑。
核心功能:
- 季度计算工具方法(当前季度、前一季度、下一季度)
- 数据差异检测(本地 vs 远程数据对比)
- 增量同步策略(获取当前季度+前一季度)
- 表结构和索引管理
- 首次同步自动建表: 当表不存在时自动创建表结构和索引
代码实现:
"""财务数据同步基础抽象模块。
提供专门用于按季度同步财务数据的基类 QuarterBasedSync。
财务数据特点:
- 按季度发布(period: 20231231, 20230930, 20230630, 20230331)
- 使用 VIP 接口一次性获取某季度的全部上市公司数据
- 数据可能会修正,增量同步需获取当前季度+前一季度
- 主键为 (ts_code, end_date)
使用方式:
class IncomeQuarterSync(QuarterBasedSync):
table_name = "financial_income"
api_name = "income_vip"
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
# 实现单季度数据获取
...
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, List, Tuple, Set
from datetime import datetime
import pandas as pd
from tqdm import tqdm
from src.data.client import TushareClient
from src.data.storage import ThreadSafeStorage, Storage
from src.data.utils import get_today_date, get_quarters_in_range, DEFAULT_START_DATE
class QuarterBasedSync(ABC):
"""财务数据季度同步抽象基类。
专门处理按季度同步的财务数据(利润表、资产负债表、现金流量表)。
财务数据同步特点:
1. 按季度获取:使用 VIP 接口一次性获取某季度全部上市公司数据
2. 数据可修正:同一季度数据可能被更新,增量同步需获取当前季度+前一季度
3. 差异检测:需对比本地与远程数据量,识别缺失或变更的记录
4. 主键:(ts_code, end_date)
子类必须实现:
- table_name: 类属性,目标表名
- api_name: 类属性,Tushare API 接口名
- fetch_single_quarter(period) -> pd.DataFrame: 获取单季度数据
- TABLE_SCHEMA: 类属性,表结构定义
Attributes:
table_name: 目标表名(子类必须覆盖)
api_name: Tushare API 接口名(子类必须覆盖)
DEFAULT_START_DATE: 默认起始日期(2018Q1)
TABLE_SCHEMA: 表结构定义 {列名: SQL类型}
TABLE_INDEXES: 索引定义 [(索引名, [列名列表]), ...]
TABLE_INDEXES: 索引定义 [(索引名, [列名列表]), ...]
注意:不要创建唯一索引,因为财务数据可能发生多次修正
"""
table_name: str = "" # 子类必须覆盖
api_name: str = "" # 子类必须覆盖
DEFAULT_START_DATE = "20180331" # 2018年Q1
# 目标报表类型(子类可覆盖)
# 默认只同步合并报表(report_type='1')
# 设为 None 则同步所有报表类型
TARGET_REPORT_TYPE: Optional[str] = "1"
# 表结构定义(子类必须覆盖)
TABLE_SCHEMA: Dict[str, str] = {}
# 索引定义(子类可覆盖)
# 格式: [("index_name", ["col1", "col2"]), ...]
# 注意:不要创建唯一索引,因为财务数据可能发生多次修正
TABLE_INDEXES: List[Tuple[str, List[str]]] = []
def __init__(self):
"""初始化季度同步管理器。"""
self.storage = ThreadSafeStorage()
self.client = TushareClient()
self._cached_data: Optional[pd.DataFrame] = None
# ======================================================================
# 抽象方法 - 子类必须实现
# ======================================================================
@abstractmethod
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231')
Returns:
包含该季度全部上市公司财务数据的 DataFrame
"""
pass
# ======================================================================
# 季度计算工具方法
# ======================================================================
def get_current_quarter(self) -> str:
"""获取当前季度(考虑是否到季末)。
如果当前日期未到季度最后一天,则返回前一季度。
这样可以避免获取尚无数据的未来季度。
Returns:
当前季度字符串 (YYYYMMDD),如 '20231231'
"""
today = get_today_date()
year = int(today[:4])
month = int(today[4:6])
# 确定当前季度
if month <= 3:
current_q = f"{year}0331"
elif month <= 6:
current_q = f"{year}0630"
elif month <= 9:
current_q = f"{year}0930"
else:
current_q = f"{year}1231"
# 如果今天还没到季末,返回前一季度
if today < current_q:
return self.get_prev_quarter(current_q)
return current_q
def get_prev_quarter(self, quarter: str) -> str:
"""获取前一季度。
Args:
quarter: 季度字符串 (YYYYMMDD),如 '20231231'
Returns:
前一季度字符串 (YYYYMMDD)
"""
year = int(quarter[:4])
month_day = quarter[4:]
if month_day == "0331":
return f"{year - 1}1231"
elif month_day == "0630":
return f"{year}0331"
elif month_day == "0930":
return f"{year}0630"
else: # "1231"
return f"{year}0930"
def get_next_quarter(self, quarter: str) -> str:
"""获取下一季度。
Args:
quarter: 季度字符串 (YYYYMMDD)
Returns:
下一季度字符串 (YYYYMMDD)
"""
year = int(quarter[:4])
month_day = quarter[4:]
if month_day == "0331":
return f"{year}0630"
elif month_day == "0630":
return f"{year}0930"
elif month_day == "0930":
return f"{year}1231"
else: # "1231"
return f"{year + 1}0331"
# ======================================================================
# 表结构管理
# ======================================================================
def ensure_table_exists(self) -> None:
"""确保表结构存在,如不存在则创建表和索引。
注意:不设置主键和唯一索引,因为财务数据可能发生多次修正,
同一支股票在同一季度可能有多个版本(不同的ann_date)。
DuckDB 会自动创建 rowid 作为主键。
"""
storage = Storage()
if storage.exists(self.table_name):
return
if not self.TABLE_SCHEMA:
print(f"[{self.__class__.__name__}] TABLE_SCHEMA not defined, skipping table creation")
return
# 构建列定义(不设置主键)
columns_def = []
for col_name, col_type in self.TABLE_SCHEMA.items():
columns_def.append(f'"{col_name}" {col_type}')
columns_sql = ", ".join(columns_def)
create_sql = f'CREATE TABLE IF NOT EXISTS "{self.table_name}" ({columns_sql})'
try:
storage._connection.execute(create_sql)
print(f"[{self.__class__.__name__}] Created table '{self.table_name}'")
except Exception as e:
print(f"[{self.__class__.__name__}] Error creating table: {e}")
raise
# 创建普通索引(不要创建唯一索引)
for idx_name, idx_cols in self.TABLE_INDEXES:
try:
idx_cols_sql = ", ".join(f'"{col}"' for col in idx_cols)
storage._connection.execute(
f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{self.table_name}"({idx_cols_sql})'
)
print(f"[{self.__class__.__name__}] Created index '{idx_name}' on {idx_cols}")
except Exception as e:
print(f"[{self.__class__.__name__}] Error creating index {idx_name}: {e}")
# ======================================================================
# 数据差异检测(核心逻辑)
# ======================================================================
def get_local_data_count_by_stock(
self, period: str
) -> Dict[str, int]:
"""获取本地数据库中某季度的各股票数据量。
Args:
period: 季度字符串 (YYYYMMDD)
Returns:
字典 {ts_code: 记录数}
"""
storage = Storage()
try:
query = f'''
SELECT ts_code, COUNT(*) as cnt
FROM "{self.table_name}"
WHERE end_date = ?
GROUP BY ts_code
'''
result = storage._connection.execute(query, [period]).fetchdf()
if result.empty:
return {}
return dict(zip(result['ts_code'], result['cnt']))
except Exception as e:
print(f"[{self.__class__.__name__}] Error querying local data count: {e}")
return {}
def get_local_records_by_key(
self, period: str
) -> Dict[tuple, int]:
"""获取本地数据库中某季度的记录(按主键分组计数)。
用于更精确的差异检测,按 (ts_code, end_date, report_type) 分组。
Args:
period: 季度字符串 (YYYYMMDD)
Returns:
字典 {(ts_code, end_date, report_type): 记录数}
"""
storage = Storage()
try:
query = f'''
SELECT ts_code, end_date, report_type, COUNT(*) as cnt
FROM "{self.table_name}"
WHERE end_date = ?
GROUP BY ts_code, end_date, report_type
'''
result = storage._connection.execute(query, [period]).fetchdf()
if result.empty:
return {}
return {
(row['ts_code'], row['end_date'], row['report_type']): row['cnt']
for _, row in result.iterrows()
}
except Exception as e:
print(f"[{self.__class__.__name__}] Error querying local records: {e}")
return {}
def compare_and_find_differences(
self,
remote_df: pd.DataFrame,
period: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""对比远程数据与本地数据,找出差异。
逻辑:
1. 统计远程数据中每只股票的数据量
2. 查询本地数据库中该季度每只股票的数据量
3. 对比找出:
- 本地缺失的股票(新增)
- 数据量不一致的股票(有更新,可能包含财务修正)
4. 返回需要插入的差异数据
注意:主键为 (ts_code, end_date, report_type),因此同一支股票在同一季度
可能有多个 report_type 的记录。差异检测按股票级别进行,如果该股票的
记录总数不一致,则更新该股票的所有记录。
Args:
remote_df: 从 API 获取的远程数据
period: 季度字符串
Returns:
(差异数据DataFrame, 统计信息DataFrame)
统计信息包含:ts_code, remote_count, local_count, status
"""
if remote_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 1. 统计远程数据中每只股票的数据量
remote_counts = remote_df.groupby('ts_code').size().to_dict()
# 2. 获取本地数据量(按股票汇总)
local_counts = self.get_local_data_count_by_stock(period)
# 3. 对比找出差异
diff_stocks = [] # 需要更新的股票列表
stats = []
for ts_code, remote_count in remote_counts.items():
local_count = local_counts.get(ts_code, 0)
if local_count == 0:
status = "new" # 本地不存在,全部插入
diff_stocks.append(ts_code)
elif local_count != remote_count:
status = "modified" # 数据量不一致,可能包含财务修正
diff_stocks.append(ts_code)
else:
status = "same" # 数据量一致,跳过
stats.append({
'ts_code': ts_code,
'remote_count': remote_count,
'local_count': local_count,
'status': status
})
# 4. 提取差异数据
if diff_stocks:
diff_df = remote_df[remote_df['ts_code'].isin(diff_stocks)].copy()
else:
diff_df = pd.DataFrame()
stats_df = pd.DataFrame(stats)
return diff_df, stats_df
# ======================================================================
# 同步核心逻辑
# ======================================================================
def delete_stock_quarter_data(
self,
period: str,
ts_codes: Optional[List[str]] = None
) -> int:
"""删除指定季度和股票的数据。
在同步前删除旧数据,然后插入新数据(先删除后插入策略)。
Args:
period: 季度字符串 (YYYYMMDD)
ts_codes: 股票代码列表,None 表示删除该季度所有数据
Returns:
删除的记录数
"""
storage = Storage()
try:
if ts_codes:
# 删除指定股票的数据
placeholders = ', '.join(['?' for _ in ts_codes])
query = f'''
DELETE FROM "{self.table_name}"
WHERE end_date = ? AND ts_code IN ({placeholders})
'''
result = storage._connection.execute(query, [period] + ts_codes)
else:
# 删除整个季度的数据
query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?'
result = storage._connection.execute(query, [period])
deleted_count = result.rowcount if hasattr(result, 'rowcount') else 0
return deleted_count
except Exception as e:
print(f"[{self.__class__.__name__}] Error deleting data: {e}")
return 0
def sync_quarter(
self,
period: str,
dry_run: bool = False
) -> Dict:
"""同步单个季度的数据。
流程:
1. 获取远程数据
2. 根据 TARGET_REPORT_TYPE 过滤报表类型
3. 对比本地数据,找出差异股票
4. 删除差异股票的旧数据
5. 插入新数据(先删除后插入)
注意:财务数据同步必须取当前季度和前一季度进行对比更新,
不存在"不需要同步"的情况。
Args:
period: 季度字符串 (YYYYMMDD)
dry_run: 是否为预览模式
Returns:
同步结果字典 {
'period': 季度,
'remote_total': 远程总记录数,
'diff_count': 差异记录数,
'deleted_count': 删除的记录数,
'inserted_count': 插入的记录数,
'dry_run': 是否预览模式
}
"""
print(f"[{self.__class__.__name__}] Syncing quarter {period}...")
# 1. 获取远程数据
remote_df = self.fetch_single_quarter(period)
if remote_df.empty:
print(f"[{self.__class__.__name__}] No data for quarter {period}")
return {
'period': period,
'remote_total': 0,
'diff_count': 0,
'deleted_count': 0,
'inserted_count': 0,
'dry_run': dry_run
}
# 2. 根据 TARGET_REPORT_TYPE 过滤报表类型
if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
remote_total = len(remote_df)
print(f"[{self.__class__.__name__}] Fetched {remote_total} records from API")
# 3. 对比找出差异股票
diff_df, stats_df = self.compare_and_find_differences(remote_df, period)
diff_stocks = list(diff_df['ts_code'].unique()) if not diff_df.empty else []
unchanged_count = len(stats_df[stats_df['status'] == 'same']) if not stats_df.empty else 0
print(f"[{self.__class__.__name__}] Comparison result:")
print(f" - Stocks with differences: {len(diff_stocks)}")
print(f" - Unchanged stocks: {unchanged_count}")
# 4. 执行同步(先删除后插入)
deleted_count = 0
inserted_count = 0
if not dry_run and not diff_df.empty:
# 4.1 删除差异股票的旧数据
deleted_count = self.delete_stock_quarter_data(period, diff_stocks)
print(f"[{self.__class__.__name__}] Deleted {deleted_count} old records")
# 4.2 插入新数据
self.storage.queue_save(self.table_name, diff_df)
self.storage.flush()
inserted_count = len(diff_df)
print(f"[{self.__class__.__name__}] Inserted {inserted_count} new records")
return {
'period': period,
'remote_total': remote_total,
'diff_count': len(diff_df),
'deleted_count': deleted_count,
'inserted_count': inserted_count,
'dry_run': dry_run
}
def sync_range(
self,
start_quarter: str,
end_quarter: str,
dry_run: bool = False
) -> List[Dict]:
"""同步指定季度范围的数据。
注意:增量同步会自动包含前一季度以确保数据完整性。
Args:
start_quarter: 起始季度 (YYYYMMDD)
end_quarter: 结束季度 (YYYYMMDD)
dry_run: 是否为预览模式
Returns:
各季度同步结果列表
"""
quarters = get_quarters_in_range(start_quarter, end_quarter)
if not quarters:
print(f"[{self.__class__.__name__}] No quarters to sync")
return []
print(f"[{self.__class__.__name__}] Syncing {len(quarters)} quarters: {quarters}")
results = []
for period in tqdm(quarters, desc=f"Syncing {self.table_name}"):
try:
result = self.sync_quarter(period, dry_run=dry_run)
results.append(result)
except Exception as e:
print(f"[{self.__class__.__name__}] Error syncing {period}: {e}")
results.append({
'period': period,
'error': str(e)
})
return results
def sync_incremental(
self,
dry_run: bool = False
) -> List[Dict]:
"""执行增量同步。
策略:
1. 确保表存在(首次同步时自动建表)
2. 获取表中最新季度
3. 计算当前季度(考虑是否到季末)
4. 确定同步范围:从最新季度到当前季度
5. **重要**:额外包含前一季度以确保数据完整性
注意:财务数据同步与日线数据不同,必须每次都获取数据进行对比
更新,不存在"不需要同步"的情况。因为财务数据可能会被修正。
Args:
dry_run: 是否为预览模式
Returns:
各季度同步结果列表
"""
print(f"\n{'='*60}")
print(f"[{self.__class__.__name__}] Incremental Sync")
print(f"{'='*60}")
# 0. 确保表存在(首次同步时自动建表)
self.ensure_table_exists()
# 1. 获取最新季度
storage = Storage()
try:
result = storage._connection.execute(
f'SELECT MAX(end_date) FROM "{self.table_name}"'
).fetchone()
latest_quarter = result[0] if result and result[0] else None
if hasattr(latest_quarter, 'strftime'):
latest_quarter = latest_quarter.strftime('%Y%m%d')
except Exception as e:
print(f"[{self.__class__.__name__}] Error getting latest quarter: {e}")
latest_quarter = None
# 2. 获取当前季度
current_quarter = self.get_current_quarter()
if latest_quarter is None:
# 无本地数据,执行全量同步
print(f"[{self.__class__.__name__}] No local data, performing full sync")
return self.sync_range(self.DEFAULT_START_DATE, current_quarter, dry_run)
print(f"[{self.__class__.__name__}] Latest local quarter: {latest_quarter}")
print(f"[{self.__class__.__name__}] Current quarter: {current_quarter}")
# 3. 确定同步范围
# 财务数据必须每次都进行对比更新,不存在"跳过"的情况
# 同步范围:从最新季度到当前季度(包含前一季度以确保数据完整性)
start_quarter = latest_quarter
if start_quarter > current_quarter:
# 如果本地数据比当前季度还新,仍然需要同步(可能包含修正数据)
start_quarter = current_quarter
else:
# 正常情况:包含前一季度
start_quarter = self.get_prev_quarter(latest_quarter)
if start_quarter < self.DEFAULT_START_DATE:
start_quarter = self.DEFAULT_START_DATE
print(f"[{self.__class__.__name__}] Sync range: {start_quarter} -> {current_quarter}")
print(f" (includes previous quarter for data integrity)")
return self.sync_range(start_quarter, current_quarter, dry_run)
def sync_full(
self,
dry_run: bool = False
) -> List[Dict]:
"""执行全量同步。
Args:
dry_run: 是否为预览模式
Returns:
各季度同步结果列表
"""
print(f"\n{'='*60}")
print(f"[{self.__class__.__name__}] Full Sync")
print(f"{'='*60}")
# 确保表存在
self.ensure_table_exists()
current_quarter = self.get_current_quarter()
return self.sync_range(self.DEFAULT_START_DATE, current_quarter, dry_run)
# ======================================================================
# 预览模式
# ======================================================================
def preview_sync(self) -> Dict:
"""预览同步信息(不实际同步)。
注意:财务数据同步必须每次都进行,因为数据可能会被修正。
预览显示将要同步的季度范围。
Returns:
预览信息字典
"""
print(f"\n{'='*60}")
print(f"[{self.__class__.__name__}] Preview Mode")
print(f"{'='*60}")
# 获取最新季度
storage = Storage()
try:
result = storage._connection.execute(
f'SELECT MAX(end_date) FROM "{self.table_name}"'
).fetchone()
latest_quarter = result[0] if result and result[0] else None
if hasattr(latest_quarter, 'strftime'):
latest_quarter = latest_quarter.strftime('%Y%m%d')
except Exception:
latest_quarter = None
current_quarter = self.get_current_quarter()
if latest_quarter is None:
# 无本地数据,需要全量同步
start_quarter = self.DEFAULT_START_DATE
message = "No local data, full sync required"
else:
# 财务数据必须每次都进行对比更新
# 同步范围:从最新季度到当前季度(包含前一季度)
start_quarter = self.get_prev_quarter(latest_quarter)
if start_quarter < self.DEFAULT_START_DATE:
start_quarter = self.DEFAULT_START_DATE
message = f"Incremental sync from {start_quarter} to {current_quarter}"
preview = {
'table_name': self.table_name,
'api_name': self.api_name,
'latest_quarter': latest_quarter,
'current_quarter': current_quarter,
'start_quarter': start_quarter,
'end_quarter': current_quarter,
'message': message
}
print(f"Table: {self.table_name}")
print(f"API: {self.api_name}")
print(f"Latest local: {latest_quarter}")
print(f"Current quarter: {current_quarter}")
print(f"Sync range: {start_quarter} -> {current_quarter}")
print(f"Message: {message}")
print(f"{'='*60}")
return preview
# ======================================================================
# 便捷函数
# ======================================================================
def sync_financial_data(
syncer_class: type,
force_full: bool = False,
dry_run: bool = False
) -> List[Dict]:
"""通用的财务数据同步便捷函数。
Args:
syncer_class: QuarterBasedSync 的子类
force_full: 是否强制全量同步
dry_run: 是否为预览模式
Returns:
同步结果列表
"""
syncer = syncer_class()
if force_full:
return syncer.sync_full(dry_run)
else:
return syncer.sync_incremental(dry_run)
def preview_financial_sync(syncer_class: type) -> Dict:
"""预览财务数据同步信息。
Args:
syncer_class: QuarterBasedSync 的子类
Returns:
预览信息字典
"""
syncer = syncer_class()
return syncer.preview_sync()
验证步骤:
- 检查文件是否正确创建
- 确认所有抽象方法已定义
- 验证类型提示完整
- 确认与现有
base_sync.py风格一致
Task 2: 重构 api_income.py
文件: src/data/api_wrappers/financial_data/api_income.py (重写)
目标: 重写为基于 QuarterBasedSync 的实现,移除旧的 IncomeSync 类。
代码实现:
"""利润表数据接口 (VIP 版本)
使用 Tushare VIP 接口 (income_vip) 获取利润表数据。
按季度同步,一次请求获取一个季度的全部上市公司数据。
接口说明:
- income_vip: 获取某一季度全部上市公司利润表数据
- 需要 5000 积分才能调用
- period 参数为报告期(季度最后一天,如 20231231)
使用方式:
# 同步利润表数据
from src.data.api_wrappers.financial_data.api_income import IncomeQuarterSync, sync_income
# 方式1: 使用类
syncer = IncomeQuarterSync()
syncer.sync_incremental() # 增量同步
syncer.sync_full() # 全量同步
# 方式2: 使用便捷函数
sync_income() # 增量同步
sync_income(force_full=True) # 全量同步
"""
from typing import Optional
import pandas as pd
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import QuarterBasedSync, sync_financial_data, preview_financial_sync
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。
使用 income_vip 接口按季度获取全部上市公司利润表数据。
表结构: financial_income
主键: (ts_code, end_date)
"""
table_name = "financial_income"
api_name = "income_vip"
# 目标报表类型:默认只同步合并报表
TARGET_REPORT_TYPE = "1"
# 表结构定义
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"f_ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
"comp_type": "INTEGER",
"end_type": "VARCHAR(10)",
"basic_eps": "DOUBLE",
"diluted_eps": "DOUBLE",
"total_revenue": "DOUBLE",
"revenue": "DOUBLE",
"int_income": "DOUBLE",
"prem_earned": "DOUBLE",
"comm_income": "DOUBLE",
"n_commis_income": "DOUBLE",
"n_oth_income": "DOUBLE",
"n_oth_b_income": "DOUBLE",
"prem_income": "DOUBLE",
"out_prem": "DOUBLE",
"une_prem_reser": "DOUBLE",
"reins_income": "DOUBLE",
"n_sec_tb_income": "DOUBLE",
"n_sec_uw_income": "DOUBLE",
"n_asset_mg_income": "DOUBLE",
"oth_b_income": "DOUBLE",
"fv_value_chg_gain": "DOUBLE",
"invest_income": "DOUBLE",
"ass_invest_income": "DOUBLE",
"forex_gain": "DOUBLE",
"total_cogs": "DOUBLE",
"oper_cost": "DOUBLE",
"int_exp": "DOUBLE",
"comm_exp": "DOUBLE",
"biz_tax_surchg": "DOUBLE",
"sell_exp": "DOUBLE",
"admin_exp": "DOUBLE",
"fin_exp": "DOUBLE",
"assets_impair_loss": "DOUBLE",
"prem_refund": "DOUBLE",
"compens_payout": "DOUBLE",
"reser_insur_liab": "DOUBLE",
"div_payt": "DOUBLE",
"reins_exp": "DOUBLE",
"oper_exp": "DOUBLE",
"compens_payout_refu": "DOUBLE",
"insur_reser_refu": "DOUBLE",
"reins_cost_refund": "DOUBLE",
"other_bus_cost": "DOUBLE",
"operate_profit": "DOUBLE",
"non_oper_income": "DOUBLE",
"non_oper_exp": "DOUBLE",
"nca_disploss": "DOUBLE",
"total_profit": "DOUBLE",
"income_tax": "DOUBLE",
"n_income": "DOUBLE",
"n_income_attr_p": "DOUBLE",
"minority_gain": "DOUBLE",
"oth_compr_income": "DOUBLE",
"t_compr_income": "DOUBLE",
"compr_inc_attr_p": "DOUBLE",
"compr_inc_attr_m_s": "DOUBLE",
"ebit": "DOUBLE",
"ebitda": "DOUBLE",
"insurance_exp": "DOUBLE",
"undist_profit": "DOUBLE",
"distable_profit": "DOUBLE",
"rd_exp": "DOUBLE",
"fin_exp_int_exp": "DOUBLE",
"fin_exp_int_inc": "DOUBLE",
"transfer_surplus_rese": "DOUBLE",
"transfer_housing_imprest": "DOUBLE",
"transfer_oth": "DOUBLE",
"adj_lossgain": "DOUBLE",
"withdra_legal_surplus": "DOUBLE",
"withdra_legal_pubfund": "DOUBLE",
"withdra_biz_devfund": "DOUBLE",
"withdra_rese_fund": "DOUBLE",
"withdra_oth_ersu": "DOUBLE",
"workers_welfare": "DOUBLE",
"distr_profit_shrhder": "DOUBLE",
"prfshare_payable_dvd": "DOUBLE",
"comshare_payable_dvd": "DOUBLE",
"capit_comstock_div": "DOUBLE",
"net_after_nr_lp_correct": "DOUBLE",
"credit_impa_loss": "DOUBLE",
"net_expo_hedging_benefits": "DOUBLE",
"oth_impair_loss_assets": "DOUBLE",
"total_opcost": "DOUBLE",
"amodcost_fin_assets": "DOUBLE",
"oth_income": "DOUBLE",
"asset_disp_income": "DOUBLE",
"continued_net_profit": "DOUBLE",
"end_net_profit": "DOUBLE",
"update_flag": "VARCHAR(1)",
}
# 索引定义(不要创建唯一索引)
# 注意:财务数据可能发生多次修正,不设置主键和唯一索引
TABLE_INDEXES = [
("idx_financial_income_ts_code", ["ts_code"]),
("idx_financial_income_end_date", ["end_date"]),
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
def __init__(self):
"""初始化利润表同步器。"""
super().__init__()
self._fields = None # 默认返回全部字段
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司利润表数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231')
Returns:
包含该季度全部上市公司利润表数据的 DataFrame
"""
params = {"period": period}
if self._fields:
params["fields"] = self._fields
return self.client.query(self.api_name, **params)
# =============================================================================
# 便捷函数
# =============================================================================
def sync_income(
force_full: bool = False,
dry_run: bool = False,
) -> list:
"""同步利润表数据(便捷函数)。
Args:
force_full: 若为 True,强制全量同步
dry_run: 若为 True,仅预览不写入
Returns:
同步结果列表
Example:
>>> # 增量同步
>>> sync_income()
>>>
>>> # 全量同步
>>> sync_income(force_full=True)
>>>
>>> # 预览
>>> sync_income(dry_run=True)
"""
return sync_financial_data(IncomeQuarterSync, force_full, dry_run)
def preview_income_sync() -> dict:
"""预览利润表同步信息。
Returns:
预览信息字典
"""
return preview_financial_sync(IncomeQuarterSync)
def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取利润表数据(原始接口,单季度)。
用于直接获取某个季度的数据,不进行同步管理。
Args:
period: 报告期,季度最后一天日期(如 '20231231')
fields: 指定返回字段,默认返回全部字段
Returns:
包含利润表数据的 DataFrame
"""
client = TushareClient()
if fields is None:
fields = (
"ts_code,ann_date,f_ann_date,end_date,report_type,comp_type,end_type,"
"basic_eps,diluted_eps,total_revenue,revenue,int_income,prem_earned,"
"comm_income,n_commis_income,n_oth_income,n_oth_b_income,prem_income,"
"out_prem,une_prem_reser,reins_income,n_sec_tb_income,n_sec_uw_income,"
"n_asset_mg_income,oth_b_income,fv_value_chg_gain,invest_income,"
"ass_invest_income,forex_gain,total_cogs,oper_cost,int_exp,comm_exp,"
"biz_tax_surchg,sell_exp,admin_exp,fin_exp,assets_impair_loss,prem_refund,"
"compens_payout,reser_insur_liab,div_payt,reins_exp,oper_exp,"
"compens_payout_refu,insur_reser_refu,reins_cost_refund,other_bus_cost,"
"operate_profit,non_oper_income,non_oper_exp,nca_disploss,total_profit,"
"income_tax,n_income,n_income_attr_p,minority_gain,oth_compr_income,"
"t_compr_income,compr_inc_attr_p,compr_inc_attr_m_s,ebit,ebitda,"
"insurance_exp,undist_profit,distable_profit,rd_exp,fin_exp_int_exp,"
"fin_exp_int_inc,transfer_surplus_rese,transfer_housing_imprest,"
"transfer_oth,adj_lossgain,withdra_legal_surplus,withdra_legal_pubfund,"
"withdra_biz_devfund,withdra_rese_fund,withdra_oth_ersu,workers_welfare,"
"distr_profit_shrhder,prfshare_payable_dvd,comshare_payable_dvd,"
"capit_comstock_div,net_after_nr_lp_correct,credit_impa_loss,"
"net_expo_hedging_benefits,oth_impair_loss_assets,total_opcost,"
"amodcost_fin_assets,oth_income,asset_disp_income,continued_net_profit,"
"end_net_profit,update_flag"
)
return client.query("income_vip", period=period, fields=fields)
变更说明:
- 新增
IncomeQuarterSync类继承QuarterBasedSync - 定义
TARGET_REPORT_TYPE = "1"只同步合并报表 - 定义表结构
TABLE_SCHEMA、普通索引TABLE_INDEXES支持自动建表 - 保留
get_income()函数作为原始数据获取接口 - 移除旧的
IncomeSync类 - 添加便捷函数
sync_income()和preview_income_sync()
重要说明:
- 不设置主键和唯一索引:财务数据可能发生多次修正,同一支股票同一季度可能有多个版本(不同 ann_date)。使用"先删除后插入"策略,避免主键冲突
- 数据更新采用"先删除后插入"策略,确保数据一致性
TARGET_REPORT_TYPE可覆盖以同步其他报表类型
验证步骤:
- 确认文件可以被正确导入
- 检查类型提示完整
- 验证表结构与旧版本一致
Task 3: 重构 api_financial_sync.py
文件: src/data/api_wrappers/financial_data/api_financial_sync.py (重写)
目标: 简化调度中心,只保留调度逻辑,移除具体同步实现。
代码实现:
"""财务数据统一同步调度中心。
该模块作为财务数据同步的调度中心,只负责任务协调和调度。
具体的同步逻辑已下沉到各 API 文件中。
支持的财务数据类型:
- income: 利润表 (已实现)
- balance: 资产负债表 (预留)
- cashflow: 现金流量表 (预留)
使用方式:
# 同步所有财务数据(增量)
from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
sync_financial()
# 全量同步
sync_financial(force_full=True)
# 只同步利润表
sync_financial(data_types=["income"])
# 预览同步
from src.data.api_wrappers.financial_data.api_financial_sync import preview_sync
preview = preview_sync()
"""
from typing import List, Dict, Optional
from src.data.api_wrappers.financial_data.api_income import (
IncomeQuarterSync,
sync_income,
preview_income_sync,
)
# 支持的财务数据类型映射
FINANCIAL_SYNCERS = {
"income": {
"syncer_class": IncomeQuarterSync,
"sync_func": sync_income,
"preview_func": preview_income_sync,
"display_name": "利润表",
},
# 预留:资产负债表
# "balance": {
# "syncer_class": BalanceQuarterSync,
# "sync_func": sync_balance,
# "preview_func": preview_balance_sync,
# "display_name": "资产负债表",
# },
# 预留:现金流量表
# "cashflow": {
# "syncer_class": CashflowQuarterSync,
# "sync_func": sync_cashflow,
# "preview_func": preview_cashflow_sync,
# "display_name": "现金流量表",
# },
}
def sync_financial(
data_types: Optional[List[str]] = None,
force_full: bool = False,
dry_run: bool = False,
) -> Dict[str, List]:
"""同步财务数据(调度函数)。
根据指定的数据类型,调度对应的同步器执行同步。
Args:
data_types: 数据类型列表,如 ["income", "balance"]
None 表示同步所有类型
force_full: 若为 True,强制全量同步
dry_run: 若为 True,仅预览不写入
Returns:
各类型同步结果字典 {数据类型: 同步结果列表}
Example:
>>> # 同步所有财务数据
>>> sync_financial()
>>>
>>> # 只同步利润表
>>> sync_financial(data_types=["income"])
>>>
>>> # 全量同步
>>> sync_financial(force_full=True)
"""
if data_types is None:
data_types = list(FINANCIAL_SYNCERS.keys())
results = {}
print("\n" + "=" * 60)
print("[Financial Sync] 财务数据同步调度中心")
print("=" * 60)
print(f"数据类型: {', '.join(data_types)}")
print(f"同步模式: {'全量' if force_full else '增量'}")
print(f"写入模式: {'预览' if dry_run else '实际写入'}")
print("=" * 60)
for data_type in data_types:
if data_type not in FINANCIAL_SYNCERS:
print(f"[WARN] 未知的数据类型: {data_type}")
results[data_type] = {"error": f"Unknown data type: {data_type}"}
continue
config = FINANCIAL_SYNCERS[data_type]
sync_func = config["sync_func"]
display_name = config["display_name"]
print(f"\n[{display_name}] 开始同步...")
try:
result = sync_func(force_full=force_full, dry_run=dry_run)
results[data_type] = result
print(f"[{display_name}] 同步完成")
except Exception as e:
print(f"[ERROR] [{display_name}] 同步失败: {e}")
results[data_type] = {"error": str(e)}
# 打印汇总
print("\n" + "=" * 60)
print("[Financial Sync] 同步汇总")
print("=" * 60)
for data_type, result in results.items():
if "error" in result:
status = f"失败: {result['error']}"
elif isinstance(result, list):
total_records = sum(r.get('diff_count', 0) for r in result if isinstance(r, dict))
status = f"成功 ({len(result)} 个季度, {total_records} 条差异)"
else:
status = "完成"
display_name = FINANCIAL_SYNCERS.get(data_type, {}).get("display_name", data_type)
print(f" {display_name}: {status}")
print("=" * 60)
return results
def preview_sync(data_types: Optional[List[str]] = None) -> Dict[str, Dict]:
"""预览财务数据同步信息。
Args:
data_types: 数据类型列表,None 表示所有类型
Returns:
各类型预览信息字典
Example:
>>> preview = preview_sync()
>>> print(preview)
"""
if data_types is None:
data_types = list(FINANCIAL_SYNCERS.keys())
previews = {}
print("\n" + "=" * 60)
print("[Financial Sync] 同步预览")
print("=" * 60)
for data_type in data_types:
if data_type not in FINANCIAL_SYNCERS:
continue
preview_func = FINANCIAL_SYNCERS[data_type]["preview_func"]
previews[data_type] = preview_func()
return previews
def list_financial_types() -> List[Dict]:
"""列出所有支持的财务数据类型。
Returns:
数据类型信息列表
"""
return [
{
"name": name,
"display_name": config["display_name"],
}
for name, config in FINANCIAL_SYNCERS.items()
]
# 保持向后兼容的别名
sync_all_financial = sync_financial
if __name__ == "__main__":
import sys
print("=" * 60)
print("财务数据同步调度中心")
print("=" * 60)
print("\n支持的财务数据类型:")
print("-" * 60)
for info in list_financial_types():
print(f" - {info['name']}: {info['display_name']}")
print("-" * 60)
print("\n使用方式:")
print(" # 同步所有财务数据")
print(" sync_financial()")
print("")
print(" # 同步指定类型")
print(' sync_financial(data_types=["income"])')
print("")
print(" # 全量同步")
print(" sync_financial(force_full=True)")
print("")
print(" # 预览")
print(" preview_sync()")
print("=" * 60)
# 默认执行预览
if len(sys.argv) > 1 and sys.argv[1] == "--sync":
print("\n执行同步...")
force_full = "--full" in sys.argv
sync_financial(force_full=force_full)
else:
print("\n执行预览...")
preview_sync()
变更说明:
- 完全移除
FinancialSync类及其 600+ 行代码 - 引入
FINANCIAL_SYNCERS注册表模式 - 调度函数
sync_financial()和preview_sync()仅做任务分发 - 保持向后兼容(
sync_all_financial别名)
验证步骤:
- 确认旧接口
sync_financial()仍可正常工作 - 检查预览功能正常
- 验证导入路径兼容
Task 4: 验证测试
目标: 确保重构后的财务数据同步功能正常。
测试步骤
Step 1: 导入测试
# 验证所有模块可以正确导入
from src.data.api_wrappers.base_financial_sync import QuarterBasedSync
from src.data.api_wrappers.financial_data.api_income import (
IncomeQuarterSync,
sync_income,
preview_income_sync,
get_income
)
from src.data.api_wrappers.financial_data.api_financial_sync import (
sync_financial,
preview_sync,
list_financial_types
)
print("所有模块导入成功")
Step 2: 基础功能测试
# 测试 QuarterBasedSync 基类方法
syncer = IncomeQuarterSync()
# 测试季度计算
assert syncer.get_prev_quarter("20231231") == "20230930"
assert syncer.get_next_quarter("20231231") == "20240331"
assert syncer.get_prev_quarter("20240331") == "20231231"
print("季度计算测试通过")
Step 3: 预览功能测试
# 测试预览功能
preview = preview_income_sync()
print(f"Preview result: {preview}")
assert "table_name" in preview
assert "start_quarter" in preview
assert "end_quarter" in preview
assert preview["table_name"] == "financial_income"
print("预览功能测试通过")
Step 4: 调度中心测试
# 测试调度中心
result = preview_sync()
print(f"All previews: {result}")
types = list_financial_types()
print(f"Available types: {types}")
print("调度中心测试通过")
Step 5: 向后兼容测试
# 验证旧接口仍可工作
from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
# 只测试预览模式
try:
result = sync_financial(data_types=["income"], dry_run=True)
print("向后兼容测试通过")
except Exception as e:
print(f"向后兼容测试失败: {e}")
验证清单
- 所有模块可以正确导入
- 季度计算工具方法工作正常
- 预览功能返回正确格式(包含 start_quarter, end_quarter)
- 同步方法始终执行(不跳过)
- 数据更新采用"先删除后插入"策略
- TARGET_REPORT_TYPE 过滤功能正常
- 不设置主键和唯一索引(支持财务数据多次修正)
- 调度中心可以列出所有类型
- 旧接口保持向后兼容
- 代码风格与现有模块一致
提交建议
建议按以下顺序提交,每次提交一个完整功能:
-
Commit 1: Task 1 - 创建
base_financial_sync.pygit add src/data/api_wrappers/base_financial_sync.py git commit -m "feat: add QuarterBasedSync base class for financial data - Add abstract base class for quarter-based financial data sync - Implement data diff detection (local vs remote comparison) - Support incremental sync with previous quarter - Add automatic table creation on first sync - Add table schema and index management" -
Commit 2: Task 2 - 重构
api_income.pygit add src/data/api_wrappers/financial_data/api_income.py git commit -m "refactor: rewrite api_income.py with QuarterBasedSync - Replace old IncomeSync with IncomeQuarterSync - Inherit from QuarterBasedSync for consistent patterns - Add TABLE_SCHEMA and TABLE_INDEXES for auto table creation - No PRIMARY_KEY to support financial data corrections - Add convenient sync_income() and preview_income_sync() functions - Keep get_income() for raw data access" -
Commit 3: Task 3 - 简化
api_financial_sync.pygit add src/data/api_wrappers/financial_data/api_financial_sync.py git commit -m "refactor: simplify api_financial_sync.py to scheduler only - Remove FinancialSync class (600+ lines) - Add FINANCIAL_SYNCERS registry pattern - Keep sync_financial() and preview_sync() as dispatchers - Maintain backward compatibility"
总结
本次重构将实现以下改进:
- 架构清晰: 调度中心只负责调度,同步逻辑下沉到具体 API 文件
- 统一基类: 新增
QuarterBasedSync,与StockBasedSync/DateBasedSync保持一致风格 - 先删除后插入: 数据更新采用删除旧数据后插入新数据的策略,确保数据一致性
- 无跳过逻辑: 财务数据同步必须每次都执行,不存在"不需要同步"的情况
- 报表类型过滤: 通过
TARGET_REPORT_TYPE灵活配置同步的报表类型(默认合并报表) - 主键设计: 主键为
(ts_code, end_date, report_type),支持财务修正数据 - 易于扩展: 新增财务数据类型时只需继承
QuarterBasedSync并注册到调度中心
变更文件汇总:
- 新建:
src/data/api_wrappers/base_financial_sync.py(~680行) - 重构:
src/data/api_wrappers/financial_data/api_income.py(~160行) - 简化:
src/data/api_wrappers/financial_data/api_financial_sync.py(~150行)
重要变更说明:
- 同步策略: 财务数据同步与日线数据不同,必须每次都进行对比更新
- 数据更新: 采用"先删除后插入"策略,先删除旧数据再插入新数据,确保数据一致性
- 无唯一约束: 不设置主键和唯一索引,因为财务数据可能发生多次修正(同一股票同一季度多个版本)
- 报表过滤: 默认只同步
report_type='1'(合并报表),可通过TARGET_REPORT_TYPE覆盖