Documentation Index Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
Load files from local or cloud storage including CSV, JSON, Parquet, and custom formats. The filesystem source works with S3, Google Cloud Storage, Azure Blob Storage, and local files.
Quick Start
Load CSV files from a directory:
import dlt
from dlt.sources.filesystem import readers
pipeline = dlt.pipeline(
pipeline_name = "csv_loader" ,
destination = "duckdb" ,
dataset_name = "file_data" ,
)
# Load CSV files with automatic schema detection
csv_files = readers(
bucket_url = "data/csv_files" ,
file_glob = "*.csv"
).read_csv()
load_info = pipeline.run(csv_files)
print (load_info)
CSV Files
JSON Lines
Parquet Files
Multiple Formats
from dlt.sources.filesystem import readers
csv_data = readers(
bucket_url = "data/csv" ,
file_glob = "*.csv"
).read_csv()
pipeline.run(csv_data)
Cloud Storage
AWS S3
from dlt.sources.filesystem import readers
# S3 bucket
s3_data = readers(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "*.csv"
).read_csv()
pipeline.run(s3_data)
Configure credentials in .dlt/secrets.toml: [ sources . filesystem . credentials ]
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"
Google Cloud Storage
# GCS bucket
gcs_data = readers(
bucket_url = "gs://my-bucket/data/" ,
file_glob = "*.parquet"
).read_parquet()
pipeline.run(gcs_data)
Configure credentials: [ sources . filesystem . credentials ]
project_id = "my-project"
client_email = "service-account@my-project.iam.gserviceaccount.com"
private_key = "-----BEGIN PRIVATE KEY----- \n ..."
Azure Blob Storage
# Azure blob storage
azure_data = readers(
bucket_url = "az://my-container/data/" ,
file_glob = "*.jsonl"
).read_jsonl()
pipeline.run(azure_data)
Configure credentials: [ sources . filesystem . credentials ]
azure_storage_account_name = "myaccount"
azure_storage_account_key = "YOUR_ACCOUNT_KEY"
Merge CSV Files
Load and merge CSV files based on a key column:
import dlt
from dlt.sources.filesystem import readers
pipeline = dlt.pipeline(
pipeline_name = "csv_merge" ,
destination = "duckdb" ,
dataset_name = "met_data" ,
)
# Load CSV files and merge on 'date' column
met_files = readers(
bucket_url = "samples" ,
file_glob = "met_csv/A801/*.csv"
).read_csv()
# Configure merge behavior
met_files.apply_hints(
write_disposition = "merge" ,
merge_key = "date"
)
load_info = pipeline.run(met_files.with_name( "met_csv" ))
print (load_info)
Incremental File Loading
Track which files have been loaded to avoid reprocessing:
import dlt
from dlt.sources.filesystem import filesystem, read_csv
pipeline = dlt.pipeline(
pipeline_name = "incremental_files" ,
destination = "duckdb" ,
dataset_name = "file_tracker" ,
)
# Track files by modification time
new_files = filesystem(
bucket_url = "data/csv" ,
file_glob = "csv/*"
)
# Add incremental on modification_date
new_files.apply_hints(
incremental = dlt.sources.incremental( "modification_date" )
)
load_info = pipeline.run(
(new_files | read_csv()).with_name( "csv_files" )
)
print (load_info)
# Second run - only new/modified files are loaded
load_info = pipeline.run(
(new_files | read_csv()).with_name( "csv_files" )
)
print (load_info) # No new files
Custom File Processing
Create custom transformers for specialized file formats:
import dlt
from typing import Iterator
from dlt.sources import TDataItems
from dlt.sources.filesystem import FileItemDict, filesystem
@dlt.transformer
def read_excel (
items : Iterator[FileItemDict],
sheet_name : str
) -> Iterator[TDataItems]:
"""Read Excel files using pandas"""
import pandas as pd
for file_obj in items:
with file_obj.open() as file :
yield pd.read_excel( file , sheet_name).to_dict( orient = "records" )
# Use the custom transformer
fresman_xls = filesystem(
bucket_url = "samples" ,
file_glob = "../custom/freshman_kgs.xlsx"
) | read_excel( "freshman_table" )
load_info = dlt.run(
freshman_xls.with_name( "freshman" ),
destination = "duckdb" ,
dataset_name = "freshman_data" ,
)
print (load_info)
Copy Files While Loading
Download files locally while tracking them in the database:
import os
import dlt
from dlt.sources.filesystem import FileItemDict, filesystem
pipeline = dlt.pipeline(
pipeline_name = "file_copier" ,
destination = "duckdb" ,
dataset_name = "file_metadata" ,
)
def _copy ( item : FileItemDict) -> FileItemDict:
"""Download file and return metadata"""
dest_file = os.path.join( "_storage" , item[ "relative_path" ])
os.makedirs(os.path.dirname(dest_file), exist_ok = True )
# Download file
item.fsspec.download(item[ "file_url" ], dest_file)
return item
# Add copy step to filesystem source
downloader = filesystem(
bucket_url = "samples" ,
file_glob = "**"
).add_map(_copy)
# Load file metadata to 'listing' table
load_info = pipeline.run(
downloader.with_name( "listing" ),
write_disposition = "replace"
)
print (load_info)
DuckDB Fast CSV Reading
Use DuckDB’s native CSV reader for maximum performance:
from dlt.sources.filesystem import readers
pipeline = dlt.pipeline(
pipeline_name = "fast_csv" ,
destination = "duckdb" ,
dataset_name = "csv_data" ,
)
# Use DuckDB's native CSV reader
csv_files = readers(
bucket_url = "samples" ,
file_glob = "met_csv/A801/*.csv"
).read_csv_duckdb(
chunk_size = 1000 ,
header = True
)
load_info = pipeline.run(csv_files)
print (load_info)
Compressed Files
Automatically handle compressed files:
from dlt.sources.filesystem import readers
pipeline = dlt.pipeline(
pipeline_name = "compressed_files" ,
destination = "duckdb" ,
dataset_name = "taxi_data" ,
)
# Automatically decompress .gz files
compressed_files = readers(
bucket_url = "samples" ,
file_glob = "gzip/*"
).read_csv_duckdb()
load_info = pipeline.run(compressed_files)
print (load_info)
File Glob Patterns
Use glob patterns to filter files:
# Single directory
readers( bucket_url = "data" , file_glob = "*.csv" )
# Recursive search
readers( bucket_url = "data" , file_glob = "**/*.json" )
# Multiple patterns
readers( bucket_url = "data" , file_glob = "2024-*/*.csv" )
# Specific subdirectory
readers( bucket_url = "data" , file_glob = "exports/2024/**/*.parquet" )
import dlt
from dlt.sources.filesystem import readers
def load_all_files ():
pipeline = dlt.pipeline(
pipeline_name = "multi_format" ,
destination = "duckdb" ,
dataset_name = "data_lake" ,
)
# Load JSONL files
jsonl_data = readers(
bucket_url = "s3://my-bucket/data" ,
file_glob = "**/*.jsonl"
).read_jsonl( chunksize = 10000 )
# Load Parquet files
parquet_data = readers(
bucket_url = "s3://my-bucket/data" ,
file_glob = "**/*.parquet"
).read_parquet()
# Load CSV files with merge
csv_data = readers(
bucket_url = "s3://my-bucket/data" ,
file_glob = "**/*.csv"
).read_csv()
csv_data.apply_hints(
write_disposition = "merge" ,
merge_key = "id"
)
# Load all together
load_info = pipeline.run([
jsonl_data.with_name( "events" ),
parquet_data.with_name( "analytics" ),
csv_data.with_name( "reference_data" ),
])
print (load_info)
if __name__ == "__main__" :
load_all_files()
Next Steps
Incremental Loading Track processed files automatically
Schema Evolution Handle changing file schemas