Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

ETLPipeline is the top-level orchestrator that wires together the three ETL services — DataExtractor, DataTransformer, and DataLoader — into a single callable workflow. Rather than hard-coding column relationships at design time, the pipeline is driven by a column_mappings list that you supply at runtime, making it possible for a Streamlit UI (or any other caller) to configure extraction, transformation, and loading entirely through data. A single call to run_dynamic_etl() handles all four pipeline stages: extract, transform, column selection, and incremental load.

Class: ETLPipeline

from src.infrastructure.db_client import DatabaseClient
from src.services.pipeline import ETLPipeline

db = DatabaseClient()
pipeline = ETLPipeline(db)

Constructor

db_client
DatabaseClient
required
An initialised DatabaseClient instance. The pipeline uses it to create three internal service objects at construction time:
  • self.extractor — a DataExtractor(db_client) for the Extract phase
  • self.transformer — a DataTransformer() for the Transform phase
  • self.loader — a DataLoader(db_client) for the Load phase

Methods

run_dynamic_etl()

Executes a complete Extract → Transform → Select → Load cycle. The behaviour of every phase is controlled by the arguments you pass; no subclassing or monkey-patching is required to customise the pipeline for a new table pair. Execution order:
  1. Extract — if source_table is non-empty, calls self.extractor.extract_by_table() with the union of all columns referenced in column_mappings. If sql_query is also provided, self.extractor.extract_by_query() runs immediately after and its result overwrites the table-based extract, giving sql_query effective precedence. When only sql_query is supplied (and source_table is an empty string), only extract_by_query() runs.
  2. Transform — iterates over column_mappings in order, dispatching each entry to the appropriate DataTransformer method.
  3. Select — builds a final column list from all target_column values plus business_key, and projects the DataFrame to those columns only.
  4. Load — calls self.loader.load_incremental() with the projected DataFrame, target_table, and business_key.
source_table
str
required
Schema-qualified or plain OLTP table name to extract from (e.g., "Sales.Orders" or "dbo.Customers"). The pipeline automatically collects the union of all source columns referenced in column_mappings and passes them to the extractor, avoiding a full SELECT *. Pass an empty string ("") when you want to use sql_query exclusively.
target_table
str
required
Name of the destination table in the Data Warehouse (e.g., "fact_orders" or "dim_customers"). Passed directly to DataLoader.load_incremental(). The table must already exist in the DW.
business_key
str
required
Column name used for incremental deduplication. Must exist in both the transformed DataFrame and the target_table in the DW. Rows whose business key already appears in the DW are not re-inserted. The pipeline adds business_key to the final column projection automatically so it is always present in the data passed to DataLoader, but the business key column must still appear as a target_column in at least one column_mappings entry (typically a "transform_type": "none" direct copy) so that the column exists in the DataFrame before the projection step runs.
column_mappings
list[dict]
required
A list of mapping dictionaries that define how source columns become target columns. Two dict formats are supported:
sql_query
str | None
default:"None"
An optional raw SQL SELECT string. When provided, the pipeline calls self.extractor.extract_by_query(sql_query) instead of the automatic column-collection path. This is useful when the source data requires a multi-table join, a CTE, or any transformation that is easier to express in SQL than through column_mappings. Set source_table to an empty string when using this parameter.
return
dict
A result dictionary summarising the completed run:
status
str
Always "success" on normal completion. An unhandled exception propagates up rather than returning an error status.
rows_extracted
int
Total number of rows returned from the source (before any deduplication). Equals the row count of the DataFrame immediately after extraction.
rows_loaded
int
Number of rows actually inserted into the DW during this run. May be less than rows_extracted if some keys were already present in the target table. Returns 0 for a no-op run with no new data.
table_destination
str
The target_table value passed in — included for logging and UI feedback without the caller needing to track the argument separately.
sample_data
pd.DataFrame
The first five rows of the projected, load-ready DataFrame (df_load.head(5)). Useful for displaying a preview in the Streamlit UI to confirm the transformation results before inspecting the DW directly.
The target_table must already exist in the Data Warehouse before calling run_dynamic_etl(). The pipeline never creates tables. Pre-create all destination tables with the correct schema before executing the ETL for the first time.

Complete Example

The example below demonstrates all six transform_type values plus a concat mapping, loading a customers dimension table into the DW.
from src.infrastructure.db_client import DatabaseClient
from src.services.pipeline import ETLPipeline

db = DatabaseClient()
pipeline = ETLPipeline(db)

# Define how each DW column is derived from the OLTP source
column_mappings = [
    # Direct copy — no transformation
    {
        "source_column": "CustomerID",
        "transform_type": "none",
        "target_column": "CustomerID",
    },
    # Text uppercase
    {
        "source_column": "CustomerName",
        "transform_type": "upper",
        "target_column": "CustomerNameUpper",
    },
    # Text lowercase
    {
        "source_column": "Email",
        "transform_type": "lower",
        "target_column": "EmailLower",
    },
    # Date component — year
    {
        "source_column": "CreatedAt",
        "transform_type": "year",
        "target_column": "RegistrationYear",
    },
    # Date component — month
    {
        "source_column": "CreatedAt",
        "transform_type": "month",
        "target_column": "RegistrationMonth",
    },
    # Date component — day
    {
        "source_column": "CreatedAt",
        "transform_type": "day",
        "target_column": "RegistrationDay",
    },
    # Concat — build FullName from two source columns
    {
        "type": "concat",
        "column1": "FirstName",
        "column2": "LastName",
        "target_column": "FullName",
    },
]

result = pipeline.run_dynamic_etl(
    source_table="dbo.Customers",
    target_table="dim_customers",
    business_key="CustomerID",
    column_mappings=column_mappings,
)

print(result["status"])          # success
print(result["rows_extracted"])  # 8500
print(result["rows_loaded"])     # 143  (new customers since last run)
print(result["table_destination"])  # dim_customers
print(result["sample_data"])
#    CustomerID  CustomerNameUpper          EmailLower  RegistrationYear  RegistrationMonth  RegistrationDay         FullName
# 0        1001        ALICE SMITH  [email protected]              2021                  3               15      Alice Smith
# 1        1002          BOB JONES    [email protected]              2022                  7                4        Bob Jones
# 2        1003        CAROL WHITE  [email protected]             2020                 11               28      Carol White

Using a Custom SQL Query

When the source data requires a join or aggregation that cannot be expressed through a single table name, pass a sql_query and set source_table to an empty string.
sql = """
    SELECT
        o.OrderID,
        o.CustomerID,
        c.CustomerName,
        o.OrderDate,
        o.Total
    FROM Sales.Orders o
    JOIN dbo.Customers c ON o.CustomerID = c.CustomerID
    WHERE o.OrderDate >= '2024-01-01'
"""

column_mappings = [
    {"source_column": "OrderID",      "transform_type": "none",  "target_column": "OrderID"},
    {"source_column": "CustomerName", "transform_type": "upper", "target_column": "CustomerNameUpper"},
    {"source_column": "OrderDate",    "transform_type": "year",  "target_column": "OrderYear"},
    {"source_column": "Total",        "transform_type": "none",  "target_column": "TotalAmount"},
]

result = pipeline.run_dynamic_etl(
    source_table="",           # empty — sql_query takes over extraction
    target_table="fact_orders",
    business_key="OrderID",
    column_mappings=column_mappings,
    sql_query=sql,
)

print(f"Loaded {result['rows_loaded']} of {result['rows_extracted']} extracted rows.")
# Loaded 890 of 1240 extracted rows.
The sample_data field in the return dict contains a real Pandas DataFrame, not a serialised representation. You can pass it directly to st.dataframe() in Streamlit or call .to_dict() to serialise it for an API response.

Build docs developers (and LLMs) love