Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/obedc295/proyect_dw/llms.txt

Use this file to discover all available pages before exploring further.

DataExtractor is the entry point of every ETL run. It opens a connection to the OLTP source database through db_client.get_oltp_connection(), executes a SQL query against SQL Server, and returns the result as a Pandas DataFrame ready for downstream transformation. All SQL generation is handled internally — callers only need to supply a table name, an optional column list, and an optional row limit.

Class overview

DataExtractor is constructed with a single db_client argument (a DatabaseClient instance). It exposes three public methods: one for simple table selects, one for arbitrary SQL queries, and one for schema introspection.
from src.infrastructure.db_client import DatabaseClient
from src.services.extractor import DataExtractor

db_client = DatabaseClient()
extractor = DataExtractor(db_client)

Public methods

extract_by_table

extract_by_table(table_name: str, columns: list[str] | None = None, limit: int | None = None) -> pd.DataFrame
Builds a SELECT statement from its arguments and runs it against the OLTP database. The generated SQL depends on whether columns and limit are supplied:
ScenarioGenerated SQL
No columns, no limitSELECT * FROM {table_name}
Columns provided, no limitSELECT col1, col2 FROM {table_name}
No columns, limit providedSELECT TOP {limit} * FROM {table_name}
Columns and limit providedSELECT TOP {limit} col1, col2 FROM {table_name}
  • When columns is None, the selector defaults to *.
  • When limit is set, SQL Server’s TOP clause is used — this is a server-side limit, not a Python-side slice, so only the specified number of rows are transferred over the wire.
# Full table extract
df_all = extractor.extract_by_table("dbo.Customers")

# Specific columns only
df_cols = extractor.extract_by_table(
    "dbo.Customers",
    columns=["CustomerID", "CompanyName", "Country"]
)

# Preview: first 100 rows of specific columns
df_preview = extractor.extract_by_table(
    "Sales.SalesOrderHeader",
    columns=["SalesOrderID", "OrderDate", "TotalDue"],
    limit=100
)
table_name should be schema-qualified (e.g., dbo.Customers, Sales.SalesOrderHeader) to avoid ambiguity when the OLTP database contains multiple schemas with identically named tables.

extract_by_query

extract_by_query(sql_query: str, limit: int | None = None) -> pd.DataFrame
Executes any arbitrary SQL string against the OLTP database. When limit is provided, the original query is wrapped as a subquery and TOP is applied to the outer select:
-- With limit=50
SELECT TOP 50 * FROM (
    SELECT CustomerID, CompanyName FROM dbo.Customers WHERE Country = 'USA'
) AS _preview

-- Without limit
SELECT CustomerID, CompanyName FROM dbo.Customers WHERE Country = 'USA'
This wrapping approach preserves the original query unchanged and is particularly useful when building previews in the Streamlit UI.
# Extract with a custom JOIN
df_joined = extractor.extract_by_query("""
    SELECT
        c.CustomerID,
        c.CompanyName,
        o.OrderDate,
        o.TotalDue
    FROM dbo.Customers c
    INNER JOIN dbo.Orders o ON c.CustomerID = o.CustomerID
    WHERE o.OrderDate >= '2023-01-01'
""")

# Same query but only the first 25 rows for a preview
df_preview = extractor.extract_by_query(
    sql_query="""
        SELECT c.CustomerID, c.CompanyName, o.OrderDate, o.TotalDue
        FROM dbo.Customers c
        INNER JOIN dbo.Orders o ON c.CustomerID = o.CustomerID
        WHERE o.OrderDate >= '2023-01-01'
    """,
    limit=25
)

extract_tables

extract_tables() -> list[str]
Introspects the OLTP database engine using SQLAlchemy’s inspect() and returns a list of all user-facing tables in schema.table format. It iterates over every schema name exposed by the engine and skips the following system schemas:

SQL Server built-in schemas

  • sys
  • INFORMATION_SCHEMA
  • guest

Fixed database roles

  • db_owner
  • db_accessadmin
  • db_securityadmin
  • db_ddladmin
  • db_backupoperator
  • db_datareader
  • db_datawriter
  • db_denydatareader
  • db_denydatawriter
Schema filtering applies schema.lower() before testing membership in system_schemas. All-lowercase entries in the set (sys, guest, db_owner, and the other db_* role names) are therefore reliably excluded. The set also contains "INFORMATION_SCHEMA" in its original mixed-case form; because the comparison lowercases the incoming name to "information_schema" before testing, this entry is effectively never matched. In practice, SQL Server’s SQLAlchemy dialect returns INFORMATION_SCHEMA in uppercase, so the information_schema schema — not present in the set — does not filter it out. User schemas with entirely lowercase names are unaffected.
tables = extractor.extract_tables()
# Example output:
# ['dbo.Customers', 'dbo.Orders', 'dbo.Products',
#  'Sales.SalesOrderHeader', 'Sales.SalesTerritory',
#  'HumanResources.Employee']

for t in tables:
    print(t)
All entries in the returned list use the schema.table format — for example dbo.Customers or Sales.SalesTerritory. Pass these strings directly to extract_by_table() as the table_name argument.
The Streamlit UI calls extract_tables() at startup to populate the Source Table dropdown. Any user-visible table in the OLTP database will appear there automatically; system schemas are always hidden.

How DataExtractor connects to the database

DataExtractor never manages connections directly. It delegates to db_client.get_oltp_connection(), which returns a SQLAlchemy connection context manager backed by oltp_engine. All connection pooling, pre-ping health checks, and teardown are handled by DatabaseClient.
# Inside extract_by_table — simplified view
with self.db.get_oltp_connection() as conn:
    df = pd.read_sql(text(query), conn)
    return df
The text() wrapper from SQLAlchemy ensures the query string is treated as literal SQL and passed to the driver without additional interpretation.

Build docs developers (and LLMs) love