Documentation Index
Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The DataIngestor class handles data loading from multiple sources (CSV files, paths, or DataFrames) and provides dataset fingerprinting for reproducibility tracking.
Class Definition
class DataIngestor:
def __init__(self, random_seed: int = 42)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:19
Constructor
Random seed for reproducible operations (currently unused but reserved for future randomized sampling)
Methods
load
def load(self, source: str | Path | pd.DataFrame) -> pd.DataFrame
Loads data from various source types into a pandas DataFrame.
source
str | Path | pd.DataFrame
required
Data source to load:
str or Path: Path to CSV file
pd.DataFrame: Returns a copy of the DataFrame
Loaded DataFrame with all columns and rows from the source
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
from pathlib import Path
ingestor = DataIngestor(random_seed=42)
# Load from CSV file (string path)
df1 = ingestor.load('data/nba_salaries.csv')
print(f"Loaded {len(df1)} rows from CSV")
# Load from Path object
df2 = ingestor.load(Path('data/nba_salaries.csv'))
print(f"Loaded {len(df2)} rows from Path")
# Load from existing DataFrame (returns copy)
existing_df = pd.DataFrame({'salary': [1000000, 2000000]})
df3 = ingestor.load(existing_df)
print(f"Loaded {len(df3)} rows from DataFrame")
assert df3 is not existing_df # Confirms it's a copy
stream_chunks
def stream_chunks(
self,
source: str | Path | pd.DataFrame,
chunk_size: int
) -> Iterator[pd.DataFrame]
Creates an iterator that yields data in chunks for streaming processing.
source
str | Path | pd.DataFrame
required
Data source to stream:
str or Path: Path to CSV file (uses pandas chunksize)
pd.DataFrame: Splits DataFrame into chunks
Iterator yielding DataFrame chunks, each containing up to chunk_size rows
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
ingestor = DataIngestor()
# Stream from CSV file
for i, chunk in enumerate(ingestor.stream_chunks('nba_data.csv', chunk_size=100)):
print(f"Chunk {i}: {len(chunk)} rows")
# Process chunk...
# Stream from DataFrame
df = pd.DataFrame({'salary': range(1000)})
chunks = list(ingestor.stream_chunks(df, chunk_size=250))
print(f"Split into {len(chunks)} chunks") # 4 chunks
print(f"Last chunk size: {len(chunks[-1])}") # 250 rows
fingerprint
def fingerprint(self, source: str | Path | pd.DataFrame) -> DatasetFingerprint
Generates a cryptographic fingerprint of the dataset for reproducibility verification.
source
str | Path | pd.DataFrame
required
Data source to fingerprint
Dataclass containing:
path (str): Source path or “<in-memory>” for DataFrames
sha256 (str): SHA-256 hash of CSV representation
rows (int): Number of rows
columns (int): Number of columns
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
ingestor = DataIngestor()
# Fingerprint CSV file
fp1 = ingestor.fingerprint('nba_salaries.csv')
print(f"Path: {fp1.path}")
print(f"SHA256: {fp1.sha256}")
print(f"Shape: {fp1.rows} rows × {fp1.columns} columns")
# Fingerprint DataFrame
df = pd.DataFrame({
'name': ['Player A', 'Player B'],
'salary': [1000000, 2000000]
})
fp2 = ingestor.fingerprint(df)
print(f"Path: {fp2.path}") # <in-memory>
print(f"SHA256: {fp2.sha256}")
print(f"Shape: {fp2.rows}x{fp2.columns}") # 2x2
# Verify data integrity
fp3 = ingestor.fingerprint('nba_salaries.csv')
if fp1.sha256 == fp3.sha256:
print("✓ Dataset unchanged")
else:
print("✗ Dataset modified!")
DatasetFingerprint
@dataclass(frozen=True)
class DatasetFingerprint:
path: str
sha256: str
rows: int
columns: int
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:12
Immutable dataclass representing a dataset’s unique fingerprint.
Source path or “<in-memory>” for DataFrames
SHA-256 cryptographic hash of the CSV representation
Number of rows in the dataset
Number of columns in the dataset
Example:
from dataclasses import asdict
from pipeline.ingestion import DataIngestor
import json
ingestor = DataIngestor()
fp = ingestor.fingerprint('nba_data.csv')
# Convert to dict for serialization
fp_dict = asdict(fp)
print(json.dumps(fp_dict, indent=2))
# {
# "path": "nba_data.csv",
# "sha256": "a1b2c3d4...",
# "rows": 500,
# "columns": 15
# }
# Access fields
print(f"Dataset has {fp.rows:,} rows")
print(f"Fingerprint: {fp.sha256[:16]}...") # First 16 chars
Usage Patterns
Batch Loading
from pipeline.ingestion import DataIngestor
from pipeline.config import PipelineConfig
config = PipelineConfig()
ingestor = DataIngestor(config.random_seed)
# Load entire dataset
df = ingestor.load('large_dataset.csv')
print(f"Loaded {len(df):,} rows into memory")
# Generate fingerprint for provenance
fp = ingestor.fingerprint(df)
print(f"Dataset fingerprint: {fp.sha256}")
Streaming Processing
from pipeline.ingestion import DataIngestor
ingestor = DataIngestor()
processed_rows = 0
for chunk in ingestor.stream_chunks('huge_dataset.csv', chunk_size=1000):
# Process each chunk independently
processed = process_chunk(chunk) # Your processing function
processed_rows += len(processed)
print(f"Processed {processed_rows:,} rows so far...")
print(f"Total processed: {processed_rows:,} rows")
Data Verification
from pipeline.ingestion import DataIngestor
import json
from pathlib import Path
ingestor = DataIngestor()
# Save fingerprint for later verification
fp_original = ingestor.fingerprint('dataset_v1.csv')
metadata = {
'fingerprint': asdict(fp_original),
'version': '1.0'
}
with open('metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
# Later: verify dataset hasn't changed
with open('metadata.json') as f:
saved_metadata = json.load(f)
fp_current = ingestor.fingerprint('dataset_v1.csv')
if fp_current.sha256 == saved_metadata['fingerprint']['sha256']:
print("✓ Dataset integrity verified")
else:
print("✗ Warning: Dataset has been modified!")
print(f" Expected: {saved_metadata['fingerprint']['sha256']}")
print(f" Got: {fp_current.sha256}")
Notes
- All methods return copies of DataFrames to prevent unintended mutations
- The
load() method uses pandas’ default CSV parsing with no special options
- Fingerprinting converts the DataFrame to CSV format before hashing for consistency
- The SHA-256 hash ensures cryptographic-strength verification of data integrity
stream_chunks() returns an iterator for memory-efficient processing of large files
- Chunk copies are created to prevent modifications from affecting subsequent chunks