Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

Every ETL run in ETL Dinámico follows the same linear path: raw rows leave the OLTP source as a Pandas DataFrame, pass through a configurable series of column-level transformations, get narrowed to only the target columns, and are finally written to the Data Warehouse with duplicate-safe incremental logic. The entire flow is driven by a single column_mappings list that the user assembles in the UI — no YAML files, no code changes required. This page traces each step with the exact method signatures and data shapes involved.

Step-by-Step Pipeline Execution

1

User Configures the ETL Job in the Streamlit UI

The user opens app.py and provides four pieces of information that together fully describe the ETL job:
  1. Source — either a table selected from the OLTP dropdown (populated by pipeline.extractor.extract_tables(), which filters out SQL Server system schemas) or a hand-written SQL query entered in the text area.
  2. Target table — a table selected from the Data Warehouse dropdown (populated by db_client.get_olap_tables()).
  3. Column mappings — one row per source column, each specifying a transformation type and a target column name. Concat mappings are added separately and use a different dict structure (see Column Mappings below).
  4. Business key — a single target column used as the uniqueness guard during incremental loading.
When the user clicks Ejecutar ETL, app.py calls _validar_y_ejecutar(), which validates all four inputs and then invokes:
resultado = pipeline.run_dynamic_etl(
    source_table=source_table or "",
    target_table=target_table,
    business_key=business_key,
    column_mappings=column_mappings,  # list of dicts assembled by the UI
    sql_query=sql_query,
)
2

Extraction — OLTP → Pandas DataFrame

ETLPipeline.run_dynamic_etl() delegates extraction to DataExtractor. The extraction path depends on whether a table name or a raw SQL query was provided.Path A — Table extraction (source_table is set):The pipeline first computes the minimal set of source columns needed by scanning the column_mappings list — it collects source_column values from regular mappings and both column1 / column2 values from concat mappings. It then calls:
# src/services/extractor.py
def extract_by_table(
    self,
    table_name: str,
    columns: list[str] | None = None,
    limit: int | None = None,
):
    cols = ", ".join(columns) if columns else "*"
    if limit:
        query = f"SELECT TOP {limit} {cols} FROM {table_name}"
    else:
        query = f"SELECT {cols} FROM {table_name}"

    with self.db.get_oltp_connection() as conn:
        df = pd.read_sql(text(query), conn)
        return df
Only the columns that are actually referenced in the mappings are fetched — unnecessary columns never leave the source database.Path B — Custom SQL extraction (sql_query is set):The user-supplied SQL is wrapped in a subquery so that a row limit can be applied safely to any arbitrary query shape:
# src/services/extractor.py
def extract_by_query(self, sql_query: str, limit: int | None = None):
    if limit:
        query = f"SELECT TOP {limit} * FROM ({sql_query}) AS _preview"
    else:
        query = sql_query
    with self.db.get_oltp_connection() as conn:
        df = pd.read_sql(text(query), conn)
    return df
In both paths the result is a standard Pandas DataFrame with column names matching the OLTP schema.
The UI uses limit=10 for the Vista previa button and limit=50 for SQL preview runs. The full ETL execution (triggered by Ejecutar ETL) calls extract_by_table without a limit, fetching all rows.
3

Transformation — DataFrame Mutated In Place

ETLPipeline iterates through column_mappings in order, calling the appropriate DataTransformer method for each entry. All methods write their output into a new column on the existing DataFrame — the original source columns are preserved unless overwritten by a same-name mapping.The full set of supported transform_type values and their implementations:
transform_typeMethod calledWhat it does
"none"Direct assignmentdf[target_col] = df[source_col] — rename only, no data change
"upper"capitalize_transform(..., operation="upper")df[new_col] = df[col].str.upper()
"lower"capitalize_transform(..., operation="lower")df[new_col] = df[col].str.lower()
"year"date_transform(..., operation="year")Parses the column with pd.to_datetime, then df[new_col] = df[col].dt.year
"month"date_transform(..., operation="month")Same parse step, then .dt.month
"day"date_transform(..., operation="day")Same parse step, then .dt.day
"concat" (special)concat_transform(...)df[new_col] = df[col1] + ' ' + df[col2]
The dispatch logic inside run_dynamic_etl:
# src/services/pipeline.py  (inside run_dynamic_etl)
for mapping in column_mappings:
    if mapping.get("type") == "concat":
        self.transformer.concat_transform(
            df,
            new_column=mapping["target_column"],
            column1=mapping["column1"],
            column2=mapping["column2"],
        )
    else:
        ttype = mapping["transform_type"]
        source_col = mapping["source_column"]
        target_col = mapping["target_column"]

        if ttype == "none":
            df[target_col] = df[source_col]
        elif ttype in ("upper", "lower"):
            self.transformer.capitalize_transform(
                df, column=source_col, new_column=target_col, operation=ttype
            )
        elif ttype in ("year", "month", "day"):
            self.transformer.date_transform(
                df, column=source_col, new_column=target_col, operation=ttype
            )
Because mappings are applied in list order, you can use the output of an earlier mapping as the source_column of a later one — for example, first normalize a date column with transform_type="year", then reference that year column in a subsequent mapping.
4

Column Selection — Only Target Columns Survive

After all transformations are applied, the DataFrame still contains the original source columns (and any intermediate columns created during transformation). Before loading, the pipeline trims the DataFrame to only the columns that matter in the Data Warehouse:
# src/services/pipeline.py
target_cols = list(set(
    [m["target_column"] for m in column_mappings] + [business_key]
))
df_load = df[target_cols]
target_cols is built from every target_column value in the mappings list, with the business_key added as a guarantee (the loader requires it). Using set() ensures no column appears twice even if the business key is also a mapped target column.
This step is what makes the pipeline “dynamic” — the shape of df_load is entirely determined at runtime by the user’s mapping configuration, not by any hard-coded column list.
5

Incremental Loading — New Rows Only Appended to the DW

DataLoader.load_incremental() implements a key-based deduplication strategy: it reads the current set of business key values already present in the OLAP target table, filters them out of the incoming DataFrame, and appends only the genuinely new rows.
# src/services/loader.py
def load_incremental(
    self, df_transformer, table_name: str, business_key: str
) -> int:
    query = f"SELECT {business_key} from {table_name}"

    with self.db_client.get_olap_connection() as conn:
        # 1. Fetch all existing keys from the DW target table
        df_dw = pd.read_sql(text(query), conn)
        valid_keys = df_dw[business_key].tolist()

        # 2. Keep only rows whose key is NOT already in the DW
        df_load = df_transformer[~df_transformer[business_key].isin(valid_keys)]

        # 3. Append new rows; never overwrite existing data
        df_load.to_sql(table_name, conn, if_exists='append', index=False)
        conn.commit()
        return len(df_load)
The ~df[business_key].isin(valid_keys) boolean mask is the core of the incremental strategy. Rows whose business key already exists in df_dw are silently dropped; only the remainder reaches df.to_sql(..., if_exists='append'). The method returns the count of rows actually written, which the pipeline surfaces as rows_loaded.
if_exists='append' means to_sql will never drop or recreate the target table — it only inserts new rows. The target table must already exist in the Data Warehouse with the correct schema before running an ETL job.
6

Results Surfaced in the Streamlit UI

run_dynamic_etl() returns a structured result dictionary:
return {
    "status": "success",
    "rows_extracted": total_extraidos,   # len(df) immediately after extraction
    "rows_loaded": rows_loaded,           # rows actually written to the DW
    "table_destination": target_table,
    "sample_data": df_load.head(5),       # first 5 rows of the trimmed DataFrame
}
Back in app.py, _validar_y_ejecutar() unpacks this dictionary and renders:
  • A success banner via st.success()
  • Two st.metric() cards side-by-side showing Filas extraídas and Filas cargadas
  • The target table name in a markdown line
  • An expandable JSON view of the full result dict
  • A st.dataframe() preview of sample_data (up to 5 rows)
The difference between rows_extracted and rows_loaded tells you at a glance how many rows were already present in the DW and therefore skipped.

Column Mappings — The Transformation Contract

The column_mappings list is the single configuration object that drives the entire pipeline. Every entry is a Python dict; the required keys differ depending on whether the mapping is a standard field mapping or a concatenation.

Standard Mapping (all non-concat transforms)

{
    "source_column": "OrderDate",   # column name as it exists in the OLTP source
    "transform_type": "year",       # one of: "none", "upper", "lower", "year", "month", "day"
    "target_column": "OrderYear",   # column name to write in the DW target table
}
A transform_type of "none" is a pure rename — the value is copied byte-for-byte from source_column to target_column with no mutation.

Concat Mapping (special structure)

{
    "type": "concat",               # presence of "type": "concat" is the discriminator
    "column1": "FirstName",         # first source column
    "column2": "LastName",          # second source column joined with a space
    "target_column": "FullName",    # resulting column in the DW
}
Concat mappings use a completely different key set from standard mappings — there is no source_column or transform_type key. The pipeline uses mapping.get("type") == "concat" as the discriminator before deciding which transformer method to call.

Complete Example — All Mapping Types

column_mappings = [
    # Direct copy (rename only) — "none" transform copies the value unchanged
    {"source_column": "CustomerID",   "transform_type": "none",  "target_column": "CustomerID"},

    # Text transformation — converts CompanyName to ALL CAPS in the DW
    {"source_column": "CompanyName",  "transform_type": "upper", "target_column": "CompanyName"},

    # Date component extraction — stores only the year integer from OrderDate
    {"source_column": "OrderDate",    "transform_type": "year",  "target_column": "OrderYear"},

    # Column concatenation — different dict structure, no source_column / transform_type
    {"type": "concat", "column1": "FirstName", "column2": "LastName", "target_column": "FullName"},
]
When ETLPipeline processes this list against a source DataFrame, the resulting df_load will have four columns: CustomerID, CompanyName, OrderYear, and FullName — regardless of how many columns the original OLTP table contained.

End-to-End Data Shape Summary

The table below shows how the data object changes at each stage of the pipeline for the example mappings above, assuming a source table with columns CustomerID, CompanyName, OrderDate, FirstName, LastName, and several other columns.
StageObjectColumns present
After extractiondf (full source)CustomerID, CompanyName, OrderDate, FirstName, LastName (only needed cols fetched)
After "none" mappingdf+ CustomerID (target col written, same value)
After "upper" mappingdf+ CompanyName (target col overwritten with uppercased value)
After "year" mappingdf+ OrderYear (integer year extracted from OrderDate)
After concat mappingdf+ FullName (FirstName + ' ' + LastName)
After column selectiondf_loadCustomerID, CompanyName, OrderYear, FullName only
After incremental filterdf_load (subset)Same columns, rows already in DW removed
Written to DWrows appendedNew rows only, appended to target table

Build docs developers (and LLMs) love