ETL Dinámico’s test suite is organized into four files — one per architectural layer. This reference documents every test function: what it exercises, what external dependencies it requires, which objects are mocked, and exactly what assertions it makes. Use this page as a map when a test fails and you need to understand what the failure is telling you about a specific component.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt
Use this file to discover all available pages before exploring further.
test_connection.py
This module contains a single integration test that directly exercises theDatabaseClient class. Unlike the other three modules, it does not use any mocking — it requires real SQL Server endpoints to be reachable.
test_conexiones
What it tests
Both the OLTP and OLAP database connections are alive and capable of executing a basic query.
Requirements
A live SQL Server instance reachable from the host machine, with correct credentials in
.env.DatabaseClient and uses each engine’s context manager (get_oltp_connection, get_olap_connection) to execute SELECT 1 via SQLAlchemy’s text() construct. The scalar result is compared to 1 using a ternary assignment. If the connection or query raises any exception, the flag is set to False and the final assert fails cleanly rather than raising an unhandled exception.
Assertions:
conexion_viva_1 is True— the OLTP engine returnedSELECT 1 = 1conexion_viva_2 is True— the OLAP engine returnedSELECT 1 = 1
test_transformer.py
This module contains a pure unit test for theDataTransformer service class. It operates entirely on an in-memory Pandas DataFrame and makes no network calls, database connections, or file I/O of any kind.
test_transformaciones_pandas
What it tests
All three
DataTransformer methods — capitalize_transform, concat_transform, and date_transform — produce correct output on the same DataFrame.Requirements
No database required. Pure Pandas — passes in any environment with dependencies installed.
| Nombre | CodigoRegion | Fecha |
|---|---|---|
| Honduras | HN | 06-11-2026 |
| Cabo Verde | CBV | 06-26-2026 |
capitalize_transform — uppercase
Calls
transformer.capitalize_transform(df, 'Nombre', 'NOMBRE MAYUSCULA', 'upper'). Reads the Nombre column, converts each value to uppercase, and writes the result into a new column called NOMBRE MAYUSCULA.concat_transform — column concatenation
Calls
transformer.concat_transform(df, 'Nombre CodigoRegion', 'Nombre', 'CodigoRegion'). Concatenates the Nombre and CodigoRegion columns with a space separator and writes the result into a new column named Nombre CodigoRegion.df.iloc[0]['NOMBRE MAYUSCULA'] == 'HONDURAS'— uppercase applied to row 0df.iloc[1]['Nombre CodigoRegion'] == 'Cabo Verde CBV'— concat applied to row 1df.iloc[0]['Anio'] == 2026— year extracted from row 0’s date
test_loader.py
This module tests the incremental-load logic insideDataLoader. The core behavior under test is the deduplication step: rows whose business key already exists in the Data Warehouse must be silently dropped before to_sql is called. The test uses MagicMock and patch to replace all real database I/O.
test_carga_incremental_filtra_correctamente
What it tests
The incremental loader correctly identifies rows already present in the DW by their business key, excludes them from the insert, and calls
to_sql exactly once.Mocking strategy
DatabaseClient replaced by MagicMock, pd.read_sql patched to return a simulated existing-DW DataFrame, DataFrame.to_sql patched to capture the call without touching any database.Step-by-step mock configuration
Step-by-step mock configuration
mock_db_client = MagicMock()— replaces the realDatabaseClientso no connection string is needed.mock_connection = MagicMock()— stands in for the SQLAlchemy connection object.mock_db_client.get_olap_connection.return_value.__enter__.return_value = mock_connection— wires the context manager protocol sowith db_client.get_olap_connection() as connreturnsmock_connection.pd.read_sql = MagicMock(return_value=df_dw_simulado)— causes any call topd.read_sqlto return a DataFrame containing onlyCustomerID=10, simulating the current state of theDimCustomertable.patch.object(pd.DataFrame, "to_sql")— intercepts theto_sqlcall so nothing is actually written; the mock records the call for assertion.
| CustomerID | Nombre |
|---|---|
| 10 | Juan |
| 20 | Pedro |
CustomerID=10. The loader must detect this, drop the first row, and insert only CustomerID=20.
Assertion:
mock_to_sql.assert_called_once()—to_sqlwas invoked exactly one time, confirming the incremental load path executed a single insert call.
assert_called_once() only verifies the count of calls — it does not inspect which rows were actually passed. To also assert the content of the inserted DataFrame, you can use mock_to_sql.call_args to retrieve the arguments and run further assertions on the resulting DataFrame slice.test_pipelines.py
This module tests the complete ETL orchestration path throughETLPipeline.run_dynamic_etl(). Both the extractor and loader are replaced with mocks, so the test focuses entirely on whether the pipeline correctly wires extraction → transformation → loading and returns the expected result dictionary.
test_flujo_completo_con_datos_reales
What it tests
Full ETL orchestration: a dynamic column mapping is applied through the transformer, the correct result metadata is returned, and the DataFrame passed to the loader contains properly uppercased column values.
Mocking strategy
extract_by_table replaced with a MagicMock that returns a fixed 3-row DataFrame; load_incremental replaced with a MagicMock that returns 3 — no database calls of any kind.| Parameter | Value |
|---|---|
source_table | Sales.SalesTerritory |
target_table | DimTerritory |
business_key | TerritoryID |
| source_column | transform_type | target_column |
|---|---|---|
Name | upper | New_Name |
TerritoryID | none | TerritoryID |
| TerritoryID | Name | CountryRegionCode |
|---|---|---|
| 1 | northwest | US |
| 2 | southeast | US |
| 3 | central | US |
upper mapping means the Name column is uppercased and written into New_Name. TerritoryID passes through unchanged. The CountryRegionCode column is not included in the mappings and is therefore dropped during transformation.
Assertions:
Result metadata
resultado['status'] == 'success' — the pipeline completed without error.resultado['rows_extracted'] == 3 — the full source DataFrame (3 rows) was extracted.Transformed DataFrame structure
'New_Name' in df_cargado.columns — the uppercase mapping created the target column correctly.