Skip to content

Transformers

cognite.neat._graph.transformers #

AddAssetDepth #

Bases: BaseTransformer

Source code in cognite/neat/_graph/transformers/_classic_cdf.py
class AddAssetDepth(BaseTransformer):
    description: str = "Adds depth of asset in the asset hierarchy to the graph"
    _use_only_once: bool = True
    _need_changes = frozenset({str(extractors.AssetsExtractor.__name__)})

    _parent_template: str = """SELECT ?child ?parent WHERE {{
                              <{asset_id}> <{parent_prop}> ?child .
                              OPTIONAL{{?child <{parent_prop}>+ ?parent .}}}}"""

    _root_template: str = """SELECT ?root WHERE {{
                             <{asset_id}> <{root_prop}> ?root .}}"""

    def __init__(
        self,
        asset_type: URIRef | None = None,
        root_prop: URIRef | None = None,
        parent_prop: URIRef | None = None,
        depth_typing: dict[int, str] | None = None,
    ):
        self.asset_type = asset_type or DEFAULT_NAMESPACE.Asset
        self.root_prop = root_prop or DEFAULT_NAMESPACE.root
        self.parent_prop = parent_prop or DEFAULT_NAMESPACE.parent
        self.depth_typing = depth_typing

    def transform(self, graph: Graph) -> None:
        """Adds depth of asset in the asset hierarchy to the graph."""
        for result in graph.query(f"SELECT DISTINCT ?asset_id WHERE {{?asset_id a <{self.asset_type}>}}"):
            asset_id = cast(tuple, result)[0]
            if depth := self.get_depth(graph, asset_id, self.root_prop, self.parent_prop):
                graph.add((asset_id, DEFAULT_NAMESPACE.depth, Literal(depth)))

                if self.depth_typing and (type_ := self.depth_typing.get(depth, None)):
                    # remove existing type
                    graph.remove((asset_id, RDF.type, None))

                    # add new type
                    graph.add((asset_id, RDF.type, DEFAULT_NAMESPACE[type_]))

    @classmethod
    def get_depth(
        cls,
        graph: Graph,
        asset_id: URIRef,
        root_prop: URIRef,
        parent_prop: URIRef,
    ) -> int | None:
        """Get asset depth in the asset hierarchy."""

        # Handles non-root assets
        if result := list(graph.query(cls._parent_template.format(asset_id=asset_id, parent_prop=parent_prop))):
            return len(cast(list[tuple], result)) + 2 if cast(list[tuple], result)[0][1] else 2

        # Handles root assets
        elif (
            (result := list(graph.query(cls._root_template.format(asset_id=asset_id, root_prop=root_prop))))
            and len(cast(list[tuple], result)) == 1
            and cast(list[tuple], result)[0][0] == asset_id
        ):
            return 1
        else:
            return None

transform(graph) #

Adds depth of asset in the asset hierarchy to the graph.

Source code in cognite/neat/_graph/transformers/_classic_cdf.py
def transform(self, graph: Graph) -> None:
    """Adds depth of asset in the asset hierarchy to the graph."""
    for result in graph.query(f"SELECT DISTINCT ?asset_id WHERE {{?asset_id a <{self.asset_type}>}}"):
        asset_id = cast(tuple, result)[0]
        if depth := self.get_depth(graph, asset_id, self.root_prop, self.parent_prop):
            graph.add((asset_id, DEFAULT_NAMESPACE.depth, Literal(depth)))

            if self.depth_typing and (type_ := self.depth_typing.get(depth, None)):
                # remove existing type
                graph.remove((asset_id, RDF.type, None))

                # add new type
                graph.add((asset_id, RDF.type, DEFAULT_NAMESPACE[type_]))

get_depth(graph, asset_id, root_prop, parent_prop) classmethod #

Get asset depth in the asset hierarchy.

Source code in cognite/neat/_graph/transformers/_classic_cdf.py
@classmethod
def get_depth(
    cls,
    graph: Graph,
    asset_id: URIRef,
    root_prop: URIRef,
    parent_prop: URIRef,
) -> int | None:
    """Get asset depth in the asset hierarchy."""

    # Handles non-root assets
    if result := list(graph.query(cls._parent_template.format(asset_id=asset_id, parent_prop=parent_prop))):
        return len(cast(list[tuple], result)) + 2 if cast(list[tuple], result)[0][1] else 2

    # Handles root assets
    elif (
        (result := list(graph.query(cls._root_template.format(asset_id=asset_id, root_prop=root_prop))))
        and len(cast(list[tuple], result)) == 1
        and cast(list[tuple], result)[0][0] == asset_id
    ):
        return 1
    else:
        return None

RelationshipToSchemaTransformer #

Bases: BaseTransformer

Replaces relationships with a schema.

This transformer analyzes the relationships in the graph and modifies them to be part of the schema for Assets, Events, Files, Sequences, and TimeSeries. Relationships without any properties are replaced by a simple relationship between the source and target nodes. Relationships with properties are replaced by a schema that contains the properties as attributes.

Parameters:

Name Type Description Default
limit int

The minimum number of relationships that need to be present for it to be converted into a schema. Default is 1.

1
Source code in cognite/neat/_graph/transformers/_classic_cdf.py
class RelationshipToSchemaTransformer(BaseTransformer):
    """Replaces relationships with a schema.

    This transformer analyzes the relationships in the graph and modifies them to be part of the schema
    for Assets, Events, Files, Sequences, and TimeSeries. Relationships without any properties
    are replaced by a simple relationship between the source and target nodes. Relationships with
    properties are replaced by a schema that contains the properties as attributes.

    Args:
        limit: The minimum number of relationships that need to be present for it
            to be converted into a schema. Default is 1.

    """

    def __init__(self, limit: int = 1, namespace: Namespace = CLASSIC_CDF_NAMESPACE) -> None:
        self._limit = limit
        self._namespace = namespace

    _NOT_PROPERTIES: frozenset[str] = frozenset(
        {"source_external_id", "target_external_id", "external_id", "source_type", "target_type"}
    )
    _RELATIONSHIP_NODE_TYPES: tuple[str, ...] = tuple(["Asset", "Event", "File", "Sequence", "TimeSeries"])
    description = "Replaces relationships with a schema"
    _use_only_once: bool = True
    _need_changes = frozenset({str(extractors.RelationshipsExtractor.__name__)})

    _count_by_source_target = """PREFIX classic: <{namespace}>

SELECT (COUNT(?instance) AS ?instanceCount)
WHERE {{
  ?instance a classic:Relationship .
  ?instance classic:source_type classic:{source_type} .
  ?instance classic:target_type classic:{target_type} .
}}"""

    _instances = """PREFIX classic: <{namespace}>

SELECT ?instance
WHERE {{
    ?instance a classic:Relationship .
    ?instance classic:source_type classic:{source_type} .
    ?instance classic:target_type classic:{target_type} .
}}"""
    _lookup_entity_query = """PREFIX classic: <{namespace}>

SELECT ?entity
WHERE {{
    ?entity a classic:{entity_type} .
    ?entity classic:external_id "{external_id}" .
}}"""

    def transform(self, graph: Graph) -> None:
        for source_type in self._RELATIONSHIP_NODE_TYPES:
            for target_type in self._RELATIONSHIP_NODE_TYPES:
                query = self._count_by_source_target.format(
                    namespace=self._namespace, source_type=source_type, target_type=target_type
                )
                for instance_count in graph.query(query):
                    if int(instance_count[0]) < self._limit:  # type: ignore[index, arg-type]
                        continue
                    query = self._instances.format(
                        namespace=self._namespace, source_type=source_type, target_type=target_type
                    )
                    for result in graph.query(query):
                        instance_id = cast(URIRef, result[0])  # type: ignore[index, misc]
                        self._convert_relationship_to_schema(graph, instance_id, source_type, target_type)

    def _convert_relationship_to_schema(
        self, graph: Graph, instance_id: URIRef, source_type: str, target_type: str
    ) -> None:
        result = cast(list[ResultRow], list(graph.query(f"DESCRIBE <{instance_id}>")))
        object_by_predicates = cast(
            dict[str, URIRef | Literal], {remove_namespace_from_uri(row[1]): row[2] for row in result}
        )
        source_external_id = cast(URIRef, object_by_predicates["source_external_id"])
        target_source_id = cast(URIRef, object_by_predicates["target_external_id"])
        try:
            source_id = self._lookup_entity(graph, source_type, source_external_id)
        except ValueError:
            warnings.warn(ResourceNotFoundWarning(source_external_id, "class", str(instance_id), "class"), stacklevel=2)
            return None
        try:
            target_id = self._lookup_entity(graph, target_type, target_source_id)
        except ValueError:
            warnings.warn(ResourceNotFoundWarning(target_source_id, "class", str(instance_id), "class"), stacklevel=2)
            return None
        external_id = str(object_by_predicates["external_id"])
        # If there is properties on the relationship, we create a new intermediate node
        self._create_node(graph, object_by_predicates, external_id, source_id, target_id, self._predicate(target_type))

        for triple in result:
            graph.remove(triple)  # type: ignore[arg-type]

    def _lookup_entity(self, graph: Graph, entity_type: str, external_id: str) -> URIRef:
        query = self._lookup_entity_query.format(
            namespace=self._namespace, entity_type=entity_type, external_id=external_id
        )
        result = list(graph.query(query))
        if len(result) == 1:
            return cast(URIRef, result[0][0])  # type: ignore[index]
        raise ValueError(f"Could not find entity with external_id {external_id} and type {entity_type}")

    def _create_node(
        self,
        graph: Graph,
        objects_by_predicates: dict[str, URIRef | Literal],
        external_id: str,
        source_id: URIRef,
        target_id: URIRef,
        predicate: URIRef,
    ) -> None:
        """Creates a new intermediate node for the relationship with properties."""
        # Create new node
        instance_id = self._namespace[external_id]
        graph.add((instance_id, RDF.type, self._namespace["Edge"]))
        for prop_name, object_ in objects_by_predicates.items():
            if prop_name in self._NOT_PROPERTIES:
                continue
            graph.add((instance_id, self._namespace[prop_name], object_))

        # Connect the new node to the source and target nodes
        graph.add((source_id, predicate, instance_id))
        graph.add((instance_id, self._namespace["end_node"], target_id))

    def _predicate(self, target_type: str) -> URIRef:
        return self._namespace[f"relationship{target_type.capitalize()}"]