Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/neo4j-labs/neocarta/llms.txt

Use this file to discover all available pages before exploring further.

Every Neocarta connector follows the same Extract–Transform–Load (ETL) pattern. The three stages are implemented as separate classes — Extractor, Transformer, Loader — orchestrated by a top-level connector class. This separation keeps each stage’s responsibility narrow and testable, and makes the flow predictable regardless of which data source or format is being ingested.

The Three Stages

1

Extract

The extractor connects to the data source — via SQL queries against information schema tables, REST API calls to a catalog, file reads, or log stream reads — and caches the raw metadata in memory. The raw objects are source-specific (unvalidated dicts, typed TypedDicts, or API response models). They are not yet in the Neocarta graph schema.Each extractor method (e.g., extract_table_info, extract_column_info) is decorated with @log_stage, which logs a one-line summary of the method name, its target (e.g., dataset id), the number of objects produced, and the elapsed time. Extract methods log counts and targets only — never SQL text, row values, or credentials.After extract completes, the extractor’s cache is populated and an _extracted flag is set on the connector. Calling extract() a second time replaces the cache and resets downstream flags.
2

Transform

The transformer reads from the extractor cache and validates each raw object against the shared Pydantic models in neocarta.data_model. Invalid fields are coerced or dropped according to the field validators defined on those models (e.g., None/NaN normalization, uppercasing of platform names).The output is a collection of typed Pydantic instances — Database, Schema, Table, Column, References, BusinessTerm, Query, and so on — ready to be written into Neo4j.At the end of transform(), the connector calls log_transform_counts(...) to emit a per-type summary (e.g., Transformed 42 Table, Transformed 187 Column). Zero-count types are skipped.
Calling transform() before extract() raises a StateError. The connector enforces stage ordering through the _extracted flag set during extract.
3

Load

The loader writes the transformed Pydantic models into Neo4j using MERGE (upsert) statements. Each node type and relationship pattern is handled separately, and the loader logs per-pattern merge counters (created, properties set) for each write. Loading is idempotent — running the same connector twice against the same source will not create duplicate nodes.During load, the Neo4j full-text and vector indexes are created if they don’t exist yet. Vector indexes are only created when the source data contains embeddings.
Calling load() before transform() raises a StateError. The connector enforces stage ordering through the _transformed flag set during transform.

The ingest() Orchestrator

Most callers interact with ingest() rather than calling the three stages individually. ingest() runs extract()transform()load() in sequence, then calls loader.upsert_neocarta_graph_node() to record that Neocarta has touched the graph (a bookkeeping node used by the MCP server for diagnostics).
import os
from neo4j import GraphDatabase
from google.cloud import bigquery
from neocarta.connectors.bigquery import BigQuerySchemaConnector

driver = GraphDatabase.driver(
    os.getenv("NEO4J_URI"),
    auth=(os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD")),
)
client = bigquery.Client(project=os.getenv("GCP_PROJECT_ID"))

# ingest() runs extract → transform → load in one call
BigQuerySchemaConnector(
    client=client,
    project_id=os.getenv("GCP_PROJECT_ID"),
    neo4j_driver=driver,
).ingest(dataset_id=os.getenv("BIGQUERY_DATASET_ID"))
Source-specific parameters (dataset id, query time window, file path) are passed to ingest(). Parameters stable for the connector’s lifetime (the Neo4j driver, the BigQuery client, the project id) are passed to the constructor.

Format Connectors: The Export Direction

Format connectors (CSV, OSI YAML) additionally support an export() orchestrator. Export runs in the opposite direction: it reads from the Neo4j graph, builds a source-format object from the subgraph, and writes it to a file. The internal stages of export (graph read, format build, file write) are private helpers — only export() is part of the public surface.
from neocarta.connectors.osi import OsiConnector

with OsiConnector(neo4j_driver=driver) as connector:
    # ingest direction: OSI YAML → Neo4j
    connector.ingest(spec_source="./datasets/osi/acme_semantic_model.yaml")

    # export direction: Neo4j → OSI YAML
    connector.export(
        semantic_model_name="acme_corp_model",
        output_path="./acme_export.yaml",
    )
Source connectors do not support export() — Neocarta does not write back to external catalogs.

Selective Loading with include_nodes / include_relationships

Connectors that support selective loading accept include_nodes and include_relationships parameters on both extract() and ingest(). Pass lists of NodeLabel and RelationshipType enum values to control which entity types are ingested.
from neocarta import NodeLabel as nl, RelationshipType as rt
from neocarta.connectors.csv import CSVConnector

connector = CSVConnector(
    csv_directory="./datasets/csv",
    neo4j_driver=driver,
)

# Load only the core structural schema — skip glossary and query history
connector.ingest(
    include_nodes=[nl.DATABASE, nl.SCHEMA, nl.TABLE, nl.COLUMN],
    include_relationships=[rt.HAS_SCHEMA, rt.HAS_TABLE, rt.HAS_COLUMN, rt.REFERENCES],
)
None (the default) means “include everything the connector can produce.”

Context Manager Protocol

All connectors implement the context manager protocol. Using a connector as a context manager ensures that any resources the connector owns are released when the block exits, even if an exception occurs.
with BigQuerySchemaConnector(
    client=client,
    project_id=os.getenv("GCP_PROJECT_ID"),
    neo4j_driver=driver,
) as connector:
    connector.ingest(dataset_id=os.getenv("BIGQUERY_DATASET_ID"))
# connector-owned resources are released here
The three methods involved are:
MethodBehavior
__enter__()Returns self — no resource acquisition happens here
__exit__()Calls close() unconditionally
close()Releases only resources the connector created (e.g., an HTTP client). The injected Neo4j driver is never closed by the connector — it is owned by the caller.
Connectors must not close the injected Neo4j driver inside close(). The driver’s lifecycle belongs to the caller, who may share it across multiple connectors or reuse it after the with block exits.

SourceConnectorProtocol and FormatConnectorProtocol

The connector contract is made executable through two runtime-checkable protocols defined in neocarta.connectors._base:
from neocarta.connectors._base import SourceConnectorProtocol, FormatConnectorProtocol

# Check at runtime whether an object conforms
assert isinstance(my_connector, SourceConnectorProtocol)

# Format connectors satisfy both protocols
assert isinstance(osi_connector, FormatConnectorProtocol)
assert isinstance(osi_connector, SourceConnectorProtocol)  # also true
FormatConnectorProtocol extends SourceConnectorProtocol with the export() method. Every connector in neocarta/connectors/ ships a tests/unit/connectors/<name>/test_conformance.py that asserts protocol conformance, verifies all public stage methods are present, and confirms that out-of-order stage calls raise StateError.

Build docs developers (and LLMs) love