"""DuckDB Database Inspector Tool Usage: uv run python -c "from src.data.db_inspector import get_db_info; get_db_info()" Or as standalone script: cd D:\\PyProject\\ProStock && uv run python -c "import sys; sys.path.insert(0, '.'); from src.data.db_inspector import get_db_info; get_db_info()" Features: - List all tables - Show row count for each table - Show database file size - Show column information for each table """ import duckdb import pandas as pd from pathlib import Path from datetime import datetime from typing import Optional def get_db_info(db_path: Optional[Path] = None): """Get complete summary of DuckDB database Args: db_path: Path to database file, uses default if None Returns: DataFrame: Summary of all tables """ # Get database path if db_path is None: from src.config.settings import get_settings cfg = get_settings() db_path = cfg.data_path_resolved / "prostock.db" cfg = get_settings() db_path = cfg.data_path_resolved / "prostock.db" else: db_path = Path(db_path) if not db_path.exists(): print(f"[ERROR] Database file not found: {db_path}") return None # Connect to database (read-only mode) conn = duckdb.connect(str(db_path), read_only=True) try: print("=" * 80) print("ProStock DuckDB Database Summary") print("=" * 80) print(f"Database Path: {db_path}") print(f"Check Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # Get database file size db_size_bytes = db_path.stat().st_size db_size_mb = db_size_bytes / (1024 * 1024) print(f"Database Size: {db_size_mb:.2f} MB ({db_size_bytes:,} bytes)") print("=" * 80) # Get all table information tables_query = """ SELECT table_name, table_type FROM information_schema.tables WHERE table_schema = 'main' ORDER BY table_name """ tables_df = conn.execute(tables_query).fetchdf() if tables_df.empty: print("\n[WARNING] No tables found in database") return pd.DataFrame() print(f"\nTable List (Total: {len(tables_df)} tables)") print("-" * 80) # Store summary information summary_data = [] for _, row in tables_df.iterrows(): table_name = row["table_name"] table_type = row["table_type"] # Get row count for table try: count_result = conn.execute( f'SELECT COUNT(*) FROM "{table_name}"' ).fetchone() row_count = count_result[0] if count_result else 0 except Exception as e: row_count = f"Error: {e}" # Get column count try: columns_query = f""" SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND table_schema = 'main' """ col_result = conn.execute(columns_query).fetchone() col_count = col_result[0] if col_result else 0 except Exception: col_count = 0 # Get date range (for daily table) date_range = "-" if ( table_name == "daily" and row_count and isinstance(row_count, int) and row_count > 0 ): try: date_query = """ SELECT MIN(trade_date) as min_date, MAX(trade_date) as max_date FROM daily """ date_result = conn.execute(date_query).fetchone() if date_result and date_result[0] and date_result[1]: date_range = f"{date_result[0]} ~ {date_result[1]}" except Exception: pass summary_data.append( { "Table Name": table_name, "Type": table_type, "Row Count": row_count if isinstance(row_count, int) else 0, "Column Count": col_count, "Date Range": date_range, } ) # Print single line info row_str = f"{row_count:,}" if isinstance(row_count, int) else str(row_count) print(f" * {table_name:<20} | Rows: {row_str:>12} | Cols: {col_count}") print("-" * 80) # Calculate total rows total_rows = sum( item["Row Count"] for item in summary_data if isinstance(item["Row Count"], int) ) print(f"\nData Summary") print(f" Total Tables: {len(summary_data)}") print(f" Total Rows: {total_rows:,}") print( f" Avg Rows/Table: {total_rows // len(summary_data):,}" if summary_data else " Avg Rows/Table: 0" ) # Detailed table structure print("\nDetailed Table Structure") print("=" * 80) for item in summary_data: table_name = item["Table Name"] print(f"\n[{table_name}]") # Get column information columns_query = f""" SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = '{table_name}' AND table_schema = 'main' ORDER BY ordinal_position """ columns_df = conn.execute(columns_query).fetchdf() if not columns_df.empty: print(f" Columns: {len(columns_df)}") print(f" {'Column':<20} {'Data Type':<20} {'Nullable':<10}") print(f" {'-' * 20} {'-' * 20} {'-' * 10}") for _, col in columns_df.iterrows(): nullable = "YES" if col["is_nullable"] == "YES" else "NO" print( f" {col['column_name']:<20} {col['data_type']:<20} {nullable:<10}" ) # For daily table, show extra statistics if ( table_name == "daily" and isinstance(item["Row Count"], int) and item["Row Count"] > 0 ): try: stats_query = """ SELECT COUNT(DISTINCT ts_code) as stock_count, COUNT(DISTINCT trade_date) as date_count FROM daily """ stats = conn.execute(stats_query).fetchone() if stats: print(f"\n Statistics:") print(f" - Unique Stocks: {stats[0]:,}") print(f" - Trade Dates: {stats[1]:,}") print( f" - Avg Records/Stock/Date: {item['Row Count'] // stats[0] if stats[0] > 0 else 0}" ) except Exception as e: print(f"\n Statistics query failed: {e}") print("\n" + "=" * 80) print("Check Complete") print("=" * 80) # Return DataFrame for further use return pd.DataFrame(summary_data) finally: conn.close() def get_table_sample(table_name: str, limit: int = 5, db_path: Optional[Path] = None): """Get sample data from specified table Args: table_name: Name of table limit: Number of rows to return db_path: Path to database file """ if db_path is None: from src.config.settings import get_settings cfg = get_settings() db_path = cfg.data_path_resolved / "prostock.db" cfg = get_settings() db_path = cfg.data_path_resolved / "prostock.db" else: db_path = Path(db_path) if not db_path.exists(): print(f"[ERROR] Database file not found: {db_path}") return None conn = duckdb.connect(str(db_path), read_only=True) try: query = f'SELECT * FROM "{table_name}" LIMIT {limit}' df = conn.execute(query).fetchdf() print(f"\nTable [{table_name}] Sample Data (first {len(df)} rows):") print(df.to_string()) return df except Exception as e: print(f"[ERROR] Query failed: {e}") return None finally: conn.close() if __name__ == "__main__": # Display database summary summary_df = get_db_info() # If daily table exists, show sample data if ( summary_df is not None and not summary_df.empty and "daily" in summary_df["Table Name"].values ): print("\n") get_table_sample("daily", limit=5)