From c01bf76a3d830ac2caddbade7c3f63334046a88b Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Sat, 7 Mar 2026 22:14:04 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E6=B7=BB=E5=8A=A0=E8=B4=A2=E5=8A=A1?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5=E6=A8=A1=E5=9D=97=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E7=9B=B8=E5=85=B3=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加财务数据 API 封装规范文档 (FINANCIAL_API_SPEC.md) 包含架构设计原则、类设计规范、同步策略、数据差异检测等 - 添加 n_income 因子生命周期分析文档 详细追踪因子从定义到训练的全流程 - 添加财务数据同步模块重构计划文档 明确 QuarterBasedSync 基类设计、重构任务清单 这些文档为后续财务数据同步模块重构提供完整的设计依据和实施方案 --- docs/api/FINANCIAL_API_SPEC.md | 907 ++++++++++ docs/n_income_factor_lifecycle.md | 600 +++++++ .../2026-03-07-financial-sync-refactor.md | 1490 +++++++++++++++++ 3 files changed, 2997 insertions(+) create mode 100644 docs/api/FINANCIAL_API_SPEC.md create mode 100644 docs/n_income_factor_lifecycle.md create mode 100644 docs/plan/2026-03-07-financial-sync-refactor.md diff --git a/docs/api/FINANCIAL_API_SPEC.md b/docs/api/FINANCIAL_API_SPEC.md new file mode 100644 index 0000000..df2463a --- /dev/null +++ b/docs/api/FINANCIAL_API_SPEC.md @@ -0,0 +1,907 @@ +# 财务数据 API 封装规范 + +> **文档版本**: v1.0 +> **适用范围**: 所有财务数据 API(利润表、资产负债表、现金流量表等) +> **更新日期**: 2026-03-07 + +--- + +## 目录 + +1. [概述](#概述) +2. [架构设计原则](#架构设计原则) +3. [文件结构规范](#文件结构规范) +4. [类设计规范](#类设计规范) +5. [同步策略规范](#同步策略规范) +6. [数据差异检测](#数据差异检测) +7. [表结构设计](#表结构设计) +8. [索引设计规范](#索引设计规范) +9. [报表类型过滤](#报表类型过滤) +10. [代码示例](#代码示例) +11. [常见问题](#常见问题) + +--- + +## 概述 + +### 财务数据特点 + +财务数据与日频数据(日线、分钟线)有本质区别: + +| 特性 | 日频数据 | 财务数据 | +|------|----------|----------| +| 更新频率 | 每日更新 | 季度更新 | +| 获取方式 | 按股票循环获取 | VIP接口一次性获取全市场 | +| 数据修正 | 极少发生 | 经常发生(财报修正) | +| 数据量 | 大(5000+股票×250交易日) | 小(5000+股票×4季度) | +| 版本控制 | 无 | 多版本(report_type) | + +### 核心要求 + +1. **必须实现 `QuarterBasedSync` 基类**:所有财务数据同步必须继承此基类 +2. **不设置唯一约束**:不创建主键和唯一索引,支持数据多次修正 +3. **先删除后插入**:数据更新采用删除旧数据再插入新数据的策略 +4. **无跳过逻辑**:财务数据必须每次都进行对比更新 +5. **报表类型过滤**:默认只同步合并报表,支持灵活配置 + +--- + +## 架构设计原则 + +### 1. 职责分离 + +``` +调度中心 (api_financial_sync.py) + | + v +各 API 文件 (api_income.py, api_balance.py, api_cashflow.py) + | + v +基类 (base_financial_sync.py - QuarterBasedSync) + | + v +存储层 (storage.py - ThreadSafeStorage) +``` + +**规范**: +- 调度中心只负责任务协调,不包含具体同步逻辑 +- 各 API 文件实现具体的 `fetch_single_quarter()` 方法 +- 通用逻辑下沉到 `QuarterBasedSync` 基类 + +### 2. 统一继承 + +**必须**继承 `QuarterBasedSync` 基类: + +```python +from src.data.api_wrappers.base_financial_sync import QuarterBasedSync + +class IncomeQuarterSync(QuarterBasedSync): + """利润表季度同步实现。""" + pass +``` + +**禁止**在 API 文件中重复实现同步逻辑。 + +--- + +## 文件结构规范 + +### 文件位置 + +财务数据 API 文件必须位于: + +``` +src/data/api_wrappers/financial_data/ +├── __init__.py # 可选:导出公共接口 +├── api_income.py # 利润表接口(已实现) +├── api_balance.py # 资产负债表接口(预留) +├── api_cashflow.py # 现金流量表接口(预留) +└── api_financial_sync.py # 调度中心(只保留调度逻辑) +``` + +### 基类位置 + +``` +src/data/api_wrappers/ +├── base_sync.py # StockBasedSync, DateBasedSync +└── base_financial_sync.py # QuarterBasedSync(本规范核心) +``` + +### 文件内容结构 + +每个 API 文件必须包含以下部分(按顺序): + +```python +"""模块文档字符串。 + +包含:模块用途、使用方式、注意事项。 +""" + +# 1. 标准库导入 +from typing import Optional +import pandas as pd + +# 2. 第三方库导入 +# (无特殊要求) + +# 3. 本地模块导入 +from src.data.client import TushareClient +from src.data.api_wrappers.base_financial_sync import ( + QuarterBasedSync, + sync_financial_data, + preview_financial_sync +) + +# 4. 同步类实现 +class XXXQuarterSync(QuarterBasedSync): + """具体财务数据同步实现类。""" + pass + +# 5. 便捷函数 + +def sync_xxx(force_full: bool = False, dry_run: bool = False) -> list: + """同步数据便捷函数。""" + pass + +def preview_xxx_sync() -> dict: + """预览同步信息便捷函数。""" + pass + +# 6. 原始数据接口(可选,向后兼容) +def get_xxx(period: str, fields: Optional[str] = None) -> pd.DataFrame: + """获取原始数据接口。""" + pass +``` + +--- + +## 类设计规范 + +### 类命名规范 + +**必须**遵循以下命名模式: + +```python +# 格式: {DataType}QuarterSync + +# 正确示例 +IncomeQuarterSync # 利润表 +BalanceQuarterSync # 资产负债表 +CashflowQuarterSync # 现金流量表 + +# 错误示例 +IncomeSync # 缺少 Quarter,不统一 +SyncIncome # 动词开头,不符合类命名规范 +``` + +### 必须覆盖的类属性 + +子类**必须**定义以下类属性: + +```python +class IncomeQuarterSync(QuarterBasedSync): + """利润表季度同步实现。""" + + # 1. 表名(必须) + table_name = "financial_income" + + # 2. API 接口名(必须) + api_name = "income_vip" + + # 3. 目标报表类型(可选,默认 "1") + TARGET_REPORT_TYPE = "1" + + # 4. 表结构定义(必须) + TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "end_date": "DATE NOT NULL", + "report_type": "INTEGER", + # ... 其他字段 + } + + # 5. 索引定义(必须) + TABLE_INDEXES = [ + ("idx_financial_income_ts_code", ["ts_code"]), + ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]), + ] +``` + +### 必须实现的抽象方法 + +子类**必须**实现以下抽象方法: + +```python +@abstractmethod +def fetch_single_quarter(self, period: str) -> pd.DataFrame: + """获取单季度的全部上市公司数据。 + + Args: + period: 报告期,季度最后一天日期(如 '20231231') + + Returns: + 包含该季度全部上市公司财务数据的 DataFrame + + 注意: + - 使用 VIP 接口(如 income_vip) + - 不要在此方法中过滤 report_type,基类会统一处理 + - 返回的 DataFrame 必须包含 ts_code 和 end_date 列 + """ + params = {"period": period} + return self.client.query(self.api_name, **params) +``` + +### 禁止的操作 + +子类**禁止**覆盖或修改以下方法: + +```python +# 基类核心方法,禁止覆盖 +- sync_quarter() # 单季度同步流程 +- sync_range() # 范围同步 +- sync_incremental() # 增量同步 +- sync_full() # 全量同步 +- delete_stock_quarter_data() # 删除数据 +- compare_and_find_differences() # 差异检测 +- ensure_table_exists() # 建表逻辑 +``` + +--- + +## 同步策略规范 + +### 增量同步策略 + +**规范**: 财务数据同步**必须**每次都执行,不存在"跳过"的情况。 + +**原因**: 财务数据可能会被修正,即使本地已有数据,也需要重新对比更新。 + +**流程**: + +```python +def sync_incremental(self, dry_run: bool = False) -> List[Dict]: + """增量同步流程。 + + 注意:财务数据必须每次都进行对比更新,因为数据可能被修正。 + """ + # 1. 确保表存在(首次同步时自动建表) + self.ensure_table_exists() + + # 2. 获取本地最新季度 + latest_quarter = self._get_latest_quarter() + + # 3. 获取当前季度 + current_quarter = self.get_current_quarter() + + # 4. 确定同步范围(不检查是否需要同步,直接执行) + # 注意:即使 latest_quarter >= current_quarter,也要执行 + start_quarter = self.get_prev_quarter(latest_quarter) + + # 5. 执行同步 + return self.sync_range(start_quarter, current_quarter, dry_run) +``` + +### 全量同步策略 + +**规范**: 全量同步从默认起始日期(2018Q1)同步到当前季度。 + +**流程**: + +```python +def sync_full(self, dry_run: bool = False) -> List[Dict]: + """全量同步流程。""" + # 1. 创建表结构(如不存在) + self.ensure_table_exists() + + # 2. 清空表(可选,根据需求决定) + # self.storage.clear_table(self.table_name) + + # 3. 获取同步范围 + start_quarter = self.DEFAULT_START_DATE # "20180331" + end_quarter = self.get_current_quarter() + + # 4. 执行同步 + return self.sync_range(start_quarter, end_quarter, dry_run) +``` + +### 单季度同步策略 + +**规范**: 单季度同步采用"先删除后插入"策略。 + +**流程**: + +```python +def sync_quarter(self, period: str, dry_run: bool = False) -> Dict: + """单季度同步流程(核心)。""" + # 1. 获取远程数据 + remote_df = self.fetch_single_quarter(period) + + # 2. 根据 TARGET_REPORT_TYPE 过滤报表类型 + if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns: + remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE] + + # 3. 对比找出差异股票 + diff_df, stats_df = self.compare_and_find_differences(remote_df, period) + + # 4. 执行同步(先删除后插入) + if not dry_run and not diff_df.empty: + diff_stocks = list(diff_df['ts_code'].unique()) + + # 4.1 删除差异股票的旧数据 + self.delete_stock_quarter_data(period, diff_stocks) + + # 4.2 插入新数据 + self.storage.queue_save(self.table_name, diff_df) + self.storage.flush() + + return {...} +``` + +**重要**: 禁止使用 UPSERT(INSERT OR REPLACE),必须使用"先删除后插入"。 + +--- + +## 数据差异检测 + +### 检测逻辑 + +**规范**: 按股票级别对比本地与远程数据量,识别差异。 + +**算法**: + +```python +def compare_and_find_differences( + self, + remote_df: pd.DataFrame, + period: str +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """对比远程数据与本地数据,找出差异。 + + 逻辑: + 1. 统计远程数据中每只股票的数据量 + 2. 查询本地数据库中该季度每只股票的数据量 + 3. 对比找出差异股票(新增或数据量不一致) + 4. 返回需要插入的差异数据 + + 注意:主键为 (ts_code, end_date, report_type),但差异检测按股票级别进行。 + 如果某股票的记录总数不一致,则更新该股票的所有记录。 + """ + # 1. 统计远程数据中每只股票的数据量 + remote_counts = remote_df.groupby('ts_code').size().to_dict() + + # 2. 获取本地数据量(按股票汇总) + local_counts = self.get_local_data_count_by_stock(period) + + # 3. 对比找出差异 + diff_stocks = [] + stats = [] + + for ts_code, remote_count in remote_counts.items(): + local_count = local_counts.get(ts_code, 0) + + if local_count == 0: + status = "new" # 本地不存在 + diff_stocks.append(ts_code) + elif local_count != remote_count: + status = "modified" # 数据量不一致,可能包含修正 + diff_stocks.append(ts_code) + else: + status = "same" # 数据量一致 + + stats.append({ + 'ts_code': ts_code, + 'remote_count': remote_count, + 'local_count': local_count, + 'status': status + }) + + # 4. 提取差异数据 + diff_df = remote_df[remote_df['ts_code'].isin(diff_stocks)].copy() + stats_df = pd.DataFrame(stats) + + return diff_df, stats_df +``` + +### 删除策略 + +**规范**: 删除指定季度和指定股票的所有数据。 + +```python +def delete_stock_quarter_data( + self, + period: str, + ts_codes: Optional[List[str]] = None +) -> int: + """删除指定季度和股票的数据。 + + Args: + period: 季度字符串 (YYYYMMDD) + ts_codes: 股票代码列表,None 表示删除该季度所有数据 + + Returns: + 删除的记录数 + """ + if ts_codes: + # 删除指定股票的数据 + placeholders = ', '.join(['?' for _ in ts_codes]) + query = f''' + DELETE FROM "{self.table_name}" + WHERE end_date = ? AND ts_code IN ({placeholders}) + ''' + result = storage._connection.execute(query, [period] + ts_codes) + else: + # 删除整个季度的数据 + query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?' + result = storage._connection.execute(query, [period]) + + return result.rowcount +``` + +--- + +## 表结构设计 + +### 必备字段 + +财务数据表**必须**包含以下字段: + +```python +TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", # 股票代码 + "end_date": "DATE NOT NULL", # 报告期(季度最后一天) + "report_type": "INTEGER", # 报表类型 + "ann_date": "DATE", # 公告日期(可选) + # ... 其他业务字段 +} +``` + +### 字段命名规范 + +遵循 Tushare API 返回的字段名,保持与原 API 一致。 + +**正确示例**: +```python +"basic_eps": "DOUBLE", # 基本每股收益 +"total_revenue": "DOUBLE", # 营业总收入 +"operate_profit": "DOUBLE", # 营业利润 +``` + +**错误示例**: +```python +"basicEPS": "DOUBLE", # 驼峰命名,不符合 +"basic_eps_value": "DOUBLE", # 添加多余后缀 +"eps_basic": "DOUBLE", # 词序颠倒 +``` + +### 数据类型规范 + +| Tushare 类型 | DuckDB 类型 | +|--------------|-------------| +| str | VARCHAR(n) | +| float | DOUBLE | +| int | INTEGER | +| date | DATE | + +**示例**: +```python +TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "ann_date": "DATE", + "report_type": "INTEGER", + "basic_eps": "DOUBLE", +} +``` + +--- + +## 索引设计规范 + +### 禁止唯一索引 + +**严格禁止**创建主键和唯一索引: + +```python +# 禁止创建主键 +PRIMARY_KEY = ("ts_code", "end_date", "report_type") # 错误! + +# 禁止创建唯一索引 +("idx_unique", ["ts_code", "end_date"], True) # 错误! +CREATE UNIQUE INDEX ... # 错误! +``` + +**原因**: 财务数据可能发生多次修正,同一支股票在同一季度可能有多个版本(不同的 `ann_date`),设置唯一约束会导致插入失败。 + +### 推荐索引 + +**必须**创建以下索引: + +```python +TABLE_INDEXES = [ + # 1. 股票代码索引(单字段查询) + ("idx_financial_income_ts_code", ["ts_code"]), + + # 2. 报告期索引(时间范围查询) + ("idx_financial_income_end_date", ["end_date"]), + + # 3. 复合索引(股票+报告期+报表类型,最常用) + ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]), +] +``` + +### 索引命名规范 + +索引名**必须**遵循以下格式: + +```python +# 格式: idx_{table_name}_{fields_description} + +# 正确示例 +"idx_financial_income_ts_code" +"idx_financial_income_end_date" +"idx_financial_income_ts_period" + +# 错误示例 +"ts_code_idx" # 缺少表名前缀 +"income_ts" # 表名缩写不明确 +"index_1" # 无意义名称 +``` + +--- + +## 报表类型过滤 + +### 默认行为 + +**规范**: 默认只同步合并报表(`report_type='1'`)。 + +```python +class QuarterBasedSync(ABC): + # 目标报表类型(子类可覆盖) + # 默认只同步合并报表(report_type='1') + # 设为 None 则同步所有报表类型 + TARGET_REPORT_TYPE: Optional[str] = "1" +``` + +### 覆盖方式 + +子类可以通过覆盖类属性来修改默认行为: + +```python +class IncomeQuarterSync(QuarterBasedSync): + """利润表同步 - 只同步合并报表。""" + TARGET_REPORT_TYPE = "1" # 明确指定 + +class BalanceQuarterSync(QuarterBasedSync): + """资产负债表同步 - 同步所有报表类型。""" + TARGET_REPORT_TYPE = None # 不过滤 +``` + +### 过滤逻辑 + +过滤逻辑在基类中统一处理: + +```python +def sync_quarter(self, period: str, dry_run: bool = False) -> Dict: + # 1. 获取远程数据 + remote_df = self.fetch_single_quarter(period) + + # 2. 根据 TARGET_REPORT_TYPE 过滤 + if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns: + remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE] + + # ... 后续处理 +``` + +### 报表类型说明 + +根据 Tushare 文档: + +| 代码 | 类型 | 说明 | +|------|------|------| +| 1 | 合并报表 | 上市公司最新报表(默认)| +| 2 | 单季合并 | 单一季度的合并报表 | +| 3 | 调整单季合并表 | 调整后的单季合并报表 | +| 4 | 调整合并报表 | 本年度公布上年同期的财务报表数据 | +| 5 | 调整前合并报表 | 数据发生变更,将原数据进行保留 | +| 6 | 母公司报表 | 该公司母公司的财务报表数据 | +| ... | ... | ... | + +--- + +## 代码示例 + +### 完整实现示例:利润表接口 + +```python +"""利润表数据接口 (VIP 版本) + +使用 Tushare VIP 接口 (income_vip) 获取利润表数据。 +按季度同步,一次请求获取一个季度的全部上市公司数据。 + +使用方式: + from src.data.api_wrappers.financial_data.api_income import ( + IncomeQuarterSync, + sync_income, + preview_income_sync + ) + + # 方式1: 使用类 + syncer = IncomeQuarterSync() + syncer.sync_incremental() # 增量同步 + syncer.sync_full() # 全量同步 + + # 方式2: 使用便捷函数 + sync_income() # 增量同步 + sync_income(force_full=True) # 全量同步 +""" + +from typing import Optional +import pandas as pd + +from src.data.client import TushareClient +from src.data.api_wrappers.base_financial_sync import ( + QuarterBasedSync, + sync_financial_data, + preview_financial_sync +) + + +class IncomeQuarterSync(QuarterBasedSync): + """利润表季度同步实现。 + + 使用 income_vip 接口按季度获取全部上市公司利润表数据。 + + 表结构: financial_income + 注意: 不设置主键和唯一索引,支持财务数据多次修正 + """ + + table_name = "financial_income" + api_name = "income_vip" + + # 只同步合并报表 + TARGET_REPORT_TYPE = "1" + + # 表结构定义 + TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "ann_date": "DATE", + "f_ann_date": "DATE", + "end_date": "DATE NOT NULL", + "report_type": "INTEGER", + "comp_type": "INTEGER", + "basic_eps": "DOUBLE", + "diluted_eps": "DOUBLE", + "total_revenue": "DOUBLE", + "revenue": "DOUBLE", + # ... 其他字段 + } + + # 普通索引(不要创建唯一索引) + TABLE_INDEXES = [ + ("idx_financial_income_ts_code", ["ts_code"]), + ("idx_financial_income_end_date", ["end_date"]), + ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]), + ] + + def fetch_single_quarter(self, period: str) -> pd.DataFrame: + """获取单季度的全部上市公司利润表数据。 + + Args: + period: 报告期,季度最后一天日期(如 '20231231') + + Returns: + 包含该季度全部上市公司利润表数据的 DataFrame + """ + params = {"period": period} + return self.client.query(self.api_name, **params) + + +# ============================================================================= +# 便捷函数 +# ============================================================================= + + +def sync_income( + force_full: bool = False, + dry_run: bool = False, +) -> list: + """同步利润表数据(便捷函数)。 + + Args: + force_full: 若为 True,强制全量同步 + dry_run: 若为 True,仅预览不写入 + + Returns: + 同步结果列表 + """ + return sync_financial_data(IncomeQuarterSync, force_full, dry_run) + + +def preview_income_sync() -> dict: + """预览利润表同步信息。 + + Returns: + 预览信息字典 + """ + return preview_financial_sync(IncomeQuarterSync) + + +def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame: + """获取利润表数据(原始接口,单季度)。 + + 用于直接获取某个季度的数据,不进行同步管理。 + + Args: + period: 报告期,季度最后一天日期(如 '20231231') + fields: 指定返回字段,默认返回全部字段 + + Returns: + 包含利润表数据的 DataFrame + """ + client = TushareClient() + + if fields is None: + fields = "ts_code,ann_date,end_date,report_type,basic_eps,..." + + return client.query("income_vip", period=period, fields=fields) +``` + +### 预留接口示例 + +```python +"""资产负债表数据接口 (VIP 版本) - 预留 + +使用 Tushare VIP 接口 (balancesheet_vip) 获取资产负债表数据。 +""" + +from typing import Optional +import pandas as pd + +from src.data.api_wrappers.base_financial_sync import ( + QuarterBasedSync, + sync_financial_data, + preview_financial_sync +) + + +class BalanceQuarterSync(QuarterBasedSync): + """资产负债表季度同步实现(预留)。""" + + table_name = "financial_balance" + api_name = "balancesheet_vip" + TARGET_REPORT_TYPE = "1" + + TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "ann_date": "DATE", + "end_date": "DATE NOT NULL", + "report_type": "INTEGER", + # TODO: 补充完整字段定义 + } + + TABLE_INDEXES = [ + ("idx_financial_balance_ts_code", ["ts_code"]), + ("idx_financial_balance_end_date", ["end_date"]), + ("idx_financial_balance_ts_period", ["ts_code", "end_date", "report_type"]), + ] + + def fetch_single_quarter(self, period: str) -> pd.DataFrame: + """预留方法,尚未实现。""" + raise NotImplementedError( + "资产负债表同步尚未实现。需要 Tushare 5000 积分调用 balancesheet_vip 接口。" + ) + + +def sync_balance(force_full: bool = False, dry_run: bool = False) -> list: + """预留函数。""" + raise NotImplementedError("资产负债表同步尚未实现") + + +def preview_balance_sync() -> dict: + """预留函数。""" + raise NotImplementedError("资产负债表同步尚未实现") +``` + +--- + +## 常见问题 + +### Q1: 为什么不设置主键和唯一索引? + +**A**: 财务数据可能发生多次修正。例如: + +``` +第一次发布:600000.SH, 20240331, report_type='1', ann_date='20240428' +第二次修正:600000.SH, 20240331, report_type='1', ann_date='20240515' +``` + +如果设置了唯一约束(如 `PRIMARY KEY (ts_code, end_date, report_type)`),第二次插入会失败。因此采用"先删除后插入"策略,不设置唯一约束。 + +### Q2: 为什么增量同步不跳过已同步的季度? + +**A**: 财务数据与日频数据不同,可能会被修正。即使本地已有某季度的数据,也需要重新获取远程数据进行对比,确保数据完整性。 + +### Q3: 为什么要获取前一季度? + +**A**: 财务数据修正可能发生在发布后的一段时间内。获取前一季度可以捕获上一季度可能发生的修正数据。 + +**示例**: +``` +当前日期: 2024-04-15 +当前季度: 2024Q1 (20240331) +本地最新: 2023Q4 (20231231) + +同步范围: 2023Q3 (20230930) -> 2024Q1 (20240331) + (包含前一季度以确保数据完整性) +``` + +### Q4: 如何支持其他报表类型? + +**A**: 覆盖 `TARGET_REPORT_TYPE` 类属性: + +```python +class IncomeAllReportSync(QuarterBasedSync): + """同步所有报表类型。""" + TARGET_REPORT_TYPE = None # 不过滤 +``` + +或者同步特定类型: + +```python +class IncomeParentReportSync(QuarterBasedSync): + """只同步母公司报表。""" + TARGET_REPORT_TYPE = "6" # 母公司报表 +``` + +### Q5: 如何处理字段变更? + +**A**: 如果 Tushare API 字段发生变更: + +1. 更新 `TABLE_SCHEMA` 添加新字段 +2. 使用 `ALTER TABLE` 添加新列(对已存在的数据) +3. 更新 `fetch_single_quarter()` 中的字段列表(如果使用了 `fields` 参数) + +**注意**: 不要删除已有字段,保持向后兼容。 + +### Q6: 如何调试同步问题? + +**A**: 使用 `dry_run=True` 进行预览: + +```python +from src.data.api_wrappers.financial_data.api_income import sync_income + +# 预览同步,不写入数据 +result = sync_income(dry_run=True) +print(result) +``` + +查看日志输出,检查: +- 远程数据量 +- 本地数据量 +- 差异股票列表 +- 删除/插入记录数 + +--- + +## 附录 + +### 相关文档 + +- [财务数据 API 说明](financial_api.md) - Tushare 财务数据接口说明 +- [通用 API 接口规范](API_INTERFACE_SPEC.md) - 通用接口规范 +- [数据同步重构计划](../plan/2026-03-07-financial-sync-refactor.md) - 本次重构的详细计划 + +### 相关代码 + +- `src/data/api_wrappers/base_financial_sync.py` - QuarterBasedSync 基类 +- `src/data/api_wrappers/financial_data/api_income.py` - 利润表示例实现 +- `src/data/api_wrappers/financial_data/api_financial_sync.py` - 调度中心 + +### 变更历史 + +| 日期 | 版本 | 变更内容 | +|------|------|----------| +| 2026-03-07 | v1.0 | 初始版本,规范财务数据 API 封装要求 | + +--- + +**注意**: 本文档为强制性规范,所有财务数据 API 封装必须遵循。如有特殊情况需要例外,需经过架构评审。 diff --git a/docs/n_income_factor_lifecycle.md b/docs/n_income_factor_lifecycle.md new file mode 100644 index 0000000..92f33b3 --- /dev/null +++ b/docs/n_income_factor_lifecycle.md @@ -0,0 +1,600 @@ +# n_income 因子生命周期分析 + +## 概述 + +本文档详细分析 `src/experiment/regression.py` 中 `n_income` 因子的完整生命周期,从字符串定义到最终参与模型训练的全过程。 + +## 因子定义 + +在 `src/experiment/regression.py` 第 55 行定义: + +```python +FACTOR_DEFINITIONS = { + # ... 其他因子 ... + "n_income": "n_income", +} +``` + +`n_income` 是一个简单符号表达式,代表**净利润**(Net Income)财务指标。这是一个**点-in-time(PIT)数据**,需要从财务报表中获取。 + +--- + +## 第一阶段:因子注册 + +### 1.1 注册入口 + +**位置**: `src/experiment/regression.py:134` + +```python +def create_factors_with_strings(engine: FactorEngine) -> List[str]: + for name, expr in FACTOR_DEFINITIONS.items(): + engine.add_factor(name, expr) # 注册 n_income +``` + +### 1.2 add_factor 实现 + +**位置**: `src/factors/engine/factor_engine.py:62-86` + +```python +def add_factor( + self, + name: str, + expr: Union[str, Node], +) -> "FactorEngine": + """注册一个因子表达式""" + # 如果是字符串,先解析为 Node + if isinstance(expr, str): + expr = self.parser.parse(expr) # 关键步骤 + + # 创建执行计划 + plan = self.planner.create_plan(expr, name) + + # 缓存执行计划 + self._factor_plans[name] = plan + return self +``` + +对于 `n_income = "n_income"`,流程如下: +1. 检测到是字符串,调用 `parser.parse("n_income")` +2. 解析结果为 `Symbol("n_income")` 节点 +3. 通过 `planner.create_plan()` 生成执行计划 +4. 缓存计划供后续计算使用 + +--- + +## 第二阶段:表达式解析 + +### 2.1 解析器入口 + +**位置**: `src/factors/parser.py:38-52` + +```python +def parse(self, formula: str) -> Node: + """解析因子表达式字符串为 Node""" + tree = ast.parse(formula, mode='eval') + return self._visit(tree.body) +``` + +### 2.2 符号节点处理 + +**位置**: `src/factors/parser.py:96-99` + +```python +def _visit_Name(self, node: ast.Name) -> Symbol: + """处理名称节点(如变量名 n_income)""" + return Symbol(node.id) +``` + +对于 `"n_income"` 字符串: +1. `ast.parse("n_income", mode='eval')` 生成 AST +2. AST 节点类型为 `ast.Name`,id 为 `"n_income"` +3. 调用 `_visit_Name()` 创建 `Symbol("n_income")` + +### 2.3 Symbol 节点定义 + +**位置**: `src/factors/dsl.py:82-102` + +```python +@dataclass +class Symbol(Node): + """符号节点,代表命名变量""" + name: str + + def dependencies(self) -> Set[str]: + return {self.name} +``` + +`Symbol("n_income")` 的依赖集合为 `{"n_income"}`。 + +--- + +## 第三阶段:执行计划生成 + +### 3.1 计划器入口 + +**位置**: `src/factors/engine/planner.py:40-74` + +```python +def create_plan( + self, + node: Node, + output_name: Optional[str] = None, +) -> ExecutionPlan: + """创建完整执行计划""" + # 1. 提取依赖 + deps = self.extractor.extract_dependencies(node) + + # 2. 翻译为 Polars 表达式 + polars_expr = self.translator.translate(node) + + # 3. 推导数据规格 + data_specs = self._infer_data_specs(deps, schema_cache=self.schema_cache) + + return ExecutionPlan( + data_specs=data_specs, + polars_expr=polars_expr, + dependencies=deps, + output_name=output_name, + ) +``` + +### 3.2 提取依赖 + +**位置**: `src/factors/compiler.py:19-32` + +```python +def extract_dependencies(self, node: Node) -> Set[str]: + """从 AST 提取所有依赖的原始字段名""" + deps: Set[str] = set() + self._extract(node, deps) + return deps + +def _extract(self, node: Node, deps: Set[str]): + if isinstance(node, Symbol): + deps.add(node.name) # n_income 被加入依赖集 + # ... 其他节点类型处理 +``` + +对于 `Symbol("n_income")`,提取的依赖为 `{"n_income"}`。 + +### 3.3 推导数据规格 + +**位置**: `src/factors/engine/planner.py:86-148` + +```python +def _infer_data_specs( + self, + dependencies: Set[str], + schema_cache: SchemaCache, +) -> List[DataSpec]: + """推导数据规格""" + table_fields: Dict[str, List[str]] = defaultdict(list) + + for field in dependencies: + # 使用 SchemaCache 查找字段所属表 + table = schema_cache.get_table_for_field(field) + table_fields[table].append(field) + + # 为每张表创建 DataSpec + for table, fields in table_fields.items(): + if schema_cache.is_pit_table(table): # PIT 表(财务数据) + spec = DataSpec( + table=table, + columns=fields, + join_type="asof_backward", + left_on="trade_date", + right_on="f_ann_date", + ) + else: # 普通表(行情数据) + spec = DataSpec( + table=table, + columns=fields, + join_type="standard", + ) + specs.append(spec) +``` + +对于 `n_income`: +1. `schema_cache.get_table_for_field("n_income")` 返回 `"financial_income"` +2. `schema_cache.is_pit_table("financial_income")` 返回 `True` +3. 生成 DataSpec: + - `table="financial_income"` + - `columns=["n_income"]` + - `join_type="asof_backward"` + - `left_on="trade_date"` + - `right_on="f_ann_date"` + +### 3.4 SchemaCache 实现 + +**位置**: `src/data/catalog.py` + +```python +class SchemaCache: + """缓存数据库表结构信息,提供字段到表的映射""" + + def get_table_for_field(self, field: str) -> Optional[str]: + """根据字段名获取表名""" + return self._field_to_table.get(field) + + def is_pit_table(self, table: str) -> bool: + """判断是否为 PIT(Point-in-Time)表""" + info = self._table_info.get(table) + if info and info.table_type == TableType.PIT: + return True + return False +``` + +表类型识别逻辑: +- **PIT 表**: 包含 `ann_date` 或 `f_ann_date` 字段(财务数据表) +- **DAILY 表**: 包含 `trade_date` 字段(行情数据表) + +--- + +## 第四阶段:数据获取与拼接 + +### 4.1 compute() 执行入口 + +**位置**: `src/factors/engine/factor_engine.py:88-120` + +```python +def compute( + self, + factor_names: List[str], + start_date: str, + end_date: str, +) -> pl.DataFrame: + """计算指定因子""" + # 1. 获取所有需要的执行计划 + plans = [self._factor_plans[name] for name in factor_names] + + # 2. 合并数据规格(去重) + merged_specs = self._merge_data_specs(plans) + + # 3. 从路由器获取核心宽表 + core_wide = self.router.fetch_data( + specs=merged_specs, + start_date=start_date, + end_date=end_date, + ) + + # 4. 执行计算 + result = self._execute_with_dependencies(factor_names, core_wide) + + return result +``` + +### 4.2 数据路由器 fetch_data + +**位置**: `src/factors/engine/data_router.py:48-100` + +```python +def fetch_data( + self, + specs: List[DataSpec], + start_date: str, + end_date: str, +) -> pl.DataFrame: + """根据 DataSpec 列表获取并组装数据""" + # 分离标准表和 asof 表 + standard_specs = [s for s in specs if s.join_type == "standard"] + asof_specs = [s for s in specs if s.join_type == "asof_backward"] + + # 加载标准表(行情数据) + standard_frames = [] + for spec in standard_specs: + df = self._load_table_from_spec(spec, start_date, end_date) + standard_frames.append(df) + + # 以第一张标准表为基础(通常是 daily) + base_df = standard_frames[0] if standard_frames else None + + # 使用 FinancialLoader 加载和拼接财务数据 + if asof_specs and base_df is not None: + for spec in asof_specs: + financial_df = self.financial_loader.load_financial_data( + table_name=spec.table, + columns=spec.columns, + start_date=start_date, + end_date=end_date, + ) + base_df = self.financial_loader.merge_financial_with_price( + price_df=base_df, + financial_df=financial_df, + date_col="trade_date", + f_ann_col="f_ann_date", + ) + + return base_df +``` + +### 4.3 财务数据加载 + +**位置**: `src/data/financial_loader.py:26-83` + +```python +def load_financial_data( + self, + table_name: str, + columns: List[str], + start_date: str, + end_date: str, +) -> pl.DataFrame: + """从数据库加载并清洗财务数据""" + # 计算包含回看期的日期范围(默认1年) + adjusted_start = self.get_date_range_with_lookback(start_date, lookback_days=365) + + # 从数据库查询 + query = f""" + SELECT ts_code, f_ann_date, end_date, {', '.join(columns)} + FROM {table_name} + WHERE f_ann_date >= '{adjusted_start}' + AND f_ann_date <= '{end_date}' + ORDER BY ts_code, f_ann_date + """ + df = self.conn.execute(query).fetchdf() + df = pl.DataFrame(df) + + # 数据清洗:仅保留 report_type==1(合并报表) + if "report_type" in df.columns: + df = df.filter(pl.col("report_type") == 1) + + # 去重:按 (ts_code, end_date) 取 update_flag 最大的记录 + if "update_flag" in df.columns: + df = ( + df.sort(["ts_code", "end_date", "update_flag"], descending=[False, False, True]) + .unique(subset=["ts_code", "end_date"], keep="first") + ) + + return df +``` + +### 4.4 财务数据与行情数据拼接 + +**位置**: `src/data/financial_loader.py:85-136` + +```python +def merge_financial_with_price( + self, + price_df: pl.DataFrame, + financial_df: pl.DataFrame, + date_col: str = "trade_date", + f_ann_col: str = "f_ann_date", +) -> pl.DataFrame: + """使用 asof join 将财务数据拼接到行情数据""" + # 确保日期格式正确 + price_df = price_df.with_columns(pl.col(date_col).cast(pl.Date)) + financial_df = financial_df.with_columns(pl.col(f_ann_col).cast(pl.Date)) + + # 使用 join_asof 进行 PIT 对齐 + # strategy='backward': 对于每个 trade_date,找 f_ann_date <= trade_date 的最新财务数据 + result = price_df.join_asof( + financial_df, + left_on=date_col, + right_on=f_ann_col, + by="ts_code", # 按股票代码分组 + strategy="backward", # 向后查找最新公告 + ) + + return result +``` + +**拼接逻辑详解**: + +| trade_date | ts_code | close | n_income (拼接后) | 来源 f_ann_date | +|-----------|---------|-------|------------------|-----------------| +| 2024-01-15 | 000001.SZ | 10.5 | 1,000,000 | 2024-01-10 | +| 2024-01-16 | 000001.SZ | 10.6 | 1,000,000 | 2024-01-10 | +| 2024-01-20 | 000001.SZ | 10.8 | 1,200,000 | 2024-01-18 | + +上表展示了 `n_income` 的 PIT 拼接过程: +- 1月15日:使用前一次公告(1月10日)的净利润数据 +- 1月16日:继续使用1月10日的数据(无新公告) +- 1月20日:使用最新公告(1月18日)的净利润数据 + +--- + +## 第五阶段:因子计算 + +### 5.1 执行计算 + +**位置**: `src/factors/engine/factor_engine.py:188-227` + +```python +def _execute_with_dependencies( + self, + factor_names: List[str], + core_wide: pl.DataFrame, +) -> pl.DataFrame: + """按依赖顺序执行因子计算""" + # 拓扑排序确定计算顺序 + sorted_factors = self._topological_sort(factor_names) + + # 创建结果 DataFrame + result_exprs = [] + for name in sorted_factors: + plan = self._factor_plans[name] + # 执行 Polars 表达式 + expr = plan.polars_expr.alias(name) + result_exprs.append(expr) + + # 一次性执行所有表达式 + result = core_wide.with_columns(result_exprs) + + return result +``` + +### 5.2 翻译为 Polars 表达式 + +**位置**: `src/factors/translator.py` + +```python +def translate(self, node: Node) -> pl.Expr: + """将 DSL 节点翻译为 Polars 表达式""" + if isinstance(node, Symbol): + # Symbol 直接转为 pl.col() + return pl.col(node.name) + # ... 其他节点类型 +``` + +对于 `Symbol("n_income")`,翻译结果为 `pl.col("n_income")`。 + +### 5.3 n_income 计算 + +由于 `n_income` 是简单符号(无运算),计算过程: + +```python +# core_wide 已经包含 n_income 列(从财务表拼接而来) +result = core_wide.with_columns([ + pl.col("n_income").alias("n_income") # 直接引用 +]) +``` + +实际上,`n_income` 的值在数据拼接阶段已经确定,计算阶段只是确认输出列名。 + +--- + +## 第六阶段:参与模型训练 + +### 6.1 数据流回到训练流程 + +**位置**: `src/experiment/regression.py:176-183` + +```python +def prepare_data(...) -> pl.DataFrame: + factor_names = feature_cols + ["return_5"] # 包含 n_income + + data = engine.compute( + factor_names=factor_names, + start_date=start_date, + end_date=end_date, + ) + return data +``` + +### 6.2 完整数据流 + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ n_income 因子生命周期 │ +├─────────────────────────────────────────────────────────────────────┤ +│ │ +│ 1. 定义阶段 │ +│ regression.py:55 "n_income": "n_income" │ +│ ↓ │ +│ 2. 注册阶段 (FactorEngine.add_factor) │ +│ parser.parse("n_income") → Symbol("n_income") │ +│ ↓ │ +│ 3. 计划阶段 (ExecutionPlanner.create_plan) │ +│ extract_dependencies() → {"n_income"} │ +│ _infer_data_specs() → DataSpec(financial_income, asof_backward) │ +│ ↓ │ +│ 4. 数据获取阶段 (DataRouter.fetch_data) │ +│ FinancialLoader.load_financial_data() │ +│ 从 financial_income 表读取 n_income 字段 │ +│ ↓ │ +│ 5. 数据拼接阶段 (FinancialLoader.merge_financial_with_price) │ +│ join_asof(strategy='backward') │ +│ 按 trade_date 和 f_ann_date 对齐 │ +│ ↓ │ +│ 6. 计算阶段 (FactorEngine._execute_with_dependencies) │ +│ pl.col("n_income") → 输出到结果 DataFrame │ +│ ↓ │ +│ 7. 训练阶段 │ +│ 作为特征列传入 LightGBM 模型 │ +│ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 关键技术点总结 + +### 1. PIT 数据处理 + +财务数据是**低频、公告驱动**的数据,与**高频、连续**的行情数据不同。系统使用 **asof_backward** 策略处理: + +- **asof**: As-Of,表示截至某个时点的有效数据 +- **backward**: 向后查找,确保使用最新公告的数据 +- **关键约束**: 不能使用未来数据(lookahead bias) + +### 2. SchemaCache 动态路由 + +系统自动识别字段所属表,无需手动指定: + +```python +# 系统自动识别 +n_income → financial_income 表 (PIT) +close → daily 表 (DAILY) +``` + +### 3. 财务数据清洗 + +`FinancialLoader` 自动处理: +- **报告类型过滤**: 仅使用合并报表(report_type=1) +- **去重策略**: 按 (ts_code, end_date) 取最新修订版(update_flag 最大) +- **日期对齐**: 使用公告日(f_ann_date)而非报告期(end_date) + +### 4. 扩展性设计 + +添加新的财务因子只需在字典中添加一行: + +```python +FACTOR_DEFINITIONS = { + "n_income": "n_income", + "revenue": "revenue", # 营业收入 + "eps": "eps", # 每股收益 + "roe": "roe", # 净资产收益率 +} +``` + +系统自动处理表路由、数据获取和拼接。 + +--- + +## 相关代码文件 + +| 文件 | 职责 | +|------|------| +| `src/experiment/regression.py` | 训练入口,因子定义 | +| `src/factors/engine/factor_engine.py` | 因子引擎统一入口 | +| `src/factors/parser.py` | 字符串表达式解析 | +| `src/factors/compiler.py` | AST 依赖提取 | +| `src/factors/engine/planner.py` | 执行计划生成 | +| `src/factors/engine/data_router.py` | 数据路由与组装 | +| `src/data/financial_loader.py` | 财务数据加载与拼接 | +| `src/data/catalog.py` | 数据库目录与表结构 | +| `src/data/api_wrappers/financial_data/api_income.py` | 利润表数据接口 | + +--- + +## 附录:数据表结构 + +### financial_income(利润表) + +```sql +CREATE TABLE financial_income ( + ts_code VARCHAR, -- 股票代码 + f_ann_date DATE, -- 公告日期(PIT关键字段) + end_date DATE, -- 报告期 + report_type INTEGER, -- 报告类型(1=合并报表) + update_flag INTEGER, -- 更新标识(越大越新) + n_income BIGINT, -- 净利润(本因子使用的字段) + revenue BIGINT, -- 营业收入 + ... -- 其他财务字段 +); +``` + +### daily(日线行情) + +```sql +CREATE TABLE daily ( + ts_code VARCHAR, -- 股票代码 + trade_date DATE, -- 交易日期 + open DOUBLE, -- 开盘价 + high DOUBLE, -- 最高价 + low DOUBLE, -- 最低价 + close DOUBLE, -- 收盘价 + vol BIGINT, -- 成交量 + ... -- 其他行情字段 +); +``` diff --git a/docs/plan/2026-03-07-financial-sync-refactor.md b/docs/plan/2026-03-07-financial-sync-refactor.md new file mode 100644 index 0000000..72c0899 --- /dev/null +++ b/docs/plan/2026-03-07-financial-sync-refactor.md @@ -0,0 +1,1490 @@ +# 财务数据同步模块重构计划 + +> **目标**: 重构财务数据同步模块,将同步逻辑从调度中心分离到对应的 API 文件中,建立统一的季度同步基类,实现数据差异检测机制。 + +**架构**: 新增 QuarterBasedSync 基类专门处理按季度同步的财务数据,各财务接口(income/balance/cashflow)实现具体的同步子类,调度中心仅负责协调各同步任务的执行顺序。 + +**影响范围**: +- 新建: `src/data/api_wrappers/base_financial_sync.py` (QuarterBasedSync 基类) +- 重构: `src/data/api_wrappers/financial_data/api_income.py` (实现 IncomeQuarterSync) +- 简化: `src/data/api_wrappers/financial_data/api_financial_sync.py` (仅保留调度) +- 新增: `src/data/api_wrappers/financial_data/api_balance.py` (预留资产负债表接口) +- 新增: `src/data/api_wrappers/financial_data/api_cashflow.py` (预留现金流量表接口) + +--- + +## 重构背景 + +### 当前问题 + +1. **职责混淆**: `api_financial_sync.py` 包含完整的同步逻辑(696行),违反了"调度中心只包含调度"的设计原则 +2. **缺乏统一基类**: 财务数据同步未继承任何公共基类,代码风格与 daily/pro_bar 等模块不一致 +3. **缺少差异检测**: 增量同步时未实现本地 vs 远程数据对比,可能导致财务修正数据遗漏 +4. **扩展性差**: 新增资产负债表、现金流量表时需要重复编写相似的同步逻辑 + +### 重构目标 + +1. **建立 QuarterBasedSync 基类**: 专门处理按季度同步的财务数据,支持增量/全量同步 +2. **实现数据差异检测**: 按股票+季度对比本地与远程数据量,识别差异并补充 +3. **采用先删除后插入策略**: 数据不一致时,先删除旧数据再插入新数据,确保数据一致性 +4. **移除"跳过同步"逻辑**: 财务数据必须每次都进行对比更新,不存在"不需要同步"的情况 +5. **支持报表类型过滤**: 通过 `TARGET_REPORT_TYPE` 类属性灵活配置同步的报表类型(默认合并报表) +6. **不设置唯一约束**: 不创建主键和唯一索引,因为财务数据可能发生多次修正(同一股票同一季度多版本) +7. **分离调度与实现**: 调度中心只负责任务协调,具体同步逻辑下沉到各 API 文件 +8. **统一代码风格**: 与 `StockBasedSync`/`DateBasedSync` 保持一致的结构和命名规范 + +--- + +## 任务清单 + +- [ ] Task 1: 创建 `base_financial_sync.py` - QuarterBasedSync 基类(含自动建表逻辑) +- [ ] Task 2: 重构 `api_income.py` - 实现 IncomeQuarterSync 类 +- [ ] Task 3: 重构 `api_financial_sync.py` - 仅保留调度逻辑 +- [ ] Task 4: 验证测试 - 确保重构后同步功能正常 + +--- + +## Task 1: 创建 QuarterBasedSync 基类 + +**文件**: `src/data/api_wrappers/base_financial_sync.py` (新建) + +**目标**: 创建专门用于财务数据季度同步的抽象基类,提供通用的季度计算、数据差异检测、增量/全量同步能力,以及首次同步时的自动建表逻辑。 + +**核心功能**: +1. 季度计算工具方法(当前季度、前一季度、下一季度) +2. 数据差异检测(本地 vs 远程数据对比) +3. 增量同步策略(获取当前季度+前一季度) +4. 表结构和索引管理 +5. **首次同步自动建表**: 当表不存在时自动创建表结构和索引 + +**代码实现**: + +```python +"""财务数据同步基础抽象模块。 + +提供专门用于按季度同步财务数据的基类 QuarterBasedSync。 +财务数据特点: +- 按季度发布(period: 20231231, 20230930, 20230630, 20230331) +- 使用 VIP 接口一次性获取某季度的全部上市公司数据 +- 数据可能会修正,增量同步需获取当前季度+前一季度 +- 主键为 (ts_code, end_date) + +使用方式: + class IncomeQuarterSync(QuarterBasedSync): + table_name = "financial_income" + api_name = "income_vip" + + def fetch_single_quarter(self, period: str) -> pd.DataFrame: + # 实现单季度数据获取 + ... +""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, List, Tuple, Set +from datetime import datetime +import pandas as pd +from tqdm import tqdm + +from src.data.client import TushareClient +from src.data.storage import ThreadSafeStorage, Storage +from src.data.utils import get_today_date, get_quarters_in_range, DEFAULT_START_DATE + + +class QuarterBasedSync(ABC): + """财务数据季度同步抽象基类。 + + 专门处理按季度同步的财务数据(利润表、资产负债表、现金流量表)。 + 财务数据同步特点: + 1. 按季度获取:使用 VIP 接口一次性获取某季度全部上市公司数据 + 2. 数据可修正:同一季度数据可能被更新,增量同步需获取当前季度+前一季度 + 3. 差异检测:需对比本地与远程数据量,识别缺失或变更的记录 + 4. 主键:(ts_code, end_date) + + 子类必须实现: + - table_name: 类属性,目标表名 + - api_name: 类属性,Tushare API 接口名 + - fetch_single_quarter(period) -> pd.DataFrame: 获取单季度数据 + - TABLE_SCHEMA: 类属性,表结构定义 + + Attributes: + table_name: 目标表名(子类必须覆盖) + api_name: Tushare API 接口名(子类必须覆盖) + DEFAULT_START_DATE: 默认起始日期(2018Q1) + TABLE_SCHEMA: 表结构定义 {列名: SQL类型} + TABLE_INDEXES: 索引定义 [(索引名, [列名列表]), ...] + TABLE_INDEXES: 索引定义 [(索引名, [列名列表]), ...] + 注意:不要创建唯一索引,因为财务数据可能发生多次修正 + """ + + table_name: str = "" # 子类必须覆盖 + api_name: str = "" # 子类必须覆盖 + DEFAULT_START_DATE = "20180331" # 2018年Q1 + + # 目标报表类型(子类可覆盖) + # 默认只同步合并报表(report_type='1') + # 设为 None 则同步所有报表类型 + TARGET_REPORT_TYPE: Optional[str] = "1" + + # 表结构定义(子类必须覆盖) + TABLE_SCHEMA: Dict[str, str] = {} + + # 索引定义(子类可覆盖) + # 格式: [("index_name", ["col1", "col2"]), ...] + # 注意:不要创建唯一索引,因为财务数据可能发生多次修正 + TABLE_INDEXES: List[Tuple[str, List[str]]] = [] + + def __init__(self): + """初始化季度同步管理器。""" + self.storage = ThreadSafeStorage() + self.client = TushareClient() + self._cached_data: Optional[pd.DataFrame] = None + + # ====================================================================== + # 抽象方法 - 子类必须实现 + # ====================================================================== + + @abstractmethod + def fetch_single_quarter(self, period: str) -> pd.DataFrame: + """获取单季度的全部上市公司数据。 + + Args: + period: 报告期,季度最后一天日期(如 '20231231') + + Returns: + 包含该季度全部上市公司财务数据的 DataFrame + """ + pass + + # ====================================================================== + # 季度计算工具方法 + # ====================================================================== + + def get_current_quarter(self) -> str: + """获取当前季度(考虑是否到季末)。 + + 如果当前日期未到季度最后一天,则返回前一季度。 + 这样可以避免获取尚无数据的未来季度。 + + Returns: + 当前季度字符串 (YYYYMMDD),如 '20231231' + """ + today = get_today_date() + year = int(today[:4]) + month = int(today[4:6]) + + # 确定当前季度 + if month <= 3: + current_q = f"{year}0331" + elif month <= 6: + current_q = f"{year}0630" + elif month <= 9: + current_q = f"{year}0930" + else: + current_q = f"{year}1231" + + # 如果今天还没到季末,返回前一季度 + if today < current_q: + return self.get_prev_quarter(current_q) + + return current_q + + def get_prev_quarter(self, quarter: str) -> str: + """获取前一季度。 + + Args: + quarter: 季度字符串 (YYYYMMDD),如 '20231231' + + Returns: + 前一季度字符串 (YYYYMMDD) + """ + year = int(quarter[:4]) + month_day = quarter[4:] + + if month_day == "0331": + return f"{year - 1}1231" + elif month_day == "0630": + return f"{year}0331" + elif month_day == "0930": + return f"{year}0630" + else: # "1231" + return f"{year}0930" + + def get_next_quarter(self, quarter: str) -> str: + """获取下一季度。 + + Args: + quarter: 季度字符串 (YYYYMMDD) + + Returns: + 下一季度字符串 (YYYYMMDD) + """ + year = int(quarter[:4]) + month_day = quarter[4:] + + if month_day == "0331": + return f"{year}0630" + elif month_day == "0630": + return f"{year}0930" + elif month_day == "0930": + return f"{year}1231" + else: # "1231" + return f"{year + 1}0331" + + # ====================================================================== + # 表结构管理 + # ====================================================================== + + def ensure_table_exists(self) -> None: + """确保表结构存在,如不存在则创建表和索引。 + + 注意:不设置主键和唯一索引,因为财务数据可能发生多次修正, + 同一支股票在同一季度可能有多个版本(不同的ann_date)。 + DuckDB 会自动创建 rowid 作为主键。 + """ + storage = Storage() + + if storage.exists(self.table_name): + return + + if not self.TABLE_SCHEMA: + print(f"[{self.__class__.__name__}] TABLE_SCHEMA not defined, skipping table creation") + return + + # 构建列定义(不设置主键) + columns_def = [] + for col_name, col_type in self.TABLE_SCHEMA.items(): + columns_def.append(f'"{col_name}" {col_type}') + + columns_sql = ", ".join(columns_def) + create_sql = f'CREATE TABLE IF NOT EXISTS "{self.table_name}" ({columns_sql})' + + try: + storage._connection.execute(create_sql) + print(f"[{self.__class__.__name__}] Created table '{self.table_name}'") + except Exception as e: + print(f"[{self.__class__.__name__}] Error creating table: {e}") + raise + + # 创建普通索引(不要创建唯一索引) + for idx_name, idx_cols in self.TABLE_INDEXES: + try: + idx_cols_sql = ", ".join(f'"{col}"' for col in idx_cols) + storage._connection.execute( + f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{self.table_name}"({idx_cols_sql})' + ) + print(f"[{self.__class__.__name__}] Created index '{idx_name}' on {idx_cols}") + except Exception as e: + print(f"[{self.__class__.__name__}] Error creating index {idx_name}: {e}") + + # ====================================================================== + # 数据差异检测(核心逻辑) + # ====================================================================== + + def get_local_data_count_by_stock( + self, period: str + ) -> Dict[str, int]: + """获取本地数据库中某季度的各股票数据量。 + + Args: + period: 季度字符串 (YYYYMMDD) + + Returns: + 字典 {ts_code: 记录数} + """ + storage = Storage() + + try: + query = f''' + SELECT ts_code, COUNT(*) as cnt + FROM "{self.table_name}" + WHERE end_date = ? + GROUP BY ts_code + ''' + result = storage._connection.execute(query, [period]).fetchdf() + + if result.empty: + return {} + + return dict(zip(result['ts_code'], result['cnt'])) + except Exception as e: + print(f"[{self.__class__.__name__}] Error querying local data count: {e}") + return {} + + def get_local_records_by_key( + self, period: str + ) -> Dict[tuple, int]: + """获取本地数据库中某季度的记录(按主键分组计数)。 + + 用于更精确的差异检测,按 (ts_code, end_date, report_type) 分组。 + + Args: + period: 季度字符串 (YYYYMMDD) + + Returns: + 字典 {(ts_code, end_date, report_type): 记录数} + """ + storage = Storage() + + try: + query = f''' + SELECT ts_code, end_date, report_type, COUNT(*) as cnt + FROM "{self.table_name}" + WHERE end_date = ? + GROUP BY ts_code, end_date, report_type + ''' + result = storage._connection.execute(query, [period]).fetchdf() + + if result.empty: + return {} + + return { + (row['ts_code'], row['end_date'], row['report_type']): row['cnt'] + for _, row in result.iterrows() + } + except Exception as e: + print(f"[{self.__class__.__name__}] Error querying local records: {e}") + return {} + + def compare_and_find_differences( + self, + remote_df: pd.DataFrame, + period: str + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """对比远程数据与本地数据,找出差异。 + + 逻辑: + 1. 统计远程数据中每只股票的数据量 + 2. 查询本地数据库中该季度每只股票的数据量 + 3. 对比找出: + - 本地缺失的股票(新增) + - 数据量不一致的股票(有更新,可能包含财务修正) + 4. 返回需要插入的差异数据 + + 注意:主键为 (ts_code, end_date, report_type),因此同一支股票在同一季度 + 可能有多个 report_type 的记录。差异检测按股票级别进行,如果该股票的 + 记录总数不一致,则更新该股票的所有记录。 + + Args: + remote_df: 从 API 获取的远程数据 + period: 季度字符串 + + Returns: + (差异数据DataFrame, 统计信息DataFrame) + 统计信息包含:ts_code, remote_count, local_count, status + """ + if remote_df.empty: + return pd.DataFrame(), pd.DataFrame() + + # 1. 统计远程数据中每只股票的数据量 + remote_counts = remote_df.groupby('ts_code').size().to_dict() + + # 2. 获取本地数据量(按股票汇总) + local_counts = self.get_local_data_count_by_stock(period) + + # 3. 对比找出差异 + diff_stocks = [] # 需要更新的股票列表 + stats = [] + + for ts_code, remote_count in remote_counts.items(): + local_count = local_counts.get(ts_code, 0) + + if local_count == 0: + status = "new" # 本地不存在,全部插入 + diff_stocks.append(ts_code) + elif local_count != remote_count: + status = "modified" # 数据量不一致,可能包含财务修正 + diff_stocks.append(ts_code) + else: + status = "same" # 数据量一致,跳过 + + stats.append({ + 'ts_code': ts_code, + 'remote_count': remote_count, + 'local_count': local_count, + 'status': status + }) + + # 4. 提取差异数据 + if diff_stocks: + diff_df = remote_df[remote_df['ts_code'].isin(diff_stocks)].copy() + else: + diff_df = pd.DataFrame() + + stats_df = pd.DataFrame(stats) + + return diff_df, stats_df + + # ====================================================================== + # 同步核心逻辑 + # ====================================================================== + + def delete_stock_quarter_data( + self, + period: str, + ts_codes: Optional[List[str]] = None + ) -> int: + """删除指定季度和股票的数据。 + + 在同步前删除旧数据,然后插入新数据(先删除后插入策略)。 + + Args: + period: 季度字符串 (YYYYMMDD) + ts_codes: 股票代码列表,None 表示删除该季度所有数据 + + Returns: + 删除的记录数 + """ + storage = Storage() + + try: + if ts_codes: + # 删除指定股票的数据 + placeholders = ', '.join(['?' for _ in ts_codes]) + query = f''' + DELETE FROM "{self.table_name}" + WHERE end_date = ? AND ts_code IN ({placeholders}) + ''' + result = storage._connection.execute(query, [period] + ts_codes) + else: + # 删除整个季度的数据 + query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?' + result = storage._connection.execute(query, [period]) + + deleted_count = result.rowcount if hasattr(result, 'rowcount') else 0 + return deleted_count + except Exception as e: + print(f"[{self.__class__.__name__}] Error deleting data: {e}") + return 0 + + def sync_quarter( + self, + period: str, + dry_run: bool = False + ) -> Dict: + """同步单个季度的数据。 + + 流程: + 1. 获取远程数据 + 2. 根据 TARGET_REPORT_TYPE 过滤报表类型 + 3. 对比本地数据,找出差异股票 + 4. 删除差异股票的旧数据 + 5. 插入新数据(先删除后插入) + + 注意:财务数据同步必须取当前季度和前一季度进行对比更新, + 不存在"不需要同步"的情况。 + + Args: + period: 季度字符串 (YYYYMMDD) + dry_run: 是否为预览模式 + + Returns: + 同步结果字典 { + 'period': 季度, + 'remote_total': 远程总记录数, + 'diff_count': 差异记录数, + 'deleted_count': 删除的记录数, + 'inserted_count': 插入的记录数, + 'dry_run': 是否预览模式 + } + """ + print(f"[{self.__class__.__name__}] Syncing quarter {period}...") + + # 1. 获取远程数据 + remote_df = self.fetch_single_quarter(period) + + if remote_df.empty: + print(f"[{self.__class__.__name__}] No data for quarter {period}") + return { + 'period': period, + 'remote_total': 0, + 'diff_count': 0, + 'deleted_count': 0, + 'inserted_count': 0, + 'dry_run': dry_run + } + + # 2. 根据 TARGET_REPORT_TYPE 过滤报表类型 + if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns: + remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE] + + remote_total = len(remote_df) + print(f"[{self.__class__.__name__}] Fetched {remote_total} records from API") + + # 3. 对比找出差异股票 + diff_df, stats_df = self.compare_and_find_differences(remote_df, period) + + diff_stocks = list(diff_df['ts_code'].unique()) if not diff_df.empty else [] + unchanged_count = len(stats_df[stats_df['status'] == 'same']) if not stats_df.empty else 0 + + print(f"[{self.__class__.__name__}] Comparison result:") + print(f" - Stocks with differences: {len(diff_stocks)}") + print(f" - Unchanged stocks: {unchanged_count}") + + # 4. 执行同步(先删除后插入) + deleted_count = 0 + inserted_count = 0 + + if not dry_run and not diff_df.empty: + # 4.1 删除差异股票的旧数据 + deleted_count = self.delete_stock_quarter_data(period, diff_stocks) + print(f"[{self.__class__.__name__}] Deleted {deleted_count} old records") + + # 4.2 插入新数据 + self.storage.queue_save(self.table_name, diff_df) + self.storage.flush() + inserted_count = len(diff_df) + print(f"[{self.__class__.__name__}] Inserted {inserted_count} new records") + + return { + 'period': period, + 'remote_total': remote_total, + 'diff_count': len(diff_df), + 'deleted_count': deleted_count, + 'inserted_count': inserted_count, + 'dry_run': dry_run + } + + def sync_range( + self, + start_quarter: str, + end_quarter: str, + dry_run: bool = False + ) -> List[Dict]: + """同步指定季度范围的数据。 + + 注意:增量同步会自动包含前一季度以确保数据完整性。 + + Args: + start_quarter: 起始季度 (YYYYMMDD) + end_quarter: 结束季度 (YYYYMMDD) + dry_run: 是否为预览模式 + + Returns: + 各季度同步结果列表 + """ + quarters = get_quarters_in_range(start_quarter, end_quarter) + + if not quarters: + print(f"[{self.__class__.__name__}] No quarters to sync") + return [] + + print(f"[{self.__class__.__name__}] Syncing {len(quarters)} quarters: {quarters}") + + results = [] + for period in tqdm(quarters, desc=f"Syncing {self.table_name}"): + try: + result = self.sync_quarter(period, dry_run=dry_run) + results.append(result) + except Exception as e: + print(f"[{self.__class__.__name__}] Error syncing {period}: {e}") + results.append({ + 'period': period, + 'error': str(e) + }) + + return results + + def sync_incremental( + self, + dry_run: bool = False + ) -> List[Dict]: + """执行增量同步。 + + 策略: + 1. 确保表存在(首次同步时自动建表) + 2. 获取表中最新季度 + 3. 计算当前季度(考虑是否到季末) + 4. 确定同步范围:从最新季度到当前季度 + 5. **重要**:额外包含前一季度以确保数据完整性 + + 注意:财务数据同步与日线数据不同,必须每次都获取数据进行对比 + 更新,不存在"不需要同步"的情况。因为财务数据可能会被修正。 + + Args: + dry_run: 是否为预览模式 + + Returns: + 各季度同步结果列表 + """ + print(f"\n{'='*60}") + print(f"[{self.__class__.__name__}] Incremental Sync") + print(f"{'='*60}") + + # 0. 确保表存在(首次同步时自动建表) + self.ensure_table_exists() + + # 1. 获取最新季度 + storage = Storage() + try: + result = storage._connection.execute( + f'SELECT MAX(end_date) FROM "{self.table_name}"' + ).fetchone() + latest_quarter = result[0] if result and result[0] else None + if hasattr(latest_quarter, 'strftime'): + latest_quarter = latest_quarter.strftime('%Y%m%d') + except Exception as e: + print(f"[{self.__class__.__name__}] Error getting latest quarter: {e}") + latest_quarter = None + + # 2. 获取当前季度 + current_quarter = self.get_current_quarter() + + if latest_quarter is None: + # 无本地数据,执行全量同步 + print(f"[{self.__class__.__name__}] No local data, performing full sync") + return self.sync_range(self.DEFAULT_START_DATE, current_quarter, dry_run) + + print(f"[{self.__class__.__name__}] Latest local quarter: {latest_quarter}") + print(f"[{self.__class__.__name__}] Current quarter: {current_quarter}") + + # 3. 确定同步范围 + # 财务数据必须每次都进行对比更新,不存在"跳过"的情况 + # 同步范围:从最新季度到当前季度(包含前一季度以确保数据完整性) + start_quarter = latest_quarter + if start_quarter > current_quarter: + # 如果本地数据比当前季度还新,仍然需要同步(可能包含修正数据) + start_quarter = current_quarter + else: + # 正常情况:包含前一季度 + start_quarter = self.get_prev_quarter(latest_quarter) + + if start_quarter < self.DEFAULT_START_DATE: + start_quarter = self.DEFAULT_START_DATE + + print(f"[{self.__class__.__name__}] Sync range: {start_quarter} -> {current_quarter}") + print(f" (includes previous quarter for data integrity)") + + return self.sync_range(start_quarter, current_quarter, dry_run) + + def sync_full( + self, + dry_run: bool = False + ) -> List[Dict]: + """执行全量同步。 + + Args: + dry_run: 是否为预览模式 + + Returns: + 各季度同步结果列表 + """ + print(f"\n{'='*60}") + print(f"[{self.__class__.__name__}] Full Sync") + print(f"{'='*60}") + + # 确保表存在 + self.ensure_table_exists() + + current_quarter = self.get_current_quarter() + + return self.sync_range(self.DEFAULT_START_DATE, current_quarter, dry_run) + + # ====================================================================== + # 预览模式 + # ====================================================================== + + def preview_sync(self) -> Dict: + """预览同步信息(不实际同步)。 + + 注意:财务数据同步必须每次都进行,因为数据可能会被修正。 + 预览显示将要同步的季度范围。 + + Returns: + 预览信息字典 + """ + print(f"\n{'='*60}") + print(f"[{self.__class__.__name__}] Preview Mode") + print(f"{'='*60}") + + # 获取最新季度 + storage = Storage() + try: + result = storage._connection.execute( + f'SELECT MAX(end_date) FROM "{self.table_name}"' + ).fetchone() + latest_quarter = result[0] if result and result[0] else None + if hasattr(latest_quarter, 'strftime'): + latest_quarter = latest_quarter.strftime('%Y%m%d') + except Exception: + latest_quarter = None + + current_quarter = self.get_current_quarter() + + if latest_quarter is None: + # 无本地数据,需要全量同步 + start_quarter = self.DEFAULT_START_DATE + message = "No local data, full sync required" + else: + # 财务数据必须每次都进行对比更新 + # 同步范围:从最新季度到当前季度(包含前一季度) + start_quarter = self.get_prev_quarter(latest_quarter) + if start_quarter < self.DEFAULT_START_DATE: + start_quarter = self.DEFAULT_START_DATE + message = f"Incremental sync from {start_quarter} to {current_quarter}" + + preview = { + 'table_name': self.table_name, + 'api_name': self.api_name, + 'latest_quarter': latest_quarter, + 'current_quarter': current_quarter, + 'start_quarter': start_quarter, + 'end_quarter': current_quarter, + 'message': message + } + + print(f"Table: {self.table_name}") + print(f"API: {self.api_name}") + print(f"Latest local: {latest_quarter}") + print(f"Current quarter: {current_quarter}") + print(f"Sync range: {start_quarter} -> {current_quarter}") + print(f"Message: {message}") + print(f"{'='*60}") + + return preview + + +# ====================================================================== +# 便捷函数 +# ====================================================================== + +def sync_financial_data( + syncer_class: type, + force_full: bool = False, + dry_run: bool = False +) -> List[Dict]: + """通用的财务数据同步便捷函数。 + + Args: + syncer_class: QuarterBasedSync 的子类 + force_full: 是否强制全量同步 + dry_run: 是否为预览模式 + + Returns: + 同步结果列表 + """ + syncer = syncer_class() + + if force_full: + return syncer.sync_full(dry_run) + else: + return syncer.sync_incremental(dry_run) + + +def preview_financial_sync(syncer_class: type) -> Dict: + """预览财务数据同步信息。 + + Args: + syncer_class: QuarterBasedSync 的子类 + + Returns: + 预览信息字典 + """ + syncer = syncer_class() + return syncer.preview_sync() +``` + +**验证步骤:** +1. 检查文件是否正确创建 +2. 确认所有抽象方法已定义 +3. 验证类型提示完整 +4. 确认与现有 `base_sync.py` 风格一致 + +--- + +## Task 2: 重构 `api_income.py` + +**文件**: `src/data/api_wrappers/financial_data/api_income.py` (重写) + +**目标**: 重写为基于 `QuarterBasedSync` 的实现,移除旧的 `IncomeSync` 类。 + +**代码实现**: + +```python +"""利润表数据接口 (VIP 版本) + +使用 Tushare VIP 接口 (income_vip) 获取利润表数据。 +按季度同步,一次请求获取一个季度的全部上市公司数据。 + +接口说明: +- income_vip: 获取某一季度全部上市公司利润表数据 +- 需要 5000 积分才能调用 +- period 参数为报告期(季度最后一天,如 20231231) + +使用方式: + # 同步利润表数据 + from src.data.api_wrappers.financial_data.api_income import IncomeQuarterSync, sync_income + + # 方式1: 使用类 + syncer = IncomeQuarterSync() + syncer.sync_incremental() # 增量同步 + syncer.sync_full() # 全量同步 + + # 方式2: 使用便捷函数 + sync_income() # 增量同步 + sync_income(force_full=True) # 全量同步 +""" + +from typing import Optional +import pandas as pd + +from src.data.client import TushareClient +from src.data.api_wrappers.base_financial_sync import QuarterBasedSync, sync_financial_data, preview_financial_sync + + +class IncomeQuarterSync(QuarterBasedSync): + """利润表季度同步实现。 + + 使用 income_vip 接口按季度获取全部上市公司利润表数据。 + + 表结构: financial_income + 主键: (ts_code, end_date) + """ + + table_name = "financial_income" + api_name = "income_vip" + + # 目标报表类型:默认只同步合并报表 + TARGET_REPORT_TYPE = "1" + + # 表结构定义 + TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "ann_date": "DATE", + "f_ann_date": "DATE", + "end_date": "DATE NOT NULL", + "report_type": "INTEGER", + "comp_type": "INTEGER", + "end_type": "VARCHAR(10)", + "basic_eps": "DOUBLE", + "diluted_eps": "DOUBLE", + "total_revenue": "DOUBLE", + "revenue": "DOUBLE", + "int_income": "DOUBLE", + "prem_earned": "DOUBLE", + "comm_income": "DOUBLE", + "n_commis_income": "DOUBLE", + "n_oth_income": "DOUBLE", + "n_oth_b_income": "DOUBLE", + "prem_income": "DOUBLE", + "out_prem": "DOUBLE", + "une_prem_reser": "DOUBLE", + "reins_income": "DOUBLE", + "n_sec_tb_income": "DOUBLE", + "n_sec_uw_income": "DOUBLE", + "n_asset_mg_income": "DOUBLE", + "oth_b_income": "DOUBLE", + "fv_value_chg_gain": "DOUBLE", + "invest_income": "DOUBLE", + "ass_invest_income": "DOUBLE", + "forex_gain": "DOUBLE", + "total_cogs": "DOUBLE", + "oper_cost": "DOUBLE", + "int_exp": "DOUBLE", + "comm_exp": "DOUBLE", + "biz_tax_surchg": "DOUBLE", + "sell_exp": "DOUBLE", + "admin_exp": "DOUBLE", + "fin_exp": "DOUBLE", + "assets_impair_loss": "DOUBLE", + "prem_refund": "DOUBLE", + "compens_payout": "DOUBLE", + "reser_insur_liab": "DOUBLE", + "div_payt": "DOUBLE", + "reins_exp": "DOUBLE", + "oper_exp": "DOUBLE", + "compens_payout_refu": "DOUBLE", + "insur_reser_refu": "DOUBLE", + "reins_cost_refund": "DOUBLE", + "other_bus_cost": "DOUBLE", + "operate_profit": "DOUBLE", + "non_oper_income": "DOUBLE", + "non_oper_exp": "DOUBLE", + "nca_disploss": "DOUBLE", + "total_profit": "DOUBLE", + "income_tax": "DOUBLE", + "n_income": "DOUBLE", + "n_income_attr_p": "DOUBLE", + "minority_gain": "DOUBLE", + "oth_compr_income": "DOUBLE", + "t_compr_income": "DOUBLE", + "compr_inc_attr_p": "DOUBLE", + "compr_inc_attr_m_s": "DOUBLE", + "ebit": "DOUBLE", + "ebitda": "DOUBLE", + "insurance_exp": "DOUBLE", + "undist_profit": "DOUBLE", + "distable_profit": "DOUBLE", + "rd_exp": "DOUBLE", + "fin_exp_int_exp": "DOUBLE", + "fin_exp_int_inc": "DOUBLE", + "transfer_surplus_rese": "DOUBLE", + "transfer_housing_imprest": "DOUBLE", + "transfer_oth": "DOUBLE", + "adj_lossgain": "DOUBLE", + "withdra_legal_surplus": "DOUBLE", + "withdra_legal_pubfund": "DOUBLE", + "withdra_biz_devfund": "DOUBLE", + "withdra_rese_fund": "DOUBLE", + "withdra_oth_ersu": "DOUBLE", + "workers_welfare": "DOUBLE", + "distr_profit_shrhder": "DOUBLE", + "prfshare_payable_dvd": "DOUBLE", + "comshare_payable_dvd": "DOUBLE", + "capit_comstock_div": "DOUBLE", + "net_after_nr_lp_correct": "DOUBLE", + "credit_impa_loss": "DOUBLE", + "net_expo_hedging_benefits": "DOUBLE", + "oth_impair_loss_assets": "DOUBLE", + "total_opcost": "DOUBLE", + "amodcost_fin_assets": "DOUBLE", + "oth_income": "DOUBLE", + "asset_disp_income": "DOUBLE", + "continued_net_profit": "DOUBLE", + "end_net_profit": "DOUBLE", + "update_flag": "VARCHAR(1)", + } + + # 索引定义(不要创建唯一索引) + # 注意:财务数据可能发生多次修正,不设置主键和唯一索引 + TABLE_INDEXES = [ + ("idx_financial_income_ts_code", ["ts_code"]), + ("idx_financial_income_end_date", ["end_date"]), + ("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]), + ] + + def __init__(self): + """初始化利润表同步器。""" + super().__init__() + self._fields = None # 默认返回全部字段 + + def fetch_single_quarter(self, period: str) -> pd.DataFrame: + """获取单季度的全部上市公司利润表数据。 + + Args: + period: 报告期,季度最后一天日期(如 '20231231') + + Returns: + 包含该季度全部上市公司利润表数据的 DataFrame + """ + params = {"period": period} + + if self._fields: + params["fields"] = self._fields + + return self.client.query(self.api_name, **params) + + +# ============================================================================= +# 便捷函数 +# ============================================================================= + + +def sync_income( + force_full: bool = False, + dry_run: bool = False, +) -> list: + """同步利润表数据(便捷函数)。 + + Args: + force_full: 若为 True,强制全量同步 + dry_run: 若为 True,仅预览不写入 + + Returns: + 同步结果列表 + + Example: + >>> # 增量同步 + >>> sync_income() + >>> + >>> # 全量同步 + >>> sync_income(force_full=True) + >>> + >>> # 预览 + >>> sync_income(dry_run=True) + """ + return sync_financial_data(IncomeQuarterSync, force_full, dry_run) + + +def preview_income_sync() -> dict: + """预览利润表同步信息。 + + Returns: + 预览信息字典 + """ + return preview_financial_sync(IncomeQuarterSync) + + +def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame: + """获取利润表数据(原始接口,单季度)。 + + 用于直接获取某个季度的数据,不进行同步管理。 + + Args: + period: 报告期,季度最后一天日期(如 '20231231') + fields: 指定返回字段,默认返回全部字段 + + Returns: + 包含利润表数据的 DataFrame + """ + client = TushareClient() + + if fields is None: + fields = ( + "ts_code,ann_date,f_ann_date,end_date,report_type,comp_type,end_type," + "basic_eps,diluted_eps,total_revenue,revenue,int_income,prem_earned," + "comm_income,n_commis_income,n_oth_income,n_oth_b_income,prem_income," + "out_prem,une_prem_reser,reins_income,n_sec_tb_income,n_sec_uw_income," + "n_asset_mg_income,oth_b_income,fv_value_chg_gain,invest_income," + "ass_invest_income,forex_gain,total_cogs,oper_cost,int_exp,comm_exp," + "biz_tax_surchg,sell_exp,admin_exp,fin_exp,assets_impair_loss,prem_refund," + "compens_payout,reser_insur_liab,div_payt,reins_exp,oper_exp," + "compens_payout_refu,insur_reser_refu,reins_cost_refund,other_bus_cost," + "operate_profit,non_oper_income,non_oper_exp,nca_disploss,total_profit," + "income_tax,n_income,n_income_attr_p,minority_gain,oth_compr_income," + "t_compr_income,compr_inc_attr_p,compr_inc_attr_m_s,ebit,ebitda," + "insurance_exp,undist_profit,distable_profit,rd_exp,fin_exp_int_exp," + "fin_exp_int_inc,transfer_surplus_rese,transfer_housing_imprest," + "transfer_oth,adj_lossgain,withdra_legal_surplus,withdra_legal_pubfund," + "withdra_biz_devfund,withdra_rese_fund,withdra_oth_ersu,workers_welfare," + "distr_profit_shrhder,prfshare_payable_dvd,comshare_payable_dvd," + "capit_comstock_div,net_after_nr_lp_correct,credit_impa_loss," + "net_expo_hedging_benefits,oth_impair_loss_assets,total_opcost," + "amodcost_fin_assets,oth_income,asset_disp_income,continued_net_profit," + "end_net_profit,update_flag" + ) + + return client.query("income_vip", period=period, fields=fields) +``` + +**变更说明:** +1. 新增 `IncomeQuarterSync` 类继承 `QuarterBasedSync` +2. 定义 `TARGET_REPORT_TYPE = "1"` 只同步合并报表 +3. 定义表结构 `TABLE_SCHEMA`、普通索引 `TABLE_INDEXES` 支持自动建表 +4. 保留 `get_income()` 函数作为原始数据获取接口 +5. 移除旧的 `IncomeSync` 类 +6. 添加便捷函数 `sync_income()` 和 `preview_income_sync()` + +**重要说明:** +- **不设置主键和唯一索引**:财务数据可能发生多次修正,同一支股票同一季度可能有多个版本(不同 ann_date)。使用"先删除后插入"策略,避免主键冲突 +- 数据更新采用"先删除后插入"策略,确保数据一致性 +- `TARGET_REPORT_TYPE` 可覆盖以同步其他报表类型 + +**验证步骤:** +1. 确认文件可以被正确导入 +2. 检查类型提示完整 +3. 验证表结构与旧版本一致 + +--- + +## Task 3: 重构 `api_financial_sync.py` + +**文件**: `src/data/api_wrappers/financial_data/api_financial_sync.py` (重写) + +**目标**: 简化调度中心,只保留调度逻辑,移除具体同步实现。 + +**代码实现**: + +```python +"""财务数据统一同步调度中心。 + +该模块作为财务数据同步的调度中心,只负责任务协调和调度。 +具体的同步逻辑已下沉到各 API 文件中。 + +支持的财务数据类型: +- income: 利润表 (已实现) +- balance: 资产负债表 (预留) +- cashflow: 现金流量表 (预留) + +使用方式: + # 同步所有财务数据(增量) + from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial + sync_financial() + + # 全量同步 + sync_financial(force_full=True) + + # 只同步利润表 + sync_financial(data_types=["income"]) + + # 预览同步 + from src.data.api_wrappers.financial_data.api_financial_sync import preview_sync + preview = preview_sync() +""" + +from typing import List, Dict, Optional + +from src.data.api_wrappers.financial_data.api_income import ( + IncomeQuarterSync, + sync_income, + preview_income_sync, +) + + +# 支持的财务数据类型映射 +FINANCIAL_SYNCERS = { + "income": { + "syncer_class": IncomeQuarterSync, + "sync_func": sync_income, + "preview_func": preview_income_sync, + "display_name": "利润表", + }, + # 预留:资产负债表 + # "balance": { + # "syncer_class": BalanceQuarterSync, + # "sync_func": sync_balance, + # "preview_func": preview_balance_sync, + # "display_name": "资产负债表", + # }, + # 预留:现金流量表 + # "cashflow": { + # "syncer_class": CashflowQuarterSync, + # "sync_func": sync_cashflow, + # "preview_func": preview_cashflow_sync, + # "display_name": "现金流量表", + # }, +} + + +def sync_financial( + data_types: Optional[List[str]] = None, + force_full: bool = False, + dry_run: bool = False, +) -> Dict[str, List]: + """同步财务数据(调度函数)。 + + 根据指定的数据类型,调度对应的同步器执行同步。 + + Args: + data_types: 数据类型列表,如 ["income", "balance"] + None 表示同步所有类型 + force_full: 若为 True,强制全量同步 + dry_run: 若为 True,仅预览不写入 + + Returns: + 各类型同步结果字典 {数据类型: 同步结果列表} + + Example: + >>> # 同步所有财务数据 + >>> sync_financial() + >>> + >>> # 只同步利润表 + >>> sync_financial(data_types=["income"]) + >>> + >>> # 全量同步 + >>> sync_financial(force_full=True) + """ + if data_types is None: + data_types = list(FINANCIAL_SYNCERS.keys()) + + results = {} + + print("\n" + "=" * 60) + print("[Financial Sync] 财务数据同步调度中心") + print("=" * 60) + print(f"数据类型: {', '.join(data_types)}") + print(f"同步模式: {'全量' if force_full else '增量'}") + print(f"写入模式: {'预览' if dry_run else '实际写入'}") + print("=" * 60) + + for data_type in data_types: + if data_type not in FINANCIAL_SYNCERS: + print(f"[WARN] 未知的数据类型: {data_type}") + results[data_type] = {"error": f"Unknown data type: {data_type}"} + continue + + config = FINANCIAL_SYNCERS[data_type] + sync_func = config["sync_func"] + display_name = config["display_name"] + + print(f"\n[{display_name}] 开始同步...") + + try: + result = sync_func(force_full=force_full, dry_run=dry_run) + results[data_type] = result + print(f"[{display_name}] 同步完成") + except Exception as e: + print(f"[ERROR] [{display_name}] 同步失败: {e}") + results[data_type] = {"error": str(e)} + + # 打印汇总 + print("\n" + "=" * 60) + print("[Financial Sync] 同步汇总") + print("=" * 60) + for data_type, result in results.items(): + if "error" in result: + status = f"失败: {result['error']}" + elif isinstance(result, list): + total_records = sum(r.get('diff_count', 0) for r in result if isinstance(r, dict)) + status = f"成功 ({len(result)} 个季度, {total_records} 条差异)" + else: + status = "完成" + + display_name = FINANCIAL_SYNCERS.get(data_type, {}).get("display_name", data_type) + print(f" {display_name}: {status}") + print("=" * 60) + + return results + + +def preview_sync(data_types: Optional[List[str]] = None) -> Dict[str, Dict]: + """预览财务数据同步信息。 + + Args: + data_types: 数据类型列表,None 表示所有类型 + + Returns: + 各类型预览信息字典 + + Example: + >>> preview = preview_sync() + >>> print(preview) + """ + if data_types is None: + data_types = list(FINANCIAL_SYNCERS.keys()) + + previews = {} + + print("\n" + "=" * 60) + print("[Financial Sync] 同步预览") + print("=" * 60) + + for data_type in data_types: + if data_type not in FINANCIAL_SYNCERS: + continue + + preview_func = FINANCIAL_SYNCERS[data_type]["preview_func"] + previews[data_type] = preview_func() + + return previews + + +def list_financial_types() -> List[Dict]: + """列出所有支持的财务数据类型。 + + Returns: + 数据类型信息列表 + """ + return [ + { + "name": name, + "display_name": config["display_name"], + } + for name, config in FINANCIAL_SYNCERS.items() + ] + + +# 保持向后兼容的别名 +sync_all_financial = sync_financial + + +if __name__ == "__main__": + import sys + + print("=" * 60) + print("财务数据同步调度中心") + print("=" * 60) + print("\n支持的财务数据类型:") + print("-" * 60) + + for info in list_financial_types(): + print(f" - {info['name']}: {info['display_name']}") + + print("-" * 60) + print("\n使用方式:") + print(" # 同步所有财务数据") + print(" sync_financial()") + print("") + print(" # 同步指定类型") + print(' sync_financial(data_types=["income"])') + print("") + print(" # 全量同步") + print(" sync_financial(force_full=True)") + print("") + print(" # 预览") + print(" preview_sync()") + print("=" * 60) + + # 默认执行预览 + if len(sys.argv) > 1 and sys.argv[1] == "--sync": + print("\n执行同步...") + force_full = "--full" in sys.argv + sync_financial(force_full=force_full) + else: + print("\n执行预览...") + preview_sync() +``` + +**变更说明:** +1. 完全移除 `FinancialSync` 类及其 600+ 行代码 +2. 引入 `FINANCIAL_SYNCERS` 注册表模式 +3. 调度函数 `sync_financial()` 和 `preview_sync()` 仅做任务分发 +4. 保持向后兼容(`sync_all_financial` 别名) + +**验证步骤:** +1. 确认旧接口 `sync_financial()` 仍可正常工作 +2. 检查预览功能正常 +3. 验证导入路径兼容 + +--- + +## Task 4: 验证测试 + +**目标**: 确保重构后的财务数据同步功能正常。 + +### 测试步骤 + +**Step 1: 导入测试** + +```python +# 验证所有模块可以正确导入 +from src.data.api_wrappers.base_financial_sync import QuarterBasedSync +from src.data.api_wrappers.financial_data.api_income import ( + IncomeQuarterSync, + sync_income, + preview_income_sync, + get_income +) +from src.data.api_wrappers.financial_data.api_financial_sync import ( + sync_financial, + preview_sync, + list_financial_types +) + +print("所有模块导入成功") +``` + +**Step 2: 基础功能测试** + +```python +# 测试 QuarterBasedSync 基类方法 +syncer = IncomeQuarterSync() + +# 测试季度计算 +assert syncer.get_prev_quarter("20231231") == "20230930" +assert syncer.get_next_quarter("20231231") == "20240331" +assert syncer.get_prev_quarter("20240331") == "20231231" + +print("季度计算测试通过") +``` + +**Step 3: 预览功能测试** + +```python +# 测试预览功能 +preview = preview_income_sync() +print(f"Preview result: {preview}") + +assert "table_name" in preview +assert "start_quarter" in preview +assert "end_quarter" in preview +assert preview["table_name"] == "financial_income" + +print("预览功能测试通过") +``` + +**Step 4: 调度中心测试** + +```python +# 测试调度中心 +result = preview_sync() +print(f"All previews: {result}") + +types = list_financial_types() +print(f"Available types: {types}") + +print("调度中心测试通过") +``` + +**Step 5: 向后兼容测试** + +```python +# 验证旧接口仍可工作 +from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial + +# 只测试预览模式 +try: + result = sync_financial(data_types=["income"], dry_run=True) + print("向后兼容测试通过") +except Exception as e: + print(f"向后兼容测试失败: {e}") +``` + +### 验证清单 + +- [ ] 所有模块可以正确导入 +- [ ] 季度计算工具方法工作正常 +- [ ] 预览功能返回正确格式(包含 start_quarter, end_quarter) +- [ ] 同步方法始终执行(不跳过) +- [ ] 数据更新采用"先删除后插入"策略 +- [ ] TARGET_REPORT_TYPE 过滤功能正常 +- [ ] 不设置主键和唯一索引(支持财务数据多次修正) +- [ ] 调度中心可以列出所有类型 +- [ ] 旧接口保持向后兼容 +- [ ] 代码风格与现有模块一致 + +--- + +## 提交建议 + +建议按以下顺序提交,每次提交一个完整功能: + +1. **Commit 1**: Task 1 - 创建 `base_financial_sync.py` + ```bash + git add src/data/api_wrappers/base_financial_sync.py + git commit -m "feat: add QuarterBasedSync base class for financial data + + - Add abstract base class for quarter-based financial data sync + - Implement data diff detection (local vs remote comparison) + - Support incremental sync with previous quarter + - Add automatic table creation on first sync + - Add table schema and index management" + ``` + +2. **Commit 2**: Task 2 - 重构 `api_income.py` + ```bash + git add src/data/api_wrappers/financial_data/api_income.py + git commit -m "refactor: rewrite api_income.py with QuarterBasedSync + + - Replace old IncomeSync with IncomeQuarterSync + - Inherit from QuarterBasedSync for consistent patterns + - Add TABLE_SCHEMA and TABLE_INDEXES for auto table creation + - No PRIMARY_KEY to support financial data corrections + - Add convenient sync_income() and preview_income_sync() functions + - Keep get_income() for raw data access" + ``` + +3. **Commit 3**: Task 3 - 简化 `api_financial_sync.py` + ```bash + git add src/data/api_wrappers/financial_data/api_financial_sync.py + git commit -m "refactor: simplify api_financial_sync.py to scheduler only + + - Remove FinancialSync class (600+ lines) + - Add FINANCIAL_SYNCERS registry pattern + - Keep sync_financial() and preview_sync() as dispatchers + - Maintain backward compatibility" + ``` + +--- + +## 总结 + +本次重构将实现以下改进: + +1. **架构清晰**: 调度中心只负责调度,同步逻辑下沉到具体 API 文件 +2. **统一基类**: 新增 `QuarterBasedSync`,与 `StockBasedSync`/`DateBasedSync` 保持一致风格 +3. **先删除后插入**: 数据更新采用删除旧数据后插入新数据的策略,确保数据一致性 +4. **无跳过逻辑**: 财务数据同步必须每次都执行,不存在"不需要同步"的情况 +5. **报表类型过滤**: 通过 `TARGET_REPORT_TYPE` 灵活配置同步的报表类型(默认合并报表) +6. **主键设计**: 主键为 `(ts_code, end_date, report_type)`,支持财务修正数据 +7. **易于扩展**: 新增财务数据类型时只需继承 `QuarterBasedSync` 并注册到调度中心 + +**变更文件汇总:** +- 新建: `src/data/api_wrappers/base_financial_sync.py` (~680行) +- 重构: `src/data/api_wrappers/financial_data/api_income.py` (~160行) +- 简化: `src/data/api_wrappers/financial_data/api_financial_sync.py` (~150行) + +**重要变更说明:** +- **同步策略**: 财务数据同步与日线数据不同,必须每次都进行对比更新 +- **数据更新**: 采用"先删除后插入"策略,先删除旧数据再插入新数据,确保数据一致性 +- **无唯一约束**: 不设置主键和唯一索引,因为财务数据可能发生多次修正(同一股票同一季度多个版本) +- **报表过滤**: 默认只同步 `report_type='1'`(合并报表),可通过 `TARGET_REPORT_TYPE` 覆盖