Documentation Index
Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt
Use this file to discover all available pages before exploring further.
DataLoader handles the Load phase of the ETL pipeline. Its single public method, load_incremental(), implements an upsert-free incremental load strategy: it reads the set of business key values already present in the target DW table, filters them out of the incoming DataFrame, and appends only the genuinely new rows. This avoids duplicates without requiring a primary-key conflict handler, making it safe to run on any SQL Server version regardless of whether MERGE or ON CONFLICT syntax is available.
Class: DataLoader
Constructor
An initialised
DatabaseClient instance. DataLoader stores it as self.db_client and uses self.db_client.get_olap_connection() to open a single OLAP connection that is used for both the key-fetch read and the append write within the same transaction.Methods
load_incremental()
Loads new rows from a transformed DataFrame into a Data Warehouse table using an incremental (deduplication) strategy. The method performs all work inside a single OLAP connection and commits before returning.
Execution steps:
- Read existing keys — executes
SELECT {business_key} FROM {table_name}via the OLAP connection and collects the result as a Python list. - Filter new rows — applies
df[~df[business_key].isin(valid_keys)]to keep only rows whose business key is not yet in the DW. - Append — calls
df_load.to_sql(table_name, conn, if_exists='append', index=False)to insert the filtered rows. - Commit — calls
conn.commit()explicitly to finalise the transaction. - Return count — returns
len(df_load), the number of rows actually inserted.
The transformed DataFrame to load. It must contain a column matching the
business_key name and all other columns that map to existing columns in the target table. The DataFrame should already be projected to only the target columns — extra columns not present in the DW table will cause a SQL error.Name of the destination table in the Data Warehouse. The string is interpolated directly into the
SELECT query and passed to pd.DataFrame.to_sql(). Must match the table name exactly as it exists in the DW schema.The column name used for deduplication. This must be a column present in both
df_transformer and the table_name table in the DW. Rows in df_transformer whose value for this column already appears in the DW are silently dropped before the insert.The number of rows inserted into the DW table during this invocation. Returns
0 if all incoming rows were already present (no-op run). This count is also surfaced by ETLPipeline.run_dynamic_etl() as the "rows_loaded" field in its return dict.Usage Example
Incremental Load Behaviour
The table below illustrates howload_incremental() responds to different scenarios based on the relationship between the incoming DataFrame and the existing DW data.
| Scenario | DW has keys | Incoming keys | Rows inserted |
|---|---|---|---|
| First load (empty DW table) | 0 | 150 000 | 150 000 |
| Subsequent run (no new data) | 150 000 | 150 000 | 0 |
| Partial new data | 150 000 | 150 312 | 312 |
| Full reload of new dataset | 0 | 5 000 | 5 000 |
The deduplication check is a Python-side
isin() call, not a SQL MERGE. For very large tables (millions of rows), consider batching the incoming DataFrame or adding a date-range filter at the extraction step to limit the number of keys loaded into memory.