class RelationshipToSchemaTransformer(BaseTransformer):
"""Replaces relationships with a schema.
This transformer analyzes the relationships in the graph and modifies them to be part of the schema
for Assets, Events, Files, Sequences, and TimeSeries. Relationships without any properties
are replaced by a simple relationship between the source and target nodes. Relationships with
properties are replaced by a schema that contains the properties as attributes.
Args:
limit: The minimum number of relationships that need to be present for it
to be converted into a schema. Default is 1.
"""
def __init__(self, limit: int = 1, namespace: Namespace = CLASSIC_CDF_NAMESPACE) -> None:
self._limit = limit
self._namespace = namespace
_NOT_PROPERTIES: frozenset[str] = frozenset(
{"source_external_id", "target_external_id", "external_id", "source_type", "target_type"}
)
_RELATIONSHIP_NODE_TYPES: tuple[str, ...] = tuple(["Asset", "Event", "File", "Sequence", "TimeSeries"])
description = "Replaces relationships with a schema"
_use_only_once: bool = True
_need_changes = frozenset({str(extractors.RelationshipsExtractor.__name__)})
_count_by_source_target = """PREFIX classic: <{namespace}>
SELECT (COUNT(?instance) AS ?instanceCount)
WHERE {{
?instance a classic:Relationship .
?instance classic:source_type classic:{source_type} .
?instance classic:target_type classic:{target_type} .
}}"""
_instances = """PREFIX classic: <{namespace}>
SELECT ?instance
WHERE {{
?instance a classic:Relationship .
?instance classic:source_type classic:{source_type} .
?instance classic:target_type classic:{target_type} .
}}"""
_lookup_entity_query = """PREFIX classic: <{namespace}>
SELECT ?entity
WHERE {{
?entity a classic:{entity_type} .
?entity classic:external_id "{external_id}" .
}}"""
def transform(self, graph: Graph) -> None:
for source_type in self._RELATIONSHIP_NODE_TYPES:
for target_type in self._RELATIONSHIP_NODE_TYPES:
query = self._count_by_source_target.format(
namespace=self._namespace, source_type=source_type, target_type=target_type
)
for instance_count in graph.query(query):
if int(instance_count[0]) < self._limit: # type: ignore[index, arg-type]
continue
query = self._instances.format(
namespace=self._namespace, source_type=source_type, target_type=target_type
)
for result in graph.query(query):
instance_id = cast(URIRef, result[0]) # type: ignore[index, misc]
self._convert_relationship_to_schema(graph, instance_id, source_type, target_type)
def _convert_relationship_to_schema(
self, graph: Graph, instance_id: URIRef, source_type: str, target_type: str
) -> None:
result = cast(list[ResultRow], list(graph.query(f"DESCRIBE <{instance_id}>")))
object_by_predicates = cast(
dict[str, URIRef | Literal], {remove_namespace_from_uri(row[1]): row[2] for row in result}
)
source_external_id = cast(URIRef, object_by_predicates["source_external_id"])
target_source_id = cast(URIRef, object_by_predicates["target_external_id"])
try:
source_id = self._lookup_entity(graph, source_type, source_external_id)
except ValueError:
warnings.warn(ResourceNotFoundWarning(source_external_id, "class", str(instance_id), "class"), stacklevel=2)
return None
try:
target_id = self._lookup_entity(graph, target_type, target_source_id)
except ValueError:
warnings.warn(ResourceNotFoundWarning(target_source_id, "class", str(instance_id), "class"), stacklevel=2)
return None
external_id = str(object_by_predicates["external_id"])
# If there is properties on the relationship, we create a new intermediate node
self._create_node(graph, object_by_predicates, external_id, source_id, target_id, self._predicate(target_type))
for triple in result:
graph.remove(triple) # type: ignore[arg-type]
def _lookup_entity(self, graph: Graph, entity_type: str, external_id: str) -> URIRef:
query = self._lookup_entity_query.format(
namespace=self._namespace, entity_type=entity_type, external_id=external_id
)
result = list(graph.query(query))
if len(result) == 1:
return cast(URIRef, result[0][0]) # type: ignore[index]
raise ValueError(f"Could not find entity with external_id {external_id} and type {entity_type}")
def _create_node(
self,
graph: Graph,
objects_by_predicates: dict[str, URIRef | Literal],
external_id: str,
source_id: URIRef,
target_id: URIRef,
predicate: URIRef,
) -> None:
"""Creates a new intermediate node for the relationship with properties."""
# Create new node
instance_id = self._namespace[external_id]
graph.add((instance_id, RDF.type, self._namespace["Edge"]))
for prop_name, object_ in objects_by_predicates.items():
if prop_name in self._NOT_PROPERTIES:
continue
graph.add((instance_id, self._namespace[prop_name], object_))
# Connect the new node to the source and target nodes
graph.add((source_id, predicate, instance_id))
graph.add((instance_id, self._namespace["end_node"], target_id))
def _predicate(self, target_type: str) -> URIRef:
return self._namespace[f"relationship{target_type.capitalize()}"]