Every ETL run in ETL Dinámico follows the same linear path: raw rows leave the OLTP source as a Pandas DataFrame, pass through a configurable series of column-level transformations, get narrowed to only the target columns, and are finally written to the Data Warehouse with duplicate-safe incremental logic. The entire flow is driven by a singleDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt
Use this file to discover all available pages before exploring further.
column_mappings list that the user assembles in the UI — no YAML files, no code changes required. This page traces each step with the exact method signatures and data shapes involved.
Step-by-Step Pipeline Execution
User Configures the ETL Job in the Streamlit UI
The user opens
app.py and provides four pieces of information that together fully describe the ETL job:- Source — either a table selected from the OLTP dropdown (populated by
pipeline.extractor.extract_tables(), which filters out SQL Server system schemas) or a hand-written SQL query entered in the text area. - Target table — a table selected from the Data Warehouse dropdown (populated by
db_client.get_olap_tables()). - Column mappings — one row per source column, each specifying a transformation type and a target column name. Concat mappings are added separately and use a different dict structure (see Column Mappings below).
- Business key — a single target column used as the uniqueness guard during incremental loading.
app.py calls _validar_y_ejecutar(), which validates all four inputs and then invokes:Extraction — OLTP → Pandas DataFrame
ETLPipeline.run_dynamic_etl() delegates extraction to DataExtractor. The extraction path depends on whether a table name or a raw SQL query was provided.Path A — Table extraction (source_table is set):The pipeline first computes the minimal set of source columns needed by scanning the column_mappings list — it collects source_column values from regular mappings and both column1 / column2 values from concat mappings. It then calls:sql_query is set):The user-supplied SQL is wrapped in a subquery so that a row limit can be applied safely to any arbitrary query shape:DataFrame with column names matching the OLTP schema.The UI uses
limit=10 for the Vista previa button and limit=50 for SQL preview runs. The full ETL execution (triggered by Ejecutar ETL) calls extract_by_table without a limit, fetching all rows.Transformation — DataFrame Mutated In Place
ETLPipeline iterates through column_mappings in order, calling the appropriate DataTransformer method for each entry. All methods write their output into a new column on the existing DataFrame — the original source columns are preserved unless overwritten by a same-name mapping.The full set of supported transform_type values and their implementations:transform_type | Method called | What it does |
|---|---|---|
"none" | Direct assignment | df[target_col] = df[source_col] — rename only, no data change |
"upper" | capitalize_transform(..., operation="upper") | df[new_col] = df[col].str.upper() |
"lower" | capitalize_transform(..., operation="lower") | df[new_col] = df[col].str.lower() |
"year" | date_transform(..., operation="year") | Parses the column with pd.to_datetime, then df[new_col] = df[col].dt.year |
"month" | date_transform(..., operation="month") | Same parse step, then .dt.month |
"day" | date_transform(..., operation="day") | Same parse step, then .dt.day |
"concat" (special) | concat_transform(...) | df[new_col] = df[col1] + ' ' + df[col2] |
run_dynamic_etl:Column Selection — Only Target Columns Survive
After all transformations are applied, the DataFrame still contains the original source columns (and any intermediate columns created during transformation). Before loading, the pipeline trims the DataFrame to only the columns that matter in the Data Warehouse:
target_cols is built from every target_column value in the mappings list, with the business_key added as a guarantee (the loader requires it). Using set() ensures no column appears twice even if the business key is also a mapped target column.This step is what makes the pipeline “dynamic” — the shape of
df_load is entirely determined at runtime by the user’s mapping configuration, not by any hard-coded column list.Incremental Loading — New Rows Only Appended to the DW
DataLoader.load_incremental() implements a key-based deduplication strategy: it reads the current set of business key values already present in the OLAP target table, filters them out of the incoming DataFrame, and appends only the genuinely new rows.~df[business_key].isin(valid_keys) boolean mask is the core of the incremental strategy. Rows whose business key already exists in df_dw are silently dropped; only the remainder reaches df.to_sql(..., if_exists='append'). The method returns the count of rows actually written, which the pipeline surfaces as rows_loaded.if_exists='append' means to_sql will never drop or recreate the target table — it only inserts new rows. The target table must already exist in the Data Warehouse with the correct schema before running an ETL job.Results Surfaced in the Streamlit UI
run_dynamic_etl() returns a structured result dictionary:app.py, _validar_y_ejecutar() unpacks this dictionary and renders:- A success banner via
st.success() - Two
st.metric()cards side-by-side showing Filas extraídas and Filas cargadas - The target table name in a markdown line
- An expandable JSON view of the full result dict
- A
st.dataframe()preview ofsample_data(up to 5 rows)
rows_extracted and rows_loaded tells you at a glance how many rows were already present in the DW and therefore skipped.Column Mappings — The Transformation Contract
Thecolumn_mappings list is the single configuration object that drives the entire pipeline. Every entry is a Python dict; the required keys differ depending on whether the mapping is a standard field mapping or a concatenation.
Standard Mapping (all non-concat transforms)
transform_type of "none" is a pure rename — the value is copied byte-for-byte from source_column to target_column with no mutation.
Concat Mapping (special structure)
Concat mappings use a completely different key set from standard mappings — there is no
source_column or transform_type key. The pipeline uses mapping.get("type") == "concat" as the discriminator before deciding which transformer method to call.Complete Example — All Mapping Types
ETLPipeline processes this list against a source DataFrame, the resulting df_load will have four columns: CustomerID, CompanyName, OrderYear, and FullName — regardless of how many columns the original OLTP table contained.
End-to-End Data Shape Summary
The table below shows how the data object changes at each stage of the pipeline for the example mappings above, assuming a source table with columnsCustomerID, CompanyName, OrderDate, FirstName, LastName, and several other columns.
| Stage | Object | Columns present |
|---|---|---|
| After extraction | df (full source) | CustomerID, CompanyName, OrderDate, FirstName, LastName (only needed cols fetched) |
After "none" mapping | df | + CustomerID (target col written, same value) |
After "upper" mapping | df | + CompanyName (target col overwritten with uppercased value) |
After "year" mapping | df | + OrderYear (integer year extracted from OrderDate) |
After concat mapping | df | + FullName (FirstName + ' ' + LastName) |
| After column selection | df_load | CustomerID, CompanyName, OrderYear, FullName only |
| After incremental filter | df_load (subset) | Same columns, rows already in DW removed |
| Written to DW | rows appended | New rows only, appended to target table |