feat(data): 添加 DuckDB 只读模式支持
- Storage 类默认使用 read_only=True 模式,允许多进程并发读取 - ThreadSafeStorage 自动使用 read_only=False 模式,用于数据同步写入 - catalog.query_duckdb_to_polars 函数使用只读连接
This commit is contained in:
@@ -439,6 +439,7 @@ def query_duckdb_to_polars(query: str, db_path: str) -> pl.LazyFrame:
|
||||
"""执行 DuckDB 查询并返回 Polars LazyFrame。
|
||||
|
||||
使用 duckdb.connect().sql(query).pl() 实现高速数据流转。
|
||||
默认使用 read_only=True 模式,允许多进程并发读取。
|
||||
|
||||
Args:
|
||||
query: SQL 查询语句
|
||||
@@ -447,7 +448,7 @@ def query_duckdb_to_polars(query: str, db_path: str) -> pl.LazyFrame:
|
||||
Returns:
|
||||
Polars LazyFrame
|
||||
"""
|
||||
conn = duckdb.connect(db_path)
|
||||
conn = duckdb.connect(db_path, read_only=True)
|
||||
try:
|
||||
# DuckDB -> Polars 高速转换
|
||||
df = conn.sql(query).pl()
|
||||
|
||||
@@ -10,6 +10,10 @@ from datetime import datetime
|
||||
from src.config.settings import get_settings
|
||||
|
||||
|
||||
# Type alias for DuckDB connection
|
||||
DuckDBConnection = duckdb.DuckDBPyConnection
|
||||
|
||||
|
||||
# Default column type mapping for automatic schema inference
|
||||
DEFAULT_TYPE_MAPPING = {
|
||||
"ts_code": "VARCHAR(16)",
|
||||
@@ -38,10 +42,14 @@ class Storage:
|
||||
- 新增 load_polars() 方法支持 Polars 零拷贝导出
|
||||
- 使用单例模式管理数据库连接
|
||||
- 并发写入通过队列管理(见 ThreadSafeStorage)
|
||||
|
||||
注意:
|
||||
- 默认使用 read_only=True 模式,允许多进程并发读取
|
||||
- 只有在数据同步时才使用 read_only=False 模式
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
_connection = None
|
||||
_connection: Optional[DuckDBConnection] = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""Singleton to ensure single connection."""
|
||||
@@ -49,8 +57,14 @@ class Storage:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, path: Optional[Path] = None):
|
||||
"""Initialize storage."""
|
||||
def __init__(self, path: Optional[Path] = None, read_only: bool = True):
|
||||
"""Initialize storage.
|
||||
|
||||
Args:
|
||||
path: 数据库文件路径,默认为配置中的路径
|
||||
read_only: 是否为只读模式,默认为 True
|
||||
只有在数据同步时才需要设置为 False
|
||||
"""
|
||||
if hasattr(self, "_initialized"):
|
||||
return
|
||||
|
||||
@@ -58,6 +72,7 @@ class Storage:
|
||||
self.base_path = path or cfg.data_path_resolved
|
||||
self.base_path.mkdir(parents=True, exist_ok=True)
|
||||
self.db_path = self.base_path / "prostock.db"
|
||||
self._read_only = read_only
|
||||
|
||||
self._init_db()
|
||||
self._initialized = True
|
||||
@@ -73,7 +88,7 @@ class Storage:
|
||||
- api_bak_basic.py: BakBasicSync.TABLE_SCHEMA
|
||||
- api_financial_sync.py: FinancialSync.TABLE_SCHEMAS
|
||||
"""
|
||||
self._connection = duckdb.connect(str(self.db_path))
|
||||
self._connection = duckdb.connect(str(self.db_path), read_only=self._read_only)
|
||||
|
||||
def save(
|
||||
self,
|
||||
@@ -304,10 +319,15 @@ class ThreadSafeStorage:
|
||||
|
||||
DuckDB 写入时不支持并发,使用队列收集写入请求,
|
||||
在 sync 结束时统一批量写入。
|
||||
|
||||
注意:
|
||||
- 此类自动使用 read_only=False 模式,用于数据同步
|
||||
- 不要在多进程中同时使用此类,只应在单进程中用于批量写入
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.storage = Storage()
|
||||
# 使用 read_only=False 模式创建 Storage,用于写入操作
|
||||
self.storage = Storage(read_only=False)
|
||||
self._pending_writes: List[tuple] = [] # [(name, data, use_upsert), ...]
|
||||
|
||||
def queue_save(self, name: str, data: pd.DataFrame, use_upsert: bool = True):
|
||||
|
||||
Reference in New Issue
Block a user