Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jbarrasa/goingmeta/llms.txt

Use this file to discover all available pages before exploring further.

Session 5 of Going Meta (broadcast June 13, 2022) tackles a practical engineering problem: when you have an ontology describing a domain, how do you use it to drive the construction of a knowledge graph rather than writing bespoke ETL code by hand? Using a UK rail network as the example domain, Jesús builds a Python pipeline that reads rail.ttl with RDFLib, queries the ontology for its classes and properties, auto-generates parameterised Cypher MERGE statements, applies a data mapping configuration, and loads Neo4j in batched transactions via the official Python driver.

What You Will Learn

  • Reading an OWL ontology from a URL with RDFLib and querying it with SPARQL
  • Extracting classes, datatype properties, and object properties from an ontology programmatically
  • Auto-generating Cypher MERGE / SET statements from ontology structure
  • Applying a mapping dictionary to connect ontology property names to CSV column names
  • Loading Neo4j in batched transactions using session.write_transaction for reliability
  • Exporting n10s namespace and mapping definitions so the graph can also be queried as RDF

The Rail Ontology

The rail.ttl OWL ontology defines two classes and several properties that will become nodes, relationships, and attributes in the Neo4j graph:
@prefix :    <http://onto.neo4j.com/rail#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

# Object properties (become relationships)
:affects rdf:type owl:ObjectProperty ;
         rdfs:domain :Event ;
         rdfs:range  :Station .

:link rdf:type owl:ObjectProperty ;
      rdfs:domain :Station ;
      rdfs:range  :Station .

# Datatype properties on Station (become node attributes)
:stationCode rdf:type owl:DatatypeProperty ;
             rdfs:domain :Station ;
             rdfs:range xsd:string .

:stationName rdf:type owl:DatatypeProperty ;
             rdfs:domain :Station ;
             rdfs:range xsd:string .

:stationAddress rdf:type owl:DatatypeProperty ;
                rdfs:domain :Station ;
                rdfs:range xsd:string .

:lat rdf:type owl:DatatypeProperty ;
     rdfs:domain :Station ;
     rdfs:range xsd:float .

:long rdf:type owl:DatatypeProperty ;
      rdfs:domain :Station ;
      rdfs:range xsd:float .

# Datatype properties on Event (become node attributes)
:eventId rdf:type owl:DatatypeProperty ;
         rdfs:domain :Event ;
         rdfs:range xsd:string .

:eventType rdf:type owl:DatatypeProperty ;
           rdfs:domain :Event ;
           rdfs:range xsd:string .

:eventDescription rdf:type owl:DatatypeProperty ;
                  rdfs:domain :Event ;
                  rdfs:range xsd:string .

:timestamp rdf:type owl:DatatypeProperty ;
           rdfs:domain :Event .

# Classes
:Station rdf:type owl:Class .
:Event   rdf:type owl:Class .

Python ETL Pipeline

Step 1 — Install Dependencies

pip install rdflib neo4j

Step 2 — URI Utility Helpers

import rdflib, time

def getLocalPart(uri):
    pos = uri.rfind('#')
    if pos < 0:
        pos = uri.rfind('/')
    if pos < 0:
        pos = uri.rindex(':')
    return uri[pos + 1:]

def getNamespacePart(uri):
    pos = uri.rfind('#')
    if pos < 0:
        pos = uri.rfind('/')
    if pos < 0:
        pos = uri.rindex(':')
    return uri[0:pos + 1]

Step 3 — Define the Data Mappings

The mapping dictionary connects each ontology class/property name to the corresponding CSV column header. Special keys (@fileName, @uniqueId, @from, @to) control the ETL behaviour.
railMappings = {}

stationMapping = {
    "@fileName":  "https://raw.githubusercontent.com/jbarrasa/goingmeta/main/session05/data/nr-stations-all.csv",
    "@uniqueId":  "stationCode",
    "lat":            "lat",
    "long":           "long",
    "stationAddress": "address",
    "stationCode":    "crs",
    "stationName":    "name",
}
railMappings["Station"] = stationMapping

eventMapping = {
    "@fileName":      "https://raw.githubusercontent.com/jbarrasa/goingmeta/main/session05/data/nr-events.csv",
    "@uniqueId":      "eventId",
    "eventDescription": "desc",
    "eventId":          "id",
    "timestamp":        "ts",
    "eventType":        "type",
}
railMappings["Event"] = eventMapping

linkMapping = {
    "@fileName": "https://raw.githubusercontent.com/jbarrasa/goingmeta/main/session05/data/nr-station-links.csv",
    "@from": "origin",
    "@to":   "destination",
}
railMappings["link"] = linkMapping

affectsMapping = {
    "@fileName": "https://raw.githubusercontent.com/jbarrasa/goingmeta/main/session05/data/nr-events.csv",
    "@from": "id",
    "@to":   "Station",
}
railMappings["affects"] = affectsMapping

Step 4 — Generate Cypher Loaders from the Ontology

The getLoadersFromOnto function reads the ontology, queries it with RDFLib SPARQL, and produces a dictionary of parameterised Cypher statements — one per class and one per object property — ready to be executed against Neo4j.
def getLoadersFromOnto(onto, rdf_format, mappings):
    g = rdflib.Graph()
    g.parse(onto, format=rdf_format)

    classes_and_props_query = """
    prefix owl: <http://www.w3.org/2002/07/owl#>
    prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

    SELECT DISTINCT ?curi (GROUP_CONCAT(DISTINCT ?propTypePair ; SEPARATOR=",") AS ?props)
    WHERE {
        ?curi rdf:type owl:Class .
        OPTIONAL {
          ?prop rdfs:domain ?curi ;
                a owl:DatatypeProperty ;
                rdfs:range ?range .
          BIND (concat(str(?prop),';',str(?range)) AS ?propTypePair)
        }
    } GROUP BY ?curi
    """

    cypher_import   = {}
    export_ns       = set()
    export_mappings = {}

    for row in g.query(classes_and_props_query):
        export_ns.add(getNamespacePart(str(row.curi)))
        export_mappings[getLocalPart(str(row.curi))] = str(row.curi)
        cls = getLocalPart(str(row.curi))
        uid_col = mappings[cls]["@uniqueId"]
        uid_prop = mappings[cls][uid_col]

        cypher = [
            "unwind $records AS record",
            f"merge (n:{cls} {{ `{uid_col}`: record.`{uid_prop}`}} )",
        ]
        for pair in str(row.props).split(","):
            propName = pair.split(";")[0]
            propType = pair.split(";")[1]
            export_ns.add(getNamespacePart(propName))
            export_mappings[getLocalPart(propName)] = propName
            lp = getLocalPart(propName)
            if lp in mappings[cls] and lp != uid_col:
                cypher.append(f"set n.{lp} = record.`{mappings[cls][lp]}`")
        cypher.append("return count(*) as total")
        cypher_import[cls] = " \n".join(cypher)

    rels_query = """
    prefix owl: <http://www.w3.org/2002/07/owl#>
    prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

    SELECT DISTINCT ?rel ?dom ?ran
    WHERE {
        ?rel a ?propertyClass .
        FILTER(?propertyClass IN (rdf:Property, owl:ObjectProperty,
               owl:FunctionalProperty, owl:AsymmetricProperty,
               owl:InverseFunctionalProperty, owl:IrreflexiveProperty,
               owl:ReflexiveProperty, owl:SymmetricProperty,
               owl:TransitiveProperty))
        ?rel rdfs:domain ?dom ;
             rdfs:range  ?ran .
    }
    """

    for row in g.query(rels_query):
        rel = getLocalPart(str(row.rel))
        dom = getLocalPart(str(row.dom))
        ran = getLocalPart(str(row.ran))
        export_ns.add(getNamespacePart(str(row.rel)))
        export_mappings[rel] = str(row.rel)
        src_uid = mappings[dom]["@uniqueId"]
        tgt_uid = mappings[ran]["@uniqueId"]
        cypher = [
            "unwind $records AS record",
            f"match (source:{dom} {{ `{src_uid}`: record.`{mappings[rel]['@from']}`}} )",
            f"match (target:{ran} {{ `{tgt_uid}`: record.`{mappings[rel]['@to']}`}} )",
            f"merge (source)-[r:`{rel}`]->(target)",
            "return count(*) as total",
        ]
        cypher_import[rel] = " \n".join(cypher)

    nscount = 0
    mapping_export_cypher = []
    for ns in export_ns:
        mapping_export_cypher.append(f"call n10s.nsprefixes.add('ns{nscount}','{ns}');")
        nscount += 1
    for k in export_mappings:
        mapping_export_cypher.append(f"call n10s.mapping.add('{export_mappings[k]}','{k}');")

    return cypher_import, mapping_export_cypher

Step 5 — Load Neo4j in Batches

import pandas as pd
from neo4j import GraphDatabase, basic_auth

def insert_data(session, query, frame, batch_size=500):
    total, batch, start = 0, 0, time.time()
    while batch * batch_size < len(frame):
        res = session.write_transaction(
            lambda tx: tx.run(
                query,
                parameters={'records': frame[batch*batch_size:(batch+1)*batch_size].to_dict('records')}
            ).data()
        )
        total += res[0]['total']
        batch += 1
        print({"total": total, "batches": batch, "time": time.time() - start})

driver = GraphDatabase.driver(
    "bolt://<your-neo4j-host>:7687",
    auth=basic_auth("neo4j", "<your-password>")
)
session = driver.session(database="neo4j")

cypher_import, mapping_defs = getLoadersFromOnto(
    "https://raw.githubusercontent.com/jbarrasa/goingmeta/main/session05/ontos/rail.ttl",
    "turtle",
    railMappings
)

for q in cypher_import:
    print("Importing:", q, "from", railMappings[q]["@fileName"])
    df = pd.read_csv(railMappings[q]["@fileName"])
    insert_data(session, cypher_import[q], df, batch_size=300)

for md in mapping_defs:
    session.run(md)
The mapping definitions written at the end register n10s namespace prefixes and property mappings so that the loaded graph can also be exposed via the n10s RDF API — enabling round-trip SPARQL queries over the same data.
Replace the hardcoded connection string with environment variables or a secrets manager before running in production. The bolt:// address and credentials in the notebook are from a temporary session instance.

Resources

Watch the Recording

Full live-stream recording of Going Meta Session 5 on YouTube.

Session Code on GitHub

The Jupyter notebook, rail ontology, and CSV data files from this session.

Build docs developers (and LLMs) love