Skip to content

Exporters

cognite.neat.rules.exporters #

DMSExporter #

Bases: CDFExporter[DMSRules, DMSSchema]

Export rules to Cognite Data Fusion's Data Model Storage (DMS) service.

Parameters:

Name Type Description Default
export_components frozenset[Literal['all', 'spaces', 'data_models', 'views', 'containers']]

Which components to export. Defaults to frozenset({"all"}).

'all'
include_space set[str]

If set, only export components in the given spaces. Defaults to None which means all spaces.

None
existing_handling Literal['fail', 'skip', 'update', 'force']

How to handle existing components. Defaults to "update". See below for details.

'update'
export_pipeline bool

Whether to export the pipeline. Defaults to False. This means setting up transformations, RAW databases and tables to populate the data model.

False
instance_space str

The space to use for the instance. Defaults to None.

None
suppress_warnings bool

Suppress warnings. Defaults to False.

False

... note::

- "fail": If any component already exists, the export will fail.
- "skip": If any component already exists, it will be skipped.
- "update": If any component already exists, it will be updated.
- "force": If any component already exists, it will be deleted and recreated.
Source code in cognite/neat/rules/exporters/_rules2dms.py
class DMSExporter(CDFExporter[DMSRules, DMSSchema]):
    """Export rules to Cognite Data Fusion's Data Model Storage (DMS) service.

    Args:
        export_components (frozenset[Literal["all", "spaces", "data_models", "views", "containers"]], optional):
            Which components to export. Defaults to frozenset({"all"}).
        include_space (set[str], optional):
            If set, only export components in the given spaces. Defaults to None which means all spaces.
        existing_handling (Literal["fail", "skip", "update", "force"], optional): How to handle existing components.
            Defaults to "update". See below for details.
        export_pipeline (bool, optional): Whether to export the pipeline. Defaults to False. This means setting
            up transformations, RAW databases and tables to populate the data model.
        instance_space (str, optional): The space to use for the instance. Defaults to None.
        suppress_warnings (bool, optional): Suppress warnings. Defaults to False.

    ... note::

        - "fail": If any component already exists, the export will fail.
        - "skip": If any component already exists, it will be skipped.
        - "update": If any component already exists, it will be updated.
        - "force": If any component already exists, it will be deleted and recreated.

    """

    def __init__(
        self,
        export_components: Component | Collection[Component] = "all",
        include_space: set[str] | None = None,
        existing_handling: Literal["fail", "skip", "update", "force"] = "update",
        export_pipeline: bool = False,
        instance_space: str | None = None,
        suppress_warnings: bool = False,
    ):
        self.export_components = {export_components} if isinstance(export_components, str) else set(export_components)
        self.include_space = include_space
        self.existing_handling = existing_handling
        self.export_pipeline = export_pipeline
        self.instance_space = instance_space
        self.suppress_warnings = suppress_warnings
        self._schema: DMSSchema | None = None

    def export_to_file(self, rules: DMSRules, filepath: Path) -> None:
        """Export the rules to a file(s).

        If the file is a directory, the components will be exported to separate files, otherwise they will be
        exported to a zip file.

        Args:
            filepath: Directory or zip file path to export to.
            rules:
        """
        if filepath.is_dir():
            self._export_to_directory(filepath, rules)
        else:
            self._export_to_zip_file(filepath, rules)

    def _export_to_directory(self, directory: Path, rules: DMSRules) -> None:
        schema = self.export(rules)
        exclude = self._create_exclude_set()
        schema.to_directory(directory, exclude=exclude, new_line=self._new_line, encoding=self._encoding)

    def _export_to_zip_file(self, filepath: Path, rules: DMSRules) -> None:
        if filepath.suffix not in {".zip"}:
            warnings.warn("File extension is not .zip, adding it to the file name", stacklevel=2)
            filepath = filepath.with_suffix(".zip")
        schema = self.export(rules)
        exclude = self._create_exclude_set()
        schema.to_zip(filepath, exclude=exclude)

    def _create_exclude_set(self):
        if "all" in self.export_components:
            exclude = set()
        else:
            exclude = {"spaces", "data_models", "views", "containers", "node_types"} - self.export_components
        return exclude

    def export(self, rules: DMSRules) -> DMSSchema:
        return rules.as_schema(include_pipeline=self.export_pipeline, instance_space=self.instance_space)

    def delete_from_cdf(self, rules: DMSRules, client: CogniteClient, dry_run: bool = False) -> Iterable[UploadResult]:
        to_export = self._prepare_exporters(rules, client)

        # we need to reverse order in which we are picking up the items to delete
        # as they are sorted in the order of creation and we need to delete them in reverse order
        for items, loader in reversed(to_export):
            item_ids = loader.get_ids(items)
            existing_items = loader.retrieve(item_ids)
            existing_ids = loader.get_ids(existing_items)
            to_delete: list[Hashable] = []
            for item_id in item_ids:
                if (
                    isinstance(loader, DataModelingLoader)
                    and self.include_space is not None
                    and not loader.in_space(item_id, self.include_space)
                ):
                    continue

                if item_id in existing_ids:
                    to_delete.append(item_id)

            deleted: set[Hashable] = set()
            failed_deleted: set[Hashable] = set()
            error_messages: list[str] = []
            if dry_run:
                deleted.update(to_delete)
            elif to_delete:
                try:
                    loader.delete(to_delete)
                except CogniteAPIError as e:
                    failed_deleted.update(loader.get_id(item) for item in e.failed + e.unknown)
                    deleted.update(loader.get_id(item) for item in e.successful)
                    error_messages.append(f"Failed delete: {e.message}")
                else:
                    deleted.update(to_delete)

            yield UploadResult(
                name=loader.resource_name,
                deleted=deleted,
                failed_deleted=failed_deleted,
                error_messages=error_messages,
            )

    def export_to_cdf_iterable(
        self, rules: DMSRules, client: CogniteClient, dry_run: bool = False
    ) -> Iterable[UploadResult]:
        to_export = self._prepare_exporters(rules, client)

        redeploy_data_model = False
        for items, loader in to_export:
            # The conversion from DMS to GraphQL does not seem to be triggered even if the views
            # are changed. This is a workaround to force the conversion.
            is_redeploying = loader is DataModelingLoader and redeploy_data_model

            to_create, to_delete, to_update, unchanged = self._categorize_items_for_upload(
                loader, items, is_redeploying
            )

            issue_list = IssueList()
            warning_list = self._validate(loader, items)
            issue_list.extend(warning_list)

            created: set[Hashable] = set()
            skipped: set[Hashable] = set()
            changed: set[Hashable] = set()
            failed_created: set[Hashable] = set()
            failed_changed: set[Hashable] = set()
            error_messages: list[str] = []
            if dry_run:
                if self.existing_handling in ["update", "force"]:
                    changed.update(loader.get_id(item) for item in to_update)
                elif self.existing_handling == "skip":
                    skipped.update(loader.get_id(item) for item in to_update)
                elif self.existing_handling == "fail":
                    failed_changed.update(loader.get_id(item) for item in to_update)
                else:
                    raise ValueError(f"Unsupported existing_handling {self.existing_handling}")
            else:
                if to_delete:
                    try:
                        loader.delete(to_delete)
                    except CogniteAPIError as e:
                        error_messages.append(f"Failed delete: {e.message}")

                if isinstance(loader, DataModelingLoader):
                    to_create = loader.sort_by_dependencies(to_create)

                try:
                    loader.create(to_create)
                except CogniteAPIError as e:
                    failed_created.update(loader.get_id(item) for item in e.failed + e.unknown)
                    created.update(loader.get_id(item) for item in e.successful)
                    error_messages.append(e.message)
                else:
                    created.update(loader.get_id(item) for item in to_create)

                if self.existing_handling in ["update", "force"]:
                    try:
                        loader.update(to_update)
                    except CogniteAPIError as e:
                        failed_changed.update(loader.get_id(item) for item in e.failed + e.unknown)
                        changed.update(loader.get_id(item) for item in e.successful)
                        error_messages.append(e.message)
                    else:
                        changed.update(loader.get_id(item) for item in to_update)
                elif self.existing_handling == "skip":
                    skipped.update(loader.get_id(item) for item in to_update)
                elif self.existing_handling == "fail":
                    failed_changed.update(loader.get_id(item) for item in to_update)

            yield UploadResult(
                name=loader.resource_name,
                created=created,
                changed=changed,
                unchanged={loader.get_id(item) for item in unchanged},
                skipped=skipped,
                failed_created=failed_created,
                failed_changed=failed_changed,
                error_messages=error_messages,
                issues=issue_list,
            )

            if loader is ViewLoader and (created or changed):
                redeploy_data_model = True

    def _categorize_items_for_upload(
        self, loader: ResourceLoader, items: Sequence[CogniteResource], is_redeploying
    ) -> tuple[list[CogniteResource], list[CogniteResource], list[CogniteResource], list[CogniteResource]]:
        item_ids = loader.get_ids(items)
        cdf_items = loader.retrieve(item_ids)
        cdf_item_by_id = {loader.get_id(item): item for item in cdf_items}
        to_create, to_update, unchanged, to_delete = [], [], [], []
        for item in items:
            if (
                isinstance(loader, DataModelingLoader)
                and self.include_space is not None
                and not loader.in_space(item, self.include_space)
            ):
                continue

            cdf_item = cdf_item_by_id.get(loader.get_id(item))
            if cdf_item is None:
                to_create.append(item)
            elif is_redeploying:
                to_update.append(item)
                to_delete.append(cdf_item)
            elif loader.are_equal(item, cdf_item):
                unchanged.append(item)
            else:
                to_update.append(item)
        return to_create, to_delete, to_update, unchanged

    def _prepare_exporters(self, rules, client) -> list[tuple[CogniteResourceList, ResourceLoader]]:
        schema = self.export(rules)
        to_export: list[tuple[CogniteResourceList, ResourceLoader]] = []
        if self.export_components.intersection({"all", "spaces"}):
            to_export.append((SpaceApplyList(schema.spaces.values()), SpaceLoader(client)))
        if self.export_components.intersection({"all", "containers"}):
            to_export.append((ContainerApplyList(schema.containers.values()), ContainerLoader(client)))
        if self.export_components.intersection({"all", "views"}):
            to_export.append((ViewApplyList(schema.views.values()), ViewLoader(client, self.existing_handling)))
        if self.export_components.intersection({"all", "data_models"}):
            to_export.append((DataModelApplyList([schema.data_model]), DataModelLoader(client)))
        if isinstance(schema, PipelineSchema):
            to_export.append((schema.databases, RawDatabaseLoader(client)))
            to_export.append((schema.raw_tables, RawTableLoader(client)))
            to_export.append((schema.transformations, TransformationLoader(client)))
        return to_export

    def _validate(self, loader: ResourceLoader, items: CogniteResourceList) -> IssueList:
        issue_list = IssueList()
        if isinstance(loader, DataModelLoader):
            models = cast(list[DataModelApply], items)
            if other_models := self._exist_other_data_models(loader, models):
                warning = PrincipleOneModelOneSpaceWarning(
                    f"There are multiple data models in the same space {models[0].space}. "
                    f"Other data models in the space are {other_models}.",
                )
                if not self.suppress_warnings:
                    warnings.warn(warning, stacklevel=2)
                issue_list.append(warning)

        return issue_list

    @classmethod
    def _exist_other_data_models(cls, loader: DataModelLoader, models: list[DataModelApply]) -> list[DataModelId]:
        if not models:
            return []
        space = models[0].space
        external_id = models[0].external_id
        try:
            data_models = loader.client.data_modeling.data_models.list(space=space, limit=25, all_versions=False)
        except CogniteAPIError as e:
            warnings.warn(ResourceRetrievalWarning(frozenset({space}), "space", str(e)), stacklevel=2)
            return []
        else:
            return [
                data_model.as_id()
                for data_model in data_models
                if (data_model.space, data_model.external_id) != (space, external_id)
            ]

export_to_file(rules, filepath) #

Export the rules to a file(s).

If the file is a directory, the components will be exported to separate files, otherwise they will be exported to a zip file.

Parameters:

Name Type Description Default
filepath Path

Directory or zip file path to export to.

required
rules DMSRules
required
Source code in cognite/neat/rules/exporters/_rules2dms.py
def export_to_file(self, rules: DMSRules, filepath: Path) -> None:
    """Export the rules to a file(s).

    If the file is a directory, the components will be exported to separate files, otherwise they will be
    exported to a zip file.

    Args:
        filepath: Directory or zip file path to export to.
        rules:
    """
    if filepath.is_dir():
        self._export_to_directory(filepath, rules)
    else:
        self._export_to_zip_file(filepath, rules)

ExcelExporter #

Bases: BaseExporter[VerifiedRules, Workbook]

Export rules to Excel.

Parameters:

Name Type Description Default
styling Style

The styling to use for the Excel file. Defaults to "default". See below for details on the different styles.

'default'
output_role

The role to use for the exported spreadsheet. If provided, the rules will be converted to this role formate before being written to excel. If not provided, the role from the rules will be used.

required
dump_as DumpOptions

This determines how the rules are written to the Excel file. An Excel file has up to three sets of sheets: user, last, and reference. The user sheets are used for inputting rules from a user. The last sheets are used for the last version of the same model as the user, while the reference sheets are used for the model the user is building on. The options are: * "user": The rules are written to the user sheets. This is used when you want to modify the rules directly and potentially change the model. This is useful when you have imported the data model from outside CDF and you want to modify it before you write it to CDF. * "last": The rules are written to the last sheets. This is used when you want to extend the rules, but have validation that you are not breaking the existing model. This is used when you want to change a model that has already been published to CDF and that model is in production. * "reference": The rules are written to the reference sheets. This is typically used when you want to build a new solution on top of an enterprise model.

'user'
new_model_id tuple[str, str] | None

The new model ID to use for the exported spreadsheet. This is only applicable if the input rules have 'is_reference' set. If provided, the model ID will be used to automatically create the new metadata sheet in the Excel file. The model id is expected to be a tuple of (prefix, title) (space, external_id) for InformationRules and DMSRules respectively.

None

The following styles are available:

  • "none": No styling is applied.
  • "minimal": Column widths are adjusted to fit the content, and the header row(s) is frozen.
  • "default": Minimal + headers are bold, increased size, and colored.
  • "maximal": Default + alternating row colors in the properties sheet for each class in addition to extra blank rows between classes and borders
Source code in cognite/neat/rules/exporters/_rules2excel.py
class ExcelExporter(BaseExporter[VerifiedRules, Workbook]):
    """Export rules to Excel.

    Args:
        styling: The styling to use for the Excel file. Defaults to "default". See below for details
            on the different styles.
        output_role: The role to use for the exported spreadsheet. If provided, the rules will be converted to
            this role formate before being written to excel. If not provided, the role from the rules will be used.
        dump_as: This determines how the rules are written to the Excel file. An Excel file has up to three sets of
           sheets: user, last, and reference. The user sheets are used for inputting rules from a user. The last sheets
           are used for the last version of the same model as the user, while the reference sheets are used for
           the model the user is building on. The options are:
             * "user": The rules are written to the user sheets. This is used when you want to modify the rules
                directly and potentially change the model. This is useful when you have imported the data model
                from outside CDF and you want to modify it before you write it to CDF.
             * "last": The rules are written to the last sheets. This is used when you want to extend the rules,
               but have validation that you are not breaking the existing model. This is used when you want to
               change a model that has already been published to CDF and that model is in production.
             * "reference": The rules are written to the reference sheets. This is typically used when you want to build
               a new solution on top of an enterprise model.
        new_model_id: The new model ID to use for the exported spreadsheet. This is only applicable if the input
            rules have 'is_reference' set. If provided, the model ID will be used to automatically create the
            new metadata sheet in the Excel file. The model id is expected to be a tuple of (prefix, title)
            (space, external_id) for InformationRules and DMSRules respectively.

    The following styles are available:

    - "none":    No styling is applied.
    - "minimal": Column widths are adjusted to fit the content, and the header row(s) is frozen.
    - "default": Minimal + headers are bold, increased size, and colored.
    - "maximal": Default + alternating row colors in the properties sheet for each class in addition to extra
                 blank rows between classes and borders
    """

    Style = Literal["none", "minimal", "default", "maximal"]
    DumpOptions = Literal["user", "last", "reference"]
    _main_header_by_sheet_name: ClassVar[dict[str, str]] = {
        "Properties": "Definition of Properties per Class",
        "Classes": "Definition of Classes",
        "Views": "Definition of Views",
        "Containers": "Definition of Containers",
        "Nodes": "Definition of Nodes",
        "Enum": "Definition of Enum Collections",
    }
    style_options = get_args(Style)
    dump_options = get_args(DumpOptions)

    def __init__(
        self, styling: Style = "default", dump_as: DumpOptions = "user", new_model_id: tuple[str, str] | None = None
    ):
        if styling not in self.style_options:
            raise ValueError(f"Invalid styling: {styling}. Valid options are {self.style_options}")
        if dump_as not in self.dump_options:
            raise ValueError(f"Invalid dump_as: {dump_as}. Valid options are {self.dump_options}")
        self.styling = styling
        self._styling_level = self.style_options.index(styling)
        self.new_model_id = new_model_id
        self.dump_as = dump_as

    def export_to_file(self, rules: VerifiedRules, filepath: Path) -> None:
        """Exports transformation rules to excel file."""
        data = self.export(rules)
        try:
            data.save(filepath)
        finally:
            data.close()
        return None

    def export(self, rules: VerifiedRules) -> Workbook:
        workbook = Workbook()
        # Remove default sheet named "Sheet"
        workbook.remove(workbook["Sheet"])

        dumped_user_rules: dict[str, Any]
        dumped_last_rules: dict[str, Any] | None = None
        dumped_reference_rules: dict[str, Any] | None = None
        if self.dump_as != "user":
            action = {"last": "update", "reference": "create"}[self.dump_as]
            metadata_creator = _MetadataCreator(action, self.new_model_id)  # type: ignore[arg-type]

            dumped_user_rules = {
                "Metadata": metadata_creator.create(rules.metadata),
            }

            if self.dump_as == "last":
                dumped_last_rules = rules.dump(by_alias=True)
                if rules.reference:
                    dumped_reference_rules = rules.reference.dump(by_alias=True, as_reference=True)
            elif self.dump_as == "reference":
                dumped_reference_rules = rules.dump(by_alias=True, as_reference=True)
        else:
            dumped_user_rules = rules.dump(by_alias=True)
            if rules.last:
                dumped_last_rules = rules.last.dump(by_alias=True)
            if rules.reference:
                dumped_reference_rules = rules.reference.dump(by_alias=True, as_reference=True)

        self._write_metadata_sheet(workbook, dumped_user_rules["Metadata"])
        self._write_sheets(workbook, dumped_user_rules, rules)
        if dumped_last_rules:
            self._write_sheets(workbook, dumped_last_rules, rules, sheet_prefix="Last")
            self._write_metadata_sheet(workbook, dumped_last_rules["Metadata"], sheet_prefix="Last")

        if dumped_reference_rules:
            self._write_sheets(workbook, dumped_reference_rules, rules, sheet_prefix="Ref")
            self._write_metadata_sheet(workbook, dumped_reference_rules["Metadata"], sheet_prefix="Ref")

        if self._styling_level > 0:
            self._adjust_column_widths(workbook)

        return workbook

    def _write_sheets(
        self,
        workbook: Workbook,
        dumped_rules: dict[str, Any],
        rules: VerifiedRules,
        sheet_prefix: str = "",
    ):
        for sheet_name, headers in rules.headers_by_sheet(by_alias=True).items():
            if sheet_name in ("Metadata", "Prefixes", "Reference", "Last"):
                continue
            sheet = workbook.create_sheet(f"{sheet_prefix}{sheet_name}")

            main_header = self._main_header_by_sheet_name[sheet_name]
            sheet.append([main_header] + [""] * (len(headers) - 1))
            sheet.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(headers))
            sheet.append(headers)

            fill_colors = itertools.cycle(["CADCFC", "FFFFFF"])
            fill_color = next(fill_colors)
            last_class: str | None = None
            item: dict[str, Any]
            for item in dumped_rules.get(sheet_name) or []:
                row = list(item.values())
                class_ = row[0]

                is_properties = sheet_name == "Properties"
                is_new_class = class_ != last_class and last_class is not None
                if self._styling_level > 2 and is_new_class and is_properties:
                    sheet.append([""] * len(headers))
                    for cell in sheet[sheet.max_row]:
                        cell.fill = PatternFill(fgColor=fill_color, patternType="solid")
                        side = Side(style="thin", color="000000")
                        cell.border = Border(left=side, right=side, top=side, bottom=side)
                    fill_color = next(fill_colors)

                sheet.append(row)
                if self._styling_level > 2 and is_properties:
                    for cell in sheet[sheet.max_row]:
                        cell.fill = PatternFill(fgColor=fill_color, patternType="solid")
                        side = Side(style="thin", color="000000")
                        cell.border = Border(left=side, right=side, top=side, bottom=side)
                last_class = class_

            if self._styling_level > 0:
                # This freezes all rows above the given row
                sheet.freeze_panes = sheet["A3"]

                sheet["A1"].alignment = Alignment(horizontal="center")

            if self._styling_level > 1:
                # Make the header row bold, larger, and colored
                sheet["A1"].font = Font(bold=True, size=20)
                sheet["A1"].fill = PatternFill(fgColor="FFC000", patternType="solid")
                for cell in sheet["2"]:
                    cell.font = Font(bold=True, size=14)

    def _write_metadata_sheet(self, workbook: Workbook, metadata: dict[str, Any], sheet_prefix: str = "") -> None:
        # Excel does not support timezone in datetime strings
        if isinstance(metadata.get("created"), datetime):
            metadata["created"] = metadata["created"].replace(tzinfo=None)
        if isinstance(metadata.get("updated"), datetime):
            metadata["updated"] = metadata["updated"].replace(tzinfo=None)

        metadata_sheet = workbook.create_sheet(f"{sheet_prefix}Metadata")
        for key, value in metadata.items():
            metadata_sheet.append([key, value])

        if self._styling_level > 1:
            for cell in metadata_sheet["A"]:
                cell.font = Font(bold=True, size=12)

    @classmethod
    def _get_item_class(cls, annotation: GenericAlias) -> type[SheetEntity]:
        if not isinstance(annotation, GenericAlias):
            raise ValueError(f"Expected annotation to be a GenericAlias, but got {type(annotation)}")
        args = get_args(annotation)
        if len(args) != 1:
            raise ValueError(f"Expected annotation to have exactly one argument, but got {len(args)}")
        arg = args[0]
        if not issubclass(arg, SheetEntity):
            raise ValueError(f"Expected annotation to have a BaseModel argument, but got {type(arg)}")
        return arg

    @classmethod
    def _adjust_column_widths(cls, workbook: Workbook) -> None:
        for sheet_ in workbook:
            sheet = cast(Worksheet, sheet_)
            for column_cells in sheet.columns:
                try:
                    max_length = max(len(str(cell.value)) for cell in column_cells if cell.value is not None)
                except ValueError:
                    max_length = 0

                selected_column = column_cells[0]
                if isinstance(selected_column, MergedCell):
                    selected_column = column_cells[1]

                current = sheet.column_dimensions[selected_column.column_letter].width or (max_length + 0.5)
                sheet.column_dimensions[selected_column.column_letter].width = max(current, max_length + 0.5)
        return None

export_to_file(rules, filepath) #

Exports transformation rules to excel file.

Source code in cognite/neat/rules/exporters/_rules2excel.py
def export_to_file(self, rules: VerifiedRules, filepath: Path) -> None:
    """Exports transformation rules to excel file."""
    data = self.export(rules)
    try:
        data.save(filepath)
    finally:
        data.close()
    return None

OWLExporter #

Bases: GraphExporter

Exports verified information rules to an OWL ontology.

Source code in cognite/neat/rules/exporters/_rules2ontology.py
class OWLExporter(GraphExporter):
    """Exports verified information rules to an OWL ontology."""

    def export(self, rules: InformationRules) -> Graph:
        return Ontology.from_rules(rules).as_owl()

SemanticDataModelExporter #

Bases: GraphExporter

Exports verified information rules to a semantic data model.

Source code in cognite/neat/rules/exporters/_rules2ontology.py
class SemanticDataModelExporter(GraphExporter):
    """Exports verified information rules to a semantic data model."""

    def export(self, rules: InformationRules) -> Graph:
        return Ontology.from_rules(rules).as_semantic_data_model()

SHACLExporter #

Bases: GraphExporter

Exports rules to a SHACL graph.

Source code in cognite/neat/rules/exporters/_rules2ontology.py
class SHACLExporter(GraphExporter):
    """Exports rules to a SHACL graph."""

    def export(self, rules: InformationRules) -> Graph:
        return Ontology.from_rules(rules).as_shacl()

YAMLExporter #

Bases: BaseExporter[VerifiedRules, str]

Export rules (Information, DMS or Domain) to YAML.

Parameters:

Name Type Description Default
files Files

The number of files to output. Defaults to "single".

'single'
output Format

The format to output the rules. Defaults to "yaml".

'yaml'
output_role

The role to output the rules for. Defaults to None, which means that the role of the input rules will be used.

required

The following formats are available:

  • "single": A single YAML file will contain the entire rules.

.. note::

YAML files are typically used for storing rules when checked into version control systems, e.g., git-history.
The advantage of using YAML files over Excel is that tools like git can show the differences between different
versions of the rules.
Source code in cognite/neat/rules/exporters/_rules2yaml.py
class YAMLExporter(BaseExporter[VerifiedRules, str]):
    """Export rules (Information, DMS or Domain) to YAML.

    Args:
        files: The number of files to output. Defaults to "single".
        output: The format to output the rules. Defaults to "yaml".
        output_role: The role to output the rules for. Defaults to None, which means that the role
            of the input rules will be used.

    The following formats are available:

    - "single": A single YAML file will contain the entire rules.

    .. note::

        YAML files are typically used for storing rules when checked into version control systems, e.g., git-history.
        The advantage of using YAML files over Excel is that tools like git can show the differences between different
        versions of the rules.

    """

    Files = Literal["single"]
    Format = Literal["yaml", "json"]

    file_option = get_args(Files)
    format_option = get_args(Format)

    def __init__(self, files: Files = "single", output: Format = "yaml"):
        if files not in self.file_option:
            raise ValueError(f"Invalid files: {files}. Valid options are {self.file_option}")
        if output not in self.format_option:
            raise ValueError(f"Invalid output: {output}. Valid options are {self.format_option}")
        self.files = files
        self.output = output

    def export_to_file(self, rules: VerifiedRules, filepath: Path) -> None:
        """Exports transformation rules to YAML/JSON file(s)."""
        if self.files == "single":
            if filepath.suffix != f".{self.output}":
                warnings.warn(f"File extension is not .{self.output}, adding it to the file name", stacklevel=2)
                filepath = filepath.with_suffix(f".{self.output}")
            filepath.write_text(self.export(rules), encoding=self._encoding, newline=self._new_line)
        else:
            raise NotImplementedError(f"Exporting to {self.files} files is not supported")

    def export(self, rules: VerifiedRules) -> str:
        """Export rules to YAML (or JSON) format.

        Args:
            rules: The rules to be exported.

        Returns:
            str: The rules in YAML (or JSON) format.
        """
        # model_dump_json ensures that the output is in JSON format,
        # if we don't do this, we will get Enums and other types that are not serializable to YAML
        json_output = rules.dump(mode="json")
        if self.output == "json":
            return json.dumps(json_output)
        elif self.output == "yaml":
            return yaml.safe_dump(json_output)
        else:
            raise ValueError(f"Invalid output: {self.output}. Valid options are {self.format_option}")

export_to_file(rules, filepath) #

Exports transformation rules to YAML/JSON file(s).

Source code in cognite/neat/rules/exporters/_rules2yaml.py
def export_to_file(self, rules: VerifiedRules, filepath: Path) -> None:
    """Exports transformation rules to YAML/JSON file(s)."""
    if self.files == "single":
        if filepath.suffix != f".{self.output}":
            warnings.warn(f"File extension is not .{self.output}, adding it to the file name", stacklevel=2)
            filepath = filepath.with_suffix(f".{self.output}")
        filepath.write_text(self.export(rules), encoding=self._encoding, newline=self._new_line)
    else:
        raise NotImplementedError(f"Exporting to {self.files} files is not supported")

export(rules) #

Export rules to YAML (or JSON) format.

Parameters:

Name Type Description Default
rules VerifiedRules

The rules to be exported.

required

Returns:

Name Type Description
str str

The rules in YAML (or JSON) format.

Source code in cognite/neat/rules/exporters/_rules2yaml.py
def export(self, rules: VerifiedRules) -> str:
    """Export rules to YAML (or JSON) format.

    Args:
        rules: The rules to be exported.

    Returns:
        str: The rules in YAML (or JSON) format.
    """
    # model_dump_json ensures that the output is in JSON format,
    # if we don't do this, we will get Enums and other types that are not serializable to YAML
    json_output = rules.dump(mode="json")
    if self.output == "json":
        return json.dumps(json_output)
    elif self.output == "yaml":
        return yaml.safe_dump(json_output)
    else:
        raise ValueError(f"Invalid output: {self.output}. Valid options are {self.format_option}")