Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

DataLoader is the final stage of the ETL pipeline. Its single public method, load_incremental(), writes transformed data into the OLAP Data Warehouse while ensuring that records already present in the target table are never duplicated. It achieves this by reading the existing set of business key values from the DW table before writing, then filtering the incoming DataFrame to keep only rows whose key has not been seen before. Only the net-new rows are appended.

What is a business key?

A business key is a column that uniquely identifies each record in the target Data Warehouse table — for example CustomerID in a DimCustomers dimension, or TerritoryID in a DimSalesTerritory table. load_incremental() uses this key as the deduplication criterion: if a row arriving from the OLTP source already has a matching key in the DW table, it is silently skipped. If the key is new, the row is inserted.
The business key column must be present in the transformed DataFrame and must be mapped in column_mappings so that ETLPipeline includes it in the final column selection before handing the data to DataLoader.

load_incremental

load_incremental(df_transformer, table_name: str, business_key: str) -> int
df_transformer
pd.DataFrame
required
The transformed DataFrame produced by DataTransformer. Must contain the column named by business_key.
table_name
str
required
Name of the target table in the OLAP Data Warehouse (e.g., "DimCustomers"). load_incremental uses if_exists='append', which appends rows when the table already exists and creates the table automatically on the first run when it does not yet exist.
business_key
str
required
The column name to use for deduplication. Must exist in both df_transformer and the target DW table.
Returns: int — the number of rows actually inserted into the DW table.

Step-by-step loading logic

1

Connect to the OLAP database

Opens a connection to the Data Warehouse via db_client.get_olap_connection(), which returns a SQLAlchemy connection context manager.
2

Read existing business keys

Fetches all current values of the business key column from the target table:
SELECT {business_key} FROM {table_name}
The result is loaded into a Python list (valid_keys).
3

Filter for new rows only

Removes any rows from the incoming DataFrame whose business key already appears in the DW:
df_load = df_transformer[~df_transformer[business_key].isin(valid_keys)]
Rows with a key that matches an existing DW record are dropped. Only genuinely new rows remain in df_load.
4

Append new rows

Writes df_load to the target DW table using pandas.DataFrame.to_sql with if_exists='append' and index=False:
df_load.to_sql(table_name, conn, if_exists='append', index=False)
5

Commit the transaction

Explicitly commits the connection to persist the insert:
conn.commit()
6

Return the insert count

Returns len(df_load) — the number of rows that were actually written to the DW table.

Code example

from src.infrastructure.db_client import DatabaseClient
from src.services.loader import DataLoader
import pandas as pd

db_client = DatabaseClient()
loader = DataLoader(db_client)

# df already transformed by DataTransformer
rows_inserted = loader.load_incremental(
    df_transformer=df,
    table_name="DimCustomers",
    business_key="CustomerID"
)
print(f"Inserted {rows_inserted} new rows")

First run vs. subsequent runs

# First run — DimCustomers is empty, all 91 rows are new
rows_inserted = loader.load_incremental(df, "DimCustomers", "CustomerID")
print(rows_inserted)  # 91

# Second run — same data, all keys already exist
rows_inserted = loader.load_incremental(df, "DimCustomers", "CustomerID")
print(rows_inserted)  # 0

# Third run — 5 new customers added to OLTP since last run
rows_inserted = loader.load_incremental(df_with_new, "DimCustomers", "CustomerID")
print(rows_inserted)  # 5

Return value

rows_inserted
int
The number of rows appended to the target DW table during this call. Will be 0 if all incoming records already exist in the table (no-op run). This value is surfaced in the ETLPipeline result dict as "rows_loaded".

load_incremental uses if_exists='append' when writing to the Data Warehouse. Pandas will create the target table automatically if it does not yet exist on the first run, inferring column types from the DataFrame. For production use, create the table in advance with explicit types and constraints — auto-created tables may use wider or looser types than intended, and the business key column will not have a primary key or index unless you add one separately.
The business key must be present in both the incoming DataFrame and the target DW table. If the key column is missing from the DataFrame, the isin() filter step will raise a KeyError. Ensure the key column is included in column_mappings when using ETLPipeline.run_dynamic_etl().

Build docs developers (and LLMs) love