Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jbarrasa/goingmeta/llms.txt

Use this file to discover all available pages before exploring further.

Session 35 (Season 2, Episode 8 — April 2025) elevates the KG construction pipeline to a fully agentic workflow using LangGraph. The central problem it solves is ontology selection: given an arbitrary piece of text, how do you automatically find the best existing ontology from a catalog to guide KG extraction — and what do you do when none exists? The answer is a StateGraph with four nodes, a conditional branch, and a fall-through that proposes a new candidate ontology using a local Ollama model.

Watch the Recording

Full live-stream replay on YouTube

Session Code

Python: flow.py and onto_utils.py

The State Schema

The workflow’s shared state is defined as a TypedDict. Every node reads from and writes to this dictionary — it is the single shared context across the graph:
from typing_extensions import TypedDict

class State(TypedDict, total=False):
    user_text: str               # The input text provided by the user
    extracted_ontology: str      # JSON: key categories and relationship types
    text_coverage: float         # Fraction of extracted categories matched in the catalog
    matched_ontologies: str      # List of ontology URLs found in the catalog
    validation_response: str     # User's yes/no on adding a new ontology to the catalog
total=False makes all fields optional, which is required because each node only populates the fields it is responsible for.

Node 1: extract_ontology

The first node calls a local Ollama gemma3:4b model to parse the user’s text and extract a rudimentary ontology — a list of entity categories and the relationship types between them. The output is a JSON string conforming to a simple schema:
from ollama import chat, ChatResponse
import re, json

def extract_ontology(state: State):
    PROMPT = f"""
    Analyze the following text and extract a rudimentary ontology:
    1. Categories of entities mentioned in the text (persons, objects, locations, events, etc).
    2. Relationship types between these categories.

    Format:
    {{ "categories": [ "category1", "category2", "category3"...],
       "relationshipTypes" : [
        {{ name: "relType1", fromCat: "category1", to: "category2" }},
        {{ name: "relType2", fromCat: "category3", to: "category1" }},
       ]
    }}
    Do not generate any additional notes or comments.

    Text:
    \"\"\"{state['user_text']}\"\"\"
    """

    response: ChatResponse = chat(model='gemma3:4b', messages=[
        {'role': 'user', 'content': PROMPT},
    ])

    cleaned = re.sub(
        r"```json\s*([\s\S]+?)\s*```", r"\1",
        response['message']['content'].strip(),
        flags=re.IGNORECASE
    )

    try:
        json.loads(cleaned)
    except json.JSONDecodeError as e:
        print("Failed to parse as JSON:", e)
        cleaned = "{}"

    return {"extracted_ontology": cleaned}
Using a local Ollama model (gemma3:4b) for the initial extraction step keeps the lightweight ontology parsing offline and cost-free — reserving the more capable (and more expensive) OpenAI models for the KG construction step downstream.

Node 2: lookup_ontology

The second node takes the extracted categories and queries a vector index on an ontology catalog in Neo4j. It uses genai.vector.encode() to embed each category on the fly and compares it against class descriptions stored in the index:
from neo4j import GraphDatabase
import os

def lookup_ontology(state: State):
    if state["extracted_ontology"]:
        NEO4J_URI = "bolt://localhost:7687"
        driver = GraphDatabase.driver(NEO4J_URI, auth=("neo4j", "neoneoneo"), database="onto")

        query = f"""
        UNWIND $categories AS cat
        CALL db.index.vector.queryNodes("label_and_desc", 1,
            genai.vector.encode(cat, 'OpenAI', {{ token: "{os.getenv("OPENAI_API_KEY")}" }}))
        YIELD node, score
        WHERE score > 0.92
        WITH $categories AS lookup_cats,
             collect({{cat: cat, matching_uri: node.uri, score: score, prov: node.prov}}) AS results
        RETURN lookup_cats,
               results AS detailed_results,
               size(results) * 1.0 / size(lookup_cats) AS coverage,
               apoc.convert.toSet([x IN results | x.prov]) AS onto_list
        """

        with driver.session() as session:
            query_result = session.run(query, json.loads(state["extracted_ontology"]) or {})
            result = [record.data() for record in query_result]
            print("Detailed results of Ontology lookup:")
            print(result)

        return {
            "matched_ontologies": result[0]['onto_list'] if result else [],
            "text_coverage": float(result[0]['coverage'] if result else 0)
        }
    else:
        return {"matched_ontologies": [], "text_coverage": 0}
The coverage metric (number of matched categories / total categories) becomes the decision variable at the conditional branch.

Conditional Branch: ontology_exists

A simple threshold gate decides whether to proceed with an existing ontology or propose a new one:
def ontology_exists(state: State):
    return "PASS" if state.get("text_coverage") > 0.3 else "FAIL"
A coverage above 30% means an existing ontology covers enough of the input’s vocabulary to be useful. Below that threshold, the workflow routes to propose_candidate_ontology.

Node 3: extract_graph (PASS branch)

When a matching ontology is found, this node downloads the ontology URLs, converts them to a neo4j-graphrag SchemaConfig using getSchemaFromOnto(), and runs SimpleKGPipeline:
from rdflib import Graph
from neo4j_graphrag.experimental.pipeline.kg_builder import SimpleKGPipeline
from neo4j_graphrag.embeddings import OpenAIEmbeddings
from neo4j_graphrag.llm.openai_llm import OpenAILLM
from neo4j_graphrag.experimental.components.text_splitters.fixed_size_splitter import FixedSizeSplitter
from onto_utils import getSchemaFromOnto

def extract_graph(state: State):
    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "neoneoneo"))

    g = Graph()
    for url in state['matched_ontologies']:
        print("Retrieving onto:", url)
        g.parse(url)

    neo4j_schema = getSchemaFromOnto(g)

    splitter = FixedSizeSplitter(chunk_size=2500, chunk_overlap=10)
    embedder = OpenAIEmbeddings(model="text-embedding-3-small")
    llm = OpenAILLM(
        model_name="gpt-4o",
        model_params={
            "max_tokens": 3000,
            "response_format": {"type": "json_object"},
            "temperature": 0,
        },
    )

    kg_builder = SimpleKGPipeline(
        llm=llm,
        driver=driver,
        text_splitter=splitter,
        embedder=embedder,
        entities=list(neo4j_schema.entities.values()),
        relations=list(neo4j_schema.relations.values()),
        potential_schema=neo4j_schema.potential_schema,
        on_error="IGNORE",
        from_pdf=False,
    )

    st = asyncio.run(kg_builder.run_async(text=state['user_text']))
    return {}

Node 4: propose_candidate_ontology (FAIL branch)

When no catalog match is found, the workflow uses gemma3 again to convert the extracted rudimentary ontology (categories + relationship types) into a proper Turtle OWL serialization, then asks the user for approval before adding it to the catalog:
def propose_candidate_ontology(state: State):
    PROMPT = f"""
    Analyze the following rudimentary ontology...
    Produce an OWL based serialisation in Turtle format for that description.
    Essentially create a owl:Class out of each category and an owl:ObjectProperty out of each relationship.
    Then add rdfs:domain to the category in 'fromCat' and rdfs:range to the category in 'to'.
    Do not generate any additional notes or comments.

    Ontology:
    {state['extracted_ontology']}
    """

    response: ChatResponse = chat(model='gemma3:4b', messages=[
        {'role': 'user', 'content': PROMPT},
    ])

    cleaned = re.sub(
        r"```turtle\s*([\s\S]+?)\s*```", r"\1",
        response['message']['content'].strip(),
        flags=re.IGNORECASE
    )
    print(cleaned)

    response = input("Do you want to add this ontology to the catalog? (yes/no): ")
    return {"validation_response": response}

Assembling the StateGraph

The full workflow wires the nodes together with edges and a conditional branch:
from langgraph.graph import StateGraph, START, END

workflow = StateGraph(State)

workflow.add_node("extract_ontology", extract_ontology)
workflow.add_node("lookup_ontology", lookup_ontology)
workflow.add_node("extract_graph", extract_graph)
workflow.add_node("propose_candidate_ontology", propose_candidate_ontology)

workflow.add_edge(START, "extract_ontology")
workflow.add_edge("extract_ontology", "lookup_ontology")
workflow.add_conditional_edges(
    "lookup_ontology",
    ontology_exists,
    {"PASS": "extract_graph", "FAIL": "propose_candidate_ontology"}
)
workflow.add_edge("extract_graph", END)
workflow.add_edge("propose_candidate_ontology", END)

chain = workflow.compile()
A Mermaid diagram of the compiled graph is exported as flow.png:
from IPython.display import Image

flow_graph = Image(chain.get_graph().draw_mermaid_png())
with open("flow.png", "wb") as f:
    f.write(flow_graph.data)

Invoking the Workflow

initial_state = {
    "user_text": "Alex Erdl and Jesus Barrasa work for Neo4j"
}
state = chain.invoke(initial_state)
print(state)

Workflow Design Highlights

Local LLM for extraction

Ollama gemma3:4b runs locally — ontology extraction and candidate proposal happen without any external API calls, keeping costs low and latency predictable.

Vector catalog matching

The ontology catalog is stored in Neo4j with vector embeddings of class labels and descriptions. Coverage scoring enables a principled, threshold-based go/no-go decision.

Human-in-the-loop gate

The FAIL branch asks for human approval before adding a new ontology to the catalog — preventing the catalog from being polluted by low-quality or duplicate ontologies.

SimpleKGPipeline integration

The PASS branch feeds directly into SimpleKGPipeline from neo4j-graphrag, using the ontology-derived schema for constrained extraction and Neo4j ingestion.
Session 36 addresses what happens when two existing ontologies in the catalog partially overlap — covering the structural patterns for merging, aligning, and reconciling them into a consistent schema.

Build docs developers (and LLMs) love