CONTEXT I have setup a neo4j vector index in combination with cypher query to fetch specific nodes and and traverse through their paths to return specific data. Given this data will be passed to an LLM for summarisation, the output should be as clean as possible.
FUNCTION
def generate_employee_cypher_query(schema)
: fetches specific paths from entity Èmployee` by following the schema/ontology paths:
def generate_employee_cypher_query(schema):
employee_schema = schema["Employee"]
match_clauses = []
return_blocks = []
# Initial vector search
base_query = """
CALL db.index.vector.queryNodes($index_name, $n_results, $query_embedding)
YIELD node AS employee, score
WHERE employee:Employee AND employee.user_id = $user_id
MATCH (employee)
"""
# Generate MATCH clauses from schema relationships
for rel in employee_schema["relationships"]:
rel_type = rel["type"]
end_node = rel["endNode"]
cardinality = rel.get("cardinality", "0..n") # Default to 0..n if not specified
# Handle different cardinalities and relationship directions
if rel["startNode"] == "Employee":
# Outgoing relationship from Employee
match_clauses.append(
f"OPTIONAL MATCH (employee)-[{rel_type.lower()}Rel:{rel_type}]->"
f"({rel_type.lower()}:{end_node})"
)
# Special handling for relationships with TimeFrame
if end_node in ["Availability", "Unavailability"]:
match_clauses.append(
f"OPTIONAL MATCH ({rel_type.lower()})-[:HAS_TIMEFRAME]->"
f"({rel_type.lower()}TimeFrame:TimeFrame)"
)
else:
# Incoming relationship to Employee
match_clauses.append(
f"OPTIONAL MATCH ({rel_type.lower()}:{end_node})"
f"-[{rel_type.lower()}Rel:{rel_type}]->(employee)"
)
# Generate return blocks for each relationship
return_blocks.append("""
employee.name AS employeeName,
score,
apoc.convert.toJson(
CASE WHEN employee IS NOT NULL
THEN apoc.map.removeKeys(properties(employee),
['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
)
ELSE {}
END
) AS employeeJson,
""")
# Start connections array
return_blocks.append("apoc.convert.toJson([")
# Generate individual connection blocks
connection_blocks = []
for rel in employee_schema["relationships"]:
rel_type = rel["type"]
end_node = rel["endNode"]
cardinality = rel.get("cardinality", "0..n")
# Handle cardinality in return statement
is_single = cardinality in ["1..1", "0..1"]
collection_suffix = "[0]" if is_single else ""
if end_node in ["Availability", "Unavailability"]:
# Special handling for timeframe relationships
connection_blocks.append(f"""{{
type: '{rel_type}',
{rel_type.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL
THEN {{
employeeName: {rel_type.lower()}.employeeName,
timeframe: CASE WHEN {rel_type.lower()}TimeFrame IS NOT NULL
THEN {{
dateIndicator: {rel_type.lower()}TimeFrame.dateIndicator,
type: {rel_type.lower()}TimeFrame.type,
recurring: {rel_type.lower()}TimeFrame.recurring
}}
ELSE null
END
}}
ELSE null END){collection_suffix}
}}""")
else:
# Standard relationship handling
connection_blocks.append(f"""{{
type: '{rel_type}',
{end_node.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL
THEN apoc.map.removeKeys(properties({rel_type.lower()}),
['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
)
ELSE null END){collection_suffix}
}}""")
# Close connections array
return_blocks.append(",\n".join(connection_blocks))
return_blocks.append("]) AS connectionsJson")
# Combine all query parts
full_query = (
base_query +
"\n".join(match_clauses) +
"\nRETURN " +
"\n".join(return_blocks)
)
return full_query
Now, the output of this specific function can be seen in the example below:
Employee Vector Search Results:
[
{
"employeeName": "Emma Williams",
"score": 0.6321649551391602,
"employee": {
"name": "Emma Williams",
"email": "emma.w@hotelexample.com"
},
"connections": [
{
"contract": {
"contractType": "Part-time"
},
"type": "HAS_CONTRACT_TYPE"
},
{
"has_unavailability": [],
"type": "HAS_UNAVAILABILITY"
},
{
"has_availability": [
{
"employeeName": "Emma Williams",
"timeframe": {
"recurring": true,
"dateIndicator": "Thu-Sat 16:00-00:00",
"type": "DayOfWeek"
}
}
],
"type": "HAS_AVAILABILITY"
},
{
"team": {
"name": "F&B"
},
"type": "BELONGS_TO"
},
{
"education": [],
"type": "HAS_EDUCATION"
},
{
"type": "HAS_CERTIFICATION",
"certification": [
{
"name": "Alcohol Service"
},
{
"name": "Mixology Certificate"
}
]
},
notice in the above output the empty strings in "education": [],
or "has_unavailability": [],
. I want those values to not be included in the output.
thank you in advance Manuel
I tried using CASE expressions or apoc.map.clean but did not seem to lead to the results i wanted.
While it would be possible to do that by changing your Python and Cypher code, it would be much simpler to just modify the JSON Employee Vector Search Results
.
For example, here is a sample function that takes your Employee Vector Search Results
and filters out, from every employee's connections
list, all objects that have a type
key and another key whose value is an empty list:
def filter_employee_connections(json_result):
result = json.loads(json_result)
for employee in result:
employee["connections"] = [
c for c in employee["connections"]
if not any(isinstance(val, list) and not val for key, val in c.items() if key != "type")
]
return json.dumps(result, indent=2)
The resulting JSON would look something like this:
[
{
"employeeName": "Emma Williams",
"score": 0.6321649551391602,
"employee": {
"name": "Emma Williams",
"email": "emma.w@hotelexample.com"
},
"connections": [
{
"contract": {
"contractType": "Part-time"
},
"type": "HAS_CONTRACT_TYPE"
},
{
"has_availability": [
{
"employeeName": "Emma Williams",
"timeframe": {
"recurring": true,
"dateIndicator": "Thu-Sat 16:00-00:00",
"type": "DayOfWeek"
}
}
],
"type": "HAS_AVAILABILITY"
},
{
"team": {
"name": "F&B"
},
"type": "BELONGS_TO"
},
{
"type": "HAS_CERTIFICATION",
"certification": [
{
"name": "Alcohol Service"
},
{
"name": "Mixology Certificate"
}
]
}
]
}
]