Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/neo4j-labs/neocarta/llms.txt

Use this file to discover all available pages before exploring further.

Neocarta connectors are the bridge between a data source and the Neo4j semantic layer graph. Every connector follows a shared contract — directory layout, stage API, state lifecycle, context manager support, and a required README — so that connectors are interoperable, testable, and maintainable. This guide covers how to build a new source connector from scratch, using the scaffold tooling to get a conformant skeleton quickly.

Connector Contract Overview

The full connector contract is documented in .claude/skills/neocarta-add-source-connector/connector-contract.md in the repository. The key rules are:

Directory layout

Connectors live under neocarta/connectors/<name>/. Source connectors use sub-folders per data type (e.g., schema/, glossary/). Format connectors use ingest/ and export/ sub-folders.

Public API

Every connector exposes extract(), transform(), load(), and ingest(). Format connectors additionally expose export(). Internal extractor, transformer, and loader classes are never exported.

Stage ordering

transform() requires a prior extract(). load() requires a prior transform(). Calling out of order raises StateError from neocarta.errors.

Context manager

All connectors implement __enter__, __exit__, and close(). The injected Neo4j driver is owned by the caller — close() must not close it.

Required Files

A minimal source connector package contains:
neocarta/connectors/my_source/
├── __init__.py          # exports only MySourceConnector (and any warnings)
├── README.md            # required; see contract §12 for sections
└── schema/
    ├── __init__.py      # exports MySourceSchemaConnector
    ├── connector.py     # the connector class
    ├── extract.py       # Extractor class
    ├── transform.py     # Transformer class
    └── models.py        # TypedDicts for raw extract output (optional)
The __init__.py at the package root re-exports the connector class and any connector-specific warnings. It does not export the extractor, transformer, or loader.

Stage Responsibilities

The extractor reads raw metadata from the source and caches it for the transformer. Each extract_*_info method is decorated with @log_stage from neocarta._logging, which automatically logs the method name, an allowlisted target identifier (e.g., dataset_id), the row count returned, and the elapsed time.
from neocarta._logging import log_stage

class MySourceExtractor:
    def __init__(self, client, project_id: str) -> None:
        self._client = client
        self._project_id = project_id
        self._tables: list[dict] = []

    @log_stage
    def extract_table_info(self, dataset_id: str) -> list[dict]:
        raw = self._client.list_tables(self._project_id, dataset_id)
        self._tables = [{"name": t.name, "description": t.description} for t in raw]
        return self._tables

    @property
    def tables(self) -> list[dict]:
        return self._tables
Never log SQL text, row values, or credentials — only counts, labels, targets, and elapsed time.

SourceConnectorProtocol Conformance

The connector contract is enforced at runtime via SourceConnectorProtocol. You can check conformance in code:
from neocarta.connectors._base import SourceConnectorProtocol

connector = MySourceSchemaConnector(
    client=my_client,
    neo4j_driver=driver,
)

assert isinstance(connector, SourceConnectorProtocol)

Using the Scaffold Tool

The repository ships a scaffold driver that creates a conformant connector skeleton and a conformance test in one command. The scaffold passes verify as generated — you fill in the implementation without worrying about wiring.
# List all connectors and their detected kind (source / format)
uv run .claude/skills/neocarta-add-source-connector/scripts/driver.py list

# Scaffold a new flat source connector
uv run .claude/skills/neocarta-add-source-connector/scripts/driver.py scaffold my_source

# Scaffold a data-type sub-connector (e.g., schema connector under my_source)
uv run .claude/skills/neocarta-add-source-connector/scripts/driver.py scaffold my_source/schema

# Scaffold a format connector (adds export() orchestrator)
uv run .claude/skills/neocarta-add-source-connector/scripts/driver.py scaffold my_format --format

# Verify a connector against the contract (static checks + conformance pytest)
uv run .claude/skills/neocarta-add-source-connector/scripts/driver.py verify my_source
verify checks: import success, protocol conformance (SourceConnectorProtocol or FormatConnectorProtocol), __all__ minimalism (no internal classes exported), README.md presence, absence of inline id f-strings and stray print() calls, and runs the connector’s test_conformance.py. It exits non-zero on any failure.

Minimal Connector Skeleton

The following shows the core structure of a source connector. The scaffold generates this; the TODO markers show what you fill in.
# neocarta/connectors/my_source/schema/connector.py

from __future__ import annotations

import logging
from typing import Any

from neo4j import Driver
from typing_extensions import Self

from neocarta._logging import log_transform_counts
from neocarta.errors import StateError
from neocarta.ingest.rdbms import Neo4jRDBMSLoader

from .extract import MySourceExtractor
from .transform import MySourceTransformer

logger = logging.getLogger(__name__)

_TRANSFORM_COUNTS = (
    ("Table", "tables"),
    ("Column", "columns"),
    ("HasTable", "has_tables"),
    ("HasColumn", "has_columns"),
    ("References", "references"),
)


class MySourceSchemaConnector:
    """Source connector for <MySource> schema metadata."""

    def __init__(
        self,
        client: Any,           # source-specific client
        neo4j_driver: Driver,
        database_name: str = "neo4j",
    ) -> None:
        self._client = client
        self._neo4j_driver = neo4j_driver
        self._database_name = database_name

        self._extractor = MySourceExtractor(client=client)
        self._transformer = MySourceTransformer(extractor=self._extractor)
        self._loader = Neo4jRDBMSLoader(
            neo4j_driver=neo4j_driver,
            database_name=database_name,
        )

        self._extracted = False
        self._transformed = False

    def extract(self, dataset_id: str) -> None:
        """Read raw schema metadata from the source."""
        self._extracted = False
        self._transformed = False
        self._extractor.extract_table_info(dataset_id=dataset_id)
        self._extractor.extract_column_info(dataset_id=dataset_id)
        self._extracted = True

    def transform(self) -> None:
        """Map raw metadata to Neocarta graph models."""
        if not self._extracted:
            raise StateError("extract() must be called before transform()")
        self._transformed = False
        self._transformer.transform_tables()
        self._transformer.transform_columns()
        log_transform_counts(logger, self._transformer, _TRANSFORM_COUNTS)
        self._transformed = True

    def load(self) -> None:
        """Write graph models into Neo4j using MERGE."""
        if not self._transformed:
            raise StateError("transform() must be called before load()")
        self._loader.load_tables(self._transformer.tables)
        self._loader.load_columns(self._transformer.columns)
        self._loader.load_references(self._transformer.references)

    def ingest(self, dataset_id: str) -> None:
        """Run extract → transform → load and record graph metadata."""
        self.extract(dataset_id=dataset_id)
        self.transform()
        self.load()
        self._loader.upsert_neocarta_graph_node()
        logger.info("Ingest completed for dataset %s", dataset_id)

    def run(self, dataset_id: str | None = None) -> None:
        """Deprecated — use ingest() instead."""
        import warnings
        warnings.warn(
            "run() is deprecated; use ingest() instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        self.ingest(dataset_id=dataset_id)  # type: ignore[arg-type]

    def close(self) -> None:
        """Release connector-owned resources (NOT the injected driver)."""
        # If this connector created its own HTTP client, close it here.
        # Do not close self._neo4j_driver — it belongs to the caller.
        pass

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None:
        self.close()

Adding CLI Support

CLI integration is a separate PR from the connector library itself. When you’re ready to expose the connector via the CLI:
1

Add a Click command group

Create neocarta/_cli/commands/my_source.py with a Click group and one subcommand per connector verb (e.g., schema, logs). Follow the noun-verb grammar: neocarta <source> <verb>.
2

Register the group

Import and register your group in neocarta/_cli/main.py:
from .commands.my_source import my_source
cli.add_command(my_source)
3

Map env vars to options

Read NEO4J_* variables via neo4j_options() (the shared Click decorator) and source-specific variables from os.getenv with envvar= on Click options.
The connector library PR and the CLI integration PR are kept separate by convention. Submit the connector under neocarta/connectors/ first; add the CLI command in a follow-up PR.

Conformance Tests

Every connector ships a conformance test at tests/unit/connectors/<name>/test_conformance.py. The scaffold generates this file. It covers:
# tests/unit/connectors/my_source/test_conformance.py
import warnings
import pytest
from neocarta.connectors._base import SourceConnectorProtocol
from neocarta.connectors.my_source.schema import MySourceSchemaConnector


def make_connector():
    """Build a connector with minimal mock dependencies."""
    from unittest.mock import MagicMock
    from neo4j import GraphDatabase
    driver = MagicMock(spec=GraphDatabase.driver("bolt://localhost:7687"))
    return MySourceSchemaConnector(client=MagicMock(), neo4j_driver=driver)


def test_protocol_conformance():
    """Connector must satisfy SourceConnectorProtocol."""
    assert isinstance(make_connector(), SourceConnectorProtocol)


def test_public_stage_methods():
    """Connector must expose all required public stage methods."""
    connector = make_connector()
    assert callable(connector.extract)
    assert callable(connector.transform)
    assert callable(connector.load)
    assert callable(connector.ingest)
    assert callable(connector.close)


def test_transform_before_extract_raises():
    """transform() before extract() must raise StateError."""
    from neocarta.errors import StateError
    with pytest.raises(StateError):
        make_connector().transform()


def test_load_before_transform_raises():
    """load() before transform() must raise StateError."""
    from neocarta.errors import StateError
    with pytest.raises(StateError):
        make_connector().load()


def test_run_emits_deprecation_warning():
    """run() must emit a DeprecationWarning."""
    connector = make_connector()
    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")
        try:
            connector.run()
        except Exception:
            pass
        assert any(issubclass(warning.category, DeprecationWarning) for warning in w)
Run the full unit suite to verify:
make test-unit       # all unit tests including conformance
make fmt && make lint  # must be clean before PR

Build docs developers (and LLMs) love