Skip to content

Importers

cognite.neat._rules.importers #

BaseImporter #

Bases: ABC, Generic[T_InputRules]

BaseImporter class which all importers inherit from.

Source code in cognite/neat/_rules/importers/_base.py
class BaseImporter(ABC, Generic[T_InputRules]):
    """
    BaseImporter class which all importers inherit from.
    """

    @abstractmethod
    def to_rules(self) -> ReadRules[T_InputRules]:
        """Creates `Rules` object from the data for target role."""
        raise NotImplementedError()

    def _default_metadata(self) -> dict[str, Any]:
        creator = "UNKNOWN"
        with suppress(KeyError, ImportError):
            import getpass

            creator = getpass.getuser()

        return {
            "prefix": "neat",
            "schema": "partial",
            "namespace": Namespace("http://purl.org/cognite/neat/"),
            "version": "0.1.0",
            "title": "Neat Imported Data Model",
            "created": datetime.now().replace(microsecond=0).isoformat(),
            "updated": datetime.now().replace(microsecond=0).isoformat(),
            "creator": creator,
            "description": f"Imported using {type(self).__name__}",
        }

    @classmethod
    def _repr_html_(cls) -> str:
        return class_html_doc(cls)

to_rules() abstractmethod #

Creates Rules object from the data for target role.

Source code in cognite/neat/_rules/importers/_base.py
@abstractmethod
def to_rules(self) -> ReadRules[T_InputRules]:
    """Creates `Rules` object from the data for target role."""
    raise NotImplementedError()

DMSImporter #

Bases: BaseImporter[DMSInputRules]

Imports a Data Model from Cognite Data Fusion.

Parameters:

Name Type Description Default
schema DMSSchema

The schema containing the data model.

required
read_issues Sequence[NeatIssue] | None

A list of issues that occurred during the import.

None
metadata DMSInputMetadata | None

Metadata for the data model.

None
ref_metadata DMSInputMetadata | None

Metadata for the reference data model.

None
Source code in cognite/neat/_rules/importers/_dms2rules.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
class DMSImporter(BaseImporter[DMSInputRules]):
    """Imports a Data Model from Cognite Data Fusion.

    Args:
        schema: The schema containing the data model.
        read_issues: A list of issues that occurred during the import.
        metadata: Metadata for the data model.
        ref_metadata: Metadata for the reference data model.

    """

    def __init__(
        self,
        schema: DMSSchema,
        read_issues: Sequence[NeatIssue] | None = None,
        metadata: DMSInputMetadata | None = None,
        ref_metadata: DMSInputMetadata | None = None,
    ):
        # Calling this root schema to distinguish it from
        # * User Schema
        # * Reference Schema
        self.root_schema = schema
        self.metadata = metadata
        self.ref_metadata = ref_metadata
        self.issue_list = IssueList(read_issues)
        self._all_containers_by_id = schema.containers.copy()
        self._all_views_by_id = schema.views.copy()
        if schema.reference:
            self._all_containers_by_id.update(schema.reference.containers.items())
            self._all_views_by_id.update(schema.reference.views.items())

    @classmethod
    def from_data_model_id(
        cls,
        client: CogniteClient,
        data_model_id: DataModelIdentifier,
        reference_model_id: DataModelIdentifier | None = None,
    ) -> "DMSImporter":
        """Create a DMSImporter ready to convert the given data model to rules.

        Args:
            client: Instantiated CogniteClient to retrieve data model.
            reference_model_id: The reference data model to retrieve. This is the data model that
                the given data model is built on top of, typically, an enterprise data model.
            data_model_id: Data Model to retrieve.

        Returns:
            DMSImporter: DMSImporter instance
        """
        data_model_ids = [data_model_id, reference_model_id] if reference_model_id else [data_model_id]
        data_models = client.data_modeling.data_models.retrieve(data_model_ids, inline_views=True)

        user_models = cls._find_model_in_list(data_models, data_model_id)
        if len(user_models) == 0:
            return cls(
                DMSSchema(),
                [
                    ResourceRetrievalError(
                        dm.DataModelId.load(data_model_id),  # type: ignore[arg-type]
                        "data model",
                        "Data Model is missing in CDF",
                    )
                ],
            )
        user_model = user_models.latest_version()

        if reference_model_id:
            ref_models = cls._find_model_in_list(data_models, reference_model_id)
            if len(ref_models) == 0:
                return cls(
                    DMSSchema(),
                    [
                        ResourceRetrievalError(
                            dm.DataModelId.load(reference_model_id), "data model", "Data Model is missing in CDF"
                        )
                    ],
                )
            ref_model: dm.DataModel[dm.View] | None = ref_models.latest_version()
        else:
            ref_model = None

        issue_list = IssueList()
        with _handle_issues(issue_list) as result:
            schema = DMSSchema.from_data_model(client, user_model, ref_model)

        if result.result == "failure" or issue_list.has_errors:
            return cls(DMSSchema(), issue_list)

        metadata = cls._create_metadata_from_model(user_model, has_reference=ref_model is not None)
        ref_metadata = cls._create_metadata_from_model(ref_model) if ref_model else None

        return cls(schema, issue_list, metadata, ref_metadata)

    @classmethod
    def _find_model_in_list(
        cls, data_models: dm.DataModelList[dm.View], model_id: DataModelIdentifier
    ) -> dm.DataModelList[dm.View]:
        identifier = DataModelId.load(model_id)
        return dm.DataModelList[dm.View](
            [
                model
                for model in data_models
                if (model.space, model.external_id) == (identifier.space, identifier.external_id)
            ]
        )

    @classmethod
    def _create_metadata_from_model(
        cls,
        model: dm.DataModel[dm.View] | dm.DataModelApply,
        has_reference: bool = False,
    ) -> DMSInputMetadata:
        description, creator = DMSInputMetadata._get_description_and_creator(model.description)

        if isinstance(model, dm.DataModel):
            created = ms_to_datetime(model.created_time)
            updated = ms_to_datetime(model.last_updated_time)
        else:
            now = datetime.now().replace(microsecond=0)
            created = now
            updated = now
        return DMSInputMetadata(
            schema_="complete",
            data_model_type="solution" if has_reference else "enterprise",
            extension="addition",
            space=model.space,
            external_id=model.external_id,
            name=model.name or model.external_id,
            version=model.version or "0.1.0",
            updated=updated,
            created=created,
            creator=",".join(creator),
            description=description,
        )

    @classmethod
    def from_directory(cls, directory: str | Path) -> "DMSImporter":
        issue_list = IssueList()
        with _handle_issues(issue_list) as _:
            schema = DMSSchema.from_directory(directory)
        # If there were errors during the import, the to_rules
        return cls(schema, issue_list)

    @classmethod
    def from_zip_file(cls, zip_file: str | Path) -> "DMSImporter":
        if Path(zip_file).suffix != ".zip":
            return cls(DMSSchema(), [FileTypeUnexpectedError(Path(zip_file), frozenset([".zip"]))])
        issue_list = IssueList()
        with _handle_issues(issue_list) as _:
            schema = DMSSchema.from_zip(zip_file)
        return cls(schema, issue_list)

    def to_rules(self) -> ReadRules[DMSInputRules]:
        if self.issue_list.has_errors:
            # In case there were errors during the import, the to_rules method will return None
            return ReadRules(None, self.issue_list, {})

        if not self.root_schema.data_model:
            self.issue_list.append(ResourceMissingIdentifierError("data model", type(self.root_schema).__name__))
            return ReadRules(None, self.issue_list, {})

        model = self.root_schema.data_model

        schema_completeness = SchemaCompleteness.complete
        data_model_type = DataModelType.enterprise
        reference: DMSInputRules | None = None
        if (ref_schema := self.root_schema.reference) and (ref_model := ref_schema.data_model):
            # Reference should always be an enterprise model.
            reference = self._create_rule_components(
                ref_model,
                ref_schema,
                self.ref_metadata or self._create_default_metadata(list(ref_schema.views.values()), is_ref=True),
                DataModelType.enterprise,
            )
            data_model_type = DataModelType.solution

        user_rules = self._create_rule_components(
            model,
            self.root_schema,
            self.metadata,
            data_model_type,
            schema_completeness,
            has_reference=reference is not None,
        )
        user_rules.reference = reference

        return ReadRules(user_rules, self.issue_list, {})

    def _create_rule_components(
        self,
        data_model: dm.DataModelApply,
        schema: DMSSchema,
        metadata: DMSInputMetadata | None = None,
        data_model_type: DataModelType | None = None,
        schema_completeness: SchemaCompleteness | None = None,
        has_reference: bool = False,
    ) -> DMSInputRules:
        properties: list[DMSInputProperty] = []
        for view_id, view in schema.views.items():
            view_entity = ViewEntity.from_id(view_id)
            class_entity = view_entity.as_class()
            for prop_id, prop in (view.properties or {}).items():
                dms_property = self._create_dms_property(prop_id, prop, view_entity, class_entity)
                if dms_property is not None:
                    properties.append(dms_property)

        data_model_view_ids: set[dm.ViewId] = {
            view.as_id() if isinstance(view, dm.View | dm.ViewApply) else view for view in data_model.views or []
        }

        metadata = metadata or DMSInputMetadata.from_data_model(data_model, has_reference)
        if data_model_type is not None:
            metadata.data_model_type = str(data_model_type)  # type: ignore[assignment]
        if schema_completeness is not None:
            metadata.schema_ = str(schema_completeness)  # type: ignore[assignment]

        enum = self._create_enum_collections(schema.containers.values())

        return DMSInputRules(
            metadata=metadata,
            properties=properties,
            containers=[DMSInputContainer.from_container(container) for container in schema.containers.values()],
            views=[
                DMSInputView.from_view(view, in_model=view_id in data_model_view_ids)
                for view_id, view in schema.views.items()
            ],
            nodes=[DMSInputNode.from_node_type(node_type) for node_type in schema.node_types.values()],
            enum=enum,
        )

    @classmethod
    def _create_default_metadata(
        cls, views: Sequence[dm.View | dm.ViewApply], is_ref: bool = False
    ) -> DMSInputMetadata:
        now = datetime.now().replace(microsecond=0)
        space = Counter(view.space for view in views).most_common(1)[0][0]
        return DMSInputMetadata(
            schema_="complete",
            extension="addition",
            data_model_type="enterprise" if is_ref else "solution",
            space=space,
            external_id="Unknown",
            version="0.1.0",
            creator="Unknown",
            created=now,
            updated=now,
        )

    def _create_dms_property(
        self, prop_id: str, prop: ViewPropertyApply, view_entity: ViewEntity, class_entity: ClassEntity
    ) -> DMSInputProperty | None:
        if isinstance(prop, dm.MappedPropertyApply) and prop.container not in self._all_containers_by_id:
            self.issue_list.append(
                ResourceNotFoundWarning[dm.ContainerId, dm.PropertyId](
                    dm.ContainerId.load(prop.container),
                    "container",
                    view_entity.to_property_id(prop_id),
                    "view property",
                )
            )
            return None
        if (
            isinstance(prop, dm.MappedPropertyApply)
            and prop.container_property_identifier not in self._all_containers_by_id[prop.container].properties
        ):
            self.issue_list.append(
                PropertyNotFoundWarning(prop.container, "container", prop_id, view_entity.as_id(), "view"),
            )
            return None
        if not isinstance(
            prop,
            dm.MappedPropertyApply
            | SingleEdgeConnectionApply
            | MultiEdgeConnectionApply
            | SingleReverseDirectRelationApply
            | MultiReverseDirectRelationApply,
        ):
            self.issue_list.append(
                PropertyTypeNotSupportedWarning[dm.ViewId](view_entity.as_id(), "view", prop_id, type(prop).__name__)
            )
            return None

        value_type = self._get_value_type(prop, view_entity, prop_id)
        if value_type is None:
            return None

        return DMSInputProperty(
            class_=str(class_entity),
            property_=prop_id,
            description=prop.description,
            name=prop.name,
            connection=self._get_connection_type(prop_id, prop, view_entity.as_id()),
            value_type=str(value_type),
            is_list=self._get_is_list(prop),
            nullable=self._get_nullable(prop),
            immutable=self._get_immutable(prop),
            default=self._get_default(prop),
            container=str(ContainerEntity.from_id(prop.container))
            if isinstance(prop, dm.MappedPropertyApply)
            else None,
            container_property=prop.container_property_identifier if isinstance(prop, dm.MappedPropertyApply) else None,
            view=str(view_entity),
            view_property=prop_id,
            index=self._get_index(prop, prop_id),
            constraint=self._get_constraint(prop, prop_id),
        )

    def _container_prop_unsafe(self, prop: dm.MappedPropertyApply) -> dm.ContainerProperty:
        """This method assumes you have already checked that the container with property exists."""
        return self._all_containers_by_id[prop.container].properties[prop.container_property_identifier]

    def _get_connection_type(
        self, prop_id: str, prop: ViewPropertyApply, view_id: dm.ViewId
    ) -> Literal["direct"] | ReverseConnectionEntity | EdgeEntity | None:
        if isinstance(prop, SingleEdgeConnectionApply | MultiEdgeConnectionApply) and prop.direction == "outwards":
            properties = ViewEntity.from_id(prop.edge_source) if prop.edge_source is not None else None
            return EdgeEntity(properties=properties, type=DMSNodeEntity.from_reference(prop.type), direction="outwards")
        elif isinstance(prop, SingleEdgeConnectionApply | MultiEdgeConnectionApply) and prop.direction == "inwards":
            if reverse_prop := self._find_reverse_edge(prop_id, prop, view_id):
                return ReverseConnectionEntity(property=reverse_prop)
            else:
                properties = ViewEntity.from_id(prop.source) if prop.edge_source is not None else None
                return EdgeEntity(
                    properties=properties, type=DMSNodeEntity.from_reference(prop.type), direction="inwards"
                )
        elif isinstance(prop, SingleReverseDirectRelationApply | MultiReverseDirectRelationApply):
            return ReverseConnectionEntity(property=prop.through.property)
        elif isinstance(prop, dm.MappedPropertyApply) and isinstance(
            self._container_prop_unsafe(prop).type, dm.DirectRelation
        ):
            return "direct"
        else:
            return None

    def _get_value_type(
        self, prop: ViewPropertyApply, view_entity: ViewEntity, prop_id
    ) -> DataType | ViewEntity | DMSUnknownEntity | None:
        if isinstance(
            prop,
            SingleEdgeConnectionApply
            | MultiEdgeConnectionApply
            | SingleReverseDirectRelationApply
            | MultiReverseDirectRelationApply,
        ):
            return ViewEntity.from_id(prop.source)
        elif isinstance(prop, dm.MappedPropertyApply):
            container_prop = self._container_prop_unsafe(cast(dm.MappedPropertyApply, prop))
            if isinstance(container_prop.type, dm.DirectRelation):
                if prop.source is None or prop.source not in self._all_views_by_id:
                    return DMSUnknownEntity()
                else:
                    return ViewEntity.from_id(prop.source)
            elif isinstance(container_prop.type, PropertyTypeWithUnit) and container_prop.type.unit:
                return DataType.load(f"{container_prop.type._type}(unit={container_prop.type.unit.external_id})")
            elif isinstance(container_prop.type, DMSEnum):
                return Enum(collection=ClassEntity(suffix=prop_id), unknownValue=container_prop.type.unknown_value)
            else:
                return DataType.load(container_prop.type._type)
        else:
            self.issue_list.append(
                PropertyTypeNotSupportedWarning[dm.ViewId](view_entity.as_id(), "view", prop_id, type(prop).__name__)
            )
            return None

    def _get_nullable(self, prop: ViewPropertyApply) -> bool | None:
        if isinstance(prop, dm.MappedPropertyApply):
            return self._container_prop_unsafe(prop).nullable
        else:
            return None

    def _get_immutable(self, prop: ViewPropertyApply) -> bool | None:
        if isinstance(prop, dm.MappedPropertyApply):
            return self._container_prop_unsafe(prop).immutable
        else:
            return None

    def _get_is_list(self, prop: ViewPropertyApply) -> bool | None:
        if isinstance(prop, dm.MappedPropertyApply):
            prop_type = self._container_prop_unsafe(prop).type
            return isinstance(prop_type, ListablePropertyType) and prop_type.is_list
        elif isinstance(prop, MultiEdgeConnectionApply | MultiReverseDirectRelationApply):
            return True
        elif isinstance(prop, SingleEdgeConnectionApply | SingleReverseDirectRelationApply):
            return False
        else:
            return None

    def _get_default(self, prop: ViewPropertyApply) -> str | None:
        if isinstance(prop, dm.MappedPropertyApply):
            default = self._container_prop_unsafe(prop).default_value
            if default is not None:
                return str(default)
        return None

    def _get_index(self, prop: ViewPropertyApply, prop_id) -> list[str] | None:
        if not isinstance(prop, dm.MappedPropertyApply):
            return None
        container = self._all_containers_by_id[prop.container]
        index: list[str] = []
        for index_name, index_obj in (container.indexes or {}).items():
            if isinstance(index_obj, BTreeIndex | InvertedIndex) and prop_id in index_obj.properties:
                index.append(index_name)
        return index or None

    def _get_constraint(self, prop: ViewPropertyApply, prop_id: str) -> list[str] | None:
        if not isinstance(prop, dm.MappedPropertyApply):
            return None
        container = self._all_containers_by_id[prop.container]
        unique_constraints: list[str] = []
        for constraint_name, constraint_obj in (container.constraints or {}).items():
            if isinstance(constraint_obj, dm.RequiresConstraint):
                # This is handled in the .from_container method of DMSContainer
                continue
            elif isinstance(constraint_obj, dm.UniquenessConstraint) and prop_id in constraint_obj.properties:
                unique_constraints.append(constraint_name)
            elif isinstance(constraint_obj, dm.UniquenessConstraint):
                # This does not apply to this property
                continue
            else:
                self.issue_list.append(
                    PropertyTypeNotSupportedWarning[dm.ContainerId](
                        prop.container, "container", prop_id, type(constraint_obj).__name__
                    )
                )
        return unique_constraints or None

    def _find_reverse_edge(
        self, prop_id: str, prop: SingleEdgeConnectionApply | MultiEdgeConnectionApply, view_id: dm.ViewId
    ) -> str | None:
        if prop.source not in self._all_views_by_id:
            return None
        view = self._all_views_by_id[prop.source]
        candidates = []
        for prop_name, reverse_prop in (view.properties or {}).items():
            if isinstance(reverse_prop, SingleEdgeConnectionApply | MultiEdgeConnectionApply):
                if (
                    reverse_prop.type == prop.type
                    and reverse_prop.source == view_id
                    and reverse_prop.direction != prop.direction
                ):
                    candidates.append(prop_name)
        if len(candidates) == 0:
            self.issue_list.append(
                PropertyNotFoundWarning(
                    prop.source,
                    "view property",
                    f"reverse edge of {prop_id}",
                    dm.PropertyId(view_id, prop_id),
                    "view property",
                )
            )
            return None
        if len(candidates) > 1:
            self.issue_list.append(
                ResourcesDuplicatedWarning(
                    frozenset(dm.PropertyId(view.as_id(), candidate) for candidate in candidates),
                    "view property",
                    default_action="Multiple reverse edges found for "
                    f"{dm.PropertyId(view_id, prop_id)!r}. Will use {candidates[0]}",
                )
            )

        return candidates[0]

    @staticmethod
    def _create_enum_collections(containers: Collection[dm.ContainerApply]) -> list[DMSInputEnum] | None:
        enum_collections: list[DMSInputEnum] = []
        for container in containers:
            for prop_id, prop in container.properties.items():
                if isinstance(prop.type, DMSEnum):
                    for identifier, value in prop.type.values.items():
                        enum_collections.append(
                            DMSInputEnum(
                                collection=prop_id, value=identifier, name=value.name, description=value.description
                            )
                        )
        return enum_collections

from_data_model_id(client, data_model_id, reference_model_id=None) classmethod #

Create a DMSImporter ready to convert the given data model to rules.

Parameters:

Name Type Description Default
client CogniteClient

Instantiated CogniteClient to retrieve data model.

required
reference_model_id DataModelIdentifier | None

The reference data model to retrieve. This is the data model that the given data model is built on top of, typically, an enterprise data model.

None
data_model_id DataModelIdentifier

Data Model to retrieve.

required

Returns:

Name Type Description
DMSImporter DMSImporter

DMSImporter instance

Source code in cognite/neat/_rules/importers/_dms2rules.py
@classmethod
def from_data_model_id(
    cls,
    client: CogniteClient,
    data_model_id: DataModelIdentifier,
    reference_model_id: DataModelIdentifier | None = None,
) -> "DMSImporter":
    """Create a DMSImporter ready to convert the given data model to rules.

    Args:
        client: Instantiated CogniteClient to retrieve data model.
        reference_model_id: The reference data model to retrieve. This is the data model that
            the given data model is built on top of, typically, an enterprise data model.
        data_model_id: Data Model to retrieve.

    Returns:
        DMSImporter: DMSImporter instance
    """
    data_model_ids = [data_model_id, reference_model_id] if reference_model_id else [data_model_id]
    data_models = client.data_modeling.data_models.retrieve(data_model_ids, inline_views=True)

    user_models = cls._find_model_in_list(data_models, data_model_id)
    if len(user_models) == 0:
        return cls(
            DMSSchema(),
            [
                ResourceRetrievalError(
                    dm.DataModelId.load(data_model_id),  # type: ignore[arg-type]
                    "data model",
                    "Data Model is missing in CDF",
                )
            ],
        )
    user_model = user_models.latest_version()

    if reference_model_id:
        ref_models = cls._find_model_in_list(data_models, reference_model_id)
        if len(ref_models) == 0:
            return cls(
                DMSSchema(),
                [
                    ResourceRetrievalError(
                        dm.DataModelId.load(reference_model_id), "data model", "Data Model is missing in CDF"
                    )
                ],
            )
        ref_model: dm.DataModel[dm.View] | None = ref_models.latest_version()
    else:
        ref_model = None

    issue_list = IssueList()
    with _handle_issues(issue_list) as result:
        schema = DMSSchema.from_data_model(client, user_model, ref_model)

    if result.result == "failure" or issue_list.has_errors:
        return cls(DMSSchema(), issue_list)

    metadata = cls._create_metadata_from_model(user_model, has_reference=ref_model is not None)
    ref_metadata = cls._create_metadata_from_model(ref_model) if ref_model else None

    return cls(schema, issue_list, metadata, ref_metadata)

DTDLImporter #

Bases: BaseImporter[InformationInputRules]

Importer from Azure Digital Twin - DTDL (Digital Twin Definition Language).

This importer supports DTDL v2.0 and v3.0.

It is recommended to use the class methods from_directory and from_zip to create an instance of this class.

Parameters:

Name Type Description Default
items Sequence[DTDLBase]

A sequence of DTDLBase objects.

required
title str

Title of the data model. Defaults to None.

None
read_issues list[ValidationIssue]

A list of issues that occurred during reading. Defaults to None.

None
schema SchemaCompleteness

Schema completeness. Defaults to SchemaCompleteness.partial.

partial
Source code in cognite/neat/_rules/importers/_dtdl2rules/dtdl_importer.py
class DTDLImporter(BaseImporter[InformationInputRules]):
    """Importer from Azure Digital Twin - DTDL (Digital Twin Definition Language).

    This importer supports DTDL v2.0 and v3.0.

    It is recommended to use the class methods `from_directory` and `from_zip` to create an instance of this class.

    Args:
        items (Sequence[DTDLBase]): A sequence of DTDLBase objects.
        title (str, optional): Title of the data model. Defaults to None.
        read_issues (list[ValidationIssue], optional): A list of issues that occurred during reading. Defaults to None.
        schema (SchemaCompleteness, optional): Schema completeness. Defaults to SchemaCompleteness.partial.

    """

    def __init__(
        self,
        items: Sequence[DTDLBase],
        title: str | None = None,
        read_issues: list[NeatIssue] | None = None,
        schema: SchemaCompleteness = SchemaCompleteness.partial,
    ) -> None:
        self._items = items
        self.title = title
        self._read_issues = IssueList(read_issues)
        self._schema_completeness = schema

    @classmethod
    def _from_file_content(cls, file_content: str, filepath: Path) -> Iterable[DTDLBase | NeatIssue]:
        raw = json.loads(file_content)
        if isinstance(raw, dict):
            if (context := raw.get("@context")) is None:
                yield FileMissingRequiredFieldWarning(filepath, "@context", "Missing '@context' key.")
                return
            raw_list = [raw]
        elif isinstance(raw, list):
            context = next(
                (entry["@context"] for entry in raw if isinstance(entry, dict) and "@context" in entry), None
            )
            if context is None:
                yield FileMissingRequiredFieldWarning(filepath, "@context", "Missing '@context' key.")
                return
            raw_list = raw
        else:
            yield FileTypeUnexpectedWarning(filepath, frozenset(["dict", "list"]), "Content is not an object or array.")
            return

        if isinstance(context, list):
            context = context[0]
        Interface.default_context = context
        spec_version = context.split(";")[1]
        try:
            cls_by_type = DTDL_CLS_BY_TYPE_BY_SPEC[spec_version]
        except KeyError:
            yield NeatValueWarning(
                f"Unsupported DTDL spec version: {spec_version} in {filepath}. "
                f"Supported versions are {humanize_collection(DTDL_CLS_BY_TYPE_BY_SPEC.keys())}."
                " The file will be skipped."
            )
            return

        for item in raw_list:
            if not (type_ := item.get("@type")):
                yield FileMissingRequiredFieldWarning(filepath, "@type", "Missing '@type' key.")
                continue
            cls_ = cls_by_type.get(type_)
            if cls_ is None:
                yield FileItemNotSupportedWarning(f"Unknown '@type' {type_}.", filepath=filepath)
                continue
            try:
                yield cls_.model_validate(item)
            except ValidationError as e:
                yield FileTypeUnexpectedWarning(filepath, frozenset([cls.__name__]), str(e))
            except Exception as e:
                yield FileReadWarning(filepath=filepath, reason=str(e))

    @classmethod
    def from_directory(cls, directory: Path) -> "DTDLImporter":
        items: list[DTDLBase] = []
        issues: list[NeatIssue] = []
        for filepath in directory.glob("**/*.json"):
            for item in cls._from_file_content(filepath.read_text(), filepath):
                if isinstance(item, NeatIssue):
                    issues.append(item)
                else:
                    items.append(item)
        return cls(items, directory.stem, read_issues=issues)

    @classmethod
    def from_zip(cls, zip_file: Path) -> "DTDLImporter":
        items: list[DTDLBase] = []
        issues: list[NeatIssue] = []
        with zipfile.ZipFile(zip_file) as z:
            for filepath in z.namelist():
                if filepath.endswith(".json"):
                    for item in cls._from_file_content(z.read(filepath).decode(), Path(filepath)):
                        if isinstance(item, NeatIssue):
                            issues.append(item)
                        else:
                            items.append(item)
        return cls(items, zip_file.stem, read_issues=issues)

    def to_rules(self) -> ReadRules[InformationInputRules]:
        converter = _DTDLConverter(self._read_issues)

        converter.convert(self._items)

        metadata = self._default_metadata()
        metadata["schema"] = self._schema_completeness.value

        if self.title:
            metadata["title"] = to_pascal(self.title)
        try:
            most_common_prefix = converter.get_most_common_prefix()
        except ValueError:
            # No prefixes are defined so we just use the default prefix...
            ...
        else:
            metadata["prefix"] = most_common_prefix

        rules = InformationInputRules(
            metadata=InformationInputMetadata.load(metadata),
            properties=converter.properties,
            classes=converter.classes,
        )

        return ReadRules(rules, converter.issues, {})

IMFImporter #

Bases: BaseRDFImporter

Convert SHACL shapes to tables/ transformation rules / Excel file.

Args:
    filepath: Path to RDF file containing the SHACL Shapes

Note

Rewrite to fit the SHACL rules we apply OWL Ontologies are information models which completeness varies. As such, constructing functional data model directly will often be impossible, therefore the produced Rules object will be ill formed. To avoid this, neat will automatically attempt to make the imported rules compliant by adding default values for missing information, attaching dangling properties to default containers based on the property type, etc.

One has to be aware that NEAT will be opinionated about how to make the ontology compliant, and that the resulting rules may not be what you expect.

Source code in cognite/neat/_rules/importers/_rdf/_imf2rules/_imf2rules.py
class IMFImporter(BaseRDFImporter):
    """Convert SHACL shapes to tables/ transformation rules / Excel file.

        Args:
            filepath: Path to RDF file containing the SHACL Shapes

    !!! Note
        Rewrite to fit the SHACL rules we apply
        OWL Ontologies are information models which completeness varies. As such, constructing functional
        data model directly will often be impossible, therefore the produced Rules object will be ill formed.
        To avoid this, neat will automatically attempt to make the imported rules compliant by adding default
        values for missing information, attaching dangling properties to default containers based on the
        property type, etc.

        One has to be aware that NEAT will be opinionated about how to make the ontology
        compliant, and that the resulting rules may not be what you expect.

    """

    def _to_rules_components(
        self,
    ) -> dict:
        components = {
            "Metadata": parse_imf_metadata(),
            "Classes": parse_imf_to_classes(self.graph),
            "Properties": parse_imf_to_properties(self.graph),
        }

        return make_components_compliant(components)

InferenceImporter #

Bases: BaseRDFImporter

Infers rules from a triple store.

Rules inference through analysis of knowledge graph provided in various formats. Use the factory methods to create a triple store from sources such as RDF files, JSON files, YAML files, XML files, or directly from a graph store.

Parameters:

Name Type Description Default
issue_list IssueList

Issue list to store issues

required
graph Graph

Knowledge graph

required
max_number_of_instance int

Maximum number of instances to be used in inference

required
prefix str

Prefix to be used for the inferred model

required
Source code in cognite/neat/_rules/importers/_rdf/_inference2rules.py
class InferenceImporter(BaseRDFImporter):
    """Infers rules from a triple store.

    Rules inference through analysis of knowledge graph provided in various formats.
    Use the factory methods to create a triple store from sources such as
    RDF files, JSON files, YAML files, XML files, or directly from a graph store.

    Args:
        issue_list: Issue list to store issues
        graph: Knowledge graph
        max_number_of_instance: Maximum number of instances to be used in inference
        prefix: Prefix to be used for the inferred model
    """

    @classmethod
    def from_graph_store(
        cls,
        store: NeatGraphStore,
        prefix: str = "inferred",
        max_number_of_instance: int = -1,
        non_existing_node_type: UnknownEntity | AnyURI = DEFAULT_NON_EXISTING_NODE_TYPE,
    ) -> "InferenceImporter":
        return super().from_graph_store(store, prefix, max_number_of_instance, non_existing_node_type)

    @classmethod
    def from_file(
        cls,
        filepath: Path,
        prefix: str = "inferred",
        max_number_of_instance: int = -1,
        non_existing_node_type: UnknownEntity | AnyURI = DEFAULT_NON_EXISTING_NODE_TYPE,
    ) -> "InferenceImporter":
        return super().from_file(filepath, prefix, max_number_of_instance, non_existing_node_type)

    @classmethod
    def from_json_file(
        cls,
        filepath: Path,
        prefix: str = "inferred",
        max_number_of_instance: int = -1,
    ) -> "InferenceImporter":
        raise NotImplementedError("JSON file format is not supported yet.")

    @classmethod
    def from_yaml_file(
        cls,
        filepath: Path,
        prefix: str = "inferred",
        max_number_of_instance: int = -1,
    ) -> "InferenceImporter":
        raise NotImplementedError("YAML file format is not supported yet.")

    @classmethod
    def from_xml_file(
        cls,
        filepath: Path,
        prefix: str = "inferred",
        max_number_of_instance: int = -1,
    ) -> "InferenceImporter":
        raise NotImplementedError("JSON file format is not supported yet.")

    def _to_rules_components(
        self,
    ) -> dict:
        """Convert RDF graph to dictionary defining data model and prefixes of the graph

        Args:
            graph: RDF graph to be converted to TransformationRules object
            max_number_of_instance: Max number of instances to be considered for each class

        Returns:
            Tuple of data model and prefixes of the graph
        """
        classes: dict[str, dict] = {}
        properties: dict[str, dict] = {}
        count_by_value_type_by_property: dict[str, dict[str, int]] = defaultdict(Counter)

        # Infers all the classes in the graph
        for class_uri, no_instances in self.graph.query(ORDERED_CLASSES_QUERY):  # type: ignore[misc]
            if (class_id := remove_namespace_from_uri(class_uri)) in classes:
                # handles cases when class id is already present in classes
                class_id = f"{class_id}_{len(classes)+1}"

            classes[class_id] = {
                "class_": class_id,
                "reference": class_uri,
                "match_type": MatchType.exact,
                "comment": f"Inferred from knowledge graph, where this class has <{no_instances}> instances",
            }

        # Infers all the properties of the class
        for class_id, class_definition in classes.items():
            for (instance,) in self.graph.query(  # type: ignore[misc]
                INSTANCES_OF_CLASS_QUERY.replace("class", class_definition["reference"])
                if self.max_number_of_instance < 0
                else INSTANCES_OF_CLASS_QUERY.replace("class", class_definition["reference"])
                + f" LIMIT {self.max_number_of_instance}"
            ):
                for property_uri, occurrence, data_type_uri, object_type_uri in self.graph.query(  # type: ignore[misc]
                    INSTANCE_PROPERTIES_DEFINITION.replace("instance_id", instance)
                ):  # type: ignore[misc]
                    # this is to skip rdf:type property
                    if property_uri == RDF.type:
                        continue

                    property_id = remove_namespace_from_uri(property_uri)

                    if value_type_uri := (data_type_uri or object_type_uri):
                        value_type_id = remove_namespace_from_uri(value_type_uri)

                    # this handles situations when property points to node that is not present in graph
                    else:
                        value_type_id = str(self.non_existing_node_type)

                        issue = PropertyValueTypeUndefinedWarning(
                            resource_type="Property",
                            identifier=f"{class_id}:{property_id}",
                            property_name=property_id,
                            default_action="Remove the property from the rules",
                            recommended_action="Make sure that graph is complete",
                        )

                        if issue not in self.issue_list:
                            self.issue_list.append(issue)

                    id_ = f"{class_id}:{property_id}"

                    definition = {
                        "class_": class_id,
                        "property_": property_id,
                        "max_count": cast(RdfLiteral, occurrence).value,
                        "value_type": value_type_id,
                        "reference": property_uri,
                    }

                    count_by_value_type_by_property[id_][value_type_id] += 1

                    # USE CASE 1: If property is not present in properties
                    if id_ not in properties:
                        properties[id_] = definition

                    # USE CASE 2: first time redefinition, value type change to multi
                    elif id_ in properties and definition["value_type"] not in properties[id_]["value_type"]:
                        properties[id_]["value_type"] = properties[id_]["value_type"] + " | " + definition["value_type"]

                    # USE CASE 3: existing but max count is different
                    elif (
                        id_ in properties
                        and definition["value_type"] in properties[id_]["value_type"]
                        and properties[id_]["max_count"] != definition["max_count"]
                    ):
                        properties[id_]["max_count"] = max(properties[id_]["max_count"], definition["max_count"])

        # Add comments
        for id_, property_ in properties.items():
            if id_ not in count_by_value_type_by_property:
                continue

            count_by_value_type = count_by_value_type_by_property[id_]
            count_list = sorted(count_by_value_type.items(), key=lambda item: item[1], reverse=True)
            # Make the comment more readable by adapting to the number of value types
            base_string = "<{value_type}> which occurs <{count}> times"
            if len(count_list) == 1:
                type_, count = count_list[0]
                counts_str = f"with value type {base_string.format(value_type=type_, count=count)} in the graph"
            elif len(count_list) == 2:
                first = base_string.format(value_type=count_list[0][0], count=count_list[0][1])
                second = base_string.format(value_type=count_list[1][0], count=count_list[1][1])
                counts_str = f"with value types {first} and {second} in the graph"
            else:
                first_part = ", ".join(
                    base_string.format(value_type=type_, count=count) for type_, count in count_list[:-1]
                )
                last = base_string.format(value_type=count_list[-1][0], count=count_list[-1][1])
                counts_str = f"with value types {first_part} and {last} in the graph"

            class_id = property_["class_"]
            property_id = property_["property_"]
            property_["comment"] = f"Class <{class_id}> has property <{property_id}> {counts_str}"

        return {
            "metadata": self._default_metadata().model_dump(),
            "classes": list(classes.values()),
            "properties": list(properties.values()),
        }

    def _default_metadata(self):
        return InformationMetadata(
            name="Inferred Model",
            creator="NEAT",
            version="inferred",
            created=datetime.now(),
            updated=datetime.now(),
            description="Inferred model from knowledge graph",
            prefix=self.prefix,
            namespace=DEFAULT_NAMESPACE,
        )

OWLImporter #

Bases: BaseRDFImporter

Convert OWL ontology to tables/ transformation rules / Excel file.

Args:
    filepath: Path to OWL ontology

Note

OWL Ontologies are information models which completeness varies. As such, constructing functional data model directly will often be impossible, therefore the produced Rules object will be ill formed. To avoid this, neat will automatically attempt to make the imported rules compliant by adding default values for missing information, attaching dangling properties to default containers based on the property type, etc.

One has to be aware that NEAT will be opinionated about how to make the ontology compliant, and that the resulting rules may not be what you expect.

Source code in cognite/neat/_rules/importers/_rdf/_owl2rules/_owl2rules.py
class OWLImporter(BaseRDFImporter):
    """Convert OWL ontology to tables/ transformation rules / Excel file.

        Args:
            filepath: Path to OWL ontology

    !!! Note
        OWL Ontologies are information models which completeness varies. As such, constructing functional
        data model directly will often be impossible, therefore the produced Rules object will be ill formed.
        To avoid this, neat will automatically attempt to make the imported rules compliant by adding default
        values for missing information, attaching dangling properties to default containers based on the
        property type, etc.

        One has to be aware that NEAT will be opinionated about how to make the ontology
        compliant, and that the resulting rules may not be what you expect.

    """

    def _to_rules_components(
        self,
    ) -> dict:
        components = {
            "Metadata": parse_owl_metadata(self.graph),
            "Classes": parse_owl_classes(self.graph),
            "Properties": parse_owl_properties(self.graph),
        }

        return make_components_compliant(components)

ExcelImporter #

Bases: BaseImporter[T_InputRules]

Import rules from an Excel file.

Parameters:

Name Type Description Default
filepath Path

The path to the Excel file.

required
Source code in cognite/neat/_rules/importers/_spreadsheet2rules.py
class ExcelImporter(BaseImporter[T_InputRules]):
    """Import rules from an Excel file.

    Args:
        filepath (Path): The path to the Excel file.
    """

    def __init__(self, filepath: Path):
        self.filepath = filepath

    def to_rules(self) -> ReadRules[T_InputRules]:
        issue_list = IssueList(title=f"'{self.filepath.name}'")
        if not self.filepath.exists():
            issue_list.append(FileNotFoundNeatError(self.filepath))
            return ReadRules(None, issue_list, {})

        with pd.ExcelFile(self.filepath) as excel_file:
            user_reader = SpreadsheetReader(issue_list)

            user_read = user_reader.read(excel_file, self.filepath)
            if user_read is None or issue_list.has_errors:
                return ReadRules(None, issue_list, {})

            last_read: ReadResult | None = None
            if any(sheet_name.startswith("Last") for sheet_name in user_reader.seen_sheets):
                last_read = SpreadsheetReader(issue_list, required=False, sheet_prefix="Last").read(
                    excel_file, self.filepath
                )
            reference_read: ReadResult | None = None
            if any(sheet_name.startswith("Ref") for sheet_name in user_reader.seen_sheets):
                reference_read = SpreadsheetReader(issue_list, sheet_prefix="Ref").read(excel_file, self.filepath)

        if issue_list.has_errors:
            return ReadRules(None, issue_list, {})

        if reference_read and user_read.role != reference_read.role:
            issue_list.append(
                PropertyDefinitionDuplicatedError(
                    self.filepath.as_posix(),
                    "spreadsheet.metadata",  # type: ignore[arg-type]
                    "role",
                    frozenset({user_read.role, reference_read.role}),
                    ("user", "reference"),
                    "sheet",
                )
            )
            return ReadRules(None, issue_list, {})

        sheets = user_read.sheets
        original_role = user_read.role
        read_info_by_sheet = user_read.read_info_by_sheet
        if last_read:
            sheets["last"] = last_read.sheets
            read_info_by_sheet.update(last_read.read_info_by_sheet)
            if reference_read:
                # The last rules will also be validated against the reference rules
                sheets["last"]["reference"] = reference_read.sheets  # type: ignore[call-overload]
        if reference_read:
            sheets["reference"] = reference_read.sheets
            read_info_by_sheet.update(reference_read.read_info_by_sheet)

        rules_cls = INPUT_RULES_BY_ROLE[original_role]
        rules = cast(T_InputRules, rules_cls.load(sheets))
        return ReadRules(rules, issue_list, {"read_info_by_sheet": read_info_by_sheet})

GoogleSheetImporter #

Bases: BaseImporter[T_InputRules]

Import rules from a Google Sheet.

.. warning::

This importer is experimental and may not work as expected.

Parameters:

Name Type Description Default
sheet_id str

The Google Sheet ID.

required
skiprows int

The number of rows to skip when reading the Google Sheet.

1
Source code in cognite/neat/_rules/importers/_spreadsheet2rules.py
class GoogleSheetImporter(BaseImporter[T_InputRules]):
    """Import rules from a Google Sheet.

    .. warning::

        This importer is experimental and may not work as expected.

    Args:
        sheet_id (str): The Google Sheet ID.
        skiprows (int): The number of rows to skip when reading the Google Sheet.
    """

    def __init__(self, sheet_id: str, skiprows: int = 1):
        self.sheet_id = sheet_id
        self.skiprows = skiprows

    def to_rules(self) -> ReadRules[T_InputRules]:
        raise NotImplementedError("Google Sheet Importer is not yet implemented.")

    def _get_sheets(self) -> dict[str, pd.DataFrame]:
        local_import("gspread", "google")
        import gspread  # type: ignore[import]

        client_google = gspread.service_account()
        google_sheet = client_google.open_by_key(self.sheet_id)
        return {worksheet.title: pd.DataFrame(worksheet.get_all_records()) for worksheet in google_sheet.worksheets()}

YAMLImporter #

Bases: BaseImporter[T_InputRules]

Imports the rules from a YAML file.

Parameters:

Name Type Description Default
raw_data dict[str, Any]

The raw data to be imported.

required

.. note::

YAML files are typically used for storing rules when checked into version control systems, e.g., git-history.
The advantage of using YAML files over Excel is that tools like git can show the differences between different
versions of the rules.
Source code in cognite/neat/_rules/importers/_yaml2rules.py
class YAMLImporter(BaseImporter[T_InputRules]):
    """Imports the rules from a YAML file.

    Args:
        raw_data: The raw data to be imported.

    .. note::

        YAML files are typically used for storing rules when checked into version control systems, e.g., git-history.
        The advantage of using YAML files over Excel is that tools like git can show the differences between different
        versions of the rules.

    """

    def __init__(
        self,
        raw_data: dict[str, Any],
        read_issues: list[NeatIssue] | None = None,
        filepaths: list[Path] | None = None,
    ) -> None:
        self.raw_data = raw_data
        self._read_issues = IssueList(read_issues)
        self._filepaths = filepaths

    @classmethod
    def from_file(cls, filepath: Path):
        if not filepath.exists():
            return cls({}, [FileNotFoundNeatError(filepath)])
        elif not filepath.is_file():
            return cls({}, [FileNotAFileError(filepath)])
        elif filepath.suffix not in [".yaml", ".yml"]:
            return cls({}, [FileTypeUnexpectedError(filepath, frozenset([".yaml", ".yml"]))])
        return cls(yaml.safe_load(filepath.read_text()), filepaths=[filepath])

    def to_rules(self) -> ReadRules[T_InputRules]:
        if self._read_issues.has_errors or not self.raw_data:
            return ReadRules(None, self._read_issues, {})
        issue_list = IssueList(title="YAML Importer", issues=self._read_issues)

        if not self._filepaths:
            issue_list.append(
                NeatValueWarning(
                    f"{type(self).__name__} was called without filepaths when there is content",
                )
            )
            metadata_file = Path()
        else:
            metadata_file_nullable = next((file for file in self._filepaths if file.stem == "metadata"), None)
            metadata_file = metadata_file_nullable or self._filepaths[0]

        if "metadata" not in self.raw_data:
            self._read_issues.append(FileMissingRequiredFieldError(metadata_file, "section", "metadata"))
            return ReadRules(None, self._read_issues, {})

        metadata = self.raw_data["metadata"]

        if "role" not in metadata:
            self._read_issues.append(FileMissingRequiredFieldError(metadata, "metadata", "role"))
            return ReadRules(None, self._read_issues, {})

        role_input = RoleTypes(metadata["role"])
        role_enum = RoleTypes(role_input)
        rules_cls = INPUT_RULES_BY_ROLE[role_enum]

        rules = cast(T_InputRules, rules_cls.load(self.raw_data))

        return ReadRules(rules, issue_list, {})