Files
ProStock/src/data/sync.py

370 lines
13 KiB
Python
Raw Normal View History

"""数据同步调度中心模块。
该模块作为数据同步的调度中心统一管理各类型数据的同步流程
重要规范 - sync.py 职责范围
本模块**仅包含每日更新的数据接口**季度/低频数据不应放入此文件
本模块包含的同步逻辑每日更新
- api_daily.py: 日线数据同步 (DailySync )
- api_daily_basic.py: 每日指标数据同步 (DailyBasicSync )
- api_bak_basic.py: 历史股票列表同步 (BakBasicSync )
- api_pro_bar.py: Pro Bar 数据同步 (ProBarSync )
- api_stock_basic.py: 股票基本信息同步
- api_trade_cal.py: 交易日历同步
不应包含的同步逻辑季度/低频更新
- financial_data/: 财务数据利润表资产负债表现金流量表等
使用方式
from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
sync_financial()
- api_namechange.py: 股票名称变更不频繁
使用方式
from src.data.api_wrappers import sync_namechange
sync_namechange(force=True)
使用方式
# 预览同步(检查数据量,不写入)
from src.data.sync import preview_sync
preview = preview_sync()
# 同步所有每日更新数据不包括财务数据、namechange
from src.data.sync import sync_all_data
result = sync_all_data()
# 强制全量重载
result = sync_all_data(force_full=True)
"""
from typing import Optional, Dict, Union, Any
import pandas as pd
from src.data.api_wrappers import sync_all_stocks
from src.data.api_wrappers.api_daily import sync_daily, preview_daily_sync
from src.data.api_wrappers.api_pro_bar import sync_pro_bar
from src.data.api_wrappers.api_bak_basic import sync_bak_basic
from src.data.api_wrappers.api_daily_basic import sync_daily_basic
from src.data.api_wrappers.api_stock_st import sync_stock_st
def preview_sync(
force_full: bool = False,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
sample_size: int = 3,
max_workers: Optional[int] = None,
) -> dict[str, Any]:
"""预览日线同步数据量和样本(不实际同步)。
这是推荐的方式可在实际同步前检查将要同步的内容
Args:
force_full: 若为 True预览全量同步 20180101
start_date: 手动指定起始日期覆盖自动检测
end_date: 手动指定结束日期默认为今天
sample_size: 预览用样本股票数量默认: 3
max_workers: 工作线程数默认: 10
Returns:
包含预览信息的字典
{
'sync_needed': bool,
'stock_count': int,
'start_date': str,
'end_date': str,
'estimated_records': int,
'sample_data': pd.DataFrame,
'mode': str, # 'full', 'incremental', 'partial', 或 'none'
}
Example:
>>> # 预览将要同步的内容
>>> preview = preview_sync()
>>>
>>> # 预览全量同步
>>> preview = preview_sync(force_full=True)
>>>
>>> # 预览更多样本
>>> preview = preview_sync(sample_size=5)
"""
return preview_daily_sync(
force_full=force_full,
start_date=start_date,
end_date=end_date,
sample_size=sample_size,
)
def sync_all(
force_full: bool = False,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
max_workers: Optional[int] = None,
dry_run: bool = False,
) -> dict[str, pd.DataFrame]:
"""同步所有股票的日线数据。
这是日线数据同步的主要入口点
Args:
force_full: 若为 True强制从 20180101 完整重载
start_date: 手动指定起始日期YYYYMMDD
end_date: 手动指定结束日期默认为今天
max_workers: 工作线程数默认: 10
dry_run: 若为 True仅预览将要同步的内容不写入数据
Returns:
映射 ts_code DataFrame 的字典
Example:
>>> # 首次同步(从 20180101 全量加载)
>>> result = sync_all()
>>>
>>> # 后续同步(增量 - 仅新数据)
>>> result = sync_all()
>>>
>>> # 强制完整重载
>>> result = sync_all(force_full=True)
>>>
>>> # 手动指定日期范围
>>> result = sync_all(start_date='20240101', end_date='20240131')
>>>
>>> # 自定义线程数
>>> result = sync_all(max_workers=20)
>>>
>>> # Dry run仅预览
>>> result = sync_all(dry_run=True)
"""
return sync_daily(
force_full=force_full,
start_date=start_date,
end_date=end_date,
max_workers=max_workers,
dry_run=dry_run,
)
def sync_all_data(
force_full: bool = False,
max_workers: Optional[int] = None,
dry_run: bool = False,
) -> dict[str, Any]:
"""同步所有每日更新的数据类型。
重要本函数仅同步每日更新的数据不包含季度/低频数据
该函数按顺序同步以下每日更新的数据类型
1. 交易日历 (sync_trade_cal_cache)
2. 股票基本信息 (sync_all_stocks)
3. 日线数据 (sync_daily)
4. Pro Bar 数据 (sync_pro_bar)
5. 每日指标数据 (sync_daily_basic)
6. 历史股票列表 (sync_bak_basic)
7. ST股票列表 (sync_stock_st)
不包含的同步需单独调用
- 财务数据: 利润表资产负债表现金流量表季度更新
使用: from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial
调用: sync_financial()
- 名称变更 (namechange): 股票曾用名低频更新
使用: from src.data.api_wrappers import sync_namechange
调用: sync_namechange(force=True)
Args:
force_full: 若为 True强制所有数据类型完整重载
max_workers: 日线数据同步的工作线程数默认: 10
dry_run: 若为 True仅显示将要同步的内容不写入数据
Returns:
映射数据类型到同步结果的字典
Example:
>>> result = sync_all_data()
>>>
>>> # 强制完整重载
>>> result = sync_all_data(force_full=True)
>>>
>>> # Dry run
>>> result = sync_all_data(dry_run=True)
"""
results: dict[str, Any] = {}
print("\n" + "=" * 60)
print("[sync_all_data] Starting full data synchronization...")
print("=" * 60)
# 1. Sync trade calendar (always needed first)
print("\n[1/7] Syncing trade calendar cache...")
try:
from src.data.api_wrappers import sync_trade_cal_cache
sync_trade_cal_cache()
results["trade_cal"] = pd.DataFrame()
print("[1/7] Trade calendar: OK")
except Exception as e:
print(f"[1/7] Trade calendar: FAILED - {e}")
results["trade_cal"] = pd.DataFrame()
# 2. Sync stock basic info
print("\n[2/7] Syncing stock basic info...")
try:
sync_all_stocks()
results["stock_basic"] = pd.DataFrame()
print("[2/7] Stock basic: OK")
except Exception as e:
print(f"[2/7] Stock basic: FAILED - {e}")
results["stock_basic"] = pd.DataFrame()
# 3. Sync daily market data
# print("\n[3/7] Syncing daily market data...")
# try:
# # 确保表存在
# from src.data.api_wrappers.api_daily import DailySync
#
# DailySync().ensure_table_exists()
#
# daily_result = sync_daily(
# force_full=force_full,
# max_workers=max_workers,
# dry_run=dry_run,
# )
# results["daily"] = daily_result
# total_daily_records = (
# sum(len(df) for df in daily_result.values()) if daily_result else 0
# )
# print(
# f"[3/7] Daily data: OK ({total_daily_records} records from {len(daily_result)} stocks)"
# )
# except Exception as e:
# print(f"[3/7] Daily data: FAILED - {e}")
# results["daily"] = pd.DataFrame()
# 4. Sync Pro Bar data
print("\n[4/7] Syncing Pro Bar data (with adj, tor, vr)...")
try:
# 确保表存在
from src.data.api_wrappers.api_pro_bar import ProBarSync
ProBarSync().ensure_table_exists()
pro_bar_result = sync_pro_bar(
force_full=force_full,
max_workers=max_workers,
dry_run=dry_run,
)
results["pro_bar"] = pro_bar_result
total_pro_bar_records = (
sum(len(df) for df in pro_bar_result.values()) if pro_bar_result else 0
)
print(
f"[4/7] Pro Bar data: OK ({total_pro_bar_records} records from {len(pro_bar_result)} stocks)"
)
except Exception as e:
print(f"[4/7] Pro Bar data: FAILED - {e}")
results["pro_bar"] = pd.DataFrame()
# 5. Sync daily basic indicators
print(
"\n[5/7] Syncing daily basic indicators (PE, PB, turnover rate, market value)..."
)
try:
# 确保表存在
from src.data.api_wrappers.api_daily_basic import DailyBasicSync
DailyBasicSync().ensure_table_exists()
daily_basic_result = sync_daily_basic(force_full=force_full, dry_run=dry_run)
results["daily_basic"] = daily_basic_result
print(f"[5/7] Daily basic: OK ({len(daily_basic_result)} records)")
except Exception as e:
print(f"[5/7] Daily basic: FAILED - {e}")
results["daily_basic"] = pd.DataFrame()
# 6. Sync stock historical list (bak_basic)
print("\n[6/7] Syncing stock historical list (bak_basic)...")
try:
# 确保表存在
from src.data.api_wrappers.api_bak_basic import BakBasicSync
BakBasicSync().ensure_table_exists()
bak_basic_result = sync_bak_basic(force_full=force_full)
results["bak_basic"] = bak_basic_result
print(f"[6/7] Bak basic: OK ({len(bak_basic_result)} records)")
except Exception as e:
print(f"[6/7] Bak basic: FAILED - {e}")
results["bak_basic"] = pd.DataFrame()
# 7. Sync ST stock list
print("\n[7/7] Syncing ST stock list...")
try:
# 确保表存在
from src.data.api_wrappers.api_stock_st import StockSTSync
StockSTSync().ensure_table_exists()
stock_st_result = sync_stock_st(force_full=force_full)
results["stock_st"] = stock_st_result
print(f"[7/7] ST stock list: OK ({len(stock_st_result)} records)")
except Exception as e:
print(f"[7/7] ST stock list: FAILED - {e}")
results["stock_st"] = pd.DataFrame()
# Summary
print("\n" + "=" * 60)
print("[sync_all_data] Sync Summary")
print("=" * 60)
for data_type, data in results.items():
if isinstance(data, dict):
# 日线和 Pro Bar 返回的是 dict[str, DataFrame]
total_records = sum(len(df) for df in data.values())
print(f" {data_type}: {len(data)} stocks, {total_records} total records")
else:
# daily_basic 和 bak_basic 返回的是 DataFrame
print(f" {data_type}: {len(data)} records")
print("=" * 60)
print("\nNote: namechange is NOT in auto-sync. To sync manually:")
print(" from src.data.api_wrappers import sync_namechange")
print(" sync_namechange(force=True)")
return results
if __name__ == "__main__":
print("=" * 60)
print("Data Sync Module")
print("=" * 60)
print("\nUsage:")
print(" # Sync all data types at once (RECOMMENDED)")
print(" from src.data.sync import sync_all_data")
print(" result = sync_all_data() # Incremental sync all")
print(" result = sync_all_data(force_full=True) # Full reload")
print("")
print(" # Or sync individual data types:")
print(" from src.data.sync import sync_all, preview_sync")
print(" from src.data.api_wrappers import sync_daily_basic, sync_bak_basic")
print("")
print(" # Preview before sync (recommended)")
print(" preview = preview_sync()")
print("")
print(" # Dry run (preview only)")
print(" result = sync_all(dry_run=True)")
print("")
print(" # Actual sync")
print(" result = sync_all() # Incremental sync")
print(" result = sync_all(force_full=True) # Full reload")
print("")
print(" # bak_basic sync")
print(" result = sync_bak_basic() # Incremental sync")
print(" result = sync_bak_basic(force_full=True) # Full reload")
print("\n" + "=" * 60)
# Run sync_all_data by default
print("\n[Main] Running sync_all_data()...")
result = sync_all_data()
print("\n[Main] Sync completed!")
print(f"Total data types synced: {len(result)}")