Skip to content

Extractors

cognite.neat._graph.extractors #

BaseExtractor #

This is the base class for all extractors. It defines the interface that extractors must implement.

Source code in cognite/neat/_graph/extractors/_base.py
class BaseExtractor:
    """This is the base class for all extractors. It defines the interface that
    extractors must implement.
    """

    @abstractmethod
    def extract(self) -> Iterable[Triple]:
        raise NotImplementedError()

    @classmethod
    def _repr_html_(cls) -> str:
        return class_html_doc(cls)

AssetsExtractor #

Bases: ClassicCDFBaseExtractor[Asset]

Extract data from Cognite Data Fusions Assets into Neat.

Parameters:

Name Type Description Default
items Iterable[Asset]

An iterable of assets.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Asset], str | None]

A function to convert an asset to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type "Asset".

None
total int

The total number of assets to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of assets to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

A set of values to skip when unpacking metadata. Defaults to frozenset({"nan", "null", "none", ""}).

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_assets.py
class AssetsExtractor(ClassicCDFBaseExtractor[Asset]):
    """Extract data from Cognite Data Fusions Assets into Neat.

    Args:
        items (Iterable[Asset]): An iterable of assets.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Asset], str | None], optional): A function to convert an asset to a type. Defaults to None.
            If None or if the function returns None, the asset will be set to the default type "Asset".
        total (int, optional): The total number of assets to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of assets to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): A set of values to skip when unpacking
            metadata. Defaults to frozenset({"nan", "null", "none", ""}).
    """

    _default_rdf_type = "Asset"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Asset], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.assets.aggregate_count(filter=AssetFilter(data_set_ids=[{"externalId": data_set_external_id}]))

        return cls(
            client.assets(data_set_external_ids=data_set_external_id),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_hierarchy(
        cls,
        client: CogniteClient,
        root_asset_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Asset], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.assets.aggregate_count(
            filter=AssetFilter(asset_subtree_ids=[{"externalId": root_asset_external_id}])
        )

        return cls(
            cast(
                Iterable[Asset],
                client.assets(asset_subtree_external_ids=root_asset_external_id),
            ),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Asset], str] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        assets = AssetList.load(Path(file_path).read_text())
        return cls(
            assets,
            namespace,
            to_type,
            total=len(assets),
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, asset: Asset) -> list[Triple]:
        """Converts an asset to triples."""
        id_ = self.namespace[f"{InstanceIdPrefix.asset}{asset.id}"]

        type_ = self._get_rdf_type(asset)

        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        if asset.name:
            triples.append((id_, self.namespace.name, Literal(asset.name)))

        if asset.description:
            triples.append((id_, self.namespace.description, Literal(asset.description)))

        if asset.external_id:
            triples.append((id_, self.namespace.external_id, Literal(asset.external_id)))

        if asset.source:
            triples.append((id_, self.namespace.source, Literal(asset.source)))

        # properties' ref creation and update
        triples.append(
            (
                id_,
                self.namespace.created_time,
                Literal(datetime.fromtimestamp(asset.created_time / 1000, timezone.utc)),
            )
        )
        triples.append(
            (
                id_,
                self.namespace.last_updated_time,
                Literal(datetime.fromtimestamp(asset.last_updated_time / 1000, timezone.utc)),
            )
        )

        if asset.labels:
            for label in asset.labels:
                # external_id can create ill-formed URIs, so we create websafe URIs
                # since labels do not have internal ids, we use the external_id as the id
                triples.append(
                    (
                        id_,
                        self.namespace.label,
                        self.namespace[f"{InstanceIdPrefix.label}{LabelsExtractor._label_id(label)}"],
                    )
                )

        if asset.metadata:
            triples.extend(self._metadata_to_triples(id_, asset.metadata))

        # Create connections:
        if asset.parent_id:
            triples.append((id_, self.namespace.parent, self.namespace[f"{InstanceIdPrefix.asset}{asset.parent_id}"]))

        if asset.root_id:
            triples.append((id_, self.namespace.root, self.namespace[f"{InstanceIdPrefix.asset}{asset.root_id}"]))

        if asset.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.dataset,
                    self.namespace[f"{InstanceIdPrefix.data_set}{asset.data_set_id}"],
                )
            )

        return triples

ClassicGraphExtractor #

Bases: BaseExtractor

This extractor extracts all classic CDF Resources.

The Classic Graph consists of the following core resource type.

Classic Node CDF Resources
  • Assets
  • TimeSeries
  • Sequences
  • Events
  • Files

All the classic node CDF resources can have one or more connections to one or more assets. This will match a direct relationship in the data modeling of CDF.

In addition, you have relationships between the classic node CDF resources. This matches an edge in the data modeling of CDF.

Finally, you have labels and data sets that to organize the graph. In which data sets have a similar, but different, role as a space in data modeling. While labels can be compared to node types in data modeling, used to quickly filter and find nodes/edges.

This extractor will extract the classic CDF graph into Neat starting from either a data set or a root asset.

It works as follows:

  1. Extract all core nodes (assets, time series, sequences, events, files) filtered by the given data set or root asset.
  2. Extract all relationships starting from any of the extracted core nodes.
  3. Extract all core nodes that are targets of the relationships that are not already extracted.
  4. Extract all labels that are connected to the extracted core nodes/relationships.
  5. Extract all data sets that are connected to the extracted core nodes/relationships.

Parameters:

Name Type Description Default
client CogniteClient

The Cognite client to use.

required
data_set_external_id str

The data set external id to extract from. Defaults to None.

None
root_asset_external_id str

The root asset external id to extract from. Defaults to None.

None
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
Source code in cognite/neat/_graph/extractors/_classic_cdf/_classic.py
class ClassicGraphExtractor(BaseExtractor):
    """This extractor extracts all classic CDF Resources.

    The Classic Graph consists of the following core resource type.

    Classic Node CDF Resources:
     - Assets
     - TimeSeries
     - Sequences
     - Events
     - Files

    All the classic node CDF resources can have one or more connections to one or more assets. This
    will match a direct relationship in the data modeling of CDF.

    In addition, you have relationships between the classic node CDF resources. This matches an edge
    in the data modeling of CDF.

    Finally, you have labels and data sets that to organize the graph. In which data sets have a similar,
    but different, role as a space in data modeling. While labels can be compared to node types in data modeling,
    used to quickly filter and find nodes/edges.

    This extractor will extract the classic CDF graph into Neat starting from either a data set or a root asset.

    It works as follows:

    1. Extract all core nodes (assets, time series, sequences, events, files) filtered by the given data set or
       root asset.
    2. Extract all relationships starting from any of the extracted core nodes.
    3. Extract all core nodes that are targets of the relationships that are not already extracted.
    4. Extract all labels that are connected to the extracted core nodes/relationships.
    5. Extract all data sets that are connected to the extracted core nodes/relationships.

    Args:
        client (CogniteClient): The Cognite client to use.
        data_set_external_id (str, optional): The data set external id to extract from. Defaults to None.
        root_asset_external_id (str, optional): The root asset external id to extract from. Defaults to None.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
    """

    # These are the core resource types in the classic CDF.
    _classic_node_types: ClassVar[tuple[_ClassicCoreType, ...]] = (
        _ClassicCoreType(AssetsExtractor, InstanceIdPrefix.asset, "assets"),
        _ClassicCoreType(TimeSeriesExtractor, InstanceIdPrefix.time_series, "time_series"),
        _ClassicCoreType(SequencesExtractor, InstanceIdPrefix.sequence, "sequences"),
        _ClassicCoreType(EventsExtractor, InstanceIdPrefix.event, "events"),
        _ClassicCoreType(FilesExtractor, InstanceIdPrefix.file, "files"),
    )

    def __init__(
        self,
        client: CogniteClient,
        data_set_external_id: str | None = None,
        root_asset_external_id: str | None = None,
        namespace: Namespace | None = None,
    ):
        self._client = client
        if sum([bool(data_set_external_id), bool(root_asset_external_id)]) != 1:
            raise ValueError("Exactly one of data_set_external_id or root_asset_external_id must be set.")
        self._root_asset_external_id = root_asset_external_id
        self._data_set_external_id = data_set_external_id
        self._namespace = namespace or DEFAULT_NAMESPACE

        self._source_external_ids_by_type: dict[InstanceIdPrefix, set[str]] = defaultdict(set)
        self._target_external_ids_by_type: dict[InstanceIdPrefix, set[str]] = defaultdict(set)
        self._labels: set[str] = set()
        self._data_set_ids: set[int] = set()

    def extract(self) -> Iterable[Triple]:
        """Extracts all classic CDF Resources."""
        yield from self._extract_core_start_nodes()

        yield from self._extract_start_node_relationships()

        yield from self._extract_core_end_nodes()

        yield from self._extract_labels()
        yield from self._extract_data_sets()

    def _extract_core_start_nodes(self):
        for core_node in self._classic_node_types:
            if self._data_set_external_id:
                extractor = core_node.extractor_cls.from_dataset(
                    self._client, self._data_set_external_id, self._namespace, unpack_metadata=False
                )
            elif self._root_asset_external_id:
                extractor = core_node.extractor_cls.from_hierarchy(
                    self._client, self._root_asset_external_id, self._namespace, unpack_metadata=False
                )
            else:
                raise ValueError("Exactly one of data_set_external_id or root_asset_external_id must be set.")

            yield from self._extract_with_logging_label_dataset(extractor, core_node.resource_type)

    def _extract_start_node_relationships(self):
        for start_resource_type, source_external_ids in self._source_external_ids_by_type.items():
            start_type = start_resource_type.removesuffix("_")
            for chunk in self._chunk(list(source_external_ids), description=f"Extracting {start_type} relationships"):
                relationship_iterator = self._client.relationships(
                    source_external_ids=list(chunk), source_types=[start_type]
                )
                extractor = RelationshipsExtractor(relationship_iterator, self._namespace, unpack_metadata=False)
                # This is a private attribute, but we need to set it to log the target nodes.
                extractor._log_target_nodes = True

                yield from extractor.extract()

                # After the extraction is done, we need to update all the new target nodes so
                # we can extract them in the next step.
                for end_type, target_external_ids in extractor._target_external_ids_by_type.items():
                    for external_id in target_external_ids:
                        # We only want to extract the target nodes that are not already extracted.
                        # Even though _source_external_ids_by_type is a defaultdict, we have to check if the key exists.
                        # This is because we might not have extracted any nodes of that type yet, and looking up
                        # a key that does not exist will create it. We are iterating of this dictionary, and
                        # we do not want to create new keys while iterating.
                        if (
                            end_type not in self._source_external_ids_by_type
                            or external_id not in self._source_external_ids_by_type[end_type]
                        ):
                            self._target_external_ids_by_type[end_type].add(external_id)

    def _extract_core_end_nodes(self):
        for core_node in self._classic_node_types:
            target_external_ids = self._target_external_ids_by_type[core_node.resource_type]
            api = getattr(self._client, core_node.api_name)
            for chunk in self._chunk(
                list(target_external_ids),
                description=f"Extracting end nodes {core_node.resource_type.removesuffix('_')}",
            ):
                resource_iterator = api.retrieve_multiple(external_ids=list(chunk), ignore_unknown_ids=True)
                extractor = core_node.extractor_cls(resource_iterator, self._namespace, unpack_metadata=False)
                yield from self._extract_with_logging_label_dataset(extractor)

    def _extract_labels(self):
        for chunk in self._chunk(list(self._labels), description="Extracting labels"):
            label_iterator = self._client.labels.retrieve(external_id=list(chunk), ignore_unknown_ids=True)
            yield from LabelsExtractor(label_iterator, self._namespace).extract()

    def _extract_data_sets(self):
        for chunk in self._chunk(list(self._data_set_ids), description="Extracting data sets"):
            data_set_iterator = self._client.data_sets.retrieve_multiple(ids=list(chunk), ignore_unknown_ids=True)
            yield from DataSetExtractor(data_set_iterator, self._namespace, unpack_metadata=False).extract()

    def _extract_with_logging_label_dataset(
        self, extractor: ClassicCDFBaseExtractor, resource_type: InstanceIdPrefix | None = None
    ) -> Iterable[Triple]:
        for triple in extractor.extract():
            if triple[1] == self._namespace.external_id and resource_type is not None:
                self._source_external_ids_by_type[resource_type].add(remove_namespace_from_uri(triple[2]))
            elif triple[1] == self._namespace.label:
                self._labels.add(remove_namespace_from_uri(triple[2]).removeprefix(InstanceIdPrefix.label))
            elif triple[1] == self._namespace.dataset:
                self._data_set_ids.add(
                    int(remove_namespace_from_uri(triple[2]).removeprefix(InstanceIdPrefix.data_set))
                )
            yield triple

    @staticmethod
    def _chunk(items: Sequence, description: str) -> Iterable:
        to_iterate: Iterable = chunker(items, chunk_size=1000)
        try:
            from rich.progress import track
        except ModuleNotFoundError:
            ...
        else:
            to_iterate = track(
                to_iterate,
                total=(len(items) // 1000) + 1,
                description=description,
            )
        return to_iterate

extract() #

Extracts all classic CDF Resources.

Source code in cognite/neat/_graph/extractors/_classic_cdf/_classic.py
def extract(self) -> Iterable[Triple]:
    """Extracts all classic CDF Resources."""
    yield from self._extract_core_start_nodes()

    yield from self._extract_start_node_relationships()

    yield from self._extract_core_end_nodes()

    yield from self._extract_labels()
    yield from self._extract_data_sets()

DataSetExtractor #

Bases: ClassicCDFBaseExtractor[DataSet]

Extract DataSets from Cognite Data Fusions into Neat.

Parameters:

Name Type Description Default
items Iterable[Asset]

An iterable of assets.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Asset], str | None]

A function to convert an asset to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type "Asset".

None
total int

The total number of assets to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of assets to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

A set of values to skip when unpacking metadata. Defaults to frozenset({"nan", "null", "none", ""}).

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_data_sets.py
class DataSetExtractor(ClassicCDFBaseExtractor[DataSet]):
    """Extract DataSets from Cognite Data Fusions into Neat.

    Args:
        items (Iterable[Asset]): An iterable of assets.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Asset], str | None], optional): A function to convert an asset to a type. Defaults to None.
            If None or if the function returns None, the asset will be set to the default type "Asset".
        total (int, optional): The total number of assets to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of assets to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): A set of values to skip when unpacking
            metadata. Defaults to frozenset({"nan", "null", "none", ""}).
    """

    _default_rdf_type = "DataSet"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: SequenceNotStr[str],
        namespace: Namespace | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.data_sets.retrieve_multiple(external_ids=data_set_external_id),
            namespace=namespace,
            total=len(data_set_external_id),
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        data_sets = DataSetList.load(Path(file_path).read_text())
        return cls(
            data_sets,
            namespace=namespace,
            total=len(data_sets),
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, item: DataSet) -> list[Triple]:
        """Converts an asset to triples."""
        id_ = self.namespace[f"{InstanceIdPrefix.data_set}{item.id}"]

        type_ = self._get_rdf_type(item)

        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        if item.name:
            triples.append((id_, self.namespace.name, Literal(item.name)))

        if item.description:
            triples.append((id_, self.namespace.description, Literal(item.description)))

        if item.external_id:
            triples.append((id_, self.namespace.external_id, Literal(item.external_id)))

        # properties' ref creation and update
        triples.append(
            (
                id_,
                self.namespace.created_time,
                Literal(datetime.fromtimestamp(item.created_time / 1000, timezone.utc)),
            )
        )
        triples.append(
            (
                id_,
                self.namespace.last_updated_time,
                Literal(datetime.fromtimestamp(item.last_updated_time / 1000, timezone.utc)),
            )
        )

        if item.write_protected:
            triples.append((id_, self.namespace.write_protected, Literal(item.write_protected)))

        if item.metadata:
            triples.extend(self._metadata_to_triples(id_, item.metadata))

        return triples

EventsExtractor #

Bases: ClassicCDFBaseExtractor[Event]

Extract data from Cognite Data Fusions Events into Neat.

Parameters:

Name Type Description Default
items Iterable[Event]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Event], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_events.py
class EventsExtractor(ClassicCDFBaseExtractor[Event]):
    """Extract data from Cognite Data Fusions Events into Neat.

    Args:
        items (Iterable[Event]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Event], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Event"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Event], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.events.aggregate_count(filter=EventFilter(data_set_ids=[{"externalId": data_set_external_id}]))

        return cls(
            client.events(data_set_external_ids=data_set_external_id),
            namespace,
            to_type,
            total=total,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_hierarchy(
        cls,
        client: CogniteClient,
        root_asset_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Event], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.events.aggregate_count(
            filter=EventFilter(asset_subtree_ids=[{"externalId": root_asset_external_id}])
        )

        return cls(
            client.events(asset_subtree_external_ids=[root_asset_external_id]),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Event], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        events = EventList.load(Path(file_path).read_text())

        return cls(
            events,
            namespace,
            to_type,
            total=len(events),
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, event: Event) -> list[Triple]:
        id_ = self.namespace[f"{InstanceIdPrefix.event}{event.id}"]

        type_ = self._get_rdf_type(event)

        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes

        if event.external_id:
            triples.append((id_, self.namespace.external_id, Literal(event.external_id)))

        if event.source:
            triples.append((id_, self.namespace.type, Literal(event.source)))

        if event.type:
            triples.append((id_, self.namespace.type, Literal(event.type)))

        if event.subtype:
            triples.append((id_, self.namespace.subtype, Literal(event.subtype)))

        if event.metadata:
            triples.extend(self._metadata_to_triples(id_, event.metadata))

        if event.description:
            triples.append((id_, self.namespace.description, Literal(event.description)))

        if event.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(event.created_time / 1000, timezone.utc)),
                )
            )

        if event.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(event.last_updated_time / 1000, timezone.utc)),
                )
            )

        if event.start_time:
            triples.append(
                (
                    id_,
                    self.namespace.start_time,
                    Literal(datetime.fromtimestamp(event.start_time / 1000, timezone.utc)),
                )
            )

        if event.end_time:
            triples.append(
                (
                    id_,
                    self.namespace.end_time,
                    Literal(datetime.fromtimestamp(event.end_time / 1000, timezone.utc)),
                )
            )

        if event.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"{InstanceIdPrefix.data_set}{event.data_set_id}"],
                )
            )

        if event.asset_ids:
            for asset_id in event.asset_ids:
                triples.append((id_, self.namespace.asset, self.namespace[f"{InstanceIdPrefix.asset}{asset_id}"]))

        return triples

FilesExtractor #

Bases: ClassicCDFBaseExtractor[FileMetadata]

Extract data from Cognite Data Fusions files metadata into Neat.

Parameters:

Name Type Description Default
items Iterable[FileMetadata]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[FileMetadata], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_files.py
class FilesExtractor(ClassicCDFBaseExtractor[FileMetadata]):
    """Extract data from Cognite Data Fusions files metadata into Neat.

    Args:
        items (Iterable[FileMetadata]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[FileMetadata], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "File"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[FileMetadata], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.files(data_set_external_ids=data_set_external_id),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_hierarchy(
        cls,
        client: CogniteClient,
        root_asset_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[FileMetadata], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.files.aggregate(
            filter=FileMetadataFilter(asset_subtree_ids=[{"externalId": root_asset_external_id}])
        )[0].count

        return cls(
            client.files(asset_subtree_external_ids=[root_asset_external_id]),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[FileMetadata], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        file_metadata = FileMetadataList.load(Path(file_path).read_text())
        return cls(
            file_metadata,
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            total=len(file_metadata),
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, file: FileMetadata) -> list[Triple]:
        id_ = self.namespace[f"{InstanceIdPrefix.file}{file.id}"]

        type_ = self._get_rdf_type(file)

        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes

        if file.external_id:
            triples.append((id_, self.namespace.external_id, Literal(file.external_id)))

        if file.source:
            triples.append((id_, self.namespace.type, Literal(file.source)))

        if file.mime_type:
            triples.append((id_, self.namespace.mime_type, Literal(file.mime_type)))

        if file.uploaded:
            triples.append((id_, self.namespace.uploaded, Literal(file.uploaded)))

        if file.source:
            triples.append((id_, self.namespace.source, Literal(file.source)))

        if file.metadata:
            triples.extend(self._metadata_to_triples(id_, file.metadata))

        if file.source_created_time:
            triples.append(
                (
                    id_,
                    self.namespace.source_created_time,
                    Literal(datetime.fromtimestamp(file.source_created_time / 1000, timezone.utc)),
                )
            )
        if file.source_modified_time:
            triples.append(
                (
                    id_,
                    self.namespace.source_created_time,
                    Literal(datetime.fromtimestamp(file.source_modified_time / 1000, timezone.utc)),
                )
            )
        if file.uploaded_time:
            triples.append(
                (
                    id_,
                    self.namespace.uploaded_time,
                    Literal(datetime.fromtimestamp(file.uploaded_time / 1000, timezone.utc)),
                )
            )

        if file.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(file.created_time / 1000, timezone.utc)),
                )
            )

        if file.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(file.last_updated_time / 1000, timezone.utc)),
                )
            )

        if file.labels:
            for label in file.labels:
                # external_id can create ill-formed URIs, so we create websafe URIs
                # since labels do not have internal ids, we use the external_id as the id
                triples.append(
                    (
                        id_,
                        self.namespace.label,
                        self.namespace[f"{InstanceIdPrefix.label}{LabelsExtractor._label_id(label)}"],
                    )
                )

        if file.security_categories:
            for category in file.security_categories:
                triples.append((id_, self.namespace.security_categories, Literal(category)))

        if file.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"{InstanceIdPrefix.data_set}{file.data_set_id}"],
                )
            )

        if file.asset_ids:
            for asset_id in file.asset_ids:
                triples.append((id_, self.namespace.asset, self.namespace[f"{InstanceIdPrefix.asset}{asset_id}"]))

        return triples

LabelsExtractor #

Bases: ClassicCDFBaseExtractor[LabelDefinition]

Extract data from Cognite Data Fusions Labels into Neat.

Parameters:

Name Type Description Default
items Iterable[LabelDefinition]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[LabelDefinition], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_labels.py
class LabelsExtractor(ClassicCDFBaseExtractor[LabelDefinition]):
    """Extract data from Cognite Data Fusions Labels into Neat.

    Args:
        items (Iterable[LabelDefinition]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[LabelDefinition], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Label"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[LabelDefinition], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.labels(data_set_external_ids=data_set_external_id),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[LabelDefinition], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        labels = LabelDefinitionList.load(Path(file_path).read_text())
        return cls(
            labels,
            total=len(labels),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, label: LabelDefinition) -> list[Triple]:
        if not label.external_id:
            return []

        id_ = self.namespace[f"{InstanceIdPrefix.label}{self._label_id(label)}"]

        type_ = self._get_rdf_type(label)
        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        triples.append((id_, self.namespace.external_id, Literal(label.external_id)))

        if label.name:
            triples.append((id_, self.namespace.name, Literal(label.name)))

        if label.description:
            triples.append((id_, self.namespace.description, Literal(label.description)))

        if label.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(label.created_time / 1000, timezone.utc)),
                )
            )

        if label.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"{InstanceIdPrefix.data_set}{label.data_set_id}"],
                )
            )

        return triples

    @staticmethod
    def _label_id(label: Label | LabelDefinition) -> str:
        # external_id can create ill-formed URIs, so we create websafe URIs
        # since labels do not have internal ids, we use the external_id as the id
        if label.external_id is None:
            raise ValueError("External id must be set of the label")
        return quote(label.external_id)

RelationshipsExtractor #

Bases: ClassicCDFBaseExtractor[Relationship]

Extract data from Cognite Data Fusions Relationships into Neat.

Parameters:

Name Type Description Default
items Iterable[Relationship]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Relationship], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_relationships.py
class RelationshipsExtractor(ClassicCDFBaseExtractor[Relationship]):
    """Extract data from Cognite Data Fusions Relationships into Neat.

    Args:
        items (Iterable[Relationship]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Relationship], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Relationship"

    def __init__(
        self,
        items: Iterable[Relationship],
        namespace: Namespace | None = None,
        to_type: Callable[[Relationship], str | None] | None = None,
        total: int | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        super().__init__(
            items,
            namespace=namespace,
            to_type=to_type,
            total=total,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )
        # This is used by the ClassicExtractor to log the target nodes, such
        # that it can extract them.
        # It is private to avoid exposing it to the user.
        self._log_target_nodes = False
        self._target_external_ids_by_type: dict[InstanceIdPrefix, set[str]] = defaultdict(set)

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Relationship], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.relationships(data_set_external_ids=data_set_external_id),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Relationship], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        relationships = RelationshipList.load(Path(file_path).read_text())
        return cls(
            relationships,
            namespace=namespace,
            total=len(relationships),
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, relationship: Relationship) -> list[Triple]:
        """Converts an asset to triples."""

        if relationship.external_id and relationship.source_external_id and relationship.target_external_id:
            if self._log_target_nodes and relationship.target_type and relationship.target_external_id:
                self._target_external_ids_by_type[InstanceIdPrefix.from_str(relationship.target_type)].add(
                    relationship.target_external_id
                )

            # relationships do not have an internal id, so we generate one
            id_ = self.namespace[f"{InstanceIdPrefix.relationship}{create_sha256_hash(relationship.external_id)}"]

            type_ = self._get_rdf_type(relationship)
            # Set rdf type
            triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

            # Set source and target types
            if source_type := relationship.source_type:
                triples.append(
                    (
                        id_,
                        self.namespace.source_type,
                        self.namespace[source_type.title()],
                    )
                )

            if target_type := relationship.target_type:
                triples.append(
                    (
                        id_,
                        self.namespace.target_type,
                        self.namespace[target_type.title()],
                    )
                )

            # Create attributes

            triples.append((id_, self.namespace.external_id, Literal(relationship.external_id)))

            triples.append(
                (
                    id_,
                    self.namespace.source_external_id,
                    Literal(relationship.source_external_id),
                )
            )

            triples.append(
                (
                    id_,
                    self.namespace.target_external_id,
                    Literal(relationship.target_external_id),
                )
            )

            if relationship.start_time:
                triples.append(
                    (
                        id_,
                        self.namespace.start_time,
                        Literal(datetime.fromtimestamp(relationship.start_time / 1000, timezone.utc)),
                    )
                )

            if relationship.end_time:
                triples.append(
                    (
                        id_,
                        self.namespace.end_time,
                        Literal(datetime.fromtimestamp(relationship.end_time / 1000, timezone.utc)),
                    )
                )

            if relationship.created_time:
                triples.append(
                    (
                        id_,
                        self.namespace.created_time,
                        Literal(datetime.fromtimestamp(relationship.created_time / 1000, timezone.utc)),
                    )
                )

            if relationship.last_updated_time:
                triples.append(
                    (
                        id_,
                        self.namespace.last_updated_time,
                        Literal(datetime.fromtimestamp(relationship.last_updated_time / 1000, timezone.utc)),
                    )
                )

            if relationship.confidence:
                triples.append(
                    (
                        id_,
                        self.namespace.confidence,
                        Literal(relationship.confidence),
                    )
                )

            if relationship.labels:
                for label in relationship.labels:
                    # external_id can create ill-formed URIs, so we create websafe URIs
                    # since labels do not have internal ids, we use the external_id as the id
                    triples.append(
                        (
                            id_,
                            self.namespace.label,
                            self.namespace[f"{InstanceIdPrefix.label}{LabelsExtractor._label_id(label)}"],
                        )
                    )

            # Create connection
            if relationship.data_set_id:
                triples.append(
                    (
                        id_,
                        self.namespace.dataset,
                        self.namespace[f"{InstanceIdPrefix.data_set}{relationship.data_set_id}"],
                    )
                )

            return triples
        return []

SequencesExtractor #

Bases: ClassicCDFBaseExtractor[Sequence]

Extract data from Cognite Data Fusions Sequences into Neat.

Parameters:

Name Type Description Default
items Iterable[Sequence]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Sequence], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_sequences.py
class SequencesExtractor(ClassicCDFBaseExtractor[Sequence]):
    """Extract data from Cognite Data Fusions Sequences into Neat.

    Args:
        items (Iterable[Sequence]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Sequence], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Sequence"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Sequence], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.sequences.aggregate_count(
            filter=SequenceFilter(data_set_ids=[{"externalId": data_set_external_id}])
        )
        return cls(
            client.sequences(data_set_external_ids=data_set_external_id),
            total=total,
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_hierarchy(
        cls,
        client: CogniteClient,
        root_asset_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Sequence], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.sequences.aggregate_count(
            filter=SequenceFilter(asset_subtree_ids=[{"externalId": root_asset_external_id}])
        )

        return cls(
            client.sequences(asset_subtree_external_ids=[root_asset_external_id]),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Sequence], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        sequences = SequenceList.load(Path(file_path).read_text())
        return cls(
            sequences,
            total=len(sequences),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, sequence: Sequence) -> list[Triple]:
        id_ = self.namespace[f"{InstanceIdPrefix.sequence}{sequence.id}"]

        type_ = self._get_rdf_type(sequence)
        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes

        if sequence.external_id:
            triples.append((id_, self.namespace.external_id, Literal(sequence.external_id)))

        if sequence.name:
            triples.append((id_, self.namespace.name, Literal(sequence.name)))

        if sequence.metadata:
            triples.extend(self._metadata_to_triples(id_, sequence.metadata))

        if sequence.description:
            triples.append((id_, self.namespace.description, Literal(sequence.description)))

        if sequence.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(sequence.created_time / 1000, timezone.utc)),
                )
            )

        if sequence.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(sequence.last_updated_time / 1000, timezone.utc)),
                )
            )

        if sequence.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"{InstanceIdPrefix.data_set}{sequence.data_set_id}"],
                )
            )

        if sequence.asset_id:
            triples.append(
                (
                    id_,
                    self.namespace.asset,
                    self.namespace[f"{InstanceIdPrefix.asset}{sequence.asset_id}"],
                )
            )

        return triples

TimeSeriesExtractor #

Bases: ClassicCDFBaseExtractor[TimeSeries]

Extract data from Cognite Data Fusions TimeSeries into Neat.

Parameters:

Name Type Description Default
items Iterable[TimeSeries]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[TimeSeries], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/_graph/extractors/_classic_cdf/_timeseries.py
class TimeSeriesExtractor(ClassicCDFBaseExtractor[TimeSeries]):
    """Extract data from Cognite Data Fusions TimeSeries into Neat.

    Args:
        items (Iterable[TimeSeries]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[TimeSeries], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "TimeSeries"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[TimeSeries], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.time_series.aggregate_count(
            filter=TimeSeriesFilter(data_set_ids=[{"externalId": data_set_external_id}])
        )

        return cls(
            client.time_series(data_set_external_ids=data_set_external_id),
            total=total,
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_hierarchy(
        cls,
        client: CogniteClient,
        root_asset_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[TimeSeries], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.time_series.aggregate_count(
            filter=TimeSeriesFilter(asset_subtree_ids=[{"externalId": root_asset_external_id}])
        )

        return cls(
            client.time_series(asset_external_ids=[root_asset_external_id]),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[TimeSeries], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        timeseries = TimeSeriesList.load(Path(file_path).read_text())
        return cls(
            timeseries,
            total=len(timeseries),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, timeseries: TimeSeries) -> list[Triple]:
        id_ = self.namespace[f"{InstanceIdPrefix.time_series}{timeseries.id}"]

        # Set rdf type
        type_ = self._get_rdf_type(timeseries)
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        if timeseries.external_id:
            triples.append((id_, self.namespace.external_id, Literal(timeseries.external_id)))

        if timeseries.name:
            triples.append((id_, self.namespace.name, Literal(timeseries.name)))

        if timeseries.is_string:
            triples.append((id_, self.namespace.is_string, Literal(timeseries.is_string)))

        if timeseries.metadata:
            triples.extend(self._metadata_to_triples(id_, timeseries.metadata))

        if timeseries.unit:
            triples.append((id_, self.namespace.unit, Literal(timeseries.unit)))

        if self.namespace.is_step:
            triples.append((id_, self.namespace.is_step, Literal(timeseries.is_step)))

        if timeseries.description:
            triples.append((id_, self.namespace.description, Literal(timeseries.description)))

        if timeseries.security_categories:
            for category in timeseries.security_categories:
                triples.append((id_, self.namespace.security_categories, Literal(category)))

        if timeseries.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(timeseries.created_time / 1000, timezone.utc)),
                )
            )

        if timeseries.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(timeseries.last_updated_time / 1000, timezone.utc)),
                )
            )

        if timeseries.legacy_name:
            triples.append((id_, self.namespace.legacy_name, Literal(timeseries.legacy_name)))

        # Create connections
        if timeseries.unit_external_id:
            # try to create connection to QUDT unit catalog
            try:
                triples.append(
                    (
                        id_,
                        self.namespace.unit_external_id,
                        URIRef(str(AnyHttpUrl(timeseries.unit_external_id))),
                    )
                )
            except ValidationError:
                triples.append(
                    (
                        id_,
                        self.namespace.unit_external_id,
                        Literal(timeseries.unit_external_id),
                    )
                )

        if timeseries.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.dataset,
                    self.namespace[f"{InstanceIdPrefix.data_set}{timeseries.data_set_id}"],
                )
            )

        if timeseries.asset_id:
            triples.append(
                (
                    id_,
                    self.namespace.asset,
                    self.namespace[f"{InstanceIdPrefix.asset}{timeseries.asset_id}"],
                )
            )

        return triples

DexpiExtractor #

Bases: BaseExtractor

DEXPI-XML extractor of RDF triples

Parameters:

Name Type Description Default
root Element

XML root element of DEXPI file.

required
namespace Namespace | None

Optional custom namespace to use for extracted triples that define data model instances. Defaults to DEFAULT_NAMESPACE.

None
Source code in cognite/neat/_graph/extractors/_dexpi.py
class DexpiExtractor(BaseExtractor):
    """
    DEXPI-XML extractor of RDF triples

    Args:
        root: XML root element of DEXPI file.
        namespace: Optional custom namespace to use for extracted triples that define data
                    model instances. Defaults to DEFAULT_NAMESPACE.
    """

    def __init__(
        self,
        root: Element,
        namespace: Namespace | None = None,
    ):
        self.root = root
        self.namespace = namespace or DEFAULT_NAMESPACE

    @classmethod
    def from_file(cls, filepath: str | Path, namespace: Namespace | None = None):
        return cls(ET.parse(filepath).getroot(), namespace)

    @classmethod
    def from_url(cls, url: str, namespace: Namespace | None = None):
        from io import BytesIO

        import requests

        response = requests.get(url)
        response.raise_for_status()
        return cls(ET.parse(BytesIO(response.content)).getroot(), namespace)

    def extract(self) -> Iterable[Triple]:
        """Extracts RDF triples from DEXPI XML file."""

        for element in iterate_tree(self.root):
            yield from self._element2triples(element, self.namespace)

    @classmethod
    def _element2triples(cls, element: Element, namespace: Namespace) -> list[Triple]:
        """Converts an element to triples."""
        triples: list[Triple] = []

        if (
            "ComponentClass" in element.attrib
            and element.attrib["ComponentClass"] != "Label"
            and "ID" in element.attrib
        ):
            id_ = namespace[element.attrib["ID"]]

            if node_triples := cls._element2node_triples(id_, element):
                triples.extend(node_triples)

            if edge_triples := cls._element2edge_triples(id_, element, namespace):
                triples.extend(edge_triples)

        return triples

    @classmethod
    def _element2edge_triples(cls, id_: URIRef, element: Element, namespace: Namespace) -> list[Triple]:
        triples: list[Triple] = []

        # connection triples
        if connections := get_children(element, "Connection"):
            for connection in connections:
                if "FromID" in connection.attrib and "ToID" in connection.attrib:
                    triples.append(
                        (
                            namespace[connection.attrib["FromID"]],
                            DEXPI.connection,
                            namespace[connection.attrib["ToID"]],
                        )
                    )

        # association triples
        if associations := get_children(element, "Association"):
            for association in associations:
                if "Type" in association.attrib and "ItemID" in association.attrib:
                    association_type = cls._to_uri_friendly_association_type(association)

                    triples.append(
                        (
                            id_,
                            DEXPI[f"association/{association_type}"],
                            namespace[association.attrib["ItemID"]],
                        )
                    )

        # children-parent triples
        for child in element:
            if "ID" in child.attrib and child.tag != "Label":
                camel_case_property = child.tag[0].lower() + child.tag[1:]
                triples.append(
                    (
                        id_,
                        DEXPI[f"children/{camel_case_property}"],
                        namespace[child.attrib["ID"]],
                    )
                )

        return triples

    @classmethod
    def _to_uri_friendly_association_type(cls, association: Element):
        association_type = "".join(
            [word.capitalize() if i != 0 else word for i, word in enumerate(association.attrib["Type"].split(" "))]
        )

        return association_type

    @classmethod
    def _element2node_triples(cls, id_: URIRef, element: Element) -> list[Triple]:
        """Converts an XML element to triples."""
        triples: list[Triple] = []

        # setting these to None this is the order of getting the type
        component_class: str | None = None
        component_name: str | None = None
        tag: str | None = None

        # adding tag triple if exists
        if tag := element.tag:
            triples.append((id_, DEXPI.tag, Literal(str(tag))))

        # adding attributes triples
        if attributes := element.attrib:
            if component_class := attributes.get("ComponentClass", None):
                triples.append((id_, DEXPI.ComponentClass, Literal(component_class)))
            if component_name := attributes.get("ComponentName", None):
                triples.append((id_, DEXPI.ComponentName, Literal(component_name)))
            if component_class_uri := attributes.get("ComponentClassURI", None):
                triples.append((id_, DEXPI.ComponentClassURI, URIRef(component_class_uri)))

        triples.append(
            (
                id_,
                RDF.type,
                as_neat_compliant_uri(DEFAULT_NAMESPACE[component_class or component_name or tag or "Unknown"]),
            )
        )

        # add label triple
        if label := cls._get_element_label(element):
            triples.append((id_, RDFS.label, Literal(label)))

        # add generic attributes triples
        if generic_attributes := cls._get_element_generic_attributes(element):
            for attribute, value_definitions in generic_attributes.items():
                predicate = as_neat_compliant_uri(attribute)
                for value_definition in value_definitions:
                    if literal := cls._value_definition2literal(value_definition):
                        triples.append((id_, predicate, literal))

        return triples

    @classmethod
    def _value_definition2literal(cls, definition: dict, make_unit_datatype: bool = False) -> Literal | None:
        if "Value" not in definition or "Format" not in definition:
            return None

        if "Units" in definition and "Value" in definition:
            if make_unit_datatype and "UnitsURI" in definition:
                return Literal(definition["Value"], datatype=URIRef(definition["UnitsURI"]))

            else:
                return Literal(definition["Value"], datatype=XSD.float)

        # case: when language is present we create add language tag to the literal
        elif "Language" in definition and "Value" in definition:
            return Literal(definition["Value"], lang=definition["Language"])

        # case: when ValueURI is present we use it instead of Value
        # this would be candidate for ENUMs in CDF
        elif "ValueURI" in definition:
            return Literal(definition["ValueURI"], datatype=XSD[definition["Format"]])

        # case: when Format is not string we make sure to add the datatype
        elif definition["Format"].lower() != "string":
            return Literal(definition["Value"], datatype=XSD[definition["Format"]])

        # case: when Format is string we add the literal without datatype (easier to read triples, less noise)
        else:
            return Literal(definition["Value"])

    @classmethod
    def _get_element_label(cls, element: Element) -> str | None:
        if children := get_children(element, "Label", no_children=1):
            if grandchildren := get_children(children[0], "Text", no_children=1):
                if "String" in grandchildren[0].attrib:
                    return grandchildren[0].attrib["String"]

        # extension for schema version 3.3, where text is used to "label" without a <label> parent
        elif children := get_children(element, "Text", no_children=1):
            if "String" in children[0].attrib:
                return children[0].attrib["String"]

        return None

    @classmethod
    def _get_element_generic_attributes(cls, element: Element) -> dict:
        # TODO: This requires more work as there are multiple groupings of GenericAttributes

        attributes = defaultdict(list)
        if children := get_children(element, "GenericAttributes", no_children=1):
            if grandchildren := get_children(children[0], "GenericAttribute"):
                for generic_attribute in grandchildren:
                    # extension for schema version 3.3, where "AttributeURI" is not included
                    if name := generic_attribute.attrib.get("Name", None):
                        attribute_uri = as_neat_compliant_uri(DEFAULT_NAMESPACE[name])
                        if attribute_uri not in attributes:
                            attributes[attribute_uri] = [generic_attribute.attrib]
                        else:
                            attributes[attribute_uri].append(generic_attribute.attrib)

        return attributes

extract() #

Extracts RDF triples from DEXPI XML file.

Source code in cognite/neat/_graph/extractors/_dexpi.py
def extract(self) -> Iterable[Triple]:
    """Extracts RDF triples from DEXPI XML file."""

    for element in iterate_tree(self.root):
        yield from self._element2triples(element, self.namespace)

DMSExtractor #

Bases: BaseExtractor

Extract data from Cognite Data Fusion DMS instances into Neat.

Parameters:

Name Type Description Default
items Iterable[Instance]

The items to extract.

required
total int | None

The total number of items to extract. If provided, this will be used to estimate the progress.

None
limit int | None

The maximum number of items to extract.

None
overwrite_namespace Namespace | None

If provided, this will overwrite the space of the extracted items.

None
Source code in cognite/neat/_graph/extractors/_dms.py
class DMSExtractor(BaseExtractor):
    """Extract data from Cognite Data Fusion DMS instances into Neat.

    Args:
        items: The items to extract.
        total: The total number of items to extract. If provided, this will be used to estimate the progress.
        limit: The maximum number of items to extract.
        overwrite_namespace: If provided, this will overwrite the space of the extracted items.
    """

    def __init__(
        self,
        items: Iterable[Instance],
        total: int | None = None,
        limit: int | None = None,
        overwrite_namespace: Namespace | None = None,
    ) -> None:
        self.items = items
        self.total = total
        self.limit = limit
        self.overwrite_namespace = overwrite_namespace

    @classmethod
    def from_data_model(
        cls, client: CogniteClient, data_model: DataModelIdentifier, limit: int | None = None
    ) -> "DMSExtractor":
        """Create an extractor from a data model.

        Args:
            client: The Cognite client to use.
            data_model: The data model to extract.
            limit: The maximum number of instances to extract.
        """
        retrieved = client.data_modeling.data_models.retrieve(data_model, inline_views=True)
        if not retrieved:
            raise ResourceRetrievalError(dm.DataModelId.load(data_model), "data model", "Data Model is missing in CDF")
        return cls.from_views(client, retrieved.latest_version().views, limit)

    @classmethod
    def from_views(cls, client: CogniteClient, views: Iterable[dm.View], limit: int | None = None) -> "DMSExtractor":
        """Create an extractor from a set of views.

        Args:
            client: The Cognite client to use.
            views: The views to extract.
            limit: The maximum number of instances to extract.
        """
        return cls(_InstanceIterator(client, views), total=None, limit=limit)

    def extract(self) -> Iterable[Triple]:
        for count, item in enumerate(self.items, 1):
            if self.limit and count > self.limit:
                break
            yield from self._extract_instance(item)

    def _extract_instance(self, instance: Instance) -> Iterable[Triple]:
        if isinstance(instance, dm.Edge):
            if not instance.properties:
                yield (
                    self._as_uri_ref(instance.start_node),
                    self._as_uri_ref(instance.type),
                    self._as_uri_ref(instance.end_node),
                )
                return
            else:
                # If the edge has properties, we create a node for the edge and connect it to the start and end nodes.
                id_ = self._as_uri_ref(instance)
                yield id_, RDF.type, self._as_uri_ref(instance.type)
                yield id_, RDF.type, self._get_namespace(instance.space).Edge
                yield (
                    id_,
                    self._as_uri_ref(dm.DirectRelationReference(instance.space, "startNode")),
                    self._as_uri_ref(instance.start_node),
                )
                yield (
                    id_,
                    self._as_uri_ref(dm.DirectRelationReference(instance.space, "endNode")),
                    self._as_uri_ref(instance.end_node),
                )

        elif isinstance(instance, dm.Node):
            id_ = self._as_uri_ref(instance)
            if instance.type:
                type_ = self._as_uri_ref(cast(dm.DirectRelationReference, instance.type))
            else:
                type_ = self._get_namespace(instance.space).Node

            yield id_, RDF.type, type_
        else:
            raise NotImplementedError(f"Unknown instance type {type(instance)}")

        for view_id, properties in instance.properties.items():
            namespace = self._get_namespace(view_id.space)
            for key, value in properties.items():
                for object_ in self._get_objects(value):
                    yield id_, namespace[key], object_

    def _get_objects(self, value: PropertyValue) -> Iterable[Literal | URIRef]:
        if isinstance(value, str | float | bool | int):
            yield Literal(value)
        elif isinstance(value, dict) and "space" in value and "externalId" in value:
            yield self._as_uri_ref(dm.DirectRelationReference.load(value))
        elif isinstance(value, dict):
            # This object is a json object.
            yield Literal(str(value), datatype=XSD._NS["json"])
        elif isinstance(value, list):
            for item in value:
                yield from self._get_objects(item)

    def _as_uri_ref(self, instance: Instance | dm.DirectRelationReference) -> URIRef:
        return self._get_namespace(instance.space)[instance.external_id]

    def _get_namespace(self, space: str) -> Namespace:
        if self.overwrite_namespace:
            return self.overwrite_namespace
        return Namespace(DEFAULT_SPACE_URI.format(space=space))

from_data_model(client, data_model, limit=None) classmethod #

Create an extractor from a data model.

Parameters:

Name Type Description Default
client CogniteClient

The Cognite client to use.

required
data_model DataModelIdentifier

The data model to extract.

required
limit int | None

The maximum number of instances to extract.

None
Source code in cognite/neat/_graph/extractors/_dms.py
@classmethod
def from_data_model(
    cls, client: CogniteClient, data_model: DataModelIdentifier, limit: int | None = None
) -> "DMSExtractor":
    """Create an extractor from a data model.

    Args:
        client: The Cognite client to use.
        data_model: The data model to extract.
        limit: The maximum number of instances to extract.
    """
    retrieved = client.data_modeling.data_models.retrieve(data_model, inline_views=True)
    if not retrieved:
        raise ResourceRetrievalError(dm.DataModelId.load(data_model), "data model", "Data Model is missing in CDF")
    return cls.from_views(client, retrieved.latest_version().views, limit)

from_views(client, views, limit=None) classmethod #

Create an extractor from a set of views.

Parameters:

Name Type Description Default
client CogniteClient

The Cognite client to use.

required
views Iterable[View]

The views to extract.

required
limit int | None

The maximum number of instances to extract.

None
Source code in cognite/neat/_graph/extractors/_dms.py
@classmethod
def from_views(cls, client: CogniteClient, views: Iterable[dm.View], limit: int | None = None) -> "DMSExtractor":
    """Create an extractor from a set of views.

    Args:
        client: The Cognite client to use.
        views: The views to extract.
        limit: The maximum number of instances to extract.
    """
    return cls(_InstanceIterator(client, views), total=None, limit=limit)

IODDExtractor #

Bases: BaseExtractor

IODD-XML extractor of RDF triples

Each IODD sheet describes an IODD device. This extractor extracts rdf triples that describes the device, and the sensors connected to the device. This data is described under the elements "DeviceIdentity" and "ProcessDataCollection". In addition, triples extacted from "DeviceIdentity" and "ProcessDataCollection" may reference "Text" elements which are found under "ExternalTextCollection". Edges to these Text element nodes are also extracted.

Parameters:

Name Type Description Default
root Element

XML root element of IODD XML file.

required
namespace Namespace | None

Optional custom namespace to use for extracted triples that define data model instances. Defaults to DEFAULT_NAMESPACE.

None
device_id str | None

Optional user specified unique id/tag for actual equipment instance. If not provided, a randomly

None
meaning that the characters /&?=

% are not allowed

required
Source code in cognite/neat/_graph/extractors/_iodd.py
class IODDExtractor(BaseExtractor):
    """
    IODD-XML extractor of RDF triples

    Each IODD sheet describes an IODD device. This extractor extracts rdf triples that describes the device, and the
    sensors connected to the device.
    This data is described under the elements "DeviceIdentity" and "ProcessDataCollection".
    In addition, triples extacted from "DeviceIdentity" and
    "ProcessDataCollection" may reference "Text" elements which are found under "ExternalTextCollection". Edges to
    these Text element nodes are also extracted.

    Args:
        root: XML root element of IODD XML file.
        namespace: Optional custom namespace to use for extracted triples that define data
                    model instances. Defaults to DEFAULT_NAMESPACE.
        device_id: Optional user specified unique id/tag for actual equipment instance. If not provided, a randomly
        generated UUID will be used. The device_id must be WEB compliant,
        meaning that the characters /&?=: % are not allowed
    """

    device_elements_with_text_nodes: ClassVar[list[str]] = ["VendorText", "VendorUrl", "DeviceName", "DeviceFamily"]
    std_variable_elements_to_extract: ClassVar[list[str]] = ["V_SerialNumber", "V_ApplicationSpecificTag"]
    text_elements_language: LiteralType["en", "de"] = "en"

    def __init__(
        self,
        root: Element,
        namespace: Namespace | None = None,
        device_id: str | None = None,
    ):
        self.root = root
        self.namespace = namespace or DEFAULT_NAMESPACE

        if device_id and device_id != re.sub(r"[^a-zA-Z0-9-_.]", "", device_id):
            raise NeatValueError("Specified device_id is not web compliant. Please exclude characters: /&?=: %")

        self.device_id = (
            self.namespace[device_id] if device_id else self.namespace[f"Device_{str(uuid.uuid4()).replace('-', '_')}"]
        )

    @cached_property
    def _text_id_2value_mapping(self) -> dict[str, str]:
        """
        !!! note used for "Prototype Solution" !!!
        A mapping for text_id references to Text elements under ExternalTextCollection.
        The mapping can be used to find the Text element with matching id, and returns
        the value associated with the Text element.
        """
        mapping = {}
        if et_root := get_children(
            self.root, "ExternalTextCollection", ignore_namespace=True, include_nested_children=True, no_children=1
        ):
            if language_element := get_children(et_root[0], "PrimaryLanguage", ignore_namespace=True, no_children=1):
                if (
                    language_element[0].attrib.get("{http://www.w3.org/XML/1998/namespace}lang")
                    == self.text_elements_language
                ):
                    if text_elements := get_children(
                        language_element[0], child_tag="Text", ignore_namespace=True, include_nested_children=True
                    ):
                        for element in text_elements:
                            if id := element.attrib.get("id"):
                                if value := element.attrib.get("value"):
                                    mapping[id] = value
        return mapping

    @classmethod
    def from_file(cls, filepath: Path, namespace: Namespace | None = None, device_id: str | None = None):
        if filepath.suffix != ".xml":
            raise FileReadError(filepath, "File is not XML.")
        return cls(ET.parse(filepath).getroot(), namespace, device_id)

    @classmethod
    def _from_root2triples(cls, root: Element, namespace: Namespace, device_id: URIRef) -> list[Triple]:
        """Loops through the relevant elements of the IODD XML sheet to create rdf triples that describes the IODD
        device by starting at the root element.
        """
        triples: list[Triple] = []

        # Extract DeviceIdentity triples
        if di_root := get_children(
            root, "DeviceIdentity", ignore_namespace=True, include_nested_children=True, no_children=1
        ):
            triples.extend(cls._iodd_device_identity2triples(di_root[0], namespace, device_id))

        # Extract VariableCollection triples -
        # this element holds the information about the sensors connected to the device that collects data such as
        # temperature, voltage, leakage etc.
        if vc_root := get_children(
            root, "VariableCollection", ignore_namespace=True, include_nested_children=True, no_children=1
        ):
            triples.extend(cls._variables_data_collection2triples(vc_root[0], namespace, device_id))

        if pc_root := get_children(
            root, "ProcessDataCollection", ignore_namespace=True, include_nested_children=True, no_children=1
        ):
            triples.extend(cls._process_data_collection2triples(pc_root[0], namespace, device_id))

        if et_root := get_children(
            root, "ExternalTextCollection", ignore_namespace=True, include_nested_children=True, no_children=1
        ):
            triples.extend(cls._text_elements2triples(et_root[0], namespace))

        return triples

    @classmethod
    def _process_data_collection2triples(
        cls, pc_root: Element, namespace: Namespace, device_id: URIRef
    ) -> list[Triple]:
        """
        Will only collect ProcessDataIn elements at this point. The data from the IO-master is transmitted as an
        array related to a ProcessDataIn item.
        """
        triples: list[Triple] = []

        if process_data_in := get_children(
            pc_root, "ProcessDataIn", ignore_namespace=True, include_nested_children=True
        ):
            for process_data_element in process_data_in:
                if p_id := process_data_element.attrib.get("id"):
                    device_id_str = remove_namespace_from_uri(device_id)
                    process_data_in_id = namespace[f"{device_id_str}.{p_id}"]

                    # Create ProcessDataIn node
                    triples.append((process_data_in_id, RDF.type, IODD.ProcessDataIn))

                    # Create connection from device to node
                    triples.append((device_id, IODD.processDataIn, process_data_in_id))

                    # Connect record items (essentially an array of indexed variables) to the ProcessDataIn node
                    triples.extend(cls._process_data_in_records2triples(process_data_element, process_data_in_id))

        return triples

    @classmethod
    def _device_2text_elements_edges(cls, di_root: Element, id: URIRef, namespace: Namespace) -> list[Triple]:
        """
        Create edges from the device node to text nodes.
        """
        triples: list[Triple] = []

        for element_tag in cls.device_elements_with_text_nodes:
            if child := get_children(
                di_root, child_tag=element_tag, ignore_namespace=True, include_nested_children=True, no_children=1
            ):
                if text_id := child[0].attrib.get("textId"):
                    # Create connection from device to textId node
                    element_tag = to_camel(element_tag)
                    triples.append((id, IODD[element_tag], namespace[text_id]))

        return triples

    @classmethod
    def _text_elements2triples(cls, et_root: Element, namespace: Namespace) -> list[Triple]:
        """
        This method extracts all text item triples under the ExternalTextCollection element. This will create a node
        for each text item, and add the text value as a property to the node.
        """
        triples: list[Triple] = []

        if language_element := get_children(et_root, "PrimaryLanguage", ignore_namespace=True, no_children=1):
            if (
                language_element[0].attrib.get("{http://www.w3.org/XML/1998/namespace}lang")
                == cls.text_elements_language
            ):
                if text_elements := get_children(
                    language_element[0], child_tag="Text", ignore_namespace=True, include_nested_children=True
                ):
                    for element in text_elements:
                        if id := element.attrib.get("id"):
                            text_id = namespace[id]

                            # Create Text node
                            triples.append((text_id, RDF.type, IODD.TextObject))

                            # Resolve text value related to the text item
                            if value := element.attrib.get("value"):
                                triples.append((text_id, IODD.value, Literal(value)))
        return triples

    @classmethod
    def _std_variables2triples(cls, vc_root: Element, namespace: Namespace, device_id: URIRef) -> list[Triple]:
        """
        For simplicity, only extract the two items we want for this use case - V_ApplicationSpecificTag and
        V_SerialNumber
        """
        triples: list[Triple] = []

        if std_variable_elements := get_children(vc_root, child_tag="StdVariableRef", ignore_namespace=True):
            for element in std_variable_elements:
                if id := element.attrib.get("id"):
                    if id in cls.std_variable_elements_to_extract:
                        if object := element.attrib.get("defaultValue"):
                            predicate = to_camel(id.replace("V_", ""))
                            triples.append((device_id, IODD[predicate], Literal(object)))
        return triples

    @classmethod
    def _variables_data_collection2triples(
        cls, vc_root: Element, namespace: Namespace, device_id: URIRef
    ) -> list[Triple]:
        """
        VariableCollection contains elements that references Variables and StdVariables. The StdVariables
        can be resolved by looking up the ID in the IODD-StandardDefinitions1.1.xml sheet.

        The Variable elements are descriptions of the sensors collecting data for the device.
        """
        triples: list[Triple] = []

        # StdVariableRef elements of interest
        triples.extend(cls._std_variables2triples(vc_root, namespace, device_id))

        # Variable elements (these are the descriptions of the sensors)
        if variable_elements := get_children(vc_root, child_tag="Variable", ignore_namespace=True):
            for element in variable_elements:
                if id := element.attrib.get("id"):
                    device_id_str = remove_namespace_from_uri(device_id)
                    variable_id = f"{device_id_str}.{id}"

                    # Create connection from device node to time series
                    triples.append((device_id, IODD.variable, Literal(variable_id, datatype=XSD["timeseries"])))

        return triples

    @classmethod
    def _iodd_device_identity2triples(cls, di_root: Element, namespace: Namespace, device_id: URIRef) -> list[Triple]:
        """
        Properties and metadata related to the IO Device are described under the 'DeviceIdentity' element in the XML.
        This method extracts the triples that describe the device's identity which is found under the
        DeviceIdentity element and its child elements.

        """
        triples: list[Triple] = []

        # Create rdf type triple for IODD
        triples.append(
            (
                device_id,
                RDF.type,
                IODD.IoddDevice,
            )
        )

        for attribute_name, attribute_value in di_root.attrib.items():
            triples.append((device_id, IODD[attribute_name], Literal(attribute_value)))

        triples.extend(cls._device_2text_elements_edges(di_root, device_id, namespace))
        return triples

    @classmethod
    def _process_data_in_records2triples(cls, pc_in_root: Element, process_data_in_id: URIRef):
        """
        Extract RecordItems related to a ProcessDataIn element. Each record item is indexed. Will use this index
        as the identifier for the time series in CDF.
        """
        triples: list[Triple] = []

        if record_items := get_children(pc_in_root, "RecordItem", ignore_namespace=True, include_nested_children=True):
            for record in record_items:
                if index := record.attrib.get("subindex"):
                    process_id_str = remove_namespace_from_uri(process_data_in_id)
                    record_id = f"{process_id_str}.{index}"
                    # Create connection from device node to time series
                    triples.append((process_data_in_id, IODD.variable, Literal(record_id, datatype=XSD["timeseries"])))

        return triples

    def extract(self) -> list[Triple]:
        """
        Extract RDF triples from IODD XML
        """
        return self._from_root2triples(self.root, self.namespace, self.device_id)

    def _variable2info(self, variable_element: Element) -> dict:
        """
        !!! note used for "Prototype Solution" !!!
        Extracts information relevant to a CDF time series type from a Variable element
        """

        variable_dict = {}

        if name := get_children(
            variable_element, child_tag="Name", ignore_namespace=True, include_nested_children=False, no_children=1
        ):
            if text_id := name[0].get("textId"):
                variable_dict["name"] = self._text_id_2value_mapping[text_id]
        if description := get_children(
            variable_element,
            child_tag="Description",
            ignore_namespace=True,
            include_nested_children=False,
            no_children=1,
        ):
            if text_id := description[0].get("textId"):
                variable_dict["description"] = self._text_id_2value_mapping[text_id]
        if data_type := get_children(
            variable_element, child_tag="Datatype", ignore_namespace=True, include_nested_children=False, no_children=1
        ):
            variable_dict["data_type"] = data_type[0].attrib[f"{XSI_XML_PREFIX}type"]

        return variable_dict

    def _process_record2info(self, record_element: Element) -> dict:
        """
        !!! note used for "Prototype Solution" !!!
        Extracts information relevant to a CDF time series type from a Record element
        """
        record_dict = {}

        if name := get_children(
            record_element, child_tag="Name", ignore_namespace=True, include_nested_children=False, no_children=1
        ):
            if text_id := name[0].get("textId"):
                record_dict["name"] = self._text_id_2value_mapping[text_id]
        if description := get_children(
            record_element, child_tag="Description", ignore_namespace=True, include_nested_children=False, no_children=1
        ):
            if text_id := description[0].get("textId"):
                record_dict["description"] = self._text_id_2value_mapping[text_id]
        if data_type := get_children(
            record_element,
            child_tag="SimpleDatatype",
            ignore_namespace=True,
            include_nested_children=False,
            no_children=1,
        ):
            record_dict["data_type"] = data_type[0].attrib[f"{XSI_XML_PREFIX}type"]
        if index := record_element.attrib.get("subindex"):
            record_dict["index"] = index

        return record_dict

    def _extract_enhanced_ts_information(self, json_file_path: Path):
        """
        Extract additional information like name, description and data type for Variables and ProcessDataIn
        record elements in the IODD. The purpose is for the result gile to be used for enhancing time series with more
        information when they are created in CDF.

        Args:
            json_file_path: file path for where to write the extracted information about all time series
                            in the IODD

        !!! note "Prototype Solution" !!!
        This is an intermediate solution while better support for adding this information directly
        into the knowledge graph for the timeseries node type is under development.
        """
        import json

        ts_ext_id2_info_map = {}

        # Variable elements (these are the descriptions of the sensors)
        if variable_elements := get_children(
            self.root, child_tag="Variable", ignore_namespace=True, include_nested_children=True
        ):
            for element in variable_elements:
                if id := element.attrib.get("id"):
                    device_id_str = remove_namespace_from_uri(self.device_id)
                    variable_id = f"{device_id_str}.{id}"
                    ts_ext_id2_info_map[variable_id] = self._variable2info(element)

        if process_data_in := get_children(
            self.root, "ProcessDataIn", ignore_namespace=True, include_nested_children=True
        ):
            for process_data_element in process_data_in:
                if p_id := process_data_element.attrib.get("id"):
                    device_id_str = remove_namespace_from_uri(self.device_id)
                    process_data_in_id = f"{device_id_str}.{p_id}"
                    if record_items := get_children(
                        process_data_element, "RecordItem", ignore_namespace=True, include_nested_children=True
                    ):
                        for record in record_items:
                            if index := record.attrib.get("subindex"):
                                process_record_id = f"{process_data_in_id}.{index}"
                                ts_ext_id2_info_map[process_record_id] = self._process_record2info(record)

        with Path.open(json_file_path, "w") as fp:
            json.dump(ts_ext_id2_info_map, fp, indent=2)

extract() #

Extract RDF triples from IODD XML

Source code in cognite/neat/_graph/extractors/_iodd.py
def extract(self) -> list[Triple]:
    """
    Extract RDF triples from IODD XML
    """
    return self._from_root2triples(self.root, self.namespace, self.device_id)

MockGraphGenerator #

Bases: BaseExtractor

Class used to generate mock graph data for purposes of testing of NEAT.

Parameters:

Name Type Description Default
rules InformationRules | DMSRules

Transformation rules defining the classes with their properties.

required
class_count dict[str | ClassEntity, int] | None

Target class count for each class in the ontology

None
stop_on_exception bool

To stop if exception is encountered or not, default is False

False
allow_isolated_classes bool

To allow generation of instances for classes that are not connected to any other class, default is True

True
Source code in cognite/neat/_graph/extractors/_mock_graph_generator.py
class MockGraphGenerator(BaseExtractor):
    """
    Class used to generate mock graph data for purposes of testing of NEAT.

    Args:
        rules: Transformation rules defining the classes with their properties.
        class_count: Target class count for each class in the ontology
        stop_on_exception: To stop if exception is encountered or not, default is False
        allow_isolated_classes: To allow generation of instances for classes that are not
                                 connected to any other class, default is True
    """

    def __init__(
        self,
        rules: InformationRules | DMSRules,
        class_count: dict[str | ClassEntity, int] | None = None,
        stop_on_exception: bool = False,
        allow_isolated_classes: bool = True,
    ):
        if isinstance(rules, DMSRules):
            # fixes potential issues with circular dependencies
            from cognite.neat._rules.transformers import DMSToInformation

            self.rules = DMSToInformation().transform(rules).rules
        elif isinstance(rules, InformationRules):
            self.rules = rules
        else:
            raise ValueError("Rules must be of type InformationRules or DMSRules!")

        if not class_count:
            self.class_count = {
                class_: 1 for class_ in InformationAnalysis(self.rules).defined_classes(consider_inheritance=True)
            }
        elif all(isinstance(key, str) for key in class_count.keys()):
            self.class_count = {
                ClassEntity.load(f"{self.rules.metadata.prefix}:{key}"): value for key, value in class_count.items()
            }
        elif all(isinstance(key, ClassEntity) for key in class_count.keys()):
            self.class_count = cast(dict[ClassEntity, int], class_count)
        else:
            raise ValueError("Class count keys must be of type str! or ClassEntity! or empty dict!")

        self.stop_on_exception = stop_on_exception
        self.allow_isolated_classes = allow_isolated_classes

    def extract(self) -> list[Triple]:
        """Generate mock triples based on data model defined transformation rules and desired number
        of class instances

        Returns:
            List of RDF triples, represented as tuples `(subject, predicate, object)`, that define data model instances
        """
        return generate_triples(
            self.rules,
            self.class_count,
            stop_on_exception=self.stop_on_exception,
            allow_isolated_classes=self.allow_isolated_classes,
        )

extract() #

Generate mock triples based on data model defined transformation rules and desired number of class instances

Returns:

Type Description
list[Triple]

List of RDF triples, represented as tuples (subject, predicate, object), that define data model instances

Source code in cognite/neat/_graph/extractors/_mock_graph_generator.py
def extract(self) -> list[Triple]:
    """Generate mock triples based on data model defined transformation rules and desired number
    of class instances

    Returns:
        List of RDF triples, represented as tuples `(subject, predicate, object)`, that define data model instances
    """
    return generate_triples(
        self.rules,
        self.class_count,
        stop_on_exception=self.stop_on_exception,
        allow_isolated_classes=self.allow_isolated_classes,
    )

RdfFileExtractor #

Bases: BaseExtractor

Extract data from RDF files into Neat.

Parameters:

Name Type Description Default
filepath Path

The path to the RDF file.

required
mime_type MIMETypes

The MIME type of the RDF file. Defaults to "application/rdf+xml".

required
base_uri URIRef

The base URI to use. Defaults to None.

DEFAULT_BASE_URI
Source code in cognite/neat/_graph/extractors/_rdf_file.py
class RdfFileExtractor(BaseExtractor):
    """Extract data from RDF files into Neat.

    Args:
        filepath (Path): The path to the RDF file.
        mime_type (MIMETypes, optional): The MIME type of the RDF file. Defaults to "application/rdf+xml".
        base_uri (URIRef, optional): The base URI to use. Defaults to None.
    """

    def __init__(
        self,
        filepath: Path,
        base_uri: URIRef = DEFAULT_BASE_URI,
        issue_list: IssueList | None = None,
    ):
        self.issue_list = issue_list or IssueList(title=f"{filepath.name}")

        self.filepath = filepath
        self.mime_type = rdflib_to_mime_types(cast(str, guess_format(str(self.filepath))))
        self.base_uri = base_uri

        if not self.filepath.exists():
            self.issue_list.append(FileNotFoundNeatError(self.filepath))

        if not self.mime_type:
            self.issue_list.append(
                FileTypeUnexpectedError(
                    self.filepath,
                    frozenset([".rdf", ".ttl", ".nt", ".n3", ".owl", ".nq", ".trig"]),
                )
            )

    def extract(self) -> Iterable[Triple]:
        raise NotImplementedError()