Documentation Index Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
Filesystem Source
The filesystem source reads files from various storage locations including cloud storage (AWS S3, Google Cloud Storage, Azure Blob Storage) and local filesystems. It provides built-in readers for common file formats and supports custom file processing.
Quick Start
Load CSV files from an S3 bucket:
import dlt
from dlt.sources.filesystem import readers
# Load CSV files from S3
csv_reader = readers(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "*.csv" ,
).read_csv()
pipeline = dlt.pipeline(
pipeline_name = "s3_csv_pipeline" ,
destination = "duckdb" ,
dataset_name = "s3_data"
)
load_info = pipeline.run(csv_reader)
print (load_info)
Supported Storage Locations
AWS S3 bucket_url = "s3://bucket-name/path/"
Google Cloud Storage bucket_url = "gs://bucket-name/path/"
Azure Blob Storage bucket_url = "az://container-name/path/"
Local Filesystem bucket_url = "file:///local/path/"
SFTP bucket_url = "sftp://user@host/path/"
Google Drive bucket_url = "gdrive://folder-id/"
Authentication
Configure credentials for cloud storage providers:
AWS S3
Google Cloud Storage
Azure Blob Storage
from dlt.sources.credentials import AwsCredentials
credentials = AwsCredentials(
aws_access_key_id = dlt.secrets[ "aws_access_key_id" ],
aws_secret_access_key = dlt.secrets[ "aws_secret_access_key" ],
)
csv_reader = readers(
bucket_url = "s3://my-bucket/data/" ,
credentials = credentials,
).read_csv()
Or use environment variables: export AWS_ACCESS_KEY_ID = "your_access_key"
export AWS_SECRET_ACCESS_KEY = "your_secret_key"
from dlt.sources.credentials import GcpServiceAccountCredentials
credentials = GcpServiceAccountCredentials(
project_id = "my-project" ,
private_key = dlt.secrets[ "gcp_private_key" ],
client_email = dlt.secrets[ "gcp_client_email" ],
)
csv_reader = readers(
bucket_url = "gs://my-bucket/data/" ,
credentials = credentials,
).read_csv()
Or use service account JSON: export GOOGLE_APPLICATION_CREDENTIALS = "/path/to/service-account.json"
from dlt.sources.credentials import AzureCredentials
credentials = AzureCredentials(
azure_storage_account_name = dlt.secrets[ "account_name" ],
azure_storage_account_key = dlt.secrets[ "account_key" ],
)
csv_reader = readers(
bucket_url = "az://my-container/data/" ,
credentials = credentials,
).read_csv()
Built-in File Readers
The readers source provides built-in support for common file formats:
CSV Files
from dlt.sources.filesystem import readers
# Read CSV files with default settings
csv_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*.csv" , # Recursive pattern
).read_csv()
# Customize CSV reading with pandas kwargs
csv_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
).read_csv(
chunksize = 10000 ,
sep = ";" ,
encoding = "utf-8" ,
dtype = { "id" : str },
)
pipeline.run(csv_data)
Parquet Files
# Read Parquet files
parquet_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*.parquet" ,
).read_parquet( chunksize = 50000 )
pipeline.run(parquet_data)
JSONL Files
# Read JSON Lines files
jsonl_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*.jsonl" ,
).read_jsonl( chunksize = 10000 )
pipeline.run(jsonl_data)
DuckDB CSV Reader
For faster CSV processing, use the DuckDB reader:
# Use DuckDB for fast CSV reading
csv_duckdb = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
).read_csv_duckdb(
chunksize = 100000 ,
# Pass DuckDB-specific options
header = True ,
delim = "," ,
)
pipeline.run(csv_duckdb)
File Filtering
Use glob patterns to filter files:
All CSV Files (Recursive)
Specific Directory
Multiple Extensions
Date Pattern
# Match all CSV files in all subdirectories
file_glob = "**/*.csv"
Using the Filesystem Resource Directly
For custom file processing, use the filesystem resource:
from dlt.sources.filesystem import filesystem
@dlt.resource
def process_files ():
"""Custom file processing logic"""
files = filesystem(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "*.txt" ,
)
for file_batch in files:
for file_item in file_batch:
# Read file content
content = file_item.read_bytes()
# Process content
processed_data = custom_parser(content)
yield processed_data
pipeline.run(process_files())
Access file metadata from FileItem objects:
from dlt.sources.filesystem import filesystem
@dlt.resource
def file_metadata ():
"""Extract file metadata"""
files = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*" ,
)
for file_batch in files:
for file_item in file_batch:
yield {
"file_url" : file_item[ "file_url" ],
"file_name" : file_item[ "file_name" ],
"mime_type" : file_item[ "mime_type" ],
"size_in_bytes" : file_item[ "size_in_bytes" ],
"modification_date" : file_item[ "modification_date" ],
}
pipeline.run(file_metadata())
Incremental Loading
Load only new or modified files:
from dlt.sources.filesystem import readers
# Load files incrementally based on modification date
csv_data = readers(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "**/*.csv" ,
incremental = dlt.sources.incremental(
cursor_path = "modification_date" ,
initial_value = "2024-01-01T00:00:00Z"
),
).read_csv()
pipeline.run(csv_data)
On subsequent runs, only files modified after the last run will be processed.
Complete Example: S3 to DuckDB
A comprehensive example loading multiple file formats from S3:
import dlt
from dlt.sources.filesystem import readers, filesystem
from dlt.sources.credentials import AwsCredentials
@dlt.source
def s3_data_source ():
"""Load multiple file formats from S3"""
# AWS credentials from secrets
credentials = AwsCredentials(
aws_access_key_id = dlt.secrets[ "aws_access_key_id" ],
aws_secret_access_key = dlt.secrets[ "aws_secret_access_key" ],
)
# Load CSV files with incremental loading
yield readers(
bucket_url = "s3://my-bucket/csv/" ,
file_glob = "**/*.csv" ,
credentials = credentials,
incremental = dlt.sources.incremental(
cursor_path = "modification_date" ,
initial_value = "2024-01-01T00:00:00Z"
),
).read_csv(
chunksize = 10000 ,
dtype = { "id" : str }
)
# Load Parquet files
yield readers(
bucket_url = "s3://my-bucket/parquet/" ,
file_glob = "**/*.parquet" ,
credentials = credentials,
).read_parquet( chunksize = 50000 )
# Load JSONL files
yield readers(
bucket_url = "s3://my-bucket/jsonl/" ,
file_glob = "**/*.jsonl" ,
credentials = credentials,
).read_jsonl( chunksize = 10000 )
# Create and run pipeline
pipeline = dlt.pipeline(
pipeline_name = "s3_pipeline" ,
destination = "duckdb" ,
dataset_name = "s3_data"
)
load_info = pipeline.run(s3_data_source())
print (load_info)
Custom File Processing
Implement custom logic for specific file formats:
import json
from dlt.sources.filesystem import filesystem
@dlt.resource
def process_custom_format ():
"""Process custom file format"""
files = filesystem(
bucket_url = "s3://my-bucket/custom/" ,
file_glob = "*.custom" ,
)
for file_batch in files:
for file_item in file_batch:
# Read and decode file
content = file_item.read_bytes().decode( "utf-8" )
# Custom parsing logic
lines = content.split( " \n " )
for line in lines:
if line.strip():
# Parse custom format
record = parse_custom_line(line)
yield record
def parse_custom_line ( line : str ) -> dict :
"""Custom parsing logic"""
# Implement your parsing logic here
parts = line.split( "|" )
return {
"field1" : parts[ 0 ],
"field2" : parts[ 1 ],
"field3" : int (parts[ 2 ]),
}
pipeline.run(process_custom_format())
Advanced Configuration
Customize fsspec Options
Pass additional options to the underlying fsspec filesystem:
csv_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
kwargs = { "use_ssl" : True }, # fsspec options
client_kwargs = { "region_name" : "us-west-2" }, # boto3 client options
).read_csv()
Load file content directly into the data:
files_with_content = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.txt" ,
extract_content = True , # Adds file_content field
)
for file_batch in files_with_content:
for file_item in file_batch:
content = file_item[ "file_content" ]
# Process content
Control Batch Size
Adjust the number of files processed per batch:
files = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
files_per_page = 50 , # Process 50 files at a time
)
Best Practices
Use appropriate chunk sizes
Balance memory usage and performance with appropriate chunk sizes: # For large CSV files, use smaller chunks
csv_data = readers(
bucket_url = "s3://..." ,
file_glob = "*.csv" ,
).read_csv( chunksize = 10000 )
# For Parquet, larger chunks are often better
parquet_data = readers(
bucket_url = "s3://..." ,
file_glob = "*.parquet" ,
).read_parquet( chunksize = 100000 )
Use incremental loading for growing datasets
Always use incremental loading when files are continuously added: csv_data = readers(
bucket_url = "s3://..." ,
file_glob = "**/*.csv" ,
incremental = dlt.sources.incremental(
cursor_path = "modification_date"
),
).read_csv()
Use specific glob patterns to minimize file scanning: # ✅ Good: Specific pattern
file_glob = "data/2024/**/*.csv"
# ❌ Bad: Too broad
file_glob = "**/*"
Never hardcode credentials. Use environment variables or secrets.toml: # secrets.toml
[ sources . aws ]
aws_access_key_id = "your_access_key"
aws_secret_access_key = "your_secret_key"
Troubleshooting
Ensure credentials are correctly configured: # Test S3 access
from dlt.common.storages.fsspec_filesystem import fsspec_filesystem
fs, _ = fsspec_filesystem( "s3://my-bucket/" )
print (fs.ls( "." )) # List files to test access
Check your glob pattern and bucket URL: # Debug: List all files
files = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*" , # Match everything
)
for file_batch in files:
for file_item in file_batch:
print (file_item[ "file_url" ])
Memory issues with large files
Reduce chunk size or process files individually: # Smaller chunks
csv_data = readers(
bucket_url = "s3://..." ,
file_glob = "*.csv" ,
).read_csv( chunksize = 1000 ) # Smaller chunks
Next Steps