Skip to main content
The pipeline module contains every public function called from the notebooks in the OpenAVM Kit project. This is the primary interface for building automated valuation models.

Initialization

init_notebook()

Initialize the notebook environment for a specific locality.
from openavmkit import pipeline

pipeline.init_notebook("us-nc-guilford")
locality
str
required
The locality slug (e.g., “us-nc-guilford”)

load_settings()

Load and return the settings dictionary for the locality.
settings = pipeline.load_settings()
settings_file
str
default:"in/settings.json"
Path to the settings file
settings_object
dict
default:"None"
Optional settings object to use instead of loading from a file
error
bool
default:"True"
If True, raises an error if the settings file cannot be loaded
warning
bool
default:"True"
If True, raises a warning if the settings file cannot be loaded
settings
dict
The fully resolved settings dictionary

Data Loading & Processing

load_dataframes()

Load dataframes based on the provided settings and return them in a dictionary.
dataframes = pipeline.load_dataframes(settings, verbose=True)
settings
dict
required
Settings dictionary
verbose
bool
default:"False"
If True, prints detailed logs during data loading
dataframes
dict
Dictionary mapping keys to loaded DataFrames

process_dataframes()

Process dataframes according to settings and return a SalesUniversePair.
sup = pipeline.process_dataframes(dataframes, settings, verbose=True)
dataframes
dict
required
Dictionary of DataFrames
settings
dict
required
Settings dictionary for data processing
verbose
bool
default:"False"
If True, prints detailed logs during processing
sup
SalesUniversePair
A SalesUniversePair object containing the processed sales and universe data

tag_model_groups_sup()

Tag model groups for a SalesUniversePair based on user-specified filters.
sup = pipeline.tag_model_groups_sup(sup, settings, verbose=True)
sup
SalesUniversePair
required
Sales and universe data
settings
dict
required
Configuration settings
verbose
bool
default:"False"
If True, enables verbose output
sup
SalesUniversePair
Updated SalesUniversePair with tagged model groups

process_sales()

Process sales data within a SalesUniversePair by cleaning invalid sales and applying time adjustments.
sup = pipeline.process_sales(sup, settings, write=True, verbose=True)
sup
SalesUniversePair
required
Sales and universe data
settings
dict
required
Configuration settings
write
bool
default:"False"
Whether to write out data during processing
verbose
bool
default:"False"
If True, prints verbose output during processing
sup
SalesUniversePair
Updated SalesUniversePair with processed sales data

Enrichment Functions

enrich_sup_spatial_lag()

Enrich the sales and universe DataFrames with spatial lag features.
sup = pipeline.enrich_sup_spatial_lag(sup, settings, verbose=True)
sup
SalesUniversePair
required
SalesUniversePair containing sales and universe DataFrames
settings
dict
required
Settings dictionary
verbose
bool
default:"False"
If True, prints progress information
sup
SalesUniversePair
Enriched SalesUniversePair with spatial lag features

enrich_sup_streets()

Enrich a GeoDataFrame with street network data.
This function can be VERY computationally and memory intensive for large datasets.
sup = pipeline.enrich_sup_streets(sup, settings, verbose=True)
sup
SalesUniversePair
required
The data you want to enrich
settings
dict
required
Settings dictionary
verbose
bool
default:"False"
If True, prints verbose output during processing
sup
SalesUniversePair
Enriched SalesUniversePair with street-related metrics

fill_unknown_values_sup()

Fill unknown values with default values as specified in settings.
sup = pipeline.fill_unknown_values_sup(sup, settings)
sup
SalesUniversePair
required
The SalesUniversePair containing sales and universe data
settings
dict
required
The settings dictionary containing configuration for filling unknown values
sup
SalesUniversePair
The updated SalesUniversePair with filled unknown values

Sales Scrutiny & Clustering

run_sales_scrutiny()

Run sales scrutiny analysis for each model group within a SalesUniversePair.
sup = pipeline.run_sales_scrutiny(
    sup, 
    settings, 
    drop_cluster_outliers=True,
    drop_heuristic_outliers=True,
    verbose=True
)
sup
SalesUniversePair
required
Sales and universe data
settings
dict
required
Configuration settings
drop_cluster_outliers
bool
default:"False"
If True, drops invalid sales identified through cluster analysis
drop_heuristic_outliers
bool
default:"True"
If True, drops invalid sales identified through heuristics
verbose
bool
default:"False"
If True, enables verbose logging
sup
SalesUniversePair
Updated SalesUniversePair after sales scrutiny analysis

mark_ss_ids_per_model_group_sup()

Cluster parcels for a sales scrutiny study by assigning sales scrutiny IDs.
sup = pipeline.mark_ss_ids_per_model_group_sup(sup, settings, verbose=True)
sup
SalesUniversePair
required
Sales and universe data
settings
dict
required
Configuration settings
verbose
bool
default:"False"
If True, prints verbose output during processing
sup
SalesUniversePair
Updated SalesUniversePair with marked sales scrutiny IDs

mark_horizontal_equity_clusters_per_model_group_sup()

Cluster parcels for a horizontal equity study by assigning horizontal equity cluster IDs.
sup = pipeline.mark_horizontal_equity_clusters_per_model_group_sup(
    sup,
    settings,
    verbose=True,
    do_land_clusters=True,
    do_impr_clusters=True
)
sup
SalesUniversePair
required
Sales and universe data
settings
dict
required
Configuration settings
verbose
bool
default:"False"
If True, prints verbose output
do_land_clusters
bool
default:"True"
If True, enables land clustering
do_impr_clusters
bool
default:"True"
If True, enables improvement clustering
sup
SalesUniversePair
Updated SalesUniversePair with horizontal equity clusters marked

Modeling Functions

try_variables()

Run tests on variables to figure out which might be the most predictive.
pipeline.try_variables(sup, settings, verbose=True, plot=True, do_report=True)
sup
SalesUniversePair
required
Your data
settings
dict
required
Settings dictionary
verbose
bool
default:"False"
If True, prints detailed logs
plot
bool
default:"False"
If True, prints visual plots
do_report
bool
default:"False"
If True, generates PDF reports

try_models()

Try out predictive models on the given SalesUniversePair. Optimized for speed and iteration.
pipeline.try_models(
    sup,
    settings,
    save_params=True,
    use_saved_params=True,
    verbose=True,
    run_main=True,
    run_vacant=True,
    run_hedonic=True,
    run_ensemble=True,
    do_shaps=False,
    do_plots=False
)
sup
SalesUniversePair
required
Sales and universe data
settings
dict
required
Configuration settings
save_params
bool
default:"True"
Whether to save model parameters
use_saved_params
bool
default:"True"
Whether to use saved model parameters
verbose
bool
default:"False"
If True, enables verbose output
run_main
bool
default:"True"
Flag to run main models
run_vacant
bool
default:"True"
Flag to run vacant models
run_hedonic
bool
default:"True"
Flag to run hedonic models
run_ensemble
bool
default:"True"
Flag to run ensemble models
do_shaps
bool
default:"False"
Flag to run SHAP analysis
do_plots
bool
default:"False"
Flag to plot scatterplots

Checkpoint & Cloud Functions

from_checkpoint()

Read cached data from a checkpoint file or generate it via a function.
df = pipeline.from_checkpoint(
    "out/checkpoints/data.parquet",
    load_and_process_data,
    {"settings": settings}
)
path
str
required
Path to the checkpoint file
func
callable
required
Function to run if the checkpoint is not available. Should return a DataFrame
params
dict
required
Parameters to pass to func when generating the data
df
pd.DataFrame
The resulting DataFrame, loaded from the checkpoint or generated

write_checkpoint()

Write data to a checkpoint file.
pipeline.write_checkpoint(data, "out/checkpoints/data.parquet")
data
Any
required
Data to be checkpointed
path
str
required
File path for saving the checkpoint

delete_checkpoints()

Delete all checkpoints that match the given prefix.
pipeline.delete_checkpoints("out/checkpoints/")
prefix
str
required
The prefix used to identify checkpoints to delete

cloud_sync()

Synchronize local files to cloud storage.
pipeline.cloud_sync(
    locality="us-nc-guilford",
    verbose=True,
    dry_run=False,
    ignore_paths=["*.tmp"]
)
locality
str
required
The locality identifier used to form remote paths
verbose
bool
default:"False"
If True, prints detailed log messages
dry_run
bool
default:"False"
If True, simulates the sync without performing any changes
ignore_paths
list
default:"None"
List of file paths or patterns to ignore during sync

Data Examination

examine_sup()

Print examination details of the sales and universe data from a SalesUniversePair.
pipeline.examine_sup(sup, settings)
sup
SalesUniversePair
required
Object containing ‘sales’ and ‘universe’ DataFrames
s
dict
required
Settings dictionary

examine_df()

Print examination details of a DataFrame.
pipeline.examine_df(df, settings)
df
pd.DataFrame
required
The data you wish to examine
s
dict
required
Settings dictionary

Output Functions

write_notebook_output_sup()

Write notebook output to disk.
pipeline.write_notebook_output_sup(
    sup,
    prefix="1-assemble",
    parquet=True,
    gpkg=False,
    shp=False,
    csv=False
)
sup
SalesUniversePair
required
Sales and universe data
prefix
str
default:"1-assemble"
File prefix for naming output files
parquet
bool
default:"True"
Whether to write to parquet format
gpkg
bool
default:"False"
Whether to write to gpkg format
shp
bool
default:"False"
Whether to write to ESRI shapefile format
csv
bool
default:"False"
Whether to write to CSV format

write_parquet()

Write data to a parquet file.
pipeline.write_parquet(df, "out/data.parquet")
df
pd.DataFrame
required
Data to be written
path
str
required
File path for saving the parquet

Build docs developers (and LLMs) love