Documentation Index
Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt
Use this file to discover all available pages before exploring further.
ETLPipeline is the top-level orchestrator that wires together DataExtractor, DataTransformer, and DataLoader into a single, declarative call. Instead of invoking each stage manually, you describe what data to move and how to transform it through a column_mappings list — the pipeline takes care of extraction, column-by-column transformation, deduplication, and incremental loading in the correct order.
How ETLPipeline is constructed
ETLPipeline accepts a DatabaseClient and internally instantiates the three service classes:
run_dynamic_etl
Parameters
Schema-qualified OLTP table name to extract from (e.g.,
"dbo.Customers", "Sales.SalesOrderHeader"). When using a custom sql_query instead, pass an empty string "" here — the pipeline will skip the table-based extraction path and use the query instead.Name of the destination table in the OLAP Data Warehouse (e.g.,
"DimCustomers"). The loader uses if_exists='append': it appends rows to an existing table and creates the table automatically on the first run if it does not yet exist. For production use, pre-create the table with explicit column types and primary key constraints.Column name used to deduplicate records between the incoming DataFrame and the existing DW table. Must be present in
column_mappings so it appears in the final DataFrame handed to the loader.A list of mapping dicts that defines which source columns to extract, how to transform them, and what to name them in the target. See Column mappings structure below.
An optional custom SQL query string. When provided,
DataExtractor.extract_by_query() is called and its result becomes the working DataFrame. The pipeline evaluates source_table and sql_query independently — if both are non-empty, both execute and the query result overwrites the table result. Pass source_table="" to use only the query path. Defaults to None.Column mappings structure
Each dict incolumn_mappings follows one of two shapes depending on whether the mapping is a standard column transform or a string concatenation.
Standard mapping — for pass-through, text case, and date transforms:
All valid transform_type values
| Value | Transformation applied | Output type |
|---|---|---|
none | Direct copy: df[target] = df[source] | Same as source |
upper | .str.upper() | string |
lower | .str.lower() | string |
year | pd.to_datetime() then .dt.year | integer |
month | pd.to_datetime() then .dt.month | integer |
day | pd.to_datetime() then .dt.day | integer |
Return value
run_dynamic_etl returns a dict with the following fields:
Always
"success" when the pipeline completes without raising an exception.Total number of rows retrieved from the OLTP source (before deduplication filtering).
Number of net-new rows actually inserted into the DW table (after deduplication).
The
target_table name, echoed back for logging and UI display.The first 5 rows of the final loaded DataFrame (
df_load.head(5)), useful for previewing results in the Streamlit UI.Pipeline execution order
Extract
If
source_table is non-empty, DataExtractor.extract_by_table() is called. The pipeline automatically builds the column list from column_mappings — it unions source_column values from standard mappings with column1 and column2 values from concat mappings, so only the columns actually needed are fetched from OLTP.If sql_query is provided, DataExtractor.extract_by_query() is called instead and the full query result is used.Transform
The pipeline iterates over
column_mappings in order. For each dict it calls the appropriate DataTransformer method based on transform_type (or type="concat"). All transformations operate on the same in-memory DataFrame.Select target columns
After all transformations, the DataFrame is narrowed to only the columns declared as
target_column in column_mappings plus the business_key. Source columns that were not mapped as targets are dropped at this stage.Load incrementally
DataLoader.load_incremental() deduplicates against the existing DW table and appends only new rows. The count of inserted rows is captured.Complete usage example
Using a custom SQL query instead of source_table
When the data you need cannot be expressed as a single table select — for example when you need a filtered subset, a JOIN across tables, or an aggregation — pass your SQL to sql_query and set source_table to an empty string.
When
sql_query is provided, the pipeline skips the automatic column selection optimisation (which normally queries only the columns referenced in column_mappings). The full result set of the query is fetched and then transformed. Ensure your SQL already selects only the columns you need.