Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

The ETL Dinámico para Data Warehouse system is organized into four distinct layers, each with a single, clearly bounded responsibility. Configuration lives at the bottom, providing connection URLs to the infrastructure layer above it. The infrastructure layer hands live SQLAlchemy engines to the services layer, which handles all ETL logic. The presentation layer sits at the top, calling the pipeline and displaying results — it never touches the database directly. This strict separation means you can swap the UI framework, change a database driver, or add a new transformation type without touching any other layer.

The Four Layers

Configuration Layer

File: src/config/settings.pyReads OLTP_DRIVER, OLTP_SERVER, OLTP_DATABASE, OLAP_DRIVER, OLAP_SERVER, and OLAP_DATABASE from a .env file via python-dotenv. Assembles raw ODBC connection strings with Windows Authentication (Trusted_Connection=yes; TrustServerCertificate=yes) and URL-encodes them with urllib.parse.quote_plus so backslashes in server names survive the SQLAlchemy URL parser. Exposes a ready-to-use global settings singleton — no other module needs to touch os.getenv directly.

Infrastructure Layer

Class: src/infrastructure/db_client.py → DatabaseClientHolds one SQLAlchemy engine for the OLTP source and one for the OLAP Data Warehouse, both created with pool_pre_ping=True. Provides get_oltp_connection() and get_olap_connection() as context managers for safe resource cleanup, plus four schema-introspection helpers — get_oltp_tables(), get_olap_tables(), get_source_columns(), and get_target_columns() — that use SQLAlchemy inspect() and handle dotted schema.table names automatically.

Services Layer

Classes: src/services/DataExtractor, DataTransformer, DataLoader, ETLPipelineEach service class handles exactly one ETL concern. DataExtractor queries the OLTP source and returns a Pandas DataFrame. DataTransformer mutates the DataFrame in place according to a mapping specification. DataLoader performs incremental upsert logic against the OLAP target. ETLPipeline orchestrates the three into a single run_dynamic_etl() call, assembling results into a structured return dictionary.

Presentation Layer

File: app.py — Streamlit UIBuilds the complete user interface: source selection (table picker or raw SQL editor), column mapping rows with per-column transformation dropdowns, concat mapping builder, business-key selector, and an Ejecutar ETL button. Initializes DatabaseClient and ETLPipeline once per session through @st.cache_resource on init_etl(), then calls pipeline.run_dynamic_etl() and renders the resulting metrics and sample data preview.

Dependency Direction

Every dependency arrow points downward — upper layers call into lower ones; lower layers never import from upper ones.
Streamlit UI  (app.py)
     │  calls init_etl(), pipeline.run_dynamic_etl()

ETLPipeline  (src/services/pipeline.py)
     │  composes and orchestrates

DataExtractor / DataTransformer / DataLoader  (src/services/)
     │  receive a DatabaseClient at construction time

DatabaseClient  (src/infrastructure/db_client.py)
     │  imports settings singleton

Settings  (src/config/settings.py)
     │  reads environment variables

.env  /  OS environment
DataTransformer is the only service that does not receive a DatabaseClient. It operates purely on in-memory Pandas DataFrames and has no I/O dependency at all, making it trivially testable in isolation.

Key Design Decisions

pool_pre_ping=True for Robust Connections

Both SQLAlchemy engines in DatabaseClient are created with pool_pre_ping=True:
# src/infrastructure/db_client.py
class DatabaseClient:
    def __init__(self):
        self.oltp_engine = create_engine(settings.OLTP_URL, pool_pre_ping=True)
        self.olap_engine = create_engine(settings.OLAP_URL, pool_pre_ping=True)
Before SQLAlchemy checks out a connection from the pool, it issues a lightweight “ping” query. If the connection has gone stale (e.g., the SQL Server restarted or a network timeout occurred), SQLAlchemy transparently recycles it rather than handing a broken connection to application code. This is especially important in a Streamlit environment where sessions can be long-lived and idle for extended periods.

@st.cache_resource — Engines Created Once Per Session

The init_etl() function in app.py is decorated with @st.cache_resource:
# app.py
@st.cache_resource
def init_etl():
    db_client = DatabaseClient()
    return db_client, ETLPipeline(db_client)
@st.cache_resource is Streamlit’s mechanism for sharing expensive, non-serializable objects (like database engine pools) across reruns of the same session. The first time a user opens the app, DatabaseClient() is called — creating both SQLAlchemy engines and initializing their connection pools. On every subsequent rerun (e.g., when the user changes a dropdown or clicks a button), Streamlit returns the same cached (db_client, pipeline) tuple without re-establishing connections. This prevents connection pool exhaustion and keeps the UI responsive.
If you need to reset the connection — for example, after rotating credentials — call st.cache_resource.clear() programmatically or add a “Reconectar” button in the sidebar that clears the cache and triggers a rerun.

URL-Encoding the ODBC Connection String

SQL Server instance names often contain backslashes (e.g., SERVER\SQLEXPRESS). SQLAlchemy’s URL parser treats backslashes as escape characters, which corrupts the connection string. settings.py solves this with urllib.parse.quote_plus:
# src/config/settings.py
raw_oltp = (
    f"DRIVER={os.getenv('OLTP_DRIVER')};"
    f"SERVER={os.getenv('OLTP_SERVER')};"
    f"DATABASE={os.getenv('OLTP_DATABASE')};"
    f"Trusted_Connection=yes;"
    f"TrustServerCertificate=yes;"
)

OLTP_URL = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(raw_oltp)}"
The quote_plus call converts \ to %5C (and spaces to +), producing a URL-safe string that SQLAlchemy and pyodbc can round-trip without data loss.

Schema Introspection for Dynamic UI

DatabaseClient exposes get_source_columns(table_name) and get_target_columns(table_name), both of which use SQLAlchemy’s inspect() API:
# src/infrastructure/db_client.py
def get_source_columns(self, table_name: str):
    inspector = inspect(self.oltp_engine)
    if "." in table_name:
        schema, tbl = table_name.split(".", 1)
        cols = inspector.get_columns(tbl, schema=schema)
    else:
        cols = inspector.get_columns(table_name)
    return [{"name": c["name"], "type": str(c["type"])} for c in cols]
The UI uses the returned [{"name": ..., "type": ...}] lists to populate the column-mapping dropdowns at runtime — no hard-coded schema knowledge is required anywhere in the application.
extract_tables() in DataExtractor filters out SQL Server system schemas (sys, INFORMATION_SCHEMA, db_owner, etc.) so only user-created tables appear in the source table picker.

Build docs developers (and LLMs) love