Compare commits

...

3 Commits

Author SHA1 Message Date
592126c376 feat(training): 实现 train/val/test 三分法并添加训练指标可视化
- DateSplitter 支持三分法划分,修复 test 数据泄露问题
- 添加训练指标曲线绘制和100轮早停
2026-03-08 01:09:47 +08:00
85044a74c6 refactor(financial-sync): 重构财务数据同步架构
- 新增 base_financial_sync.py 基础同步抽象类
- 重构 api_financial_sync.py 简化调度逻辑
- 重命名 IncomeSync 为 IncomeQuarterSync 继承新基础类
- 增强 storage.py 支持 use_upsert 参数
- 更新 __init__.py 导出符号
2026-03-08 00:30:04 +08:00
c01bf76a3d docs: 添加财务数据同步模块重构相关文档
- 添加财务数据 API 封装规范文档 (FINANCIAL_API_SPEC.md)
  包含架构设计原则、类设计规范、同步策略、数据差异检测等

- 添加 n_income 因子生命周期分析文档
  详细追踪因子从定义到训练的全流程

- 添加财务数据同步模块重构计划文档
  明确 QuarterBasedSync 基类设计、重构任务清单

这些文档为后续财务数据同步模块重构提供完整的设计依据和实施方案
2026-03-07 22:14:04 +08:00
10 changed files with 4711 additions and 956 deletions

View File

@@ -0,0 +1,907 @@
# 财务数据 API 封装规范
> **文档版本**: v1.0
> **适用范围**: 所有财务数据 API利润表、资产负债表、现金流量表等
> **更新日期**: 2026-03-07
---
## 目录
1. [概述](#概述)
2. [架构设计原则](#架构设计原则)
3. [文件结构规范](#文件结构规范)
4. [类设计规范](#类设计规范)
5. [同步策略规范](#同步策略规范)
6. [数据差异检测](#数据差异检测)
7. [表结构设计](#表结构设计)
8. [索引设计规范](#索引设计规范)
9. [报表类型过滤](#报表类型过滤)
10. [代码示例](#代码示例)
11. [常见问题](#常见问题)
---
## 概述
### 财务数据特点
财务数据与日频数据(日线、分钟线)有本质区别:
| 特性 | 日频数据 | 财务数据 |
|------|----------|----------|
| 更新频率 | 每日更新 | 季度更新 |
| 获取方式 | 按股票循环获取 | VIP接口一次性获取全市场 |
| 数据修正 | 极少发生 | 经常发生(财报修正) |
| 数据量 | 大5000+股票×250交易日 | 小5000+股票×4季度 |
| 版本控制 | 无 | 多版本report_type |
### 核心要求
1. **必须实现 `QuarterBasedSync` 基类**:所有财务数据同步必须继承此基类
2. **不设置唯一约束**:不创建主键和唯一索引,支持数据多次修正
3. **先删除后插入**:数据更新采用删除旧数据再插入新数据的策略
4. **无跳过逻辑**:财务数据必须每次都进行对比更新
5. **报表类型过滤**:默认只同步合并报表,支持灵活配置
---
## 架构设计原则
### 1. 职责分离
```
调度中心 (api_financial_sync.py)
|
v
各 API 文件 (api_income.py, api_balance.py, api_cashflow.py)
|
v
基类 (base_financial_sync.py - QuarterBasedSync)
|
v
存储层 (storage.py - ThreadSafeStorage)
```
**规范**:
- 调度中心只负责任务协调,不包含具体同步逻辑
- 各 API 文件实现具体的 `fetch_single_quarter()` 方法
- 通用逻辑下沉到 `QuarterBasedSync` 基类
### 2. 统一继承
**必须**继承 `QuarterBasedSync` 基类:
```python
from src.data.api_wrappers.base_financial_sync import QuarterBasedSync
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。"""
pass
```
**禁止**在 API 文件中重复实现同步逻辑。
---
## 文件结构规范
### 文件位置
财务数据 API 文件必须位于:
```
src/data/api_wrappers/financial_data/
├── __init__.py # 可选:导出公共接口
├── api_income.py # 利润表接口(已实现)
├── api_balance.py # 资产负债表接口(预留)
├── api_cashflow.py # 现金流量表接口(预留)
└── api_financial_sync.py # 调度中心(只保留调度逻辑)
```
### 基类位置
```
src/data/api_wrappers/
├── base_sync.py # StockBasedSync, DateBasedSync
└── base_financial_sync.py # QuarterBasedSync本规范核心
```
### 文件内容结构
每个 API 文件必须包含以下部分(按顺序):
```python
"""模块文档字符串。
包含:模块用途、使用方式、注意事项。
"""
# 1. 标准库导入
from typing import Optional
import pandas as pd
# 2. 第三方库导入
# (无特殊要求)
# 3. 本地模块导入
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync
)
# 4. 同步类实现
class XXXQuarterSync(QuarterBasedSync):
"""具体财务数据同步实现类。"""
pass
# 5. 便捷函数
def sync_xxx(force_full: bool = False, dry_run: bool = False) -> list:
"""同步数据便捷函数。"""
pass
def preview_xxx_sync() -> dict:
"""预览同步信息便捷函数。"""
pass
# 6. 原始数据接口(可选,向后兼容)
def get_xxx(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取原始数据接口。"""
pass
```
---
## 类设计规范
### 类命名规范
**必须**遵循以下命名模式:
```python
# 格式: {DataType}QuarterSync
# 正确示例
IncomeQuarterSync # 利润表
BalanceQuarterSync # 资产负债表
CashflowQuarterSync # 现金流量表
# 错误示例
IncomeSync # 缺少 Quarter不统一
SyncIncome # 动词开头,不符合类命名规范
```
### 必须覆盖的类属性
子类**必须**定义以下类属性:
```python
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。"""
# 1. 表名(必须)
table_name = "financial_income"
# 2. API 接口名(必须)
api_name = "income_vip"
# 3. 目标报表类型(可选,默认 "1"
TARGET_REPORT_TYPE = "1"
# 4. 表结构定义(必须)
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
# ... 其他字段
}
# 5. 索引定义(必须)
TABLE_INDEXES = [
("idx_financial_income_ts_code", ["ts_code"]),
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
```
### 必须实现的抽象方法
子类**必须**实现以下抽象方法:
```python
@abstractmethod
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司财务数据的 DataFrame
注意:
- 使用 VIP 接口(如 income_vip
- 不要在此方法中过滤 report_type基类会统一处理
- 返回的 DataFrame 必须包含 ts_code 和 end_date 列
"""
params = {"period": period}
return self.client.query(self.api_name, **params)
```
### 禁止的操作
子类**禁止**覆盖或修改以下方法:
```python
# 基类核心方法,禁止覆盖
- sync_quarter() # 单季度同步流程
- sync_range() # 范围同步
- sync_incremental() # 增量同步
- sync_full() # 全量同步
- delete_stock_quarter_data() # 删除数据
- compare_and_find_differences() # 差异检测
- ensure_table_exists() # 建表逻辑
```
---
## 同步策略规范
### 增量同步策略
**规范**: 财务数据同步**必须**每次都执行,不存在"跳过"的情况。
**原因**: 财务数据可能会被修正,即使本地已有数据,也需要重新对比更新。
**流程**:
```python
def sync_incremental(self, dry_run: bool = False) -> List[Dict]:
"""增量同步流程。
注意:财务数据必须每次都进行对比更新,因为数据可能被修正。
"""
# 1. 确保表存在(首次同步时自动建表)
self.ensure_table_exists()
# 2. 获取本地最新季度
latest_quarter = self._get_latest_quarter()
# 3. 获取当前季度
current_quarter = self.get_current_quarter()
# 4. 确定同步范围(不检查是否需要同步,直接执行)
# 注意:即使 latest_quarter >= current_quarter也要执行
start_quarter = self.get_prev_quarter(latest_quarter)
# 5. 执行同步
return self.sync_range(start_quarter, current_quarter, dry_run)
```
### 全量同步策略
**规范**: 全量同步从默认起始日期2018Q1同步到当前季度。
**流程**:
```python
def sync_full(self, dry_run: bool = False) -> List[Dict]:
"""全量同步流程。"""
# 1. 创建表结构(如不存在)
self.ensure_table_exists()
# 2. 清空表(可选,根据需求决定)
# self.storage.clear_table(self.table_name)
# 3. 获取同步范围
start_quarter = self.DEFAULT_START_DATE # "20180331"
end_quarter = self.get_current_quarter()
# 4. 执行同步
return self.sync_range(start_quarter, end_quarter, dry_run)
```
### 单季度同步策略
**规范**: 单季度同步采用"先删除后插入"策略。
**流程**:
```python
def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
"""单季度同步流程(核心)。"""
# 1. 获取远程数据
remote_df = self.fetch_single_quarter(period)
# 2. 根据 TARGET_REPORT_TYPE 过滤报表类型
if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
# 3. 对比找出差异股票
diff_df, stats_df = self.compare_and_find_differences(remote_df, period)
# 4. 执行同步(先删除后插入)
if not dry_run and not diff_df.empty:
diff_stocks = list(diff_df['ts_code'].unique())
# 4.1 删除差异股票的旧数据
self.delete_stock_quarter_data(period, diff_stocks)
# 4.2 插入新数据
self.storage.queue_save(self.table_name, diff_df)
self.storage.flush()
return {...}
```
**重要**: 禁止使用 UPSERTINSERT OR REPLACE必须使用"先删除后插入"。
---
## 数据差异检测
### 检测逻辑
**规范**: 按股票级别对比本地与远程数据量,识别差异。
**算法**:
```python
def compare_and_find_differences(
self,
remote_df: pd.DataFrame,
period: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""对比远程数据与本地数据,找出差异。
逻辑:
1. 统计远程数据中每只股票的数据量
2. 查询本地数据库中该季度每只股票的数据量
3. 对比找出差异股票(新增或数据量不一致)
4. 返回需要插入的差异数据
注意:主键为 (ts_code, end_date, report_type),但差异检测按股票级别进行。
如果某股票的记录总数不一致,则更新该股票的所有记录。
"""
# 1. 统计远程数据中每只股票的数据量
remote_counts = remote_df.groupby('ts_code').size().to_dict()
# 2. 获取本地数据量(按股票汇总)
local_counts = self.get_local_data_count_by_stock(period)
# 3. 对比找出差异
diff_stocks = []
stats = []
for ts_code, remote_count in remote_counts.items():
local_count = local_counts.get(ts_code, 0)
if local_count == 0:
status = "new" # 本地不存在
diff_stocks.append(ts_code)
elif local_count != remote_count:
status = "modified" # 数据量不一致,可能包含修正
diff_stocks.append(ts_code)
else:
status = "same" # 数据量一致
stats.append({
'ts_code': ts_code,
'remote_count': remote_count,
'local_count': local_count,
'status': status
})
# 4. 提取差异数据
diff_df = remote_df[remote_df['ts_code'].isin(diff_stocks)].copy()
stats_df = pd.DataFrame(stats)
return diff_df, stats_df
```
### 删除策略
**规范**: 删除指定季度和指定股票的所有数据。
```python
def delete_stock_quarter_data(
self,
period: str,
ts_codes: Optional[List[str]] = None
) -> int:
"""删除指定季度和股票的数据。
Args:
period: 季度字符串 (YYYYMMDD)
ts_codes: 股票代码列表None 表示删除该季度所有数据
Returns:
删除的记录数
"""
if ts_codes:
# 删除指定股票的数据
placeholders = ', '.join(['?' for _ in ts_codes])
query = f'''
DELETE FROM "{self.table_name}"
WHERE end_date = ? AND ts_code IN ({placeholders})
'''
result = storage._connection.execute(query, [period] + ts_codes)
else:
# 删除整个季度的数据
query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?'
result = storage._connection.execute(query, [period])
return result.rowcount
```
---
## 表结构设计
### 必备字段
财务数据表**必须**包含以下字段:
```python
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL", # 股票代码
"end_date": "DATE NOT NULL", # 报告期(季度最后一天)
"report_type": "INTEGER", # 报表类型
"ann_date": "DATE", # 公告日期(可选)
# ... 其他业务字段
}
```
### 字段命名规范
遵循 Tushare API 返回的字段名,保持与原 API 一致。
**正确示例**:
```python
"basic_eps": "DOUBLE", # 基本每股收益
"total_revenue": "DOUBLE", # 营业总收入
"operate_profit": "DOUBLE", # 营业利润
```
**错误示例**:
```python
"basicEPS": "DOUBLE", # 驼峰命名,不符合
"basic_eps_value": "DOUBLE", # 添加多余后缀
"eps_basic": "DOUBLE", # 词序颠倒
```
### 数据类型规范
| Tushare 类型 | DuckDB 类型 |
|--------------|-------------|
| str | VARCHAR(n) |
| float | DOUBLE |
| int | INTEGER |
| date | DATE |
**示例**:
```python
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"report_type": "INTEGER",
"basic_eps": "DOUBLE",
}
```
---
## 索引设计规范
### 禁止唯一索引
**严格禁止**创建主键和唯一索引:
```python
# 禁止创建主键
PRIMARY_KEY = ("ts_code", "end_date", "report_type") # 错误!
# 禁止创建唯一索引
("idx_unique", ["ts_code", "end_date"], True) # 错误!
CREATE UNIQUE INDEX ... # 错误!
```
**原因**: 财务数据可能发生多次修正,同一支股票在同一季度可能有多个版本(不同的 `ann_date`),设置唯一约束会导致插入失败。
### 推荐索引
**必须**创建以下索引:
```python
TABLE_INDEXES = [
# 1. 股票代码索引(单字段查询)
("idx_financial_income_ts_code", ["ts_code"]),
# 2. 报告期索引(时间范围查询)
("idx_financial_income_end_date", ["end_date"]),
# 3. 复合索引(股票+报告期+报表类型,最常用)
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
```
### 索引命名规范
索引名**必须**遵循以下格式:
```python
# 格式: idx_{table_name}_{fields_description}
# 正确示例
"idx_financial_income_ts_code"
"idx_financial_income_end_date"
"idx_financial_income_ts_period"
# 错误示例
"ts_code_idx" # 缺少表名前缀
"income_ts" # 表名缩写不明确
"index_1" # 无意义名称
```
---
## 报表类型过滤
### 默认行为
**规范**: 默认只同步合并报表(`report_type='1'`)。
```python
class QuarterBasedSync(ABC):
# 目标报表类型(子类可覆盖)
# 默认只同步合并报表report_type='1'
# 设为 None 则同步所有报表类型
TARGET_REPORT_TYPE: Optional[str] = "1"
```
### 覆盖方式
子类可以通过覆盖类属性来修改默认行为:
```python
class IncomeQuarterSync(QuarterBasedSync):
"""利润表同步 - 只同步合并报表。"""
TARGET_REPORT_TYPE = "1" # 明确指定
class BalanceQuarterSync(QuarterBasedSync):
"""资产负债表同步 - 同步所有报表类型。"""
TARGET_REPORT_TYPE = None # 不过滤
```
### 过滤逻辑
过滤逻辑在基类中统一处理:
```python
def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
# 1. 获取远程数据
remote_df = self.fetch_single_quarter(period)
# 2. 根据 TARGET_REPORT_TYPE 过滤
if self.TARGET_REPORT_TYPE and 'report_type' in remote_df.columns:
remote_df = remote_df[remote_df['report_type'] == self.TARGET_REPORT_TYPE]
# ... 后续处理
```
### 报表类型说明
根据 Tushare 文档:
| 代码 | 类型 | 说明 |
|------|------|------|
| 1 | 合并报表 | 上市公司最新报表(默认)|
| 2 | 单季合并 | 单一季度的合并报表 |
| 3 | 调整单季合并表 | 调整后的单季合并报表 |
| 4 | 调整合并报表 | 本年度公布上年同期的财务报表数据 |
| 5 | 调整前合并报表 | 数据发生变更,将原数据进行保留 |
| 6 | 母公司报表 | 该公司母公司的财务报表数据 |
| ... | ... | ... |
---
## 代码示例
### 完整实现示例:利润表接口
```python
"""利润表数据接口 (VIP 版本)
使用 Tushare VIP 接口 (income_vip) 获取利润表数据。
按季度同步,一次请求获取一个季度的全部上市公司数据。
使用方式:
from src.data.api_wrappers.financial_data.api_income import (
IncomeQuarterSync,
sync_income,
preview_income_sync
)
# 方式1: 使用类
syncer = IncomeQuarterSync()
syncer.sync_incremental() # 增量同步
syncer.sync_full() # 全量同步
# 方式2: 使用便捷函数
sync_income() # 增量同步
sync_income(force_full=True) # 全量同步
"""
from typing import Optional
import pandas as pd
from src.data.client import TushareClient
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync
)
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。
使用 income_vip 接口按季度获取全部上市公司利润表数据。
表结构: financial_income
注意: 不设置主键和唯一索引,支持财务数据多次修正
"""
table_name = "financial_income"
api_name = "income_vip"
# 只同步合并报表
TARGET_REPORT_TYPE = "1"
# 表结构定义
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"f_ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
"comp_type": "INTEGER",
"basic_eps": "DOUBLE",
"diluted_eps": "DOUBLE",
"total_revenue": "DOUBLE",
"revenue": "DOUBLE",
# ... 其他字段
}
# 普通索引(不要创建唯一索引)
TABLE_INDEXES = [
("idx_financial_income_ts_code", ["ts_code"]),
("idx_financial_income_end_date", ["end_date"]),
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司利润表数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司利润表数据的 DataFrame
"""
params = {"period": period}
return self.client.query(self.api_name, **params)
# =============================================================================
# 便捷函数
# =============================================================================
def sync_income(
force_full: bool = False,
dry_run: bool = False,
) -> list:
"""同步利润表数据(便捷函数)。
Args:
force_full: 若为 True强制全量同步
dry_run: 若为 True仅预览不写入
Returns:
同步结果列表
"""
return sync_financial_data(IncomeQuarterSync, force_full, dry_run)
def preview_income_sync() -> dict:
"""预览利润表同步信息。
Returns:
预览信息字典
"""
return preview_financial_sync(IncomeQuarterSync)
def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取利润表数据(原始接口,单季度)。
用于直接获取某个季度的数据,不进行同步管理。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
fields: 指定返回字段,默认返回全部字段
Returns:
包含利润表数据的 DataFrame
"""
client = TushareClient()
if fields is None:
fields = "ts_code,ann_date,end_date,report_type,basic_eps,..."
return client.query("income_vip", period=period, fields=fields)
```
### 预留接口示例
```python
"""资产负债表数据接口 (VIP 版本) - 预留
使用 Tushare VIP 接口 (balancesheet_vip) 获取资产负债表数据。
"""
from typing import Optional
import pandas as pd
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync
)
class BalanceQuarterSync(QuarterBasedSync):
"""资产负债表季度同步实现(预留)。"""
table_name = "financial_balance"
api_name = "balancesheet_vip"
TARGET_REPORT_TYPE = "1"
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
# TODO: 补充完整字段定义
}
TABLE_INDEXES = [
("idx_financial_balance_ts_code", ["ts_code"]),
("idx_financial_balance_end_date", ["end_date"]),
("idx_financial_balance_ts_period", ["ts_code", "end_date", "report_type"]),
]
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""预留方法,尚未实现。"""
raise NotImplementedError(
"资产负债表同步尚未实现。需要 Tushare 5000 积分调用 balancesheet_vip 接口。"
)
def sync_balance(force_full: bool = False, dry_run: bool = False) -> list:
"""预留函数。"""
raise NotImplementedError("资产负债表同步尚未实现")
def preview_balance_sync() -> dict:
"""预留函数。"""
raise NotImplementedError("资产负债表同步尚未实现")
```
---
## 常见问题
### Q1: 为什么不设置主键和唯一索引?
**A**: 财务数据可能发生多次修正。例如:
```
第一次发布600000.SH, 20240331, report_type='1', ann_date='20240428'
第二次修正600000.SH, 20240331, report_type='1', ann_date='20240515'
```
如果设置了唯一约束(如 `PRIMARY KEY (ts_code, end_date, report_type)`),第二次插入会失败。因此采用"先删除后插入"策略,不设置唯一约束。
### Q2: 为什么增量同步不跳过已同步的季度?
**A**: 财务数据与日频数据不同,可能会被修正。即使本地已有某季度的数据,也需要重新获取远程数据进行对比,确保数据完整性。
### Q3: 为什么要获取前一季度?
**A**: 财务数据修正可能发生在发布后的一段时间内。获取前一季度可以捕获上一季度可能发生的修正数据。
**示例**:
```
当前日期: 2024-04-15
当前季度: 2024Q1 (20240331)
本地最新: 2023Q4 (20231231)
同步范围: 2023Q3 (20230930) -> 2024Q1 (20240331)
(包含前一季度以确保数据完整性)
```
### Q4: 如何支持其他报表类型?
**A**: 覆盖 `TARGET_REPORT_TYPE` 类属性:
```python
class IncomeAllReportSync(QuarterBasedSync):
"""同步所有报表类型。"""
TARGET_REPORT_TYPE = None # 不过滤
```
或者同步特定类型:
```python
class IncomeParentReportSync(QuarterBasedSync):
"""只同步母公司报表。"""
TARGET_REPORT_TYPE = "6" # 母公司报表
```
### Q5: 如何处理字段变更?
**A**: 如果 Tushare API 字段发生变更:
1. 更新 `TABLE_SCHEMA` 添加新字段
2. 使用 `ALTER TABLE` 添加新列(对已存在的数据)
3. 更新 `fetch_single_quarter()` 中的字段列表(如果使用了 `fields` 参数)
**注意**: 不要删除已有字段,保持向后兼容。
### Q6: 如何调试同步问题?
**A**: 使用 `dry_run=True` 进行预览:
```python
from src.data.api_wrappers.financial_data.api_income import sync_income
# 预览同步,不写入数据
result = sync_income(dry_run=True)
print(result)
```
查看日志输出,检查:
- 远程数据量
- 本地数据量
- 差异股票列表
- 删除/插入记录数
---
## 附录
### 相关文档
- [财务数据 API 说明](financial_api.md) - Tushare 财务数据接口说明
- [通用 API 接口规范](API_INTERFACE_SPEC.md) - 通用接口规范
- [数据同步重构计划](../plan/2026-03-07-financial-sync-refactor.md) - 本次重构的详细计划
### 相关代码
- `src/data/api_wrappers/base_financial_sync.py` - QuarterBasedSync 基类
- `src/data/api_wrappers/financial_data/api_income.py` - 利润表示例实现
- `src/data/api_wrappers/financial_data/api_financial_sync.py` - 调度中心
### 变更历史
| 日期 | 版本 | 变更内容 |
|------|------|----------|
| 2026-03-07 | v1.0 | 初始版本,规范财务数据 API 封装要求 |
---
**注意**: 本文档为强制性规范,所有财务数据 API 封装必须遵循。如有特殊情况需要例外,需经过架构评审。

View File

@@ -0,0 +1,600 @@
# n_income 因子生命周期分析
## 概述
本文档详细分析 `src/experiment/regression.py``n_income` 因子的完整生命周期,从字符串定义到最终参与模型训练的全过程。
## 因子定义
`src/experiment/regression.py` 第 55 行定义:
```python
FACTOR_DEFINITIONS = {
# ... 其他因子 ...
"n_income": "n_income",
}
```
`n_income` 是一个简单符号表达式,代表**净利润**Net Income财务指标。这是一个**点-in-timePIT数据**,需要从财务报表中获取。
---
## 第一阶段:因子注册
### 1.1 注册入口
**位置**: `src/experiment/regression.py:134`
```python
def create_factors_with_strings(engine: FactorEngine) -> List[str]:
for name, expr in FACTOR_DEFINITIONS.items():
engine.add_factor(name, expr) # 注册 n_income
```
### 1.2 add_factor 实现
**位置**: `src/factors/engine/factor_engine.py:62-86`
```python
def add_factor(
self,
name: str,
expr: Union[str, Node],
) -> "FactorEngine":
"""注册一个因子表达式"""
# 如果是字符串,先解析为 Node
if isinstance(expr, str):
expr = self.parser.parse(expr) # 关键步骤
# 创建执行计划
plan = self.planner.create_plan(expr, name)
# 缓存执行计划
self._factor_plans[name] = plan
return self
```
对于 `n_income = "n_income"`,流程如下:
1. 检测到是字符串,调用 `parser.parse("n_income")`
2. 解析结果为 `Symbol("n_income")` 节点
3. 通过 `planner.create_plan()` 生成执行计划
4. 缓存计划供后续计算使用
---
## 第二阶段:表达式解析
### 2.1 解析器入口
**位置**: `src/factors/parser.py:38-52`
```python
def parse(self, formula: str) -> Node:
"""解析因子表达式字符串为 Node"""
tree = ast.parse(formula, mode='eval')
return self._visit(tree.body)
```
### 2.2 符号节点处理
**位置**: `src/factors/parser.py:96-99`
```python
def _visit_Name(self, node: ast.Name) -> Symbol:
"""处理名称节点(如变量名 n_income"""
return Symbol(node.id)
```
对于 `"n_income"` 字符串:
1. `ast.parse("n_income", mode='eval')` 生成 AST
2. AST 节点类型为 `ast.Name`id 为 `"n_income"`
3. 调用 `_visit_Name()` 创建 `Symbol("n_income")`
### 2.3 Symbol 节点定义
**位置**: `src/factors/dsl.py:82-102`
```python
@dataclass
class Symbol(Node):
"""符号节点,代表命名变量"""
name: str
def dependencies(self) -> Set[str]:
return {self.name}
```
`Symbol("n_income")` 的依赖集合为 `{"n_income"}`
---
## 第三阶段:执行计划生成
### 3.1 计划器入口
**位置**: `src/factors/engine/planner.py:40-74`
```python
def create_plan(
self,
node: Node,
output_name: Optional[str] = None,
) -> ExecutionPlan:
"""创建完整执行计划"""
# 1. 提取依赖
deps = self.extractor.extract_dependencies(node)
# 2. 翻译为 Polars 表达式
polars_expr = self.translator.translate(node)
# 3. 推导数据规格
data_specs = self._infer_data_specs(deps, schema_cache=self.schema_cache)
return ExecutionPlan(
data_specs=data_specs,
polars_expr=polars_expr,
dependencies=deps,
output_name=output_name,
)
```
### 3.2 提取依赖
**位置**: `src/factors/compiler.py:19-32`
```python
def extract_dependencies(self, node: Node) -> Set[str]:
"""从 AST 提取所有依赖的原始字段名"""
deps: Set[str] = set()
self._extract(node, deps)
return deps
def _extract(self, node: Node, deps: Set[str]):
if isinstance(node, Symbol):
deps.add(node.name) # n_income 被加入依赖集
# ... 其他节点类型处理
```
对于 `Symbol("n_income")`,提取的依赖为 `{"n_income"}`
### 3.3 推导数据规格
**位置**: `src/factors/engine/planner.py:86-148`
```python
def _infer_data_specs(
self,
dependencies: Set[str],
schema_cache: SchemaCache,
) -> List[DataSpec]:
"""推导数据规格"""
table_fields: Dict[str, List[str]] = defaultdict(list)
for field in dependencies:
# 使用 SchemaCache 查找字段所属表
table = schema_cache.get_table_for_field(field)
table_fields[table].append(field)
# 为每张表创建 DataSpec
for table, fields in table_fields.items():
if schema_cache.is_pit_table(table): # PIT 表(财务数据)
spec = DataSpec(
table=table,
columns=fields,
join_type="asof_backward",
left_on="trade_date",
right_on="f_ann_date",
)
else: # 普通表(行情数据)
spec = DataSpec(
table=table,
columns=fields,
join_type="standard",
)
specs.append(spec)
```
对于 `n_income`
1. `schema_cache.get_table_for_field("n_income")` 返回 `"financial_income"`
2. `schema_cache.is_pit_table("financial_income")` 返回 `True`
3. 生成 DataSpec
- `table="financial_income"`
- `columns=["n_income"]`
- `join_type="asof_backward"`
- `left_on="trade_date"`
- `right_on="f_ann_date"`
### 3.4 SchemaCache 实现
**位置**: `src/data/catalog.py`
```python
class SchemaCache:
"""缓存数据库表结构信息,提供字段到表的映射"""
def get_table_for_field(self, field: str) -> Optional[str]:
"""根据字段名获取表名"""
return self._field_to_table.get(field)
def is_pit_table(self, table: str) -> bool:
"""判断是否为 PITPoint-in-Time"""
info = self._table_info.get(table)
if info and info.table_type == TableType.PIT:
return True
return False
```
表类型识别逻辑:
- **PIT 表**: 包含 `ann_date``f_ann_date` 字段(财务数据表)
- **DAILY 表**: 包含 `trade_date` 字段(行情数据表)
---
## 第四阶段:数据获取与拼接
### 4.1 compute() 执行入口
**位置**: `src/factors/engine/factor_engine.py:88-120`
```python
def compute(
self,
factor_names: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""计算指定因子"""
# 1. 获取所有需要的执行计划
plans = [self._factor_plans[name] for name in factor_names]
# 2. 合并数据规格(去重)
merged_specs = self._merge_data_specs(plans)
# 3. 从路由器获取核心宽表
core_wide = self.router.fetch_data(
specs=merged_specs,
start_date=start_date,
end_date=end_date,
)
# 4. 执行计算
result = self._execute_with_dependencies(factor_names, core_wide)
return result
```
### 4.2 数据路由器 fetch_data
**位置**: `src/factors/engine/data_router.py:48-100`
```python
def fetch_data(
self,
specs: List[DataSpec],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""根据 DataSpec 列表获取并组装数据"""
# 分离标准表和 asof 表
standard_specs = [s for s in specs if s.join_type == "standard"]
asof_specs = [s for s in specs if s.join_type == "asof_backward"]
# 加载标准表(行情数据)
standard_frames = []
for spec in standard_specs:
df = self._load_table_from_spec(spec, start_date, end_date)
standard_frames.append(df)
# 以第一张标准表为基础(通常是 daily
base_df = standard_frames[0] if standard_frames else None
# 使用 FinancialLoader 加载和拼接财务数据
if asof_specs and base_df is not None:
for spec in asof_specs:
financial_df = self.financial_loader.load_financial_data(
table_name=spec.table,
columns=spec.columns,
start_date=start_date,
end_date=end_date,
)
base_df = self.financial_loader.merge_financial_with_price(
price_df=base_df,
financial_df=financial_df,
date_col="trade_date",
f_ann_col="f_ann_date",
)
return base_df
```
### 4.3 财务数据加载
**位置**: `src/data/financial_loader.py:26-83`
```python
def load_financial_data(
self,
table_name: str,
columns: List[str],
start_date: str,
end_date: str,
) -> pl.DataFrame:
"""从数据库加载并清洗财务数据"""
# 计算包含回看期的日期范围默认1年
adjusted_start = self.get_date_range_with_lookback(start_date, lookback_days=365)
# 从数据库查询
query = f"""
SELECT ts_code, f_ann_date, end_date, {', '.join(columns)}
FROM {table_name}
WHERE f_ann_date >= '{adjusted_start}'
AND f_ann_date <= '{end_date}'
ORDER BY ts_code, f_ann_date
"""
df = self.conn.execute(query).fetchdf()
df = pl.DataFrame(df)
# 数据清洗:仅保留 report_type==1合并报表
if "report_type" in df.columns:
df = df.filter(pl.col("report_type") == 1)
# 去重:按 (ts_code, end_date) 取 update_flag 最大的记录
if "update_flag" in df.columns:
df = (
df.sort(["ts_code", "end_date", "update_flag"], descending=[False, False, True])
.unique(subset=["ts_code", "end_date"], keep="first")
)
return df
```
### 4.4 财务数据与行情数据拼接
**位置**: `src/data/financial_loader.py:85-136`
```python
def merge_financial_with_price(
self,
price_df: pl.DataFrame,
financial_df: pl.DataFrame,
date_col: str = "trade_date",
f_ann_col: str = "f_ann_date",
) -> pl.DataFrame:
"""使用 asof join 将财务数据拼接到行情数据"""
# 确保日期格式正确
price_df = price_df.with_columns(pl.col(date_col).cast(pl.Date))
financial_df = financial_df.with_columns(pl.col(f_ann_col).cast(pl.Date))
# 使用 join_asof 进行 PIT 对齐
# strategy='backward': 对于每个 trade_date找 f_ann_date <= trade_date 的最新财务数据
result = price_df.join_asof(
financial_df,
left_on=date_col,
right_on=f_ann_col,
by="ts_code", # 按股票代码分组
strategy="backward", # 向后查找最新公告
)
return result
```
**拼接逻辑详解**
| trade_date | ts_code | close | n_income (拼接后) | 来源 f_ann_date |
|-----------|---------|-------|------------------|-----------------|
| 2024-01-15 | 000001.SZ | 10.5 | 1,000,000 | 2024-01-10 |
| 2024-01-16 | 000001.SZ | 10.6 | 1,000,000 | 2024-01-10 |
| 2024-01-20 | 000001.SZ | 10.8 | 1,200,000 | 2024-01-18 |
上表展示了 `n_income` 的 PIT 拼接过程:
- 1月15日使用前一次公告1月10日的净利润数据
- 1月16日继续使用1月10日的数据无新公告
- 1月20日使用最新公告1月18日的净利润数据
---
## 第五阶段:因子计算
### 5.1 执行计算
**位置**: `src/factors/engine/factor_engine.py:188-227`
```python
def _execute_with_dependencies(
self,
factor_names: List[str],
core_wide: pl.DataFrame,
) -> pl.DataFrame:
"""按依赖顺序执行因子计算"""
# 拓扑排序确定计算顺序
sorted_factors = self._topological_sort(factor_names)
# 创建结果 DataFrame
result_exprs = []
for name in sorted_factors:
plan = self._factor_plans[name]
# 执行 Polars 表达式
expr = plan.polars_expr.alias(name)
result_exprs.append(expr)
# 一次性执行所有表达式
result = core_wide.with_columns(result_exprs)
return result
```
### 5.2 翻译为 Polars 表达式
**位置**: `src/factors/translator.py`
```python
def translate(self, node: Node) -> pl.Expr:
"""将 DSL 节点翻译为 Polars 表达式"""
if isinstance(node, Symbol):
# Symbol 直接转为 pl.col()
return pl.col(node.name)
# ... 其他节点类型
```
对于 `Symbol("n_income")`,翻译结果为 `pl.col("n_income")`
### 5.3 n_income 计算
由于 `n_income` 是简单符号(无运算),计算过程:
```python
# core_wide 已经包含 n_income 列(从财务表拼接而来)
result = core_wide.with_columns([
pl.col("n_income").alias("n_income") # 直接引用
])
```
实际上,`n_income` 的值在数据拼接阶段已经确定,计算阶段只是确认输出列名。
---
## 第六阶段:参与模型训练
### 6.1 数据流回到训练流程
**位置**: `src/experiment/regression.py:176-183`
```python
def prepare_data(...) -> pl.DataFrame:
factor_names = feature_cols + ["return_5"] # 包含 n_income
data = engine.compute(
factor_names=factor_names,
start_date=start_date,
end_date=end_date,
)
return data
```
### 6.2 完整数据流
```
┌─────────────────────────────────────────────────────────────────────┐
│ n_income 因子生命周期 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 定义阶段 │
│ regression.py:55 "n_income": "n_income" │
│ ↓ │
│ 2. 注册阶段 (FactorEngine.add_factor) │
│ parser.parse("n_income") → Symbol("n_income") │
│ ↓ │
│ 3. 计划阶段 (ExecutionPlanner.create_plan) │
│ extract_dependencies() → {"n_income"} │
│ _infer_data_specs() → DataSpec(financial_income, asof_backward) │
│ ↓ │
│ 4. 数据获取阶段 (DataRouter.fetch_data) │
│ FinancialLoader.load_financial_data() │
│ 从 financial_income 表读取 n_income 字段 │
│ ↓ │
│ 5. 数据拼接阶段 (FinancialLoader.merge_financial_with_price) │
│ join_asof(strategy='backward') │
│ 按 trade_date 和 f_ann_date 对齐 │
│ ↓ │
│ 6. 计算阶段 (FactorEngine._execute_with_dependencies) │
│ pl.col("n_income") → 输出到结果 DataFrame │
│ ↓ │
│ 7. 训练阶段 │
│ 作为特征列传入 LightGBM 模型 │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
---
## 关键技术点总结
### 1. PIT 数据处理
财务数据是**低频、公告驱动**的数据,与**高频、连续**的行情数据不同。系统使用 **asof_backward** 策略处理:
- **asof**: As-Of表示截至某个时点的有效数据
- **backward**: 向后查找,确保使用最新公告的数据
- **关键约束**: 不能使用未来数据lookahead bias
### 2. SchemaCache 动态路由
系统自动识别字段所属表,无需手动指定:
```python
# 系统自动识别
n_income financial_income (PIT)
close daily (DAILY)
```
### 3. 财务数据清洗
`FinancialLoader` 自动处理:
- **报告类型过滤**: 仅使用合并报表report_type=1
- **去重策略**: 按 (ts_code, end_date) 取最新修订版update_flag 最大)
- **日期对齐**: 使用公告日f_ann_date而非报告期end_date
### 4. 扩展性设计
添加新的财务因子只需在字典中添加一行:
```python
FACTOR_DEFINITIONS = {
"n_income": "n_income",
"revenue": "revenue", # 营业收入
"eps": "eps", # 每股收益
"roe": "roe", # 净资产收益率
}
```
系统自动处理表路由、数据获取和拼接。
---
## 相关代码文件
| 文件 | 职责 |
|------|------|
| `src/experiment/regression.py` | 训练入口,因子定义 |
| `src/factors/engine/factor_engine.py` | 因子引擎统一入口 |
| `src/factors/parser.py` | 字符串表达式解析 |
| `src/factors/compiler.py` | AST 依赖提取 |
| `src/factors/engine/planner.py` | 执行计划生成 |
| `src/factors/engine/data_router.py` | 数据路由与组装 |
| `src/data/financial_loader.py` | 财务数据加载与拼接 |
| `src/data/catalog.py` | 数据库目录与表结构 |
| `src/data/api_wrappers/financial_data/api_income.py` | 利润表数据接口 |
---
## 附录:数据表结构
### financial_income利润表
```sql
CREATE TABLE financial_income (
ts_code VARCHAR, -- 股票代码
f_ann_date DATE, -- 公告日期PIT关键字段
end_date DATE, -- 报告期
report_type INTEGER, -- 报告类型1=合并报表)
update_flag INTEGER, -- 更新标识(越大越新)
n_income BIGINT, -- 净利润(本因子使用的字段)
revenue BIGINT, -- 营业收入
... -- 其他财务字段
);
```
### daily日线行情
```sql
CREATE TABLE daily (
ts_code VARCHAR, -- 股票代码
trade_date DATE, -- 交易日期
open DOUBLE, -- 开盘价
high DOUBLE, -- 最高价
low DOUBLE, -- 最低价
close DOUBLE, -- 收盘价
vol BIGINT, -- 成交量
... -- 其他行情字段
);
```

File diff suppressed because it is too large Load Diff

View File

@@ -47,7 +47,8 @@ from src.data.api_wrappers.api_pro_bar import (
from src.data.api_wrappers.financial_data.api_income import (
get_income,
sync_income,
IncomeSync,
preview_income_sync,
IncomeQuarterSync,
)
from src.data.api_wrappers.api_bak_basic import get_bak_basic, sync_bak_basic
from src.data.api_wrappers.api_namechange import get_namechange, sync_namechange
@@ -84,7 +85,8 @@ __all__ = [
# Income statement
"get_income",
"sync_income",
"IncomeSync",
"preview_income_sync",
"IncomeQuarterSync",
# Historical stock list
"get_bak_basic",
"sync_bak_basic",

View File

@@ -0,0 +1,756 @@
"""财务数据同步基础抽象模块。
提供专门用于按季度同步财务数据的基类 QuarterBasedSync。
财务数据特点:
- 按季度发布period: 20231231, 20230930, 20230630, 20230331
- 使用 VIP 接口一次性获取某季度的全部上市公司数据
- 数据可能会修正,增量同步需获取当前季度+前一季度
- 主键为 (ts_code, end_date)
使用方式:
class IncomeQuarterSync(QuarterBasedSync):
table_name = "financial_income"
api_name = "income_vip"
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
# 实现单季度数据获取
...
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, List, Tuple, Set
from datetime import datetime
import pandas as pd
from tqdm import tqdm
from src.data.client import TushareClient
from src.data.storage import ThreadSafeStorage, Storage
from src.data.utils import get_today_date, get_quarters_in_range, DEFAULT_START_DATE
class QuarterBasedSync(ABC):
"""财务数据季度同步抽象基类。
专门处理按季度同步的财务数据(利润表、资产负债表、现金流量表)。
财务数据同步特点:
1. 按季度获取:使用 VIP 接口一次性获取某季度全部上市公司数据
2. 数据可修正:同一季度数据可能被更新,增量同步需获取当前季度+前一季度
3. 差异检测:需对比本地与远程数据量,识别缺失或变更的记录
4. 主键:(ts_code, end_date)
子类必须实现:
- table_name: 类属性,目标表名
- api_name: 类属性Tushare API 接口名
- fetch_single_quarter(period) -> pd.DataFrame: 获取单季度数据
- TABLE_SCHEMA: 类属性,表结构定义
Attributes:
table_name: 目标表名(子类必须覆盖)
api_name: Tushare API 接口名(子类必须覆盖)
DEFAULT_START_DATE: 默认起始日期2018Q1
TABLE_SCHEMA: 表结构定义 {列名: SQL类型}
TABLE_INDEXES: 索引定义 [(索引名, [列名列表]), ...]
注意:不要创建唯一索引,因为财务数据可能发生多次修正
"""
table_name: str = "" # 子类必须覆盖
api_name: str = "" # 子类必须覆盖
DEFAULT_START_DATE = "20180331" # 2018年Q1
# 目标报表类型(子类可覆盖)
# 默认只同步合并报表report_type='1'
# 设为 None 则同步所有报表类型
TARGET_REPORT_TYPE: Optional[str] = "1"
# 表结构定义(子类必须覆盖)
TABLE_SCHEMA: Dict[str, str] = {}
# 索引定义(子类可覆盖)
# 格式: [("index_name", ["col1", "col2"]), ...]
# 注意:不要创建唯一索引,因为财务数据可能发生多次修正
TABLE_INDEXES: List[Tuple[str, List[str]]] = []
def __init__(self):
"""初始化季度同步管理器。"""
self.storage = ThreadSafeStorage()
self.client = TushareClient()
self._cached_data: Optional[pd.DataFrame] = None
# ======================================================================
# 抽象方法 - 子类必须实现
# ======================================================================
@abstractmethod
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司数据。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司财务数据的 DataFrame
"""
pass
# ======================================================================
# 季度计算工具方法
# ======================================================================
def get_current_quarter(self) -> str:
"""获取当前季度(考虑是否到季末)。
如果当前日期未到季度最后一天,则返回前一季度。
这样可以避免获取尚无数据的未来季度。
Returns:
当前季度字符串 (YYYYMMDD),如 '20231231'
"""
today = get_today_date()
year = int(today[:4])
month = int(today[4:6])
# 确定当前季度
if month <= 3:
current_q = f"{year}0331"
elif month <= 6:
current_q = f"{year}0630"
elif month <= 9:
current_q = f"{year}0930"
else:
current_q = f"{year}1231"
# 如果今天还没到季末,返回前一季度
if today < current_q:
return self.get_prev_quarter(current_q)
return current_q
def get_prev_quarter(self, quarter: str) -> str:
"""获取前一季度。
Args:
quarter: 季度字符串 (YYYYMMDD),如 '20231231'
Returns:
前一季度字符串 (YYYYMMDD)
"""
year = int(quarter[:4])
month_day = quarter[4:]
if month_day == "0331":
return f"{year - 1}1231"
elif month_day == "0630":
return f"{year}0331"
elif month_day == "0930":
return f"{year}0630"
else: # "1231"
return f"{year}0930"
def get_next_quarter(self, quarter: str) -> str:
"""获取下一季度。
Args:
quarter: 季度字符串 (YYYYMMDD)
Returns:
下一季度字符串 (YYYYMMDD)
"""
year = int(quarter[:4])
month_day = quarter[4:]
if month_day == "0331":
return f"{year}0630"
elif month_day == "0630":
return f"{year}0930"
elif month_day == "0930":
return f"{year}1231"
else: # "1231"
return f"{year + 1}0331"
# ======================================================================
# 表结构管理
# ======================================================================
def ensure_table_exists(self) -> None:
"""确保表结构存在,如不存在则创建表和索引。
注意:不设置主键和唯一索引,因为财务数据可能发生多次修正,
同一支股票在同一季度可能有多个版本不同的ann_date
DuckDB 会自动创建 rowid 作为主键。
"""
storage = Storage()
if storage.exists(self.table_name):
return
if not self.TABLE_SCHEMA:
print(
f"[{self.__class__.__name__}] TABLE_SCHEMA not defined, skipping table creation"
)
return
# 构建列定义(不设置主键)
columns_def = []
for col_name, col_type in self.TABLE_SCHEMA.items():
columns_def.append(f'"{col_name}" {col_type}')
columns_sql = ", ".join(columns_def)
create_sql = f'CREATE TABLE IF NOT EXISTS "{self.table_name}" ({columns_sql})'
try:
storage._connection.execute(create_sql)
print(f"[{self.__class__.__name__}] Created table '{self.table_name}'")
except Exception as e:
print(f"[{self.__class__.__name__}] Error creating table: {e}")
raise
# 创建普通索引(不要创建唯一索引)
for idx_name, idx_cols in self.TABLE_INDEXES:
try:
idx_cols_sql = ", ".join(f'"{col}"' for col in idx_cols)
storage._connection.execute(
f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{self.table_name}"({idx_cols_sql})'
)
print(
f"[{self.__class__.__name__}] Created index '{idx_name}' on {idx_cols}"
)
except Exception as e:
print(
f"[{self.__class__.__name__}] Error creating index {idx_name}: {e}"
)
# ======================================================================
# 数据差异检测(核心逻辑)
# ======================================================================
def get_local_data_count_by_stock(self, period: str) -> Dict[str, int]:
"""获取本地数据库中某季度的各股票数据量。
Args:
period: 季度字符串 (YYYYMMDD)
Returns:
字典 {ts_code: 记录数}
"""
storage = Storage()
try:
# 将 YYYYMMDD 转换为 YYYY-MM-DD 格式
period_formatted = f"{period[:4]}-{period[4:6]}-{period[6:]}"
query = f'''
SELECT ts_code, COUNT(*) as cnt
FROM "{self.table_name}"
WHERE end_date = ?
GROUP BY ts_code
'''
result = storage._connection.execute(query, [period_formatted]).fetchdf()
if result.empty:
return {}
return dict(zip(result["ts_code"], result["cnt"]))
except Exception as e:
print(f"[{self.__class__.__name__}] Error querying local data count: {e}")
return {}
def get_local_records_by_key(self, period: str) -> Dict[tuple, int]:
"""获取本地数据库中某季度的记录(按主键分组计数)。
用于更精确的差异检测,按 (ts_code, end_date, report_type) 分组。
Args:
period: 季度字符串 (YYYYMMDD)
Returns:
字典 {(ts_code, end_date, report_type): 记录数}
"""
storage = Storage()
try:
# 将 YYYYMMDD 转换为 YYYY-MM-DD 格式
period_formatted = f"{period[:4]}-{period[4:6]}-{period[6:]}"
query = f'''
SELECT ts_code, end_date, report_type, COUNT(*) as cnt
FROM "{self.table_name}"
WHERE end_date = ?
GROUP BY ts_code, end_date, report_type
'''
result = storage._connection.execute(query, [period_formatted]).fetchdf()
if result.empty:
return {}
return {
(row["ts_code"], row["end_date"], row["report_type"]): row["cnt"]
for _, row in result.iterrows()
}
except Exception as e:
print(f"[{self.__class__.__name__}] Error querying local records: {e}")
return {}
def compare_and_find_differences(
self, remote_df: pd.DataFrame, period: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""对比远程数据与本地数据,找出差异。
逻辑:
1. 统计远程数据中每只股票的数据量
2. 查询本地数据库中该季度每只股票的数据量
3. 对比找出:
- 本地缺失的股票(新增)
- 数据量不一致的股票(有更新,可能包含财务修正)
4. 返回需要插入的差异数据
注意:主键为 (ts_code, end_date, report_type),因此同一支股票在同一季度
可能有多个 report_type 的记录。差异检测按股票级别进行,如果该股票的
记录总数不一致,则更新该股票的所有记录。
Args:
remote_df: 从 API 获取的远程数据
period: 季度字符串
Returns:
(差异数据DataFrame, 统计信息DataFrame)
统计信息包含ts_code, remote_count, local_count, status
"""
if remote_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 1. 统计远程数据中每只股票的数据量
remote_counts = remote_df.groupby("ts_code").size().to_dict()
# 2. 获取本地数据量(按股票汇总)
local_counts = self.get_local_data_count_by_stock(period)
# 3. 对比找出差异
diff_stocks = [] # 需要更新的股票列表
stats = []
for ts_code, remote_count in remote_counts.items():
local_count = local_counts.get(ts_code, 0)
if local_count == 0:
status = "new" # 本地不存在,全部插入
diff_stocks.append(ts_code)
elif local_count != remote_count:
status = "modified" # 数据量不一致,可能包含财务修正
diff_stocks.append(ts_code)
else:
status = "same" # 数据量一致,跳过
stats.append(
{
"ts_code": ts_code,
"remote_count": remote_count,
"local_count": local_count,
"status": status,
}
)
# 4. 提取差异数据
if diff_stocks:
diff_df = remote_df[remote_df["ts_code"].isin(diff_stocks)].copy()
else:
diff_df = pd.DataFrame()
stats_df = pd.DataFrame(stats)
return diff_df, stats_df
# ======================================================================
# 同步核心逻辑
# ======================================================================
def delete_stock_quarter_data(
self, period: str, ts_codes: Optional[List[str]] = None
) -> int:
"""删除指定季度和股票的数据。
在同步前删除旧数据,然后插入新数据(先删除后插入策略)。
Args:
period: 季度字符串 (YYYYMMDD)
ts_codes: 股票代码列表None 表示删除该季度所有数据
Returns:
删除的记录数
"""
storage = Storage()
try:
# 将 YYYYMMDD 转换为 YYYY-MM-DD 格式
period_formatted = f"{period[:4]}-{period[4:6]}-{period[6:]}"
if ts_codes:
# 删除指定股票的数据
placeholders = ", ".join(["?" for _ in ts_codes])
query = f'''
DELETE FROM "{self.table_name}"
WHERE end_date = ? AND ts_code IN ({placeholders})
'''
result = storage._connection.execute(
query, [period_formatted] + ts_codes
)
else:
# 删除整个季度的数据
query = f'DELETE FROM "{self.table_name}" WHERE end_date = ?'
result = storage._connection.execute(query, [period_formatted])
# DuckDB 的 rowcount 可能返回 -1我们手动查询删除后的数量变化
# 由于我们已经删除了特定条件的数据,直接返回传入的股票数量作为估算
if ts_codes:
deleted_count = len(ts_codes)
else:
# 删除整个季度,查询删除前的数量
deleted_count = -1 # 标记为未知,稍后处理
return deleted_count
except Exception as e:
print(f"[{self.__class__.__name__}] Error deleting data: {e}")
return 0
def sync_quarter(self, period: str, dry_run: bool = False) -> Dict:
"""同步单个季度的数据。
流程:
1. 获取远程数据
2. 根据 TARGET_REPORT_TYPE 过滤报表类型
3. 对比本地数据,找出差异股票
4. 删除差异股票的旧数据
5. 插入新数据(先删除后插入)
注意:财务数据同步必须取当前季度和前一季度进行对比更新,
不存在"不需要同步"的情况。
Args:
period: 季度字符串 (YYYYMMDD)
dry_run: 是否为预览模式
Returns:
同步结果字典 {
'period': 季度,
'remote_total': 远程总记录数,
'diff_count': 差异记录数,
'deleted_count': 删除的记录数,
'inserted_count': 插入的记录数,
'dry_run': 是否预览模式
}
"""
print(f"[{self.__class__.__name__}] Syncing quarter {period}...")
# 1. 获取远程数据
remote_df = self.fetch_single_quarter(period)
if remote_df.empty:
print(f"[{self.__class__.__name__}] No data for quarter {period}")
return {
"period": period,
"remote_total": 0,
"diff_count": 0,
"deleted_count": 0,
"inserted_count": 0,
"dry_run": dry_run,
}
# 2. 根据 TARGET_REPORT_TYPE 过滤报表类型
if self.TARGET_REPORT_TYPE and "report_type" in remote_df.columns:
remote_df = remote_df[remote_df["report_type"] == self.TARGET_REPORT_TYPE]
remote_total = len(remote_df)
print(f"[{self.__class__.__name__}] Fetched {remote_total} records from API")
# 3. 检查本地是否有该季度数据
local_counts = self.get_local_data_count_by_stock(period)
is_first_sync_for_period = len(local_counts) == 0
# 4. 执行同步
deleted_count = 0
inserted_count = 0
if is_first_sync_for_period:
# 首次同步该季度:直接插入所有数据
print(
f"[{self.__class__.__name__}] First sync for quarter {period}, inserting all data directly"
)
if not dry_run:
self.storage.queue_save(self.table_name, remote_df, use_upsert=False)
self.storage.flush()
inserted_count = len(remote_df)
print(
f"[{self.__class__.__name__}] Inserted {inserted_count} new records"
)
return {
"period": period,
"remote_total": remote_total,
"diff_count": len(remote_df),
"deleted_count": 0,
"inserted_count": inserted_count,
"dry_run": dry_run,
}
# 5. 非首次同步:对比找出差异股票
diff_df, stats_df = self.compare_and_find_differences(remote_df, period)
diff_stocks = list(diff_df["ts_code"].unique()) if not diff_df.empty else []
unchanged_count = (
len(stats_df[stats_df["status"] == "same"]) if not stats_df.empty else 0
)
print(f"[{self.__class__.__name__}] Comparison result:")
print(f" - Stocks with differences: {len(diff_stocks)}")
print(f" - Unchanged stocks: {unchanged_count}")
if not dry_run and not diff_df.empty:
# 5.1 删除差异股票的旧数据
deleted_stocks_count = len(diff_stocks)
self.delete_stock_quarter_data(period, diff_stocks)
deleted_count = len(diff_df)
print(
f"[{self.__class__.__name__}] Deleted {deleted_stocks_count} stocks' old records (approx {deleted_count} rows)"
)
# 5.2 插入新数据(使用普通 INSERT因为已删除旧数据
self.storage.queue_save(self.table_name, diff_df, use_upsert=False)
self.storage.flush()
inserted_count = len(diff_df)
print(f"[{self.__class__.__name__}] Inserted {inserted_count} new records")
return {
"period": period,
"remote_total": remote_total,
"diff_count": len(diff_df),
"deleted_count": deleted_count,
"inserted_count": inserted_count,
"dry_run": dry_run,
}
def sync_range(
self, start_quarter: str, end_quarter: str, dry_run: bool = False
) -> List[Dict]:
"""同步指定季度范围的数据。
注意:增量同步会自动包含前一季度以确保数据完整性。
Args:
start_quarter: 起始季度 (YYYYMMDD)
end_quarter: 结束季度 (YYYYMMDD)
dry_run: 是否为预览模式
Returns:
各季度同步结果列表
"""
quarters = get_quarters_in_range(start_quarter, end_quarter)
if not quarters:
print(f"[{self.__class__.__name__}] No quarters to sync")
return []
print(
f"[{self.__class__.__name__}] Syncing {len(quarters)} quarters: {quarters}"
)
results = []
for period in tqdm(quarters, desc=f"Syncing {self.table_name}"):
try:
result = self.sync_quarter(period, dry_run=dry_run)
results.append(result)
except Exception as e:
print(f"[{self.__class__.__name__}] Error syncing {period}: {e}")
results.append({"period": period, "error": str(e)})
return results
def sync_incremental(self, dry_run: bool = False) -> List[Dict]:
"""执行增量同步。
策略:
1. 确保表存在(首次同步时自动建表)
2. 获取表中最新季度
3. 计算当前季度(考虑是否到季末)
4. 确定同步范围:从最新季度到当前季度
5. **重要**:额外包含前一季度以确保数据完整性
注意:财务数据同步与日线数据不同,必须每次都获取数据进行对比
更新,不存在"不需要同步"的情况。因为财务数据可能会被修正。
Args:
dry_run: 是否为预览模式
Returns:
各季度同步结果列表
"""
print(f"\n{'=' * 60}")
print(f"[{self.__class__.__name__}] Incremental Sync")
print(f"{'=' * 60}")
# 0. 确保表存在(首次同步时自动建表)
self.ensure_table_exists()
# 1. 获取最新季度
storage = Storage()
try:
result = storage._connection.execute(
f'SELECT MAX(end_date) FROM "{self.table_name}"'
).fetchone()
latest_quarter = result[0] if result and result[0] else None
if hasattr(latest_quarter, "strftime"):
latest_quarter = latest_quarter.strftime("%Y%m%d")
except Exception as e:
print(f"[{self.__class__.__name__}] Error getting latest quarter: {e}")
latest_quarter = None
# 2. 获取当前季度
current_quarter = self.get_current_quarter()
if latest_quarter is None:
# 无本地数据,执行全量同步
print(f"[{self.__class__.__name__}] No local data, performing full sync")
return self.sync_range(self.DEFAULT_START_DATE, current_quarter, dry_run)
print(f"[{self.__class__.__name__}] Latest local quarter: {latest_quarter}")
print(f"[{self.__class__.__name__}] Current quarter: {current_quarter}")
# 3. 确定同步范围
# 财务数据必须每次都进行对比更新,不存在"跳过"的情况
# 同步范围:从最新季度到当前季度(包含前一季度以确保数据完整性)
start_quarter = latest_quarter
if start_quarter > current_quarter:
# 如果本地数据比当前季度还新,仍然需要同步(可能包含修正数据)
start_quarter = current_quarter
else:
# 正常情况:包含前一季度
start_quarter = self.get_prev_quarter(latest_quarter)
if start_quarter < self.DEFAULT_START_DATE:
start_quarter = self.DEFAULT_START_DATE
# 打印同步的两个季度信息
print(f"\n[{self.__class__.__name__}] 将同步以下两个季度的财报:")
print(f" - 前一季度: {start_quarter}")
print(f" - 当前季度: {current_quarter}")
print(f" (包含前一季度以确保数据完整性)")
print()
return self.sync_range(start_quarter, current_quarter, dry_run)
def sync_full(self, dry_run: bool = False) -> List[Dict]:
"""执行全量同步。
Args:
dry_run: 是否为预览模式
Returns:
各季度同步结果列表
"""
print(f"\n{'=' * 60}")
print(f"[{self.__class__.__name__}] Full Sync")
print(f"{'=' * 60}")
# 确保表存在
self.ensure_table_exists()
current_quarter = self.get_current_quarter()
return self.sync_range(self.DEFAULT_START_DATE, current_quarter, dry_run)
# ======================================================================
# 预览模式
# ======================================================================
def preview_sync(self) -> Dict:
"""预览同步信息(不实际同步)。
注意:财务数据同步必须每次都进行,因为数据可能会被修正。
预览显示将要同步的季度范围。
Returns:
预览信息字典
"""
print(f"\n{'=' * 60}")
print(f"[{self.__class__.__name__}] Preview Mode")
print(f"{'=' * 60}")
# 获取最新季度
storage = Storage()
try:
result = storage._connection.execute(
f'SELECT MAX(end_date) FROM "{self.table_name}"'
).fetchone()
latest_quarter = result[0] if result and result[0] else None
if hasattr(latest_quarter, "strftime"):
latest_quarter = latest_quarter.strftime("%Y%m%d")
except Exception:
latest_quarter = None
current_quarter = self.get_current_quarter()
if latest_quarter is None:
# 无本地数据,需要全量同步
start_quarter = self.DEFAULT_START_DATE
message = "No local data, full sync required"
else:
# 财务数据必须每次都进行对比更新
# 同步范围:从最新季度到当前季度(包含前一季度)
start_quarter = self.get_prev_quarter(latest_quarter)
if start_quarter < self.DEFAULT_START_DATE:
start_quarter = self.DEFAULT_START_DATE
message = f"Incremental sync from {start_quarter} to {current_quarter}"
preview = {
"table_name": self.table_name,
"api_name": self.api_name,
"latest_quarter": latest_quarter,
"current_quarter": current_quarter,
"start_quarter": start_quarter,
"end_quarter": current_quarter,
"message": message,
}
print(f"Table: {self.table_name}")
print(f"API: {self.api_name}")
print(f"Latest local: {latest_quarter}")
print(f"Current quarter: {current_quarter}")
print(f"Sync range: {start_quarter} -> {current_quarter}")
print(f"Message: {message}")
print(f"{'=' * 60}")
return preview
# ======================================================================
# 便捷函数
# ======================================================================
def sync_financial_data(
syncer_class: type, force_full: bool = False, dry_run: bool = False
) -> List[Dict]:
"""通用的财务数据同步便捷函数。
Args:
syncer_class: QuarterBasedSync 的子类
force_full: 是否强制全量同步
dry_run: 是否为预览模式
Returns:
同步结果列表
"""
syncer = syncer_class()
if force_full:
return syncer.sync_full(dry_run)
else:
return syncer.sync_incremental(dry_run)
def preview_financial_sync(syncer_class: type) -> Dict:
"""预览财务数据同步信息。
Args:
syncer_class: QuarterBasedSync 的子类
Returns:
预览信息字典
"""
syncer = syncer_class()
return syncer.preview_sync()

View File

@@ -1,696 +1,234 @@
"""财务数据统一同步调度中心。
该模块作为财务数据同步的调度中心,统一管理各类型财务数据的同步流程
支持全量同步和增量同步两种模式
该模块作为财务数据同步的调度中心,只负责任务协调和调度
具体的同步逻辑已下沉到各 API 文件中
财务数据类型
支持的财务数据类型:
- income: 利润表 (已实现)
- balance: 资产负债表 (预留)
- cashflow: 现金流量表 (预留)
同步模式:
1. 全量同步 (force_full=True):
- 检查表是否存在,如不存在则建表+建索引
- 从默认开始日期 (20180101) 同步到当前季度
2. 增量同步 (force_full=False):
- 获取表中最新季度 (MAX(end_date))
- 计算当前季度(如果当前日期未到季末,则用前一季度)
- 如果最新季度 == 当前季度,不同步(避免消耗流量)
- 否则从最新季度+1 同步到当前季度
使用方式:
# 增量同步利润表数据(推荐)
使用方式:
# 同步所有财务数据(增量)
from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
sync_financial()
# 全量同步利润表数据
from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
# 全量同步
sync_financial(force_full=True)
# 只同步利润表
sync_financial(data_types=["income"])
# 预览同步
from src.data.api_wrappers.financial_data.api_financial_sync import preview_sync
preview = preview_sync()
"""
from typing import Optional, Dict, List
from datetime import datetime
from typing import List, Dict, Optional
import pandas as pd
from src.data.storage import Storage, ThreadSafeStorage
from src.data.utils import (
get_today_date,
get_quarters_in_range,
date_to_quarter,
DEFAULT_START_DATE,
from src.data.api_wrappers.financial_data.api_income import (
IncomeQuarterSync,
sync_income,
preview_income_sync,
)
from src.data.api_wrappers.financial_data.api_income import get_income
# =============================================================================
# 财务数据表结构定义
# =============================================================================
# 各财务数据表的表名和字段定义
FINANCIAL_TABLES = {
# 支持的财务数据类型映射
FINANCIAL_SYNCERS = {
"income": {
"table_name": "financial_income",
"api_name": "income_vip",
"period_field": "end_date", # 用于存储最新季度的字段
"get_data_func": get_income,
"syncer_class": IncomeQuarterSync,
"sync_func": sync_income,
"preview_func": preview_income_sync,
"display_name": "利润表",
},
# 预留:资产负债表
# "balance": {
# "table_name": "financial_balance",
# "api_name": "balance Sheet_vip",
# "period_field": "end_date",
# "get_data_func": get_balance,
# "syncer_class": BalanceQuarterSync,
# "sync_func": sync_balance,
# "preview_func": preview_balance_sync,
# "display_name": "资产负债表",
# },
# 预留:现金流量表
# "cashflow": {
# "table_name": "financial_cashflow",
# "api_name": "cashflow_vip",
# "period_field": "end_date",
# "get_data_func": get_cashflow,
# "syncer_class": CashflowQuarterSync,
# "sync_func": sync_cashflow,
# "preview_func": preview_cashflow_sync,
# "display_name": "现金流量表",
# },
}
# =============================================================================
# 财务数据同步核心类
# =============================================================================
def sync_financial(
data_types: Optional[List[str]] = None,
force_full: bool = False,
dry_run: bool = False,
) -> Dict[str, List]:
"""同步财务数据(调度函数)。
根据指定的数据类型,调度对应的同步器执行同步。
class FinancialSync:
"""财务数据统一同步管理器。
Args:
data_types: 数据类型列表,如 ["income", "balance"]
None 表示同步所有类型
force_full: 若为 True强制全量同步
dry_run: 若为 True仅预览不写入
支持全量同步和增量同步,自动检测数据状态并选择最优同步策略。
功能特性:
- 全量/增量同步自动切换
- 自动建表和索引(如不存在)
- 智能季度计算(当前季度未到季末时使用前一季度)
- 流量保护(最新季度==当前季度时不请求API
Returns:
各类型同步结果字典 {数据类型: 同步结果列表}
Example:
>>> sync = FinancialSync()
>>> sync.sync_all() # 增量同步所有财务数据
>>> sync.sync_all(force_full=True) # 全量同步
>>> sync.sync_income() # 只同步利润表
>>> # 同步所有财务数据
>>> sync_financial()
>>>
>>> # 只同步利润表
>>> sync_financial(data_types=["income"])
>>>
>>> # 全量同步
>>> sync_financial(force_full=True)
"""
if data_types is None:
data_types = list(FINANCIAL_SYNCERS.keys())
# 表结构定义(按表名)
TABLE_SCHEMAS = {
"financial_income": {
"columns": {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"f_ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
"comp_type": "INTEGER",
"end_type": "VARCHAR(10)",
"basic_eps": "DOUBLE",
"diluted_eps": "DOUBLE",
"total_revenue": "DOUBLE",
"revenue": "DOUBLE",
"int_income": "DOUBLE",
"prem_earned": "DOUBLE",
"comm_income": "DOUBLE",
"n_commis_income": "DOUBLE",
"n_oth_income": "DOUBLE",
"n_oth_b_income": "DOUBLE",
"prem_income": "DOUBLE",
"out_prem": "DOUBLE",
"une_prem_reser": "DOUBLE",
"reins_income": "DOUBLE",
"n_sec_tb_income": "DOUBLE",
"n_sec_uw_income": "DOUBLE",
"n_asset_mg_income": "DOUBLE",
"oth_b_income": "DOUBLE",
"fv_value_chg_gain": "DOUBLE",
"invest_income": "DOUBLE",
"ass_invest_income": "DOUBLE",
"forex_gain": "DOUBLE",
"total_cogs": "DOUBLE",
"oper_cost": "DOUBLE",
"int_exp": "DOUBLE",
"comm_exp": "DOUBLE",
"biz_tax_surchg": "DOUBLE",
"sell_exp": "DOUBLE",
"admin_exp": "DOUBLE",
"fin_exp": "DOUBLE",
"assets_impair_loss": "DOUBLE",
"prem_refund": "DOUBLE",
"compens_payout": "DOUBLE",
"reser_insur_liab": "DOUBLE",
"div_payt": "DOUBLE",
"reins_exp": "DOUBLE",
"oper_exp": "DOUBLE",
"compens_payout_refu": "DOUBLE",
"insur_reser_refu": "DOUBLE",
"reins_cost_refund": "DOUBLE",
"other_bus_cost": "DOUBLE",
"operate_profit": "DOUBLE",
"non_oper_income": "DOUBLE",
"non_oper_exp": "DOUBLE",
"nca_disploss": "DOUBLE",
"total_profit": "DOUBLE",
"income_tax": "DOUBLE",
"n_income": "DOUBLE",
"n_income_attr_p": "DOUBLE",
"minority_gain": "DOUBLE",
"oth_compr_income": "DOUBLE",
"t_compr_income": "DOUBLE",
"compr_inc_attr_p": "DOUBLE",
"compr_inc_attr_m_s": "DOUBLE",
"ebit": "DOUBLE",
"ebitda": "DOUBLE",
"insurance_exp": "DOUBLE",
"undist_profit": "DOUBLE",
"distable_profit": "DOUBLE",
"rd_exp": "DOUBLE",
"fin_exp_int_exp": "DOUBLE",
"fin_exp_int_inc": "DOUBLE",
"transfer_surplus_rese": "DOUBLE",
"transfer_housing_imprest": "DOUBLE",
"transfer_oth": "DOUBLE",
"adj_lossgain": "DOUBLE",
"withdra_legal_surplus": "DOUBLE",
"withdra_legal_pubfund": "DOUBLE",
"withdra_biz_devfund": "DOUBLE",
"withdra_rese_fund": "DOUBLE",
"withdra_oth_ersu": "DOUBLE",
"workers_welfare": "DOUBLE",
"distr_profit_shrhder": "DOUBLE",
"prfshare_payable_dvd": "DOUBLE",
"comshare_payable_dvd": "DOUBLE",
"capit_comstock_div": "DOUBLE",
"net_after_nr_lp_correct": "DOUBLE",
"credit_impa_loss": "DOUBLE",
"net_expo_hedging_benefits": "DOUBLE",
"oth_impair_loss_assets": "DOUBLE",
"total_opcost": "DOUBLE",
"amodcost_fin_assets": "DOUBLE",
"oth_income": "DOUBLE",
"asset_disp_income": "DOUBLE",
"continued_net_profit": "DOUBLE",
"end_net_profit": "DOUBLE",
"update_flag": "VARCHAR(1)",
},
"primary_key": ("ts_code", "end_date"),
"indexes": [
("idx_financial_ann", ["ts_code", "ann_date"]),
],
},
}
def __init__(self):
"""初始化同步管理器"""
self.storage = Storage()
self.thread_storage = ThreadSafeStorage()
def _create_table_if_not_exists(self, table_name: str) -> None:
"""如果表不存在则创建表和索引。
Args:
table_name: 表名
"""
if self.storage.exists(table_name):
print(f"[FinancialSync] 表 {table_name} 已存在,跳过建表")
return
if table_name not in self.TABLE_SCHEMAS:
print(f"[FinancialSync] 表 {table_name} 没有定义表结构,跳过建表")
return
schema = self.TABLE_SCHEMAS[table_name]
print(f"[FinancialSync] 表 {table_name} 不存在,创建表和索引...")
# 构建列定义
columns_def = []
for col_name, col_type in schema["columns"].items():
columns_def.append(f'"{col_name}" {col_type}')
# 添加主键约束
if schema.get("primary_key"):
pk_cols = ', '.join(f'"{col}"' for col in schema["primary_key"])
columns_def.append(f"PRIMARY KEY ({pk_cols})")
columns_sql = ", ".join(columns_def)
create_sql = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({columns_sql})'
try:
self.storage._connection.execute(create_sql)
print(f"[FinancialSync] 表 {table_name} 创建完成")
except Exception as e:
print(f"[FinancialSync] 创建表 {table_name} 失败: {e}")
raise
# 创建索引
for idx_name, idx_cols in schema.get("indexes", []):
try:
idx_cols_sql = ', '.join(f'"{col}"' for col in idx_cols)
self.storage._connection.execute(
f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{table_name}"({idx_cols_sql})'
)
print(f"[FinancialSync] 索引 {idx_name} 创建完成")
except Exception as e:
print(f"[FinancialSync] 创建索引 {idx_name} 失败: {e}")
def _get_latest_quarter(
self, table_name: str, period_field: str = "end_date"
) -> Optional[str]:
"""获取表中最新季度。
Args:
table_name: 表名
period_field: 季度字段名
Returns:
最新季度字符串 (YYYYMMDD),如无数据返回 None
"""
try:
result = self.storage._connection.execute(f"""
SELECT MAX({period_field}) FROM {table_name}
""").fetchone()
if result and result[0]:
# 转换为字符串格式
latest = result[0]
if hasattr(latest, "strftime"):
return latest.strftime("%Y%m%d")
return str(latest)
return None
except Exception as e:
print(f"[FinancialSync] 获取最新季度失败: {e}")
return None
def _get_current_quarter(self) -> str:
"""获取当前季度(考虑是否到季末)。
如果当前日期未到季度最后一天,则返回前一季度。
这样可以避免获取尚无数据的未来季度。
Returns:
当前季度字符串 (YYYYMMDD)
"""
today = get_today_date()
current_quarter = date_to_quarter(today)
# 检查今天是否到了当前季度的最后一天
if today < current_quarter:
# 未到季末,返回前一季度
return self._get_prev_quarter(current_quarter)
return current_quarter
def _get_prev_quarter(self, quarter: str) -> str:
"""获取前一季度。
Args:
quarter: 季度字符串 (YYYYMMDD)
Returns:
前一季度字符串 (YYYYMMDD)
"""
year = int(quarter[:4])
month_day = quarter[4:]
if month_day == "0331":
# Q1 -> 去年 Q4
return f"{year - 1}1231"
elif month_day == "0630":
# Q2 -> Q1
return f"{year}0331"
elif month_day == "0930":
# Q3 -> Q2
return f"{year}0630"
else: # "1231"
# Q4 -> Q3
return f"{year}0930"
def _get_next_quarter(self, quarter: str) -> str:
"""获取下一季度。
Args:
quarter: 季度字符串 (YYYYMMDD)
Returns:
下一季度字符串 (YYYYMMDD)
"""
year = int(quarter[:4])
month_day = quarter[4:]
if month_day == "0331":
# Q1 -> Q2
return f"{year}0630"
elif month_day == "0630":
# Q2 -> Q3
return f"{year}0930"
elif month_day == "0930":
# Q3 -> Q4
return f"{year}1231"
else: # "1231"
# Q4 -> 明年 Q1
return f"{year + 1}0331"
def _check_incremental_needed(
self,
table_name: str,
period_field: str = "end_date",
) -> tuple[bool, Optional[str], Optional[str]]:
"""检查增量同步是否需要。
Args:
table_name: 表名
period_field: 季度字段名
Returns:
(是否需要同步, 起始季度, 目标季度)
- 如果不需要同步,返回 (False, None, None)
"""
# 获取表中最新季度
latest_quarter = self._get_latest_quarter(table_name, period_field)
if latest_quarter is None:
# 无本地数据,需要全量同步
print(f"[FinancialSync] 表 {table_name} 无数据,需要全量同步")
return (True, DEFAULT_START_DATE, self._get_current_quarter())
print(f"[FinancialSync] 表 {table_name} 最新季度: {latest_quarter}")
# 获取当前季度(考虑是否到季末)
current_quarter = self._get_current_quarter()
print(f"[FinancialSync] 当前季度: {current_quarter}")
# 比较:如果最新季度 >= 当前季度,不需要同步
if latest_quarter >= current_quarter:
print(
f"[FinancialSync] 最新季度 {latest_quarter} >= 当前季度 {current_quarter},跳过增量同步"
)
return (False, None, None)
# 需要增量同步:从最新季度+1 到 当前季度
start_quarter = self._get_next_quarter(latest_quarter)
print(f"[FinancialSync] 增量同步: {start_quarter} -> {current_quarter}")
return (True, start_quarter, current_quarter)
def _sync_single_table(
self,
table_config: Dict,
start_quarter: str,
end_quarter: str,
) -> int:
"""同步单个财务数据表。
Args:
table_config: 表配置字典
start_quarter: 起始季度
end_quarter: 目标季度
Returns:
同步的记录数
"""
table_name = table_config["table_name"]
get_data_func = table_config["get_data_func"]
# 获取需要同步的季度列表
quarters = get_quarters_in_range(start_quarter, end_quarter)
print(f"[FinancialSync] 计划同步 {len(quarters)} 个季度: {quarters}")
total_records = 0
# 对每个季度调用 API 获取数据
for period in quarters:
try:
df = get_data_func(period)
if df.empty:
print(f"[WARN] 季度 {period} 无数据")
continue
# 只保留合并报表 (report_type='1',注意是字符串)
if "report_type" in df.columns:
df = df[df["report_type"] == "1"]
if not df.empty:
self.thread_storage.queue_save(table_name, df)
print(f"[FinancialSync] 季度 {period} -> {len(df)} 条记录")
total_records += len(df)
except Exception as e:
print(f"[ERROR] 获取季度 {period} 数据失败: {e}")
# 刷新缓存到数据库
self.thread_storage.flush()
return total_records
def sync_income(
self,
force_full: bool = False,
) -> Dict:
"""同步利润表数据。
Args:
force_full: 若为 True强制全量同步
Returns:
同步结果字典
"""
table_config = FINANCIAL_TABLES["income"]
table_name = table_config["table_name"]
period_field = table_config["period_field"]
print("\n" + "=" * 60)
print(f"[FinancialSync] 开始同步利润表 (force_full={force_full})")
print("=" * 60)
# 1. 全量同步:建表
if force_full:
self._create_table_if_not_exists(table_name)
start_quarter = DEFAULT_START_DATE
end_quarter = self._get_current_quarter()
else:
# 2. 增量同步:检查是否需要
needed, start_quarter, end_quarter = self._check_incremental_needed(
table_name, period_field
)
if not needed:
return {
"status": "skipped",
"message": "数据已是最新",
"table": table_name,
}
# 检查表是否存在,不存在则创建
if not self.storage.exists(table_name):
self._create_table_if_not_exists(table_name)
# 3. 执行同步
print(f"[FinancialSync] 同步范围: {start_quarter} -> {end_quarter}")
total_records = self._sync_single_table(
table_config, start_quarter, end_quarter
)
result = {
"status": "success",
"table": table_name,
"start_quarter": start_quarter,
"end_quarter": end_quarter,
"records": total_records,
}
print(f"[FinancialSync] 利润表同步完成: {total_records} 条记录")
return result
def sync_all(
self,
force_full: bool = False,
) -> Dict[str, Dict]:
"""同步所有财务数据表。
Args:
force_full: 若为 True强制全量同步
Returns:
各表同步结果字典
"""
results = {}
print("\n" + "=" * 60)
print(f"[FinancialSync] 开始同步所有财务数据 (force_full={force_full})")
print("[Financial Sync] 财务数据同步调度中心")
print("=" * 60)
print(f"数据类型: {', '.join(data_types)}")
print(f"同步模式: {'全量' if force_full else '增量'}")
print(f"写入模式: {'预览' if dry_run else '实际写入'}")
print("=" * 60)
# 同步各财务数据表
for data_type, table_config in FINANCIAL_TABLES.items():
for data_type in data_types:
if data_type not in FINANCIAL_SYNCERS:
print(f"[WARN] 未知的数据类型: {data_type}")
results[data_type] = {"error": f"Unknown data type: {data_type}"}
continue
config = FINANCIAL_SYNCERS[data_type]
sync_func = config["sync_func"]
display_name = config["display_name"]
print(f"\n[{display_name}] 开始同步...")
try:
if data_type == "income":
result = self.sync_income(force_full=force_full)
else:
# 预留其他表的同步逻辑
print(f"[FinancialSync] {data_type} 暂未实现,跳过")
result = {"status": "not_implemented"}
result = sync_func(force_full=force_full, dry_run=dry_run)
results[data_type] = result
print(f"[{display_name}] 同步完成")
except Exception as e:
print(f"[ERROR] 同步 {data_type} 失败: {e}")
results[data_type] = {"status": "error", "error": str(e)}
print(f"[ERROR] [{display_name}] 同步失败: {e}")
results[data_type] = {"error": str(e)}
# 打印汇总
print("\n" + "=" * 60)
print("[Financial Sync] 同步汇总")
print("=" * 60)
for data_type, result in results.items():
status = result.get("status", "unknown")
records = result.get("records", 0)
print(f" {data_type}: {status} ({records} records)")
if "error" in result:
status = f"失败: {result['error']}"
elif isinstance(result, list):
total_records = sum(
r.get("diff_count", 0) for r in result if isinstance(r, dict)
)
status = f"成功 ({len(result)} 个季度, {total_records} 条差异)"
else:
status = "完成"
display_name = FINANCIAL_SYNCERS.get(data_type, {}).get(
"display_name", data_type
)
print(f" {display_name}: {status}")
print("=" * 60)
return results
# =============================================================================
# 便捷函数
# =============================================================================
def sync_financial(
data_type: str = "income",
force_full: bool = False,
) -> Dict:
"""同步财务数据(便捷函数)。
def preview_sync(data_types: Optional[List[str]] = None) -> Dict[str, Dict]:
"""预览财务数据同步信息。
Args:
data_type: 财务数据类型 ('income', 'balance', 'cashflow')
force_full: 若为 True强制全量同步
data_types: 数据类型列表None 表示所有类型
Returns:
同步结果字典
各类型预览信息字典
Example:
>>> # 增量同步利润表
>>> sync_financial()
>>> # 全量同步
>>> sync_financial(force_full=True)
>>> preview = preview_sync()
>>> print(preview)
"""
syncer = FinancialSync()
if data_types is None:
data_types = list(FINANCIAL_SYNCERS.keys())
if data_type == "income":
return syncer.sync_income(force_full=force_full)
else:
raise ValueError(f"不支持的财务数据类型: {data_type}")
previews = {}
print("\n" + "=" * 60)
print("[Financial Sync] 同步预览")
print("=" * 60)
def sync_all_financial(force_full: bool = False) -> Dict[str, Dict]:
"""同步所有财务数据(便捷函数)。
Args:
force_full: 若为 True强制全量同步
Returns:
各表同步结果字典
Example:
>>> # 增量同步所有财务数据
>>> sync_all_financial()
>>> # 全量同步
>>> sync_all_financial(force_full=True)
"""
syncer = FinancialSync()
return syncer.sync_all(force_full=force_full)
def preview_sync() -> Dict:
"""预览同步信息(不实际同步)。
Returns:
预览信息字典:
{
'income': {
'sync_needed': bool,
'latest_quarter': str,
'current_quarter': str,
'start_quarter': str,
'end_quarter': str,
},
...
}
"""
syncer = FinancialSync()
preview = {}
for data_type, table_config in FINANCIAL_TABLES.items():
if data_type != "income":
for data_type in data_types:
if data_type not in FINANCIAL_SYNCERS:
continue
table_name = table_config["table_name"]
period_field = table_config["period_field"]
preview_func = FINANCIAL_SYNCERS[data_type]["preview_func"]
previews[data_type] = preview_func()
# 获取最新季度
latest_quarter = syncer._get_latest_quarter(table_name, period_field)
current_quarter = syncer._get_current_quarter()
return previews
# 检查是否需要同步
needed, start_quarter, end_quarter = syncer._check_incremental_needed(
table_name, period_field
)
preview[data_type] = {
"sync_needed": needed,
"latest_quarter": latest_quarter,
"current_quarter": current_quarter,
"start_quarter": start_quarter,
"end_quarter": end_quarter,
def list_financial_types() -> List[Dict]:
"""列出所有支持的财务数据类型。
Returns:
数据类型信息列表
"""
return [
{
"name": name,
"display_name": config["display_name"],
}
return preview
for name, config in FINANCIAL_SYNCERS.items()
]
# =============================================================================
# 主程序入口
# =============================================================================
# 保持向后兼容的别名
sync_all_financial = sync_financial
if __name__ == "__main__":
import sys
print("=" * 60)
print("财务数据同步模块")
print("财务数据同步调度中心")
print("=" * 60)
print("\n支持的财务数据类型:")
print("-" * 60)
for info in list_financial_types():
print(f" - {info['name']}: {info['display_name']}")
print("-" * 60)
print("\n使用方式:")
print(" # 预览同步信息")
print(
" from src.data.api_wrappers.financial_data.api_financial_sync import preview_sync"
)
print(" preview = preview_sync()")
print(" # 同步所有财务数据(默认)")
print(" python api_financial_sync.py")
print("")
print(" # 量同步(推荐)")
print(" # 量同步")
print(" python api_financial_sync.py --full")
print("")
print(" # 预览模式")
print(" python api_financial_sync.py --preview")
print("")
print(" # Python 代码调用")
print(
" from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial"
)
print(" sync_financial()")
print("")
print(" # 全量同步")
print(" sync_financial(force_full=True)")
print("")
print(" # 同步所有财务数据")
print(
" from src.data.api_wrappers.financial_data.api_financial_sync import sync_all_financial"
)
print(" sync_all_financial()")
print("=" * 60)
# 默认执行增量同步
if len(sys.argv) > 1 and sys.argv[1] == "--full":
print("\n[Main] 执行全量同步...")
result = sync_all_financial(force_full=True)
# 默认执行同步
if len(sys.argv) > 1 and sys.argv[1] == "--preview":
print("\n执行预览...")
preview_sync()
else:
print("\n[Main] 执行增量同步...")
result = sync_financial()
print("\n[Main] 执行完成!")
print(f"结果: {result}")
print("\n执行同步...")
force_full = "--full" in sys.argv
sync_financial(force_full=force_full)

View File

@@ -7,133 +7,252 @@
- income_vip: 获取某一季度全部上市公司利润表数据
- 需要 5000 积分才能调用
- period 参数为报告期(季度最后一天,如 20231231
使用方式:
# 同步利润表数据
from src.data.api_wrappers.financial_data.api_income import IncomeQuarterSync, sync_income
# 方式1: 使用类
syncer = IncomeQuarterSync()
syncer.sync_incremental() # 增量同步
syncer.sync_full() # 全量同步
# 方式2: 使用便捷函数
sync_income() # 增量同步
sync_income(force_full=True) # 全量同步
"""
from typing import Optional
import pandas as pd
from typing import Optional, List
from tqdm import tqdm
from src.data.client import TushareClient
from src.data.storage import ThreadSafeStorage
from src.data.utils import get_today_date, get_quarters_in_range
from src.data.api_wrappers.base_financial_sync import (
QuarterBasedSync,
sync_financial_data,
preview_financial_sync,
)
def get_income(
period: str,
fields: Optional[str] = None,
) -> pd.DataFrame:
"""获取利润表数据 (VIP 接口)
class IncomeQuarterSync(QuarterBasedSync):
"""利润表季度同步实现。
从 Tushare 获取指定季度的全部上市公司利润表数据。
使用 income_vip 接口按季度获取全部上市公司利润表数据。
Args:
period: 报告期,季度最后一天日期 (如 '20231231', '20230930')
- 0331: 一季报
- 0630: 半年报
- 0930: 三季报
- 1231: 年报
fields: 指定返回字段,默认返回全部字段
Returns:
pd.DataFrame 包含利润表数据:
- ts_code: 股票代码
- ann_date: 公告日期
- end_date: 报告期
- basic_eps: 基本每股收益
- report_type: 报告类型 (1=合并报表)
Example:
>>> data = get_income('20231231')
>>> print(data[['ts_code', 'ann_date', 'basic_eps']].head())
表结构: financial_income
主键: (ts_code, end_date)
"""
client = TushareClient()
# 默认字段返回全部字段利润表有100+字段)
if fields is None:
fields = "ts_code,ann_date,f_ann_date,end_date,report_type,comp_type,end_type,basic_eps,diluted_eps,total_revenue,revenue,int_income,prem_earned,comm_income,n_commis_income,n_oth_income,n_oth_b_income,prem_income,out_prem,une_prem_reser,reins_income,n_sec_tb_income,n_sec_uw_income,n_asset_mg_income,oth_b_income,fv_value_chg_gain,invest_income,ass_invest_income,forex_gain,total_cogs,oper_cost,int_exp,comm_exp,biz_tax_surchg,sell_exp,admin_exp,fin_exp,assets_impair_loss,prem_refund,compens_payout,reser_insur_liab,div_payt,reins_exp,oper_exp,compens_payout_refu,insur_reser_refu,reins_cost_refund,other_bus_cost,operate_profit,non_oper_income,non_oper_exp,nca_disploss,total_profit,income_tax,n_income,n_income_attr_p,minority_gain,oth_compr_income,t_compr_income,compr_inc_attr_p,compr_inc_attr_m_s,ebit,ebitda,insurance_exp,undist_profit,distable_profit,rd_exp,fin_exp_int_exp,fin_exp_int_inc,transfer_surplus_rese,transfer_housing_imprest,transfer_oth,adj_lossgain,withdra_legal_surplus,withdra_legal_pubfund,withdra_biz_devfund,withdra_rese_fund,withdra_oth_ersu,workers_welfare,distr_profit_shrhder,prfshare_payable_dvd,comshare_payable_dvd,capit_comstock_div,net_after_nr_lp_correct,credit_impa_loss,net_expo_hedging_benefits,oth_impair_loss_assets,total_opcost,amodcost_fin_assets,oth_income,asset_disp_income,continued_net_profit,end_net_profit,update_flag"
params = {"fields": fields, "period": period}
return client.query("income_vip", **params)
table_name = "financial_income"
api_name = "income_vip"
# 目标报表类型:默认只同步合并报表
TARGET_REPORT_TYPE = "1"
# =============================================================================
# IncomeSync - 利润表数据批量同步类
# =============================================================================
# 表结构定义
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"ann_date": "DATE",
"f_ann_date": "DATE",
"end_date": "DATE NOT NULL",
"report_type": "INTEGER",
"comp_type": "INTEGER",
"end_type": "VARCHAR(10)",
"basic_eps": "DOUBLE",
"diluted_eps": "DOUBLE",
"total_revenue": "DOUBLE",
"revenue": "DOUBLE",
"int_income": "DOUBLE",
"prem_earned": "DOUBLE",
"comm_income": "DOUBLE",
"n_commis_income": "DOUBLE",
"n_oth_income": "DOUBLE",
"n_oth_b_income": "DOUBLE",
"prem_income": "DOUBLE",
"out_prem": "DOUBLE",
"une_prem_reser": "DOUBLE",
"reins_income": "DOUBLE",
"n_sec_tb_income": "DOUBLE",
"n_sec_uw_income": "DOUBLE",
"n_asset_mg_income": "DOUBLE",
"oth_b_income": "DOUBLE",
"fv_value_chg_gain": "DOUBLE",
"invest_income": "DOUBLE",
"ass_invest_income": "DOUBLE",
"forex_gain": "DOUBLE",
"total_cogs": "DOUBLE",
"oper_cost": "DOUBLE",
"int_exp": "DOUBLE",
"comm_exp": "DOUBLE",
"biz_tax_surchg": "DOUBLE",
"sell_exp": "DOUBLE",
"admin_exp": "DOUBLE",
"fin_exp": "DOUBLE",
"assets_impair_loss": "DOUBLE",
"prem_refund": "DOUBLE",
"compens_payout": "DOUBLE",
"reser_insur_liab": "DOUBLE",
"div_payt": "DOUBLE",
"reins_exp": "DOUBLE",
"oper_exp": "DOUBLE",
"compens_payout_refu": "DOUBLE",
"insur_reser_refu": "DOUBLE",
"reins_cost_refund": "DOUBLE",
"other_bus_cost": "DOUBLE",
"operate_profit": "DOUBLE",
"non_oper_income": "DOUBLE",
"non_oper_exp": "DOUBLE",
"nca_disploss": "DOUBLE",
"total_profit": "DOUBLE",
"income_tax": "DOUBLE",
"n_income": "DOUBLE",
"n_income_attr_p": "DOUBLE",
"minority_gain": "DOUBLE",
"oth_compr_income": "DOUBLE",
"t_compr_income": "DOUBLE",
"compr_inc_attr_p": "DOUBLE",
"compr_inc_attr_m_s": "DOUBLE",
"ebit": "DOUBLE",
"ebitda": "DOUBLE",
"insurance_exp": "DOUBLE",
"undist_profit": "DOUBLE",
"distable_profit": "DOUBLE",
"rd_exp": "DOUBLE",
"fin_exp_int_exp": "DOUBLE",
"fin_exp_int_inc": "DOUBLE",
"transfer_surplus_rese": "DOUBLE",
"transfer_housing_imprest": "DOUBLE",
"transfer_oth": "DOUBLE",
"adj_lossgain": "DOUBLE",
"withdra_legal_surplus": "DOUBLE",
"withdra_legal_pubfund": "DOUBLE",
"withdra_biz_devfund": "DOUBLE",
"withdra_rese_fund": "DOUBLE",
"withdra_oth_ersu": "DOUBLE",
"workers_welfare": "DOUBLE",
"distr_profit_shrhder": "DOUBLE",
"prfshare_payable_dvd": "DOUBLE",
"comshare_payable_dvd": "DOUBLE",
"capit_comstock_div": "DOUBLE",
"net_after_nr_lp_correct": "DOUBLE",
"credit_impa_loss": "DOUBLE",
"net_expo_hedging_benefits": "DOUBLE",
"oth_impair_loss_assets": "DOUBLE",
"total_opcost": "DOUBLE",
"amodcost_fin_assets": "DOUBLE",
"oth_income": "DOUBLE",
"asset_disp_income": "DOUBLE",
"continued_net_profit": "DOUBLE",
"end_net_profit": "DOUBLE",
"update_flag": "VARCHAR(1)",
}
class IncomeSync:
"""利润表数据批量同步管理器 (VIP 版本)
功能特性:
- 按季度同步,每次请求获取该季度全部上市公司数据
- 使用 income_vip 接口
- 只保留合并报表report_type=1
- 使用 ThreadSafeStorage 安全写入
Example:
>>> sync = IncomeSync()
>>> sync.sync(start_date='20200101', end_date='20231231')
"""
# 索引定义(不要创建唯一索引)
# 注意:财务数据可能发生多次修正,不设置主键和唯一索引
TABLE_INDEXES = [
("idx_financial_income_ts_code", ["ts_code"]),
("idx_financial_income_end_date", ["end_date"]),
("idx_financial_income_ts_period", ["ts_code", "end_date", "report_type"]),
]
def __init__(self):
"""初始化同步管理器"""
self.storage = ThreadSafeStorage()
self.client = TushareClient()
"""初始化利润表同步器。"""
super().__init__()
self._fields = None # 默认返回全部字段
def sync(
self,
start_date: str,
end_date: Optional[str] = None,
) -> None:
"""同步利润表数据
def fetch_single_quarter(self, period: str) -> pd.DataFrame:
"""获取单季度的全部上市公司利润表数据。
Args:
start_date: 开始日期 YYYYMMDD
end_date: 结束日期 YYYYMMDD默认为今天
period: 报告期,季度最后一天日期(如 '20231231'
Returns:
包含该季度全部上市公司利润表数据的 DataFrame
"""
if end_date is None:
end_date = get_today_date()
params = {"period": period}
# 获取日期范围内的所有季度
quarters = get_quarters_in_range(start_date, end_date)
print(f"[IncomeSync] 计划同步 {len(quarters)} 个季度: {quarters}")
if self._fields:
params["fields"] = self._fields
# 对每个季度调用 income_vip 获取全部股票数据
for period in tqdm(quarters, desc="Syncing income by quarter"):
try:
df = get_income(period)
if df.empty:
print(f"[WARN] 季度 {period} 无数据")
continue
return self.client.query(self.api_name, **params)
# 只保留合并报表 (report_type='1',注意是字符串)
if "report_type" in df.columns:
df = df[df["report_type"] == "1"]
if not df.empty:
self.storage.queue_save("financial_income", df)
print(f"[IncomeSync] 季度 {period} -> {len(df)} 条记录")
except Exception as e:
print(f"[ERROR] 获取季度 {period} 数据失败: {e}")
# 刷新缓存到数据库
self.storage.flush()
print(f"[IncomeSync] 同步完成,共处理 {len(quarters)} 个季度")
# =============================================================================
# 便捷函数
# =============================================================================
def sync_income(
start_date: str,
end_date: Optional[str] = None,
) -> None:
"""同步利润表数据(便捷函数)
force_full: bool = False,
dry_run: bool = False,
) -> list:
"""同步利润表数据(便捷函数)
Args:
start_date: 开始日期 YYYYMMDD
end_date: 结束日期 YYYYMMDD默认为今天
force_full: 若为 True强制全量同步
dry_run: 若为 True仅预览不写入
Returns:
同步结果列表
Example:
>>> sync_income('20200101')
>>> sync_income('20200101', '20231231')
>>> # 增量同步
>>> sync_income()
>>>
>>> # 全量同步
>>> sync_income(force_full=True)
>>>
>>> # 预览
>>> sync_income(dry_run=True)
"""
syncer = IncomeSync()
syncer.sync(start_date, end_date)
return sync_financial_data(IncomeQuarterSync, force_full, dry_run)
def preview_income_sync() -> dict:
"""预览利润表同步信息。
Returns:
预览信息字典
"""
return preview_financial_sync(IncomeQuarterSync)
def get_income(period: str, fields: Optional[str] = None) -> pd.DataFrame:
"""获取利润表数据(原始接口,单季度)。
用于直接获取某个季度的数据,不进行同步管理。
Args:
period: 报告期,季度最后一天日期(如 '20231231'
fields: 指定返回字段,默认返回全部字段
Returns:
包含利润表数据的 DataFrame
"""
client = TushareClient()
if fields is None:
fields = (
"ts_code,ann_date,f_ann_date,end_date,report_type,comp_type,end_type,"
"basic_eps,diluted_eps,total_revenue,revenue,int_income,prem_earned,"
"comm_income,n_commis_income,n_oth_income,n_oth_b_income,prem_income,"
"out_prem,une_prem_reser,reins_income,n_sec_tb_income,n_sec_uw_income,"
"n_asset_mg_income,oth_b_income,fv_value_chg_gain,invest_income,"
"ass_invest_income,forex_gain,total_cogs,oper_cost,int_exp,comm_exp,"
"biz_tax_surchg,sell_exp,admin_exp,fin_exp,assets_impair_loss,prem_refund,"
"compens_payout,reser_insur_liab,div_payt,reins_exp,oper_exp,"
"compens_payout_refu,insur_reser_refu,reins_cost_refund,other_bus_cost,"
"operate_profit,non_oper_income,non_oper_exp,nca_disploss,total_profit,"
"income_tax,n_income,n_income_attr_p,minority_gain,oth_compr_income,"
"t_compr_income,compr_inc_attr_p,compr_inc_attr_m_s,ebit,ebitda,"
"insurance_exp,undist_profit,distable_profit,rd_exp,fin_exp_int_exp,"
"fin_exp_int_inc,transfer_surplus_rese,transfer_housing_imprest,"
"transfer_oth,adj_lossgain,withdra_legal_surplus,withdra_legal_pubfund,"
"withdra_biz_devfund,withdra_rese_fund,withdra_oth_ersu,workers_welfare,"
"distr_profit_shrhder,prfshare_payable_dvd,comshare_payable_dvd,"
"capit_comstock_div,net_after_nr_lp_correct,credit_impa_loss,"
"net_expo_hedging_benefits,oth_impair_loss_assets,total_opcost,"
"amodcost_fin_assets,oth_income,asset_disp_income,continued_net_profit,"
"end_net_profit,update_flag"
)
return client.query("income_vip", period=period, fields=fields)

View File

@@ -1,4 +1,5 @@
"""DuckDB storage for data persistence."""
import pandas as pd
import polars as pl
import duckdb
@@ -73,13 +74,22 @@ class Storage:
- api_financial_sync.py: FinancialSync.TABLE_SCHEMAS
"""
self._connection = duckdb.connect(str(self.db_path))
def save(self, name: str, data: pd.DataFrame, mode: str = "append") -> dict:
def save(
self,
name: str,
data: pd.DataFrame,
mode: str = "append",
use_upsert: bool = True,
) -> dict:
"""Save data to DuckDB.
Args:
name: Table name
data: DataFrame to save
mode: 'append' (UPSERT) or 'replace' (DELETE + INSERT)
mode: 'append' or 'replace' (DELETE + INSERT)
use_upsert: 若为 True 使用 INSERT OR REPLACE (upsert)
若为 False 使用普通 INSERT (依赖外部删除逻辑)
Returns:
Dict with save result
@@ -123,15 +133,18 @@ class Storage:
if mode == "replace":
self._connection.execute(f"DELETE FROM {name}")
# UPSERT: INSERT OR REPLACE
columns = ', '.join(f'"{col}"' for col in data.columns)
columns = ", ".join(f'"{col}"' for col in data.columns)
if use_upsert:
# UPSERT: INSERT OR REPLACE (需要主键约束)
self._connection.execute(f"""
INSERT OR REPLACE INTO {name} ({columns})
SELECT {columns} FROM temp_data
""")
columns = ", ".join(data.columns)
else:
# 普通 INSERT (依赖外部删除逻辑确保无重复)
self._connection.execute(f"""
INSERT OR REPLACE INTO {name} ({columns})
INSERT INTO {name} ({columns})
SELECT {columns} FROM temp_data
""")
@@ -220,8 +233,8 @@ class Storage:
# Build query
conditions = []
if start_date and end_date:
start = pd.to_datetime(start_date, format='%Y%m%d').date()
end = pd.to_datetime(end_date, format='%Y%m%d').date()
start = pd.to_datetime(start_date, format="%Y%m%d").date()
end = pd.to_datetime(end_date, format="%Y%m%d").date()
conditions.append(f"trade_date BETWEEN '{start}' AND '{end}'")
if ts_code:
conditions.append(f"ts_code = '{ts_code}'")
@@ -295,12 +308,18 @@ class ThreadSafeStorage:
def __init__(self):
self.storage = Storage()
self._pending_writes: List[tuple] = [] # [(name, data), ...]
self._pending_writes: List[tuple] = [] # [(name, data, use_upsert), ...]
def queue_save(self, name: str, data: pd.DataFrame):
"""将数据放入写入队列(不立即写入)"""
def queue_save(self, name: str, data: pd.DataFrame, use_upsert: bool = True):
"""将数据放入写入队列(不立即写入)
Args:
name: 表名
data: DataFrame 数据
use_upsert: 若为 True 使用 INSERT OR REPLACE若为 False 使用普通 INSERT
"""
if not data.empty:
self._pending_writes.append((name, data))
self._pending_writes.append((name, data, use_upsert))
def flush(self):
"""批量写入所有队列数据。
@@ -310,21 +329,22 @@ class ThreadSafeStorage:
if not self._pending_writes:
return
# 合并相同表的数据
# 按表名和 use_upsert 分组
table_data = defaultdict(list)
for name, data in self._pending_writes:
table_data[name].append(data)
for name, data, use_upsert in self._pending_writes:
key = (name, use_upsert)
table_data[key].append(data)
# 批量写入每个表
for name, data_list in table_data.items():
for (name, use_upsert), data_list in table_data.items():
combined = pd.concat(data_list, ignore_index=True)
# 在批量数据中先去重
if "ts_code" in combined.columns and "trade_date" in combined.columns:
combined = combined.drop_duplicates(
subset=["ts_code", "trade_date"], keep="last"
)
self.storage.save(name, combined, mode="append")
self.storage.save(name, combined, mode="append", use_upsert=use_upsert)
self._pending_writes.clear()

File diff suppressed because one or more lines are too long

View File

@@ -1,30 +1,39 @@
"""数据划分器
提供基于日期范围的数据划分功能,支持一次性训练/测试划分
提供基于日期范围的数据划分功能,支持 train/val/test 三分法
"""
from typing import Tuple
from typing import Tuple, Optional
import polars as pl
class DateSplitter:
"""基于日期范围的一次性划分
"""基于日期范围的一次性划分(支持 train/val/test 三分法)
将数据按日期划分为训练集和测试集,不滚动。
将数据按日期划分为训练集、验证集和测试集,不滚动。
正确的三分法:
- Train: 用于训练模型参数
- Val: 用于验证/早停/调参(从训练时间后切出)
- Test: 仅用于最终评估,完全独立于训练过程
示例:
train_start: "20200101", train_end: "20221231" (训练集:3年)
train_start: "20200101", train_end: "20211231" (训练集:2年)
val_start: "20220101", val_end: "20221231" (验证集1年)
test_start: "20230101", test_end: "20231231" (测试集1年)
特点:
- 一次性划分,不滚动
- 训练集测试集互不重叠
- 训练集、验证集、测试集三者互不重叠
- 验证集和测试集按时间顺序位于训练集之后
- 基于实际日期范围,而非行数
Attributes:
train_start: 训练期开始日期,格式 "YYYYMMDD"
train_end: 训练期结束日期,格式 "YYYYMMDD"
val_start: 验证期开始日期,格式 "YYYYMMDD"(可选)
val_end: 验证期结束日期,格式 "YYYYMMDD"(可选)
test_start: 测试期开始日期,格式 "YYYYMMDD"
test_end: 测试期结束日期,格式 "YYYYMMDD"
"""
@@ -35,6 +44,8 @@ class DateSplitter:
train_end: str,
test_start: str,
test_end: str,
val_start: Optional[str] = None,
val_end: Optional[str] = None,
):
"""初始化日期划分器
@@ -43,17 +54,31 @@ class DateSplitter:
train_end: 训练期结束日期 "YYYYMMDD"
test_start: 测试期开始日期 "YYYYMMDD"
test_end: 测试期结束日期 "YYYYMMDD"
val_start: 验证期开始日期 "YYYYMMDD"(可选,如果不提供则从 train 中划分)
val_end: 验证期结束日期 "YYYYMMDD"(可选,如果不提供则从 train 中划分)
Raises:
ValueError: 日期格式错误或日期范围无效
Note:
正确的三分法:
- Train: 用于训练模型参数
- Val: 用于验证/早停/调参(必须位于 train 之后、test 之前)
- Test: 仅用于最终评估,完全独立于训练过程
"""
# 验证日期格式(简单的长度检查)
for name, value in [
dates_to_check = [
("train_start", train_start),
("train_end", train_end),
("test_start", test_start),
("test_end", test_end),
]:
]
if val_start is not None:
dates_to_check.append(("val_start", val_start))
if val_end is not None:
dates_to_check.append(("val_end", val_end))
for name, value in dates_to_check:
if not isinstance(value, str) or len(value) != 8:
raise ValueError(
f"{name} 必须是格式为 'YYYYMMDD' 的8位字符串得到: {value}"
@@ -68,31 +93,83 @@ class DateSplitter:
raise ValueError(
f"test_start ({test_start}) 必须早于或等于 test_end ({test_end})"
)
if test_start <= train_end:
# 验证 val 日期(如果提供了)
if val_start is not None and val_end is not None:
if val_start > val_end:
raise ValueError(
f"测试集开始日期 ({test_start}) 必须晚于训练集结束日期 ({train_end})"
"以确保训练集和测试集不重叠"
f"val_start ({val_start}) 必须早于或等于 val_end ({val_end})"
)
if val_start <= train_end:
raise ValueError(
f"验证集开始日期 ({val_start}) 必须晚于训练集结束日期 ({train_end})"
"以确保验证集在训练集之后"
)
if test_start <= val_end:
raise ValueError(
f"测试集开始日期 ({test_start}) 必须晚于验证集结束日期 ({val_end})"
"以确保测试集在验证集之后"
)
elif val_start is not None or val_end is not None:
raise ValueError("val_start 和 val_end 必须同时提供或同时省略")
# 如果没有提供 val 日期,自动从 train 后划分一段作为 val
# 默认取 train 结束后的 20% 时间作为 val但必须确保在 test 之前
if val_start is None:
# 计算 train 时间跨度(天数近似)
from datetime import datetime
train_start_dt = datetime.strptime(train_start, "%Y%m%d")
train_end_dt = datetime.strptime(train_end, "%Y%m%d")
test_start_dt = datetime.strptime(test_start, "%Y%m%d")
train_days = (train_end_dt - train_start_dt).days
val_duration = max(int(train_days * 0.2), 30) # 至少30天
val_start_dt = train_end_dt + __import__("datetime").timedelta(days=1)
val_end_dt = val_start_dt + __import__("datetime").timedelta(
days=val_duration
)
# 确保 val 在 test 之前
if val_end_dt >= test_start_dt:
# 取 train 和 test 之间的中点
gap_days = (test_start_dt - train_end_dt).days
val_end_dt = train_end_dt + __import__("datetime").timedelta(
days=gap_days // 2
)
val_start_dt = train_end_dt + __import__("datetime").timedelta(days=1)
val_start = val_start_dt.strftime("%Y%m%d")
val_end = min(val_end_dt.strftime("%Y%m%d"), test_start)
self.train_start = train_start
self.train_end = train_end
self.val_start = val_start
self.val_end = val_end
self.test_start = test_start
self.test_end = test_end
def split(
self, data: pl.DataFrame, date_col: str = "trade_date"
) -> Tuple[pl.DataFrame, pl.DataFrame]:
"""划分数据为训练集和测试集
) -> Tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]:
"""划分数据为训练集、验证集和测试集
Args:
data: 输入数据,必须包含日期列
date_col: 日期列名,默认为 "trade_date"
Returns:
(train_data, test_data) 元组
(train_data, val_data, test_data) 元组
Raises:
ValueError: 数据中不包含指定的日期列
Note:
正确的三分法:
- train_data: 用于训练模型参数
- val_data: 用于验证/早停/调参
- test_data: 仅用于最终评估,完全独立于训练过程
"""
if date_col not in data.columns:
raise ValueError(f"数据中不包含列 '{date_col}',可用列: {data.columns}")
@@ -103,20 +180,43 @@ class DateSplitter:
& (pl.col(date_col) <= self.train_end)
)
# 筛选验证集数据
val_data = data.filter(
(pl.col(date_col) >= self.val_start) & (pl.col(date_col) <= self.val_end)
)
# 筛选测试集数据
test_data = data.filter(
(pl.col(date_col) >= self.test_start) & (pl.col(date_col) <= self.test_end)
)
return train_data, test_data
return train_data, val_data, test_data
def split_train_test(
self, data: pl.DataFrame, date_col: str = "trade_date"
) -> Tuple[pl.DataFrame, pl.DataFrame]:
"""划分数据为训练集和测试集(验证集合并到训练集)
适用于不需要验证集的场景,或者使用交叉验证的场景。
Args:
data: 输入数据,必须包含日期列
date_col: 日期列名,默认为 "trade_date"
Returns:
(train_val_data, test_data) 元组,其中 train_val_data 包含 train + val
"""
train_data, val_data, test_data = self.split(data, date_col)
# 合并 train 和 val
train_val_data = pl.concat([train_data, val_data])
return train_val_data, test_data
def __repr__(self) -> str:
"""返回划分器的字符串表示"""
return (
f"DateSplitter("
f"train_start='{self.train_start}', "
f"train_end='{self.train_end}', "
f"test_start='{self.test_start}', "
f"test_end='{self.test_end}'"
f"train='{self.train_start}-{self.train_end}', "
f"val='{self.val_start}-{self.val_end}', "
f"test='{self.test_start}-{self.test_end}'"
f")"
)