Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

ETLPipeline is the top-level orchestrator that wires together DataExtractor, DataTransformer, and DataLoader into a single, declarative call. Instead of invoking each stage manually, you describe what data to move and how to transform it through a column_mappings list — the pipeline takes care of extraction, column-by-column transformation, deduplication, and incremental loading in the correct order.

How ETLPipeline is constructed

ETLPipeline accepts a DatabaseClient and internally instantiates the three service classes:
from src.infrastructure.db_client import DatabaseClient
from src.services.pipeline import ETLPipeline

db_client = DatabaseClient()
pipeline = ETLPipeline(db_client)
# Internally creates:
#   self.extractor = DataExtractor(db_client)
#   self.transformer = DataTransformer()
#   self.loader = DataLoader(db_client)

run_dynamic_etl

run_dynamic_etl(
    source_table: str,
    target_table: str,
    business_key: str,
    column_mappings: list[dict],
    sql_query: str | None = None
) -> dict

Parameters

source_table
str
required
Schema-qualified OLTP table name to extract from (e.g., "dbo.Customers", "Sales.SalesOrderHeader"). When using a custom sql_query instead, pass an empty string "" here — the pipeline will skip the table-based extraction path and use the query instead.
target_table
str
required
Name of the destination table in the OLAP Data Warehouse (e.g., "DimCustomers"). The loader uses if_exists='append': it appends rows to an existing table and creates the table automatically on the first run if it does not yet exist. For production use, pre-create the table with explicit column types and primary key constraints.
business_key
str
required
Column name used to deduplicate records between the incoming DataFrame and the existing DW table. Must be present in column_mappings so it appears in the final DataFrame handed to the loader.
column_mappings
list[dict]
required
A list of mapping dicts that defines which source columns to extract, how to transform them, and what to name them in the target. See Column mappings structure below.
sql_query
str | None
An optional custom SQL query string. When provided, DataExtractor.extract_by_query() is called and its result becomes the working DataFrame. The pipeline evaluates source_table and sql_query independently — if both are non-empty, both execute and the query result overwrites the table result. Pass source_table="" to use only the query path. Defaults to None.

Column mappings structure

Each dict in column_mappings follows one of two shapes depending on whether the mapping is a standard column transform or a string concatenation. Standard mapping — for pass-through, text case, and date transforms:
{
    "source_column": "OrderDate",   # Column name in the OLTP source
    "transform_type": "year",       # One of: none, upper, lower, year, month, day
    "target_column": "OrderYear"    # Column name to write in the target DW
}
Concat mapping — for joining two string columns with a space:
{
    "type": "concat",          # Must be exactly "concat"
    "column1": "FirstName",    # First source column
    "column2": "LastName",     # Second source column
    "target_column": "FullName"
}

All valid transform_type values

ValueTransformation appliedOutput type
noneDirect copy: df[target] = df[source]Same as source
upper.str.upper()string
lower.str.lower()string
yearpd.to_datetime() then .dt.yearinteger
monthpd.to_datetime() then .dt.monthinteger
daypd.to_datetime() then .dt.dayinteger

Return value

run_dynamic_etl returns a dict with the following fields:
status
str
Always "success" when the pipeline completes without raising an exception.
rows_extracted
int
Total number of rows retrieved from the OLTP source (before deduplication filtering).
rows_loaded
int
Number of net-new rows actually inserted into the DW table (after deduplication).
table_destination
str
The target_table name, echoed back for logging and UI display.
sample_data
pd.DataFrame
The first 5 rows of the final loaded DataFrame (df_load.head(5)), useful for previewing results in the Streamlit UI.
# Example return value
{
    "status": "success",
    "rows_extracted": 91,
    "rows_loaded": 12,
    "table_destination": "DimCustomers",
    "sample_data": DataFrame  # first 5 rows of loaded data
}

Pipeline execution order

1

Extract

If source_table is non-empty, DataExtractor.extract_by_table() is called. The pipeline automatically builds the column list from column_mappings — it unions source_column values from standard mappings with column1 and column2 values from concat mappings, so only the columns actually needed are fetched from OLTP.If sql_query is provided, DataExtractor.extract_by_query() is called instead and the full query result is used.
2

Transform

The pipeline iterates over column_mappings in order. For each dict it calls the appropriate DataTransformer method based on transform_type (or type="concat"). All transformations operate on the same in-memory DataFrame.
3

Select target columns

After all transformations, the DataFrame is narrowed to only the columns declared as target_column in column_mappings plus the business_key. Source columns that were not mapped as targets are dropped at this stage.
4

Load incrementally

DataLoader.load_incremental() deduplicates against the existing DW table and appends only new rows. The count of inserted rows is captured.
5

Return result dict

The pipeline assembles and returns the result dict containing status, row counts, destination table name, and a 5-row sample.

Complete usage example

from src.infrastructure.db_client import DatabaseClient
from src.services.pipeline import ETLPipeline

db_client = DatabaseClient()
pipeline = ETLPipeline(db_client)

result = pipeline.run_dynamic_etl(
    source_table="dbo.Customers",
    target_table="DimCustomers",
    business_key="CustomerID",
    column_mappings=[
        {"source_column": "CustomerID",   "transform_type": "none",  "target_column": "CustomerID"},
        {"source_column": "CompanyName",  "transform_type": "upper", "target_column": "CompanyName"},
        {"source_column": "ContactName",  "transform_type": "upper", "target_column": "ContactName"},
        {"source_column": "OrderDate",    "transform_type": "year",  "target_column": "OrderYear"},
        {"type": "concat", "column1": "City", "column2": "Country",  "target_column": "CityCountry"},
    ]
)

print(f"Status:          {result['status']}")
print(f"Rows extracted:  {result['rows_extracted']}")
print(f"Rows loaded:     {result['rows_loaded']}")
print(f"Destination:     {result['table_destination']}")
print(result['sample_data'])

Using a custom SQL query instead of source_table

When the data you need cannot be expressed as a single table select — for example when you need a filtered subset, a JOIN across tables, or an aggregation — pass your SQL to sql_query and set source_table to an empty string.
result = pipeline.run_dynamic_etl(
    source_table="",
    target_table="DimTopCustomers",
    business_key="CustomerID",
    sql_query="SELECT CustomerID, CompanyName FROM dbo.Customers WHERE Country = 'USA'",
    column_mappings=[
        {"source_column": "CustomerID",  "transform_type": "none",  "target_column": "CustomerID"},
        {"source_column": "CompanyName", "transform_type": "upper", "target_column": "CompanyName"},
    ]
)
When sql_query is provided, the pipeline skips the automatic column selection optimisation (which normally queries only the columns referenced in column_mappings). The full result set of the query is fetched and then transformed. Ensure your SQL already selects only the columns you need.

Error handling and edge cases

If both source_table and sql_query are provided, the pipeline will execute both extraction paths in sequence, with the sql_query result overwriting the source_table result. Use one or the other, not both.
On a first run against an empty DW table, rows_loaded will equal rows_extracted (all records are new). On subsequent runs with unchanged source data, rows_loaded will be 0 — this is expected behaviour, not an error.

Build docs developers (and LLMs) love