Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

DatabaseClient is the single entry point for all database connectivity in the ETL pipeline. It holds two SQLAlchemy engines — one for the OLTP source (SQL Server transactional database) and one for the OLAP target (Data Warehouse) — and exposes connection context managers plus schema introspection helpers used by every downstream service. Both engines are created with pool_pre_ping=True, which silently recycles stale connections before handing them to callers.

Class: DatabaseClient

from src.infrastructure.db_client import DatabaseClient

db = DatabaseClient()

Constructor

DatabaseClient.__init__() reads connection strings from the global settings singleton and creates two SQLAlchemy engines immediately at instantiation time. No parameters are accepted.
# settings.py builds the connection URLs from environment variables:
#   OLTP_DRIVER, OLTP_SERVER, OLTP_DATABASE
#   OLAP_DRIVER, OLAP_SERVER, OLAP_DATABASE
# Both use Windows Authentication (Trusted_Connection=yes)

from src.infrastructure.db_client import DatabaseClient

db = DatabaseClient()
# db.oltp_engine  →  mssql+pyodbc engine pointing at the OLTP database
# db.olap_engine  →  mssql+pyodbc engine pointing at the OLAP / DW database
Connection URLs are assembled by src/config/settings.py using urllib.parse.quote_plus to safely encode the raw pyodbc connection string, including backslash-separated server names. Set the required environment variables (or a .env file) before instantiating DatabaseClient.

Connection Methods

These two methods return live SQLAlchemy Connection objects. Use them as context managers so the connection is automatically returned to the pool when the with block exits.

get_oltp_connection()

Returns a context-manager-compatible SQLAlchemy connection to the OLTP (source) database.
with db.get_oltp_connection() as conn:
    result = conn.execute(text("SELECT TOP 1 * FROM Sales.Orders"))
    row = result.fetchone()
return
sqlalchemy.engine.Connection
A SQLAlchemy Connection context manager bound to oltp_engine. Commits or rolls back automatically on exit depending on whether an exception was raised.

get_olap_connection()

Returns a context-manager-compatible SQLAlchemy connection to the OLAP (Data Warehouse) target database.
with db.get_olap_connection() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM dim_customers"))
    print(result.scalar())
return
sqlalchemy.engine.Connection
A SQLAlchemy Connection context manager bound to olap_engine. Commits or rolls back automatically on exit depending on whether an exception was raised.

Table Discovery Methods

These methods use SQLAlchemy’s inspect() to query the database catalogue without writing raw SQL.

get_oltp_tables()

Returns the list of user table names in the OLTP database’s default schema.
tables = db.get_oltp_tables()
# Example output: ['Customers', 'Orders', 'Products', 'OrderDetails']
return
list[str]
Unqualified table names (no schema prefix) as reported by Inspector.get_table_names() against the OLTP engine.
get_oltp_tables() queries only the default schema. To discover tables across all schemas — including schema-qualified names like Sales.Orders — use DataExtractor.extract_tables() instead.

get_olap_tables()

Returns the list of user table names in the OLAP / Data Warehouse database’s default schema.
tables = db.get_olap_tables()
# Example output: ['dim_customers', 'dim_products', 'fact_orders']
return
list[str]
Unqualified table names as reported by Inspector.get_table_names() against the OLAP engine.

Schema Introspection Methods

These methods return column metadata for a given table and are used by the Streamlit UI to populate column-mapping dropdowns at runtime.

get_source_columns(table_name)

Fetches column names and their SQL types from the OLTP database. Accepts both plain table names and schema-qualified names separated by a dot.
table_name
str
required
The table to inspect. Supply a plain name ("Customers") or a schema-qualified name ("Sales.Orders"). When a dot is present the string is split on the first . to derive schema and table arguments for Inspector.get_columns().
return
list[dict]
A list of column descriptor dicts, each containing:
cols = db.get_source_columns("Customers")
# [
#   {"name": "CustomerID",   "type": "INTEGER"},
#   {"name": "CustomerName", "type": "VARCHAR(100)"},
#   {"name": "CreatedAt",    "type": "DATETIME"},
# ]

get_target_columns(table_name)

Identical behaviour to get_source_columns but inspects the OLAP (Data Warehouse) engine instead.
table_name
str
required
The DW table to inspect. Accepts plain ("dim_customers") or schema-qualified ("dbo.fact_orders") names.
return
list[dict]
A list of column descriptor dicts, each containing:
cols = db.get_target_columns("dim_customers")
# [
#   {"name": "CustomerKey",  "type": "INTEGER"},
#   {"name": "CustomerName", "type": "NVARCHAR(100)"},
#   {"name": "LoadDate",     "type": "DATE"},
# ]
The target table must already exist in the Data Warehouse before calling get_target_columns(). The method does not create tables — it only reads the catalogue. Use the same precaution before calling DataLoader.load_incremental().

Build docs developers (and LLMs) love