Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

DataLoader handles the Load phase of the ETL pipeline. Its single public method, load_incremental(), implements an upsert-free incremental load strategy: it reads the set of business key values already present in the target DW table, filters them out of the incoming DataFrame, and appends only the genuinely new rows. This avoids duplicates without requiring a primary-key conflict handler, making it safe to run on any SQL Server version regardless of whether MERGE or ON CONFLICT syntax is available.

Class: DataLoader

from src.infrastructure.db_client import DatabaseClient
from src.services.loader import DataLoader

db = DatabaseClient()
loader = DataLoader(db)

Constructor

db_client
DatabaseClient
required
An initialised DatabaseClient instance. DataLoader stores it as self.db_client and uses self.db_client.get_olap_connection() to open a single OLAP connection that is used for both the key-fetch read and the append write within the same transaction.

Methods

load_incremental()

Loads new rows from a transformed DataFrame into a Data Warehouse table using an incremental (deduplication) strategy. The method performs all work inside a single OLAP connection and commits before returning. Execution steps:
  1. Read existing keys — executes SELECT {business_key} FROM {table_name} via the OLAP connection and collects the result as a Python list.
  2. Filter new rows — applies df[~df[business_key].isin(valid_keys)] to keep only rows whose business key is not yet in the DW.
  3. Append — calls df_load.to_sql(table_name, conn, if_exists='append', index=False) to insert the filtered rows.
  4. Commit — calls conn.commit() explicitly to finalise the transaction.
  5. Return count — returns len(df_load), the number of rows actually inserted.
df_transformer
pd.DataFrame
required
The transformed DataFrame to load. It must contain a column matching the business_key name and all other columns that map to existing columns in the target table. The DataFrame should already be projected to only the target columns — extra columns not present in the DW table will cause a SQL error.
table_name
str
required
Name of the destination table in the Data Warehouse. The string is interpolated directly into the SELECT query and passed to pd.DataFrame.to_sql(). Must match the table name exactly as it exists in the DW schema.
business_key
str
required
The column name used for deduplication. This must be a column present in both df_transformer and the table_name table in the DW. Rows in df_transformer whose value for this column already appears in the DW are silently dropped before the insert.
return
int
The number of rows inserted into the DW table during this invocation. Returns 0 if all incoming rows were already present (no-op run). This count is also surfaced by ETLPipeline.run_dynamic_etl() as the "rows_loaded" field in its return dict.
The target table (table_name) must already exist in the Data Warehouse before calling load_incremental(). DataLoader never creates tables — it only appends to existing ones. Pre-create the schema using your DW migration tooling or a DDL script before running the ETL for the first time.

Usage Example

import pandas as pd
from src.infrastructure.db_client import DatabaseClient
from src.services.extractor import DataExtractor
from src.services.transformer import DataTransformer
from src.services.loader import DataLoader

# ── Setup ─────────────────────────────────────────────────────────────────────
db        = DatabaseClient()
extractor = DataExtractor(db)
transformer = DataTransformer()
loader    = DataLoader(db)

# ── Extract ───────────────────────────────────────────────────────────────────
df = extractor.extract_by_table(
    "Sales.Orders",
    columns=["OrderID", "CustomerID", "OrderDate", "Total"]
)
# df shape: (150 000, 4)

# ── Transform ─────────────────────────────────────────────────────────────────
transformer.date_transform(df, column="OrderDate", new_column="OrderYear",  operation="year")
transformer.date_transform(df, column="OrderDate", new_column="OrderMonth", operation="month")

# Keep only the columns that exist in the DW table
df_ready = df[["OrderID", "CustomerID", "OrderYear", "OrderMonth", "Total"]]

# ── Load ──────────────────────────────────────────────────────────────────────
rows_inserted = loader.load_incremental(
    df_transformer=df_ready,
    table_name="fact_orders",
    business_key="OrderID"
)

print(f"Inserted {rows_inserted} new rows into fact_orders.")
# Example output: "Inserted 312 new rows into fact_orders."
# (138 rows were already present in the DW from a previous run and were skipped)

Incremental Load Behaviour

The table below illustrates how load_incremental() responds to different scenarios based on the relationship between the incoming DataFrame and the existing DW data.
ScenarioDW has keysIncoming keysRows inserted
First load (empty DW table)0150 000150 000
Subsequent run (no new data)150 000150 0000
Partial new data150 000150 312312
Full reload of new dataset05 0005 000
The deduplication check is a Python-side isin() call, not a SQL MERGE. For very large tables (millions of rows), consider batching the incoming DataFrame or adding a date-range filter at the extraction step to limit the number of keys loaded into memory.

Build docs developers (and LLMs) love