Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

DataExtractor is responsible for the Extract phase of the ETL pipeline. It wraps a DatabaseClient instance and provides three extraction strategies: a structured table-based select with optional column and row limits, a freeform SQL query path, and a catalogue scan that lists every user-accessible table in the OLTP database. All methods return data as Pandas DataFrame objects, making the extracted data immediately compatible with DataTransformer.

Class: DataExtractor

from src.infrastructure.db_client import DatabaseClient
from src.services.extractor import DataExtractor

db = DatabaseClient()
extractor = DataExtractor(db)

Constructor

db_client
DatabaseClient
required
An initialised DatabaseClient instance. DataExtractor stores it as self.db and calls self.db.get_oltp_connection() for every extraction, ensuring each operation uses a properly pooled connection from the shared SQLAlchemy pool.

Methods

extract_by_table()

Builds and executes a SELECT statement against a named OLTP table. Column selection and row count are both optional. Use this method when you know the table name and want to pull data with minimal SQL authoring.
SELECT TOP {limit} {cols} FROM {table_name}
SELECT {cols} FROM {table_name}          -- when limit is None
table_name
str
required
The fully qualified or plain table name to query (e.g., "Sales.Orders" or "Customers"). The string is interpolated directly into the SQL statement, so schema-qualified names work without any additional parsing.
columns
list[str] | None
default:"None"
A list of column names to include in the SELECT clause. When None (the default), the clause expands to * and every column is returned. When provided, the list is joined with , to form the explicit column list.
limit
int | None
default:"None"
Maximum number of rows to return. When set, TOP {limit} is injected immediately after SELECT. When None, no TOP clause is added and all matching rows are returned. Useful for previewing large tables in the Streamlit UI without loading the full dataset.
return
pd.DataFrame
A Pandas DataFrame containing the query results. Column names match the database column names (or the aliases specified in columns).
df = extractor.extract_by_table("Sales.Orders")
# Executes: SELECT * FROM Sales.Orders
print(df.shape)           # (150000, 12)
print(df.columns.tolist())
# ['OrderID', 'CustomerID', 'OrderDate', 'Total', ...]

extract_by_query()

Executes an arbitrary SQL string against the OLTP database. When a limit is supplied the original query is wrapped inside a subquery so TOP can be applied without modifying the caller’s SQL.
-- With limit:
SELECT TOP {limit} * FROM ({sql_query}) AS _preview

-- Without limit:
{sql_query}
sql_query
str
required
A complete SQL SELECT statement to execute. The string is passed to sqlalchemy.text() before execution, so standard SQLAlchemy text-clause semantics apply. Multi-table joins, CTEs, and subqueries are all valid.
limit
int | None
default:"None"
When provided, the original sql_query is wrapped as a subquery aliased _preview and a TOP {limit} clause is applied to the outer query. This preserves the original query’s structure while still capping the result set. When None, the original query is executed as-is.
return
pd.DataFrame
A Pandas DataFrame containing all rows (or the top limit rows) returned by the query.
df = extractor.extract_by_query(
    "SELECT CustomerID, SUM(Total) AS Revenue FROM Sales.Orders GROUP BY CustomerID"
)
print(df.head())
#    CustomerID   Revenue
# 0        1001  45200.00
# 1        1042  12800.50
The wrapping subquery uses the alias _preview. If your original query already defines a subquery with that alias, rename it to avoid a SQL name collision.

extract_tables()

Scans the OLTP database catalogue and returns every user-accessible table as a schema-qualified string ("schema.table"). System schemas are filtered out so only application data surfaces. The filter applies schema.lower() in system_schemas — each discovered schema name is lowercased before checking against the exclusion set. The exclusion set is defined exactly as follows in source: Excluded system schemas (exact set):
SchemaSchemaSchema
sysINFORMATION_SCHEMAguest
db_ownerdb_accessadmindb_securityadmin
db_ddladmindb_backupoperatordb_datareader
db_datawriterdb_denydatareaderdb_denydatawriter
The filter calls schema.lower() before testing set membership. Because the exclusion set stores "INFORMATION_SCHEMA" in uppercase and the comparison uses a lowercased schema name, the effective matching is case-insensitive only for the all-lowercase entries in the set (e.g., "sys", "guest", "db_owner", etc.). Schemas whose names vary in case beyond these entries will be passed through to the table list.
return
list[str]
A list of schema-qualified table names in "schema.table" format. The list is built by iterating over every non-system schema returned by Inspector.get_schema_names() and concatenating each schema name with its child table names.
tables = extractor.extract_tables()
print(tables)
# [
#   "dbo.Customers",
#   "dbo.Products",
#   "Sales.Orders",
#   "Sales.OrderDetails",
#   "HR.Employees",
# ]

# Feed directly into a column-mapping UI or a pipeline loop
for table in tables:
    print(table)
The returned strings are already in the "schema.table" format expected by extract_by_table(), DatabaseClient.get_source_columns(), and ETLPipeline.run_dynamic_etl(). You can pass them through without modification.

Build docs developers (and LLMs) love