Skip to content

Extractors

cognite.neat.graph.extractors #

BaseExtractor #

This is the base class for all extractors. It defines the interface that extractors must implement.

Source code in cognite/neat/graph/extractors/_base.py
class BaseExtractor:
    """This is the base class for all extractors. It defines the interface that
    extractors must implement.
    """

    @abstractmethod
    def extract(self) -> Iterable[Triple]:
        raise NotImplementedError()

    @classmethod
    def _repr_html_(cls) -> str:
        return class_html_doc(cls)

AssetsExtractor #

Bases: ClassicCDFExtractor[Asset]

Extract data from Cognite Data Fusions Assets into Neat.

Parameters:

Name Type Description Default
items Iterable[Asset]

An iterable of assets.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Asset], str | None]

A function to convert an asset to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type "Asset".

None
total int

The total number of assets to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of assets to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

A set of values to skip when unpacking metadata. Defaults to frozenset({"nan", "null", "none", ""}).

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_assets.py
class AssetsExtractor(ClassicCDFExtractor[Asset]):
    """Extract data from Cognite Data Fusions Assets into Neat.

    Args:
        items (Iterable[Asset]): An iterable of assets.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Asset], str | None], optional): A function to convert an asset to a type. Defaults to None.
            If None or if the function returns None, the asset will be set to the default type "Asset".
        total (int, optional): The total number of assets to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of assets to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): A set of values to skip when unpacking
            metadata. Defaults to frozenset({"nan", "null", "none", ""}).
    """

    _default_rdf_type = "Asset"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Asset], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.assets.aggregate_count(filter=AssetFilter(data_set_ids=[{"externalId": data_set_external_id}]))

        return cls(
            client.assets(data_set_external_ids=data_set_external_id),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_hierarchy(
        cls,
        client: CogniteClient,
        root_asset_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Asset], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.assets.aggregate_count(
            filter=AssetFilter(asset_subtree_ids=[{"externalId": root_asset_external_id}])
        )

        return cls(
            cast(
                Iterable[Asset],
                client.assets(asset_subtree_external_ids=root_asset_external_id),
            ),
            namespace,
            to_type,
            total,
            limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Asset], str] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        assets = AssetList.load(Path(file_path).read_text())
        return cls(
            assets,
            namespace,
            to_type,
            total=len(assets),
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, asset: Asset) -> list[Triple]:
        """Converts an asset to triples."""
        id_ = self.namespace[f"Asset_{asset.id}"]

        type_ = self._get_rdf_type(asset)

        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        if asset.name:
            triples.append((id_, self.namespace.name, Literal(asset.name)))

        if asset.description:
            triples.append((id_, self.namespace.description, Literal(asset.description)))

        if asset.external_id:
            triples.append((id_, self.namespace.external_id, Literal(asset.external_id)))

        if asset.source:
            triples.append((id_, self.namespace.source, Literal(asset.source)))

        # properties' ref creation and update
        triples.append(
            (
                id_,
                self.namespace.created_time,
                Literal(datetime.fromtimestamp(asset.created_time / 1000, timezone.utc)),
            )
        )
        triples.append(
            (
                id_,
                self.namespace.last_updated_time,
                Literal(datetime.fromtimestamp(asset.last_updated_time / 1000, timezone.utc)),
            )
        )

        if asset.labels:
            for label in asset.labels:
                # external_id can create ill-formed URIs, so we create websafe URIs
                # since labels do not have internal ids, we use the external_id as the id
                triples.append(
                    (
                        id_,
                        self.namespace.label,
                        self.namespace[f"Label_{create_sha256_hash(label.dump()['externalId'])}"],
                    )
                )

        if asset.metadata:
            triples.extend(self._metadata_to_triples(id_, asset.metadata))

        # Create connections:
        if asset.parent_id:
            triples.append((id_, self.namespace.parent, self.namespace[f"Asset_{asset.parent_id}"]))

        if asset.root_id:
            triples.append((id_, self.namespace.root, self.namespace[f"Asset_{asset.root_id}"]))

        if asset.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.dataset,
                    self.namespace[f"Dataset_{asset.data_set_id}"],
                )
            )

        return triples

EventsExtractor #

Bases: ClassicCDFExtractor[Event]

Extract data from Cognite Data Fusions Events into Neat.

Parameters:

Name Type Description Default
items Iterable[Event]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Event], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_events.py
class EventsExtractor(ClassicCDFExtractor[Event]):
    """Extract data from Cognite Data Fusions Events into Neat.

    Args:
        items (Iterable[Event]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Event], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Event"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Event], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.events.aggregate_count(filter=EventFilter(data_set_ids=[{"externalId": data_set_external_id}]))

        return cls(
            client.events(data_set_external_ids=data_set_external_id),
            namespace,
            to_type,
            total=total,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Event], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        events = EventList.load(Path(file_path).read_text())

        return cls(
            events,
            namespace,
            to_type,
            total=len(events),
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, event: Event) -> list[Triple]:
        id_ = self.namespace[f"Event_{event.id}"]

        type_ = self._get_rdf_type(event)

        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes

        if event.external_id:
            triples.append((id_, self.namespace.external_id, Literal(event.external_id)))

        if event.source:
            triples.append((id_, self.namespace.type, Literal(event.source)))

        if event.type:
            triples.append((id_, self.namespace.type, Literal(event.type)))

        if event.subtype:
            triples.append((id_, self.namespace.subtype, Literal(event.subtype)))

        if event.metadata:
            triples.extend(self._metadata_to_triples(id_, event.metadata))

        if event.description:
            triples.append((id_, self.namespace.description, Literal(event.description)))

        if event.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(event.created_time / 1000, timezone.utc)),
                )
            )

        if event.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(event.last_updated_time / 1000, timezone.utc)),
                )
            )

        if event.start_time:
            triples.append(
                (
                    id_,
                    self.namespace.start_time,
                    Literal(datetime.fromtimestamp(event.start_time / 1000, timezone.utc)),
                )
            )

        if event.end_time:
            triples.append(
                (
                    id_,
                    self.namespace.end_time,
                    Literal(datetime.fromtimestamp(event.end_time / 1000, timezone.utc)),
                )
            )

        if event.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"Dataset_{event.data_set_id}"],
                )
            )

        if event.asset_ids:
            for asset_id in event.asset_ids:
                triples.append((id_, self.namespace.asset, self.namespace[f"Asset_{asset_id}"]))

        return triples

FilesExtractor #

Bases: ClassicCDFExtractor[FileMetadata]

Extract data from Cognite Data Fusions files metadata into Neat.

Parameters:

Name Type Description Default
items Iterable[FileMetadata]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[FileMetadata], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_files.py
class FilesExtractor(ClassicCDFExtractor[FileMetadata]):
    """Extract data from Cognite Data Fusions files metadata into Neat.

    Args:
        items (Iterable[FileMetadata]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[FileMetadata], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "File"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[FileMetadata], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.files(data_set_external_ids=data_set_external_id),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[FileMetadata], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        file_metadata = FileMetadataList.load(Path(file_path).read_text())
        return cls(
            file_metadata,
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            total=len(file_metadata),
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, file: FileMetadata) -> list[Triple]:
        id_ = self.namespace[f"File_{file.id}"]

        type_ = self._get_rdf_type(file)

        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes

        if file.external_id:
            triples.append((id_, self.namespace.external_id, Literal(file.external_id)))

        if file.source:
            triples.append((id_, self.namespace.type, Literal(file.source)))

        if file.mime_type:
            triples.append((id_, self.namespace.mime_type, Literal(file.mime_type)))

        if file.uploaded:
            triples.append((id_, self.namespace.uploaded, Literal(file.uploaded)))

        if file.source:
            triples.append((id_, self.namespace.source, Literal(file.source)))

        if file.metadata:
            triples.extend(self._metadata_to_triples(id_, file.metadata))

        if file.source_created_time:
            triples.append(
                (
                    id_,
                    self.namespace.source_created_time,
                    Literal(datetime.fromtimestamp(file.source_created_time / 1000, timezone.utc)),
                )
            )
        if file.source_modified_time:
            triples.append(
                (
                    id_,
                    self.namespace.source_created_time,
                    Literal(datetime.fromtimestamp(file.source_modified_time / 1000, timezone.utc)),
                )
            )
        if file.uploaded_time:
            triples.append(
                (
                    id_,
                    self.namespace.uploaded_time,
                    Literal(datetime.fromtimestamp(file.uploaded_time / 1000, timezone.utc)),
                )
            )

        if file.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(file.created_time / 1000, timezone.utc)),
                )
            )

        if file.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(file.last_updated_time / 1000, timezone.utc)),
                )
            )

        if file.labels:
            for label in file.labels:
                # external_id can create ill-formed URIs, so we create websafe URIs
                # since labels do not have internal ids, we use the external_id as the id
                triples.append(
                    (
                        id_,
                        self.namespace.label,
                        self.namespace[f"Label_{quote(label.dump()['externalId'])}"],
                    )
                )

        if file.security_categories:
            for category in file.security_categories:
                triples.append((id_, self.namespace.security_categories, Literal(category)))

        if file.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"Dataset_{file.data_set_id}"],
                )
            )

        if file.asset_ids:
            for asset_id in file.asset_ids:
                triples.append((id_, self.namespace.asset, self.namespace[f"Asset_{asset_id}"]))

        return triples

LabelsExtractor #

Bases: ClassicCDFExtractor[LabelDefinition]

Extract data from Cognite Data Fusions Labels into Neat.

Parameters:

Name Type Description Default
items Iterable[LabelDefinition]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[LabelDefinition], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_labels.py
class LabelsExtractor(ClassicCDFExtractor[LabelDefinition]):
    """Extract data from Cognite Data Fusions Labels into Neat.

    Args:
        items (Iterable[LabelDefinition]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[LabelDefinition], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Label"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[LabelDefinition], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.labels(data_set_external_ids=data_set_external_id),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[LabelDefinition], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        labels = LabelDefinitionList.load(Path(file_path).read_text())
        return cls(
            labels,
            total=len(labels),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, label: LabelDefinition) -> list[Triple]:
        if not label.external_id:
            return []

        id_ = self.namespace[f"Label_{create_sha256_hash(label.external_id)}"]

        type_ = self._get_rdf_type(label)
        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        triples.append((id_, self.namespace.external_id, Literal(label.external_id)))

        if label.name:
            triples.append((id_, self.namespace.name, Literal(label.name)))

        if label.description:
            triples.append((id_, self.namespace.description, Literal(label.description)))

        if label.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(label.created_time / 1000, timezone.utc)),
                )
            )

        if label.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"Dataset_{label.data_set_id}"],
                )
            )

        return triples

RelationshipsExtractor #

Bases: ClassicCDFExtractor[Relationship]

Extract data from Cognite Data Fusions Relationships into Neat.

Parameters:

Name Type Description Default
items Iterable[Relationship]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Relationship], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_relationships.py
class RelationshipsExtractor(ClassicCDFExtractor[Relationship]):
    """Extract data from Cognite Data Fusions Relationships into Neat.

    Args:
        items (Iterable[Relationship]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Relationship], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Relationship"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Relationship], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        return cls(
            client.relationships(data_set_external_ids=data_set_external_id),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Relationship], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        relationships = RelationshipList.load(Path(file_path).read_text())
        return cls(
            relationships,
            namespace=namespace,
            total=len(relationships),
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, relationship: Relationship) -> list[Triple]:
        """Converts an asset to triples."""

        if relationship.external_id and relationship.source_external_id and relationship.target_external_id:
            # relationships do not have an internal id, so we generate one
            id_ = self.namespace[f"Relationship_{create_sha256_hash(relationship.external_id)}"]

            type_ = self._get_rdf_type(relationship)
            # Set rdf type
            triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

            # Set source and target types
            if source_type := relationship.source_type:
                triples.append(
                    (
                        id_,
                        self.namespace.source_type,
                        self.namespace[source_type.title()],
                    )
                )

            if target_type := relationship.target_type:
                triples.append(
                    (
                        id_,
                        self.namespace.target_type,
                        self.namespace[target_type.title()],
                    )
                )

            # Create attributes

            triples.append((id_, self.namespace.external_id, Literal(relationship.external_id)))

            triples.append(
                (
                    id_,
                    self.namespace.source_external_id,
                    Literal(relationship.source_external_id),
                )
            )

            triples.append(
                (
                    id_,
                    self.namespace.target_external_id,
                    Literal(relationship.target_external_id),
                )
            )

            if relationship.start_time:
                triples.append(
                    (
                        id_,
                        self.namespace.start_time,
                        Literal(datetime.fromtimestamp(relationship.start_time / 1000, timezone.utc)),
                    )
                )

            if relationship.end_time:
                triples.append(
                    (
                        id_,
                        self.namespace.end_time,
                        Literal(datetime.fromtimestamp(relationship.end_time / 1000, timezone.utc)),
                    )
                )

            if relationship.created_time:
                triples.append(
                    (
                        id_,
                        self.namespace.created_time,
                        Literal(datetime.fromtimestamp(relationship.created_time / 1000, timezone.utc)),
                    )
                )

            if relationship.last_updated_time:
                triples.append(
                    (
                        id_,
                        self.namespace.last_updated_time,
                        Literal(datetime.fromtimestamp(relationship.last_updated_time / 1000, timezone.utc)),
                    )
                )

            if relationship.confidence:
                triples.append(
                    (
                        id_,
                        self.namespace.confidence,
                        Literal(relationship.confidence),
                    )
                )

            if relationship.labels:
                for label in relationship.labels:
                    # external_id can create ill-formed URIs, so we create websafe URIs
                    # since labels do not have internal ids, we use the external_id as the id
                    triples.append(
                        (
                            id_,
                            self.namespace.label,
                            self.namespace[f"Label_{quote(label.dump()['externalId'])}"],
                        )
                    )

            # Create connection
            if relationship.data_set_id:
                triples.append(
                    (
                        id_,
                        self.namespace.dataset,
                        self.namespace[f"Dataset_{relationship.data_set_id}"],
                    )
                )

            return triples
        return []

SequencesExtractor #

Bases: ClassicCDFExtractor[Sequence]

Extract data from Cognite Data Fusions Sequences into Neat.

Parameters:

Name Type Description Default
items Iterable[Sequence]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[Sequence], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_sequences.py
class SequencesExtractor(ClassicCDFExtractor[Sequence]):
    """Extract data from Cognite Data Fusions Sequences into Neat.

    Args:
        items (Iterable[Sequence]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[Sequence], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "Sequence"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Sequence], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.sequences.aggregate_count(
            filter=SequenceFilter(data_set_ids=[{"externalId": data_set_external_id}])
        )
        return cls(
            client.sequences(data_set_external_ids=data_set_external_id),
            total=total,
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[Sequence], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        sequences = SequenceList.load(Path(file_path).read_text())
        return cls(
            sequences,
            total=len(sequences),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, sequence: Sequence) -> list[Triple]:
        id_ = self.namespace[f"Sequence_{sequence.id}"]

        type_ = self._get_rdf_type(sequence)
        # Set rdf type
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes

        if sequence.external_id:
            triples.append((id_, self.namespace.external_id, Literal(sequence.external_id)))

        if sequence.name:
            triples.append((id_, self.namespace.name, Literal(sequence.name)))

        if sequence.metadata:
            triples.extend(self._metadata_to_triples(id_, sequence.metadata))

        if sequence.description:
            triples.append((id_, self.namespace.description, Literal(sequence.description)))

        if sequence.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(sequence.created_time / 1000, timezone.utc)),
                )
            )

        if sequence.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(sequence.last_updated_time / 1000, timezone.utc)),
                )
            )

        if sequence.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.data_set_id,
                    self.namespace[f"Dataset_{sequence.data_set_id}"],
                )
            )

        if sequence.asset_id:
            triples.append(
                (
                    id_,
                    self.namespace.asset,
                    self.namespace[f"Asset_{sequence.asset_id}"],
                )
            )

        return triples

TimeSeriesExtractor #

Bases: ClassicCDFExtractor[TimeSeries]

Extract data from Cognite Data Fusions TimeSeries into Neat.

Parameters:

Name Type Description Default
items Iterable[TimeSeries]

An iterable of items.

required
namespace Namespace

The namespace to use. Defaults to DEFAULT_NAMESPACE.

None
to_type Callable[[TimeSeries], str | None]

A function to convert an item to a type. Defaults to None. If None or if the function returns None, the asset will be set to the default type.

None
total int

The total number of items to load. If passed, you will get a progress bar if rich is installed. Defaults to None.

None
limit int

The maximal number of items to load. Defaults to None. This is typically used for testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to limit the extraction to 1000 assets to test the setup.

None
unpack_metadata bool

Whether to unpack metadata. Defaults to False, which yields the metadata as a JSON string.

True
skip_metadata_values set[str] | frozenset[str] | None

If you are unpacking metadata, then values in this set will be skipped.

DEFAULT_SKIP_METADATA_VALUES
Source code in cognite/neat/graph/extractors/_classic_cdf/_timeseries.py
class TimeSeriesExtractor(ClassicCDFExtractor[TimeSeries]):
    """Extract data from Cognite Data Fusions TimeSeries into Neat.

    Args:
        items (Iterable[TimeSeries]): An iterable of items.
        namespace (Namespace, optional): The namespace to use. Defaults to DEFAULT_NAMESPACE.
        to_type (Callable[[TimeSeries], str | None], optional): A function to convert an item to a type.
            Defaults to None. If None or if the function returns None, the asset will be set to the default type.
        total (int, optional): The total number of items to load. If passed, you will get a progress bar if rich
            is installed. Defaults to None.
        limit (int, optional): The maximal number of items to load. Defaults to None. This is typically used for
            testing setup of the extractor. For example, if you are extracting 100 000 assets, you might want to
            limit the extraction to 1000 assets to test the setup.
        unpack_metadata (bool, optional): Whether to unpack metadata. Defaults to False, which yields the metadata as
            a JSON string.
        skip_metadata_values (set[str] | frozenset[str] | None, optional): If you are unpacking metadata, then
           values in this set will be skipped.
    """

    _default_rdf_type = "TimeSeries"

    @classmethod
    def from_dataset(
        cls,
        client: CogniteClient,
        data_set_external_id: str,
        namespace: Namespace | None = None,
        to_type: Callable[[TimeSeries], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        total = client.time_series.aggregate_count(
            filter=TimeSeriesFilter(data_set_ids=[{"externalId": data_set_external_id}])
        )

        return cls(
            client.time_series(data_set_external_ids=data_set_external_id),
            total=total,
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    @classmethod
    def from_file(
        cls,
        file_path: str,
        namespace: Namespace | None = None,
        to_type: Callable[[TimeSeries], str | None] | None = None,
        limit: int | None = None,
        unpack_metadata: bool = True,
        skip_metadata_values: Set[str] | None = DEFAULT_SKIP_METADATA_VALUES,
    ):
        timeseries = TimeSeriesList.load(Path(file_path).read_text())
        return cls(
            timeseries,
            total=len(timeseries),
            namespace=namespace,
            to_type=to_type,
            limit=limit,
            unpack_metadata=unpack_metadata,
            skip_metadata_values=skip_metadata_values,
        )

    def _item2triples(self, timeseries: TimeSeries) -> list[Triple]:
        id_ = self.namespace[f"TimeSeries_{timeseries.id}"]

        # Set rdf type
        type_ = self._get_rdf_type(timeseries)
        triples: list[Triple] = [(id_, RDF.type, self.namespace[type_])]

        # Create attributes
        if timeseries.external_id:
            triples.append((id_, self.namespace.external_id, Literal(timeseries.external_id)))

        if timeseries.name:
            triples.append((id_, self.namespace.name, Literal(timeseries.name)))

        if timeseries.is_string:
            triples.append((id_, self.namespace.is_string, Literal(timeseries.is_string)))

        if timeseries.metadata:
            triples.extend(self._metadata_to_triples(id_, timeseries.metadata))

        if timeseries.unit:
            triples.append((id_, self.namespace.unit, Literal(timeseries.unit)))

        if self.namespace.is_step:
            triples.append((id_, self.namespace.is_step, Literal(timeseries.is_step)))

        if timeseries.description:
            triples.append((id_, self.namespace.description, Literal(timeseries.description)))

        if timeseries.security_categories:
            for category in timeseries.security_categories:
                triples.append((id_, self.namespace.security_categories, Literal(category)))

        if timeseries.created_time:
            triples.append(
                (
                    id_,
                    self.namespace.created_time,
                    Literal(datetime.fromtimestamp(timeseries.created_time / 1000, timezone.utc)),
                )
            )

        if timeseries.last_updated_time:
            triples.append(
                (
                    id_,
                    self.namespace.last_updated_time,
                    Literal(datetime.fromtimestamp(timeseries.last_updated_time / 1000, timezone.utc)),
                )
            )

        if timeseries.legacy_name:
            triples.append((id_, self.namespace.legacy_name, Literal(timeseries.legacy_name)))

        # Create connections
        if timeseries.unit_external_id:
            # try to create connection to QUDT unit catalog
            try:
                triples.append(
                    (
                        id_,
                        self.namespace.unit_external_id,
                        URIRef(str(AnyHttpUrl(timeseries.unit_external_id))),
                    )
                )
            except ValidationError:
                triples.append(
                    (
                        id_,
                        self.namespace.unit_external_id,
                        Literal(timeseries.unit_external_id),
                    )
                )

        if timeseries.data_set_id:
            triples.append(
                (
                    id_,
                    self.namespace.dataset,
                    self.namespace[f"Dataset_{timeseries.data_set_id}"],
                )
            )

        if timeseries.asset_id:
            triples.append(
                (
                    id_,
                    self.namespace.asset,
                    self.namespace[f"Asset_{timeseries.asset_id}"],
                )
            )

        return triples

DexpiExtractor #

Bases: BaseExtractor

DEXPI-XML extractor of RDF triples

Parameters:

Name Type Description Default
root Element

XML root element of DEXPI file.

required
namespace Namespace | None

Optional custom namespace to use for extracted triples that define data model instances. Defaults to DEFAULT_NAMESPACE.

None
Source code in cognite/neat/graph/extractors/_dexpi.py
class DexpiExtractor(BaseExtractor):
    """
    DEXPI-XML extractor of RDF triples

    Args:
        root: XML root element of DEXPI file.
        namespace: Optional custom namespace to use for extracted triples that define data
                    model instances. Defaults to DEFAULT_NAMESPACE.
    """

    def __init__(
        self,
        root: Element,
        namespace: Namespace | None = None,
    ):
        self.root = root
        self.namespace = namespace or DEFAULT_NAMESPACE

    @classmethod
    def from_file(cls, filepath: str | Path, namespace: Namespace | None = None):
        return cls(ET.parse(filepath).getroot(), namespace)

    @classmethod
    def from_url(cls, url: str, namespace: Namespace | None = None):
        from io import BytesIO

        import requests

        response = requests.get(url)
        response.raise_for_status()
        return cls(ET.parse(BytesIO(response.content)).getroot(), namespace)

    def extract(self) -> Iterable[Triple]:
        """Extracts RDF triples from DEXPI XML file."""

        for element in iterate_tree(self.root):
            yield from self._element2triples(element, self.namespace)

    @classmethod
    def _element2triples(cls, element: Element, namespace: Namespace) -> list[Triple]:
        """Converts an element to triples."""
        triples: list[Triple] = []

        if (
            "ComponentClass" in element.attrib
            and element.attrib["ComponentClass"] != "Label"
            and "ID" in element.attrib
        ):
            id_ = namespace[element.attrib["ID"]]

            if node_triples := cls._element2node_triples(id_, element):
                triples.extend(node_triples)

            if edge_triples := cls._element2edge_triples(id_, element, namespace):
                triples.extend(edge_triples)

        return triples

    @classmethod
    def _element2edge_triples(cls, id_: URIRef, element: Element, namespace: Namespace) -> list[Triple]:
        triples: list[Triple] = []

        # connection triples
        if connections := get_children(element, "Connection"):
            for connection in connections:
                if "FromID" in connection.attrib and "ToID" in connection.attrib:
                    triples.append(
                        (
                            namespace[connection.attrib["FromID"]],
                            DEXPI.connection,
                            namespace[connection.attrib["ToID"]],
                        )
                    )

        # association triples
        if associations := get_children(element, "Association"):
            for association in associations:
                if "Type" in association.attrib and "ItemID" in association.attrib:
                    association_type = cls._to_uri_friendly_association_type(association)

                    triples.append(
                        (
                            id_,
                            DEXPI[f"association/{association_type}"],
                            namespace[association.attrib["ItemID"]],
                        )
                    )

        # children-parent triples
        for child in element:
            if "ID" in child.attrib and child.tag != "Label":
                camel_case_property = child.tag[0].lower() + child.tag[1:]
                triples.append(
                    (
                        id_,
                        DEXPI[f"children/{camel_case_property}"],
                        namespace[child.attrib["ID"]],
                    )
                )

        return triples

    @classmethod
    def _to_uri_friendly_association_type(cls, association: Element):
        association_type = "".join(
            [word.capitalize() if i != 0 else word for i, word in enumerate(association.attrib["Type"].split(" "))]
        )

        return association_type

    @classmethod
    def _element2node_triples(cls, id_: URIRef, element: Element) -> list[Triple]:
        """Converts an XML element to triples."""
        triples: list[Triple] = []

        # setting these to None this is the order of getting the type
        component_class: str | None = None
        component_name: str | None = None
        tag: str | None = None

        # adding tag triple if exists
        if tag := element.tag:
            triples.append((id_, DEXPI.tag, Literal(str(tag))))

        # adding attributes triples
        if attributes := element.attrib:
            if component_class := attributes.get("ComponentClass", None):
                triples.append((id_, DEXPI.ComponentClass, Literal(component_class)))
            if component_name := attributes.get("ComponentName", None):
                triples.append((id_, DEXPI.ComponentName, Literal(component_name)))
            if component_class_uri := attributes.get("ComponentClassURI", None):
                triples.append((id_, DEXPI.ComponentClassURI, URIRef(component_class_uri)))

        triples.append(
            (
                id_,
                RDF.type,
                as_neat_compliant_uri(DEFAULT_NAMESPACE[component_class or component_name or tag or "Unknown"]),
            )
        )

        # add label triple
        if label := cls._get_element_label(element):
            triples.append((id_, RDFS.label, Literal(label)))

        # add generic attributes triples
        if generic_attributes := cls._get_element_generic_attributes(element):
            for attribute, value_definitions in generic_attributes.items():
                predicate = as_neat_compliant_uri(attribute)
                for value_definition in value_definitions:
                    if literal := cls._value_definition2literal(value_definition):
                        triples.append((id_, predicate, literal))

        return triples

    @classmethod
    def _value_definition2literal(cls, definition: dict, make_unit_datatype: bool = False) -> Literal | None:
        if "Value" not in definition or "Format" not in definition:
            return None

        if "Units" in definition and "Value" in definition:
            if make_unit_datatype and "UnitsURI" in definition:
                return Literal(definition["Value"], datatype=URIRef(definition["UnitsURI"]))

            else:
                return Literal(definition["Value"], datatype=XSD.float)

        # case: when language is present we create add language tag to the literal
        elif "Language" in definition and "Value" in definition:
            return Literal(definition["Value"], lang=definition["Language"])

        # case: when ValueURI is present we use it instead of Value
        # this would be candidate for ENUMs in CDF
        elif "ValueURI" in definition:
            return Literal(definition["ValueURI"], datatype=XSD[definition["Format"]])

        # case: when Format is not string we make sure to add the datatype
        elif definition["Format"].lower() != "string":
            return Literal(definition["Value"], datatype=XSD[definition["Format"]])

        # case: when Format is string we add the literal without datatype (easier to read triples, less noise)
        else:
            return Literal(definition["Value"])

    @classmethod
    def _get_element_label(cls, element: Element) -> str | None:
        if children := get_children(element, "Label", 1):
            if grandchildren := get_children(children[0], "Text", 1):
                if "String" in grandchildren[0].attrib:
                    return grandchildren[0].attrib["String"]

        # extension for schema version 3.3, where text is used to "label" without a <label> parent
        elif children := get_children(element, "Text", 1):
            if "String" in children[0].attrib:
                return children[0].attrib["String"]

        return None

    @classmethod
    def _get_element_generic_attributes(cls, element: Element) -> dict:
        # TODO: This requires more work as there are multiple groupings of GenericAttributes

        attributes = defaultdict(list)
        if children := get_children(element, "GenericAttributes", 1):
            if grandchildren := get_children(children[0], "GenericAttribute"):
                for generic_attribute in grandchildren:
                    # extension for schema version 3.3, where "AttributeURI" is not included
                    if name := generic_attribute.attrib.get("Name", None):
                        attribute_uri = as_neat_compliant_uri(DEFAULT_NAMESPACE[name])
                        if attribute_uri not in attributes:
                            attributes[attribute_uri] = [generic_attribute.attrib]
                        else:
                            attributes[attribute_uri].append(generic_attribute.attrib)

        return attributes

extract() #

Extracts RDF triples from DEXPI XML file.

Source code in cognite/neat/graph/extractors/_dexpi.py
def extract(self) -> Iterable[Triple]:
    """Extracts RDF triples from DEXPI XML file."""

    for element in iterate_tree(self.root):
        yield from self._element2triples(element, self.namespace)

DMSExtractor #

Bases: BaseExtractor

Extract data from Cognite Data Fusion DMS instances into Neat.

Parameters:

Name Type Description Default
items Iterable[Instance]

The items to extract.

required
total int | None

The total number of items to extract. If provided, this will be used to estimate the progress.

None
limit int | None

The maximum number of items to extract.

None
overwrite_namespace Namespace | None

If provided, this will overwrite the space of the extracted items.

None
Source code in cognite/neat/graph/extractors/_dms.py
class DMSExtractor(BaseExtractor):
    """Extract data from Cognite Data Fusion DMS instances into Neat.

    Args:
        items: The items to extract.
        total: The total number of items to extract. If provided, this will be used to estimate the progress.
        limit: The maximum number of items to extract.
        overwrite_namespace: If provided, this will overwrite the space of the extracted items.
    """

    def __init__(
        self,
        items: Iterable[Instance],
        total: int | None = None,
        limit: int | None = None,
        overwrite_namespace: Namespace | None = None,
    ) -> None:
        self.items = items
        self.total = total
        self.limit = limit
        self.overwrite_namespace = overwrite_namespace

    @classmethod
    def from_data_model(
        cls, client: CogniteClient, data_model: DataModelIdentifier, limit: int | None = None
    ) -> "DMSExtractor":
        """Create an extractor from a data model.

        Args:
            client: The Cognite client to use.
            data_model: The data model to extract.
            limit: The maximum number of instances to extract.
        """
        retrieved = client.data_modeling.data_models.retrieve(data_model, inline_views=True)
        if not retrieved:
            raise ResourceRetrievalError(dm.DataModelId.load(data_model), "data model", "Data Model is missing in CDF")
        return cls.from_views(client, retrieved.latest_version().views, limit)

    @classmethod
    def from_views(cls, client: CogniteClient, views: Iterable[dm.View], limit: int | None = None) -> "DMSExtractor":
        """Create an extractor from a set of views.

        Args:
            client: The Cognite client to use.
            views: The views to extract.
            limit: The maximum number of instances to extract.
        """
        return cls(_InstanceIterator(client, views), total=None, limit=limit)

    def extract(self) -> Iterable[Triple]:
        for count, item in enumerate(self.items, 1):
            if self.limit and count > self.limit:
                break
            yield from self._extract_instance(item)

    def _extract_instance(self, instance: Instance) -> Iterable[Triple]:
        if isinstance(instance, dm.Edge):
            if not instance.properties:
                yield (
                    self._as_uri_ref(instance.start_node),
                    self._as_uri_ref(instance.type),
                    self._as_uri_ref(instance.end_node),
                )
                return
            else:
                # If the edge has properties, we create a node for the edge and connect it to the start and end nodes.
                id_ = self._as_uri_ref(instance)
                yield id_, RDF.type, self._as_uri_ref(instance.type)
                yield id_, RDF.type, self._get_namespace(instance.space).Edge
                yield (
                    id_,
                    self._as_uri_ref(dm.DirectRelationReference(instance.space, "startNode")),
                    self._as_uri_ref(instance.start_node),
                )
                yield (
                    id_,
                    self._as_uri_ref(dm.DirectRelationReference(instance.space, "endNode")),
                    self._as_uri_ref(instance.end_node),
                )

        elif isinstance(instance, dm.Node):
            id_ = self._as_uri_ref(instance)
            if instance.type:
                type_ = self._as_uri_ref(cast(dm.DirectRelationReference, instance.type))
            else:
                type_ = self._get_namespace(instance.space).Node

            yield id_, RDF.type, type_
        else:
            raise NotImplementedError(f"Unknown instance type {type(instance)}")

        for view_id, properties in instance.properties.items():
            namespace = self._get_namespace(view_id.space)
            for key, value in properties.items():
                for object_ in self._get_objects(value):
                    yield id_, namespace[key], object_

    def _get_objects(self, value: PropertyValue) -> Iterable[Literal | URIRef]:
        if isinstance(value, str | float | bool | int):
            yield Literal(value)
        elif isinstance(value, dict) and "space" in value and "externalId" in value:
            yield self._as_uri_ref(dm.DirectRelationReference.load(value))
        elif isinstance(value, dict):
            # This object is a json object.
            yield Literal(str(value), datatype=XSD._NS["json"])
        elif isinstance(value, list):
            for item in value:
                yield from self._get_objects(item)

    def _as_uri_ref(self, instance: Instance | dm.DirectRelationReference) -> URIRef:
        return self._get_namespace(instance.space)[instance.external_id]

    def _get_namespace(self, space: str) -> Namespace:
        if self.overwrite_namespace:
            return self.overwrite_namespace
        return Namespace(DEFAULT_SPACE_URI.format(space=space))

from_data_model(client, data_model, limit=None) classmethod #

Create an extractor from a data model.

Parameters:

Name Type Description Default
client CogniteClient

The Cognite client to use.

required
data_model DataModelIdentifier

The data model to extract.

required
limit int | None

The maximum number of instances to extract.

None
Source code in cognite/neat/graph/extractors/_dms.py
@classmethod
def from_data_model(
    cls, client: CogniteClient, data_model: DataModelIdentifier, limit: int | None = None
) -> "DMSExtractor":
    """Create an extractor from a data model.

    Args:
        client: The Cognite client to use.
        data_model: The data model to extract.
        limit: The maximum number of instances to extract.
    """
    retrieved = client.data_modeling.data_models.retrieve(data_model, inline_views=True)
    if not retrieved:
        raise ResourceRetrievalError(dm.DataModelId.load(data_model), "data model", "Data Model is missing in CDF")
    return cls.from_views(client, retrieved.latest_version().views, limit)

from_views(client, views, limit=None) classmethod #

Create an extractor from a set of views.

Parameters:

Name Type Description Default
client CogniteClient

The Cognite client to use.

required
views Iterable[View]

The views to extract.

required
limit int | None

The maximum number of instances to extract.

None
Source code in cognite/neat/graph/extractors/_dms.py
@classmethod
def from_views(cls, client: CogniteClient, views: Iterable[dm.View], limit: int | None = None) -> "DMSExtractor":
    """Create an extractor from a set of views.

    Args:
        client: The Cognite client to use.
        views: The views to extract.
        limit: The maximum number of instances to extract.
    """
    return cls(_InstanceIterator(client, views), total=None, limit=limit)

MockGraphGenerator #

Bases: BaseExtractor

Class used to generate mock graph data for purposes of testing of NEAT.

Parameters:

Name Type Description Default
rules InformationRules | DMSRules

Transformation rules defining the classes with their properties.

required
class_count dict[str | ClassEntity, int] | None

Target class count for each class in the ontology

None
stop_on_exception bool

To stop if exception is encountered or not, default is False

False
allow_isolated_classes bool

To allow generation of instances for classes that are not connected to any other class, default is True

True
Source code in cognite/neat/graph/extractors/_mock_graph_generator.py
class MockGraphGenerator(BaseExtractor):
    """
    Class used to generate mock graph data for purposes of testing of NEAT.

    Args:
        rules: Transformation rules defining the classes with their properties.
        class_count: Target class count for each class in the ontology
        stop_on_exception: To stop if exception is encountered or not, default is False
        allow_isolated_classes: To allow generation of instances for classes that are not
                                 connected to any other class, default is True
    """

    def __init__(
        self,
        rules: InformationRules | DMSRules,
        class_count: dict[str | ClassEntity, int] | None = None,
        stop_on_exception: bool = False,
        allow_isolated_classes: bool = True,
    ):
        if isinstance(rules, DMSRules):
            # fixes potential issues with circular dependencies
            from cognite.neat.rules.transformers import DMSToInformation

            self.rules = DMSToInformation().transform(rules).rules
        elif isinstance(rules, InformationRules):
            self.rules = rules
        else:
            raise ValueError("Rules must be of type InformationRules or DMSRules!")

        if not class_count:
            self.class_count = {
                class_: 1 for class_ in InformationAnalysis(self.rules).defined_classes(consider_inheritance=True)
            }
        elif all(isinstance(key, str) for key in class_count.keys()):
            self.class_count = {
                ClassEntity.load(f"{self.rules.metadata.prefix}:{key}"): value for key, value in class_count.items()
            }
        elif all(isinstance(key, ClassEntity) for key in class_count.keys()):
            self.class_count = cast(dict[ClassEntity, int], class_count)
        else:
            raise ValueError("Class count keys must be of type str! or ClassEntity! or empty dict!")

        self.stop_on_exception = stop_on_exception
        self.allow_isolated_classes = allow_isolated_classes

    def extract(self) -> list[Triple]:
        """Generate mock triples based on data model defined transformation rules and desired number
        of class instances

        Returns:
            List of RDF triples, represented as tuples `(subject, predicate, object)`, that define data model instances
        """
        return generate_triples(
            self.rules,
            self.class_count,
            stop_on_exception=self.stop_on_exception,
            allow_isolated_classes=self.allow_isolated_classes,
        )

extract() #

Generate mock triples based on data model defined transformation rules and desired number of class instances

Returns:

Type Description
list[Triple]

List of RDF triples, represented as tuples (subject, predicate, object), that define data model instances

Source code in cognite/neat/graph/extractors/_mock_graph_generator.py
def extract(self) -> list[Triple]:
    """Generate mock triples based on data model defined transformation rules and desired number
    of class instances

    Returns:
        List of RDF triples, represented as tuples `(subject, predicate, object)`, that define data model instances
    """
    return generate_triples(
        self.rules,
        self.class_count,
        stop_on_exception=self.stop_on_exception,
        allow_isolated_classes=self.allow_isolated_classes,
    )

RdfFileExtractor #

Bases: BaseExtractor

Extract data from RDF files into Neat.

Parameters:

Name Type Description Default
filepath Path

The path to the RDF file.

required
mime_type MIMETypes

The MIME type of the RDF file. Defaults to "application/rdf+xml".

'application/rdf+xml'
base_uri URIRef

The base URI to use. Defaults to None.

None
Source code in cognite/neat/graph/extractors/_rdf_file.py
class RdfFileExtractor(BaseExtractor):
    """Extract data from RDF files into Neat.

    Args:
        filepath (Path): The path to the RDF file.
        mime_type (MIMETypes, optional): The MIME type of the RDF file. Defaults to "application/rdf+xml".
        base_uri (URIRef, optional): The base URI to use. Defaults to None.
    """

    def __init__(
        self,
        filepath: Path,
        mime_type: MIMETypes = "application/rdf+xml",
        base_uri: URIRef | None = None,
    ):
        self.filepath = filepath
        self.mime_type = mime_type
        self.base_uri = base_uri