Documentation Index
Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt
Use this file to discover all available pages before exploring further.
ETLPipeline is the top-level orchestrator that wires together the three ETL services — DataExtractor, DataTransformer, and DataLoader — into a single callable workflow. Rather than hard-coding column relationships at design time, the pipeline is driven by a column_mappings list that you supply at runtime, making it possible for a Streamlit UI (or any other caller) to configure extraction, transformation, and loading entirely through data. A single call to run_dynamic_etl() handles all four pipeline stages: extract, transform, column selection, and incremental load.
Class: ETLPipeline
Constructor
An initialised
DatabaseClient instance. The pipeline uses it to create three internal service objects at construction time:self.extractor— aDataExtractor(db_client)for the Extract phaseself.transformer— aDataTransformer()for the Transform phaseself.loader— aDataLoader(db_client)for the Load phase
Methods
run_dynamic_etl()
Executes a complete Extract → Transform → Select → Load cycle. The behaviour of every phase is controlled by the arguments you pass; no subclassing or monkey-patching is required to customise the pipeline for a new table pair.
Execution order:
- Extract — if
source_tableis non-empty, callsself.extractor.extract_by_table()with the union of all columns referenced incolumn_mappings. Ifsql_queryis also provided,self.extractor.extract_by_query()runs immediately after and its result overwrites the table-based extract, givingsql_queryeffective precedence. When onlysql_queryis supplied (andsource_tableis an empty string), onlyextract_by_query()runs. - Transform — iterates over
column_mappingsin order, dispatching each entry to the appropriateDataTransformermethod. - Select — builds a final column list from all
target_columnvalues plusbusiness_key, and projects the DataFrame to those columns only. - Load — calls
self.loader.load_incremental()with the projected DataFrame,target_table, andbusiness_key.
Schema-qualified or plain OLTP table name to extract from (e.g.,
"Sales.Orders" or "dbo.Customers"). The pipeline automatically collects the union of all source columns referenced in column_mappings and passes them to the extractor, avoiding a full SELECT *. Pass an empty string ("") when you want to use sql_query exclusively.Name of the destination table in the Data Warehouse (e.g.,
"fact_orders" or "dim_customers"). Passed directly to DataLoader.load_incremental(). The table must already exist in the DW.Column name used for incremental deduplication. Must exist in both the transformed DataFrame and the
target_table in the DW. Rows whose business key already appears in the DW are not re-inserted. The pipeline adds business_key to the final column projection automatically so it is always present in the data passed to DataLoader, but the business key column must still appear as a target_column in at least one column_mappings entry (typically a "transform_type": "none" direct copy) so that the column exists in the DataFrame before the projection step runs.A list of mapping dictionaries that define how source columns become target columns. Two dict formats are supported:
An optional raw SQL
SELECT string. When provided, the pipeline calls self.extractor.extract_by_query(sql_query) instead of the automatic column-collection path. This is useful when the source data requires a multi-table join, a CTE, or any transformation that is easier to express in SQL than through column_mappings. Set source_table to an empty string when using this parameter.A result dictionary summarising the completed run:
Always
"success" on normal completion. An unhandled exception propagates up rather than returning an error status.Total number of rows returned from the source (before any deduplication). Equals the row count of the DataFrame immediately after extraction.
Number of rows actually inserted into the DW during this run. May be less than
rows_extracted if some keys were already present in the target table. Returns 0 for a no-op run with no new data.The
target_table value passed in — included for logging and UI feedback without the caller needing to track the argument separately.The first five rows of the projected, load-ready DataFrame (
df_load.head(5)). Useful for displaying a preview in the Streamlit UI to confirm the transformation results before inspecting the DW directly.Complete Example
The example below demonstrates all sixtransform_type values plus a concat mapping, loading a customers dimension table into the DW.
Using a Custom SQL Query
When the source data requires a join or aggregation that cannot be expressed through a single table name, pass asql_query and set source_table to an empty string.