Skip to content

Product Types

Defining a Product Type

A Product Type (often referred to simply as a product) is the top level object of a domain model. A product is effectively the template used for creating a subscription instance, and you can instantiate as many instances of these as you want. To see an example product model, you can see a very simple Node product type from the example workflow orchestrator:

# Copyright 2019-2023 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from orchestrator.domain.base import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, strEnum

from products.product_blocks.node import NodeBlock, NodeBlockInactive, NodeBlockProvisioning


class Node_Type(strEnum):
    Cisco = "Cisco"
    Nokia = "Nokia"
    Cumulus = "Cumulus"
    FRR = "FRR"


class NodeInactive(SubscriptionModel, is_base=True):
    node_type: Node_Type
    node: NodeBlockInactive


class NodeProvisioning(NodeInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    node_type: Node_Type
    node: NodeBlockProvisioning


class Node(NodeProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    node_type: Node_Type
    node: NodeBlock

Type Hints

Notice how type hints are used on these classes—The WFO uses these types for pydantic validations and for type safety when serializing data into and out of the database. If you're not familiar with type hinting, learn about the benefits from PEP 484!

Fixed Inputs

When a hard coded value is stored on product model, like Node_Type is here, it is called a Fixed Input. Read more about Fixed Inputs here

Breaking this product down a bit more, we see 3 classes, NodeInactive, NodeProvisioning, and finally Node. These three classes are built off of each-other, with the lowest level class (NodeInactive) based off of the SubscriptionModel base class. Each class has two simple attributes, one is the Fixed Input of Node_Type, and the other is the root product block node. Each one of these classes represents the Node product in its various lifecycle states, which are defined here in the SubscriptionLifecycle enum:

orchestrator.types.SubscriptionLifecycle

Bases: pydantic_forms.types.strEnum

Source code in orchestrator/types.py
116
117
118
119
120
121
122
123
@strawberry.enum
class SubscriptionLifecycle(strEnum):
    INITIAL = "initial"
    ACTIVE = "active"
    MIGRATING = "migrating"
    DISABLED = "disabled"
    TERMINATED = "terminated"
    PROVISIONING = "provisioning"

To fully understand the Subscription Model, it's best to look at the SubscriptionModel itself in the code. Here you can also see the various methods available for use on these Subscription instances when you are using them in your workflow code:

orchestrator.domain.base.SubscriptionModel

Bases: orchestrator.domain.base.DomainModel

This is the base class for all product subscription models.

To use this class, see the examples below:

Definining a subscription model:

>>> class SubscriptionInactive(SubscriptionModel, product_type="SP"):  # doctest:+SKIP
...    block: Optional[ProductBlockModelInactive] = None

>>> class Subscription(BlockInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):  # doctest:+SKIP
...    block: ProductBlockModel

This example defines a subscription model with two different contraints based on lifecycle. Subscription is valid only for ACTIVE And SubscriptionInactive for all other states. product_type must be defined on the base class and need not to be defined on the others

Create a new empty subscription:

>>> example1 = SubscriptionInactive.from_product_id(product_id, customer_id)  # doctest:+SKIP

Create a new instance based on a dict in the state:

>>> example2 = SubscriptionInactive(**state)  # doctest:+SKIP

To retrieve a ProductBlockModel from the database:

>>> SubscriptionInactive.from_subscription(subscription_id)  # doctest:+SKIP
Source code in orchestrator/domain/base.py
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
class SubscriptionModel(DomainModel):
    r"""This is the base class for all product subscription models.

    To use this class, see the examples below:

    Definining a subscription model:

        >>> class SubscriptionInactive(SubscriptionModel, product_type="SP"):  # doctest:+SKIP
        ...    block: Optional[ProductBlockModelInactive] = None

        >>> class Subscription(BlockInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):  # doctest:+SKIP
        ...    block: ProductBlockModel

    This example defines a subscription model with two different contraints based on lifecycle. `Subscription` is valid only for `ACTIVE`
    And `SubscriptionInactive` for all other states.
    `product_type` must be defined on the base class and need not to be defined on the others

    Create a new empty subscription:

        >>> example1 = SubscriptionInactive.from_product_id(product_id, customer_id)  # doctest:+SKIP

    Create a new instance based on a dict in the state:

        >>> example2 = SubscriptionInactive(**state)  # doctest:+SKIP

    To retrieve a ProductBlockModel from the database:

        >>> SubscriptionInactive.from_subscription(subscription_id)  # doctest:+SKIP
    """

    product: ProductModel
    customer_id: str
    _db_model: SubscriptionTable = PrivateAttr()
    subscription_id: UUID = Field(default_factory=uuid4)  # pragma: no mutate
    description: str = "Initial subscription"  # pragma: no mutate
    status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL  # pragma: no mutate
    insync: bool = False  # pragma: no mutate
    start_date: datetime | None = None  # pragma: no mutate
    end_date: datetime | None = None  # pragma: no mutate
    note: str | None = None  # pragma: no mutate

    def __new__(cls, *args: Any, status: SubscriptionLifecycle | None = None, **kwargs: Any) -> "SubscriptionModel":
        # status can be none if created during change_lifecycle
        if status and not issubclass(cls, lookup_specialized_type(cls, status)):
            raise ValueError(f"{cls} is not valid for status {status}")

        return super().__new__(cls)

    @classmethod
    def __pydantic_init_subclass__(  # type: ignore[override]
        cls, is_base: bool = False, lifecycle: list[SubscriptionLifecycle] | None = None, **kwargs: Any
    ) -> None:
        super().__pydantic_init_subclass__(lifecycle=lifecycle, **kwargs)

        if is_base:
            cls.__base_type__ = cls

        if is_base or lifecycle:
            register_specialized_type(cls, lifecycle)

        cls.__doc__ = make_subscription_model_docstring(cls, lifecycle)

    @classmethod
    def diff_product_in_database(cls, product_id: UUID) -> dict[str, dict[str, set[str] | dict[str, set[str]]]]:
        """Return any differences between the attrs defined on the domain model and those on product blocks in the database.

        This is only needed to check if the domain model and database models match which would be done during testing...
        """
        product_db = db.session.get(ProductTable, product_id)
        product_blocks_in_db = {pb.name for pb in product_db.product_blocks} if product_db else set()

        product_blocks_in_model = cls._get_depends_on_product_block_types()
        product_blocks_types_in_model = get_depends_on_product_block_type_list(product_blocks_in_model)

        product_blocks_in_model = set(flatten(map(attrgetter("__names__"), product_blocks_types_in_model)))  # type: ignore

        missing_product_blocks_in_db = product_blocks_in_model - product_blocks_in_db  # type: ignore
        missing_product_blocks_in_model = product_blocks_in_db - product_blocks_in_model  # type: ignore

        fixed_inputs_model = set(cls._non_product_block_fields_)
        fixed_inputs_in_db = {fi.name for fi in product_db.fixed_inputs} if product_db else set()

        missing_fixed_inputs_in_db = fixed_inputs_model - fixed_inputs_in_db
        missing_fixed_inputs_in_model = fixed_inputs_in_db - fixed_inputs_model

        logger.debug(
            "ProductTable blocks diff",
            product_block_db=product_db.name if product_db else None,
            product_blocks_in_db=product_blocks_in_db,
            product_blocks_in_model=product_blocks_in_model,
            fixed_inputs_in_db=fixed_inputs_in_db,
            fixed_inputs_model=fixed_inputs_model,
            missing_product_blocks_in_db=missing_product_blocks_in_db,
            missing_product_blocks_in_model=missing_product_blocks_in_model,
            missing_fixed_inputs_in_db=missing_fixed_inputs_in_db,
            missing_fixed_inputs_in_model=missing_fixed_inputs_in_model,
        )

        missing_data_depends_on_blocks: dict[str, set[str]] = {}
        for product_block_in_model in product_blocks_types_in_model:
            missing_data_depends_on_blocks.update(product_block_in_model.diff_product_block_in_database())

        diff: dict[str, set[str] | dict[str, set[str]]] = {
            k: v
            for k, v in {
                "missing_product_blocks_in_db": missing_product_blocks_in_db,
                "missing_product_blocks_in_model": missing_product_blocks_in_model,
                "missing_fixed_inputs_in_db": missing_fixed_inputs_in_db,
                "missing_fixed_inputs_in_model": missing_fixed_inputs_in_model,
                "missing_in_depends_on_blocks": missing_data_depends_on_blocks,
            }.items()
            if v
        }

        missing_data: dict[str, dict[str, set[str] | dict[str, set[str]]]] = {}
        if diff and product_db:
            missing_data[product_db.name] = diff

        return missing_data

    @classmethod
    def from_product_id(
        cls: type[S],
        product_id: UUID | UUIDstr,
        customer_id: str,
        status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL,
        description: str | None = None,
        insync: bool = False,
        start_date: datetime | None = None,
        end_date: datetime | None = None,
        note: str | None = None,
    ) -> S:
        """Use product_id (and customer_id) to return required fields of a new empty subscription."""
        # Caller wants a new instance and provided a product_id and customer_id
        product_db = db.session.get(ProductTable, product_id)
        if not product_db:
            raise KeyError("Could not find a product for the given product_id")

        product = ProductModel(
            product_id=product_db.product_id,
            name=product_db.name,
            description=product_db.description,
            product_type=product_db.product_type,
            tag=product_db.tag,
            status=product_db.status,
            created_at=product_db.created_at,
            end_date=product_db.end_date,
        )

        if description is None:
            description = f"Initial subscription of {product.description}"

        subscription_id = uuid4()
        subscription = SubscriptionTable(
            subscription_id=subscription_id,
            product_id=product_id,
            customer_id=customer_id,
            description=description,
            status=status.value,
            insync=insync,
            start_date=start_date,
            end_date=end_date,
            note=note,
        )
        db.session.add(subscription)

        fixed_inputs = {fi.name: fi.value for fi in product_db.fixed_inputs}
        instances = cls._init_instances(subscription_id)

        model = cls(
            product=product,
            customer_id=customer_id,
            subscription_id=subscription_id,
            description=description,
            status=status,
            insync=insync,
            start_date=start_date,
            end_date=end_date,
            note=note,
            **fixed_inputs,
            **instances,
        )
        model._db_model = subscription
        return model

    @classmethod
    def from_other_lifecycle(
        cls: type[S],
        other: "SubscriptionModel",
        status: SubscriptionLifecycle,
        skip_validation: bool = False,
    ) -> S:
        """Create new domain model from instance while changing the status.

        This makes sure we always have a specific instance.
        """
        if not cls.__base_type__:
            # Import here to prevent cyclic imports
            from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

            cls = SUBSCRIPTION_MODEL_REGISTRY.get(other.product.name, cls)  # type:ignore
            cls = lookup_specialized_type(cls, status)

        # this will raise ValueError when wrong lifecycle transitions are detected in the new domain model
        if not skip_validation:
            validate_lifecycle_change(other, status)

        data = cls._data_from_lifecycle(other, status, other.subscription_id)
        data["status"] = status
        if data["start_date"] is None and status == SubscriptionLifecycle.ACTIVE:
            data["start_date"] = nowtz()
        if data["end_date"] is None and status == SubscriptionLifecycle.TERMINATED:
            data["end_date"] = nowtz()

        model = cls(**data)
        model._db_model = other._db_model

        return model

    # Some common functions shared by from_other_product and from_subscription
    @classmethod
    def _get_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> Any:
        return db.session.get(
            SubscriptionTable,
            subscription_id,
            options=[
                selectinload(SubscriptionTable.instances)
                .selectinload(SubscriptionInstanceTable.product_block)
                .selectinload(ProductBlockTable.resource_types),
                selectinload(SubscriptionTable.instances).selectinload(
                    SubscriptionInstanceTable.in_use_by_block_relations
                ),
                selectinload(SubscriptionTable.instances).selectinload(SubscriptionInstanceTable.values),
            ],
        )

    @classmethod
    def _to_product_model(cls: type[S], product: ProductTable) -> ProductModel:
        return ProductModel(
            product_id=product.product_id,
            name=product.name,
            description=product.description,
            product_type=product.product_type,
            tag=product.tag,
            status=product.status,
            created_at=product.created_at if product.created_at else None,
            end_date=product.end_date if product.end_date else None,
        )

    @classmethod
    def from_other_product(
        cls: type[S],
        old_instantiation: S,
        new_product_id: UUID | str,
        new_root: tuple[str, ProductBlockModel] | None = None,
    ) -> S:
        db_product = get_product_by_id(new_product_id)
        if not db_product:
            raise KeyError("Could not find a product for the given product_id")

        subscription = cls._get_subscription(old_instantiation.subscription_id)
        product = cls._to_product_model(db_product)

        status = SubscriptionLifecycle(subscription.status)

        if not cls.__base_type__:
            # Import here to prevent cyclic imports
            from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

            cls = SUBSCRIPTION_MODEL_REGISTRY.get(subscription.product.name, cls)  # type:ignore
            cls = lookup_specialized_type(cls, status)
        elif not issubclass(cls, lookup_specialized_type(cls, status)):
            raise ValueError(f"{cls} is not valid for lifecycle {status}")

        fixed_inputs = {fi.name: fi.value for fi in db_product.fixed_inputs}

        if new_root:
            name, product_block = new_root
            instances = {name: product_block}
        else:
            instances = cls._load_instances(subscription.instances, status, match_domain_attr=False)  # type:ignore

        try:
            model = cls(
                product=product,
                customer_id=subscription.customer_id,
                subscription_id=subscription.subscription_id,
                description=subscription.description,
                status=status,
                insync=subscription.insync,
                start_date=subscription.start_date,
                end_date=subscription.end_date,
                note=subscription.note,
                **fixed_inputs,
                **instances,
            )
            model._db_model = subscription
            return model
        except ValidationError:
            logger.exception(
                "Subscription is not correct in database", loaded_fixed_inputs=fixed_inputs, loaded_instances=instances
            )
            raise

    @classmethod
    def from_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> S:
        """Use a subscription_id to return required fields of an existing subscription."""
        subscription = cls._get_subscription(subscription_id)
        if subscription is None:
            raise ValueError(f"Subscription with id: {subscription_id}, does not exist")
        product = cls._to_product_model(subscription.product)

        status = SubscriptionLifecycle(subscription.status)

        if not cls.__base_type__:
            # Import here to prevent cyclic imports
            from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

            try:
                cls = SUBSCRIPTION_MODEL_REGISTRY[subscription.product.name]  # type:ignore
            except KeyError:
                raise ProductNotInRegistryError(
                    f"'{subscription.product.name}' is not found within the SUBSCRIPTION_MODEL_REGISTRY"
                )
            cls = lookup_specialized_type(cls, status)
        elif not issubclass(cls, lookup_specialized_type(cls, status)):
            raise ValueError(f"{cls} is not valid for lifecycle {status}")

        fixed_inputs = {fi.name: fi.value for fi in subscription.product.fixed_inputs}

        instances = cls._load_instances(subscription.instances, status, match_domain_attr=False)

        try:
            model = cls(
                product=product,
                customer_id=subscription.customer_id,
                subscription_id=subscription.subscription_id,
                description=subscription.description,
                status=status,
                insync=subscription.insync,
                start_date=subscription.start_date,
                end_date=subscription.end_date,
                note=subscription.note,
                **fixed_inputs,
                **instances,
            )
            model._db_model = subscription
            return model
        except ValidationError:
            logger.exception(
                "Subscription is not correct in database", loaded_fixed_inputs=fixed_inputs, loaded_instances=instances
            )
            raise

    def save(self) -> None:
        """Save the subscription to the database."""
        specialized_type = lookup_specialized_type(self.__class__, self.status)
        if specialized_type and not isinstance(self, specialized_type):
            raise ValueError(
                f"Lifecycle status {self.status.value} requires specialized type {specialized_type!r}, was: {type(self)!r}"
            )

        sub = db.session.get(
            SubscriptionTable,
            self.subscription_id,
            options=[
                selectinload(SubscriptionTable.instances)
                .selectinload(SubscriptionInstanceTable.product_block)
                .selectinload(ProductBlockTable.resource_types),
                selectinload(SubscriptionTable.instances).selectinload(SubscriptionInstanceTable.values),
            ],
        )
        if not sub:
            sub = self._db_model

        # Make sure we refresh the object and not use an already mapped object
        db.session.refresh(sub)

        self._db_model = sub
        sub.product_id = self.product.product_id
        sub.customer_id = self.customer_id
        sub.description = self.description
        sub.status = self.status.value
        sub.insync = self.insync
        sub.start_date = self.start_date
        sub.end_date = self.end_date
        sub.note = self.note

        db.session.add(sub)
        db.session.flush()  # Sends INSERT and returns subscription_id without committing transaction

        old_instances_dict = {instance.subscription_instance_id: instance for instance in sub.instances}

        saved_instances, depends_on_instances = self._save_instances(self.subscription_id, self.status)

        for instances in depends_on_instances.values():
            for instance in instances:
                if instance.subscription_id != self.subscription_id:
                    raise ValueError(
                        "Attempting to save a Foreign `Subscription Instance` directly below a subscription. "
                        "This is not allowed."
                    )
        sub.instances = saved_instances

        # Calculate what to remove
        instances_set = {instance.subscription_instance_id for instance in sub.instances}
        for instance_id in instances_set:
            old_instances_dict.pop(instance_id, None)

        # What's left should be removed
        for instance in old_instances_dict.values():
            db.session.delete(instance)

        db.session.flush()

    @property
    def db_model(self) -> SubscriptionTable:
        return self._db_model

diff_product_in_database classmethod

diff_product_in_database(
    product_id: UUID,
) -> dict[str, dict[str, set[str] | dict[str, set[str]]]]

Return any differences between the attrs defined on the domain model and those on product blocks in the database.

This is only needed to check if the domain model and database models match which would be done during testing...

Source code in orchestrator/domain/base.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
@classmethod
def diff_product_in_database(cls, product_id: UUID) -> dict[str, dict[str, set[str] | dict[str, set[str]]]]:
    """Return any differences between the attrs defined on the domain model and those on product blocks in the database.

    This is only needed to check if the domain model and database models match which would be done during testing...
    """
    product_db = db.session.get(ProductTable, product_id)
    product_blocks_in_db = {pb.name for pb in product_db.product_blocks} if product_db else set()

    product_blocks_in_model = cls._get_depends_on_product_block_types()
    product_blocks_types_in_model = get_depends_on_product_block_type_list(product_blocks_in_model)

    product_blocks_in_model = set(flatten(map(attrgetter("__names__"), product_blocks_types_in_model)))  # type: ignore

    missing_product_blocks_in_db = product_blocks_in_model - product_blocks_in_db  # type: ignore
    missing_product_blocks_in_model = product_blocks_in_db - product_blocks_in_model  # type: ignore

    fixed_inputs_model = set(cls._non_product_block_fields_)
    fixed_inputs_in_db = {fi.name for fi in product_db.fixed_inputs} if product_db else set()

    missing_fixed_inputs_in_db = fixed_inputs_model - fixed_inputs_in_db
    missing_fixed_inputs_in_model = fixed_inputs_in_db - fixed_inputs_model

    logger.debug(
        "ProductTable blocks diff",
        product_block_db=product_db.name if product_db else None,
        product_blocks_in_db=product_blocks_in_db,
        product_blocks_in_model=product_blocks_in_model,
        fixed_inputs_in_db=fixed_inputs_in_db,
        fixed_inputs_model=fixed_inputs_model,
        missing_product_blocks_in_db=missing_product_blocks_in_db,
        missing_product_blocks_in_model=missing_product_blocks_in_model,
        missing_fixed_inputs_in_db=missing_fixed_inputs_in_db,
        missing_fixed_inputs_in_model=missing_fixed_inputs_in_model,
    )

    missing_data_depends_on_blocks: dict[str, set[str]] = {}
    for product_block_in_model in product_blocks_types_in_model:
        missing_data_depends_on_blocks.update(product_block_in_model.diff_product_block_in_database())

    diff: dict[str, set[str] | dict[str, set[str]]] = {
        k: v
        for k, v in {
            "missing_product_blocks_in_db": missing_product_blocks_in_db,
            "missing_product_blocks_in_model": missing_product_blocks_in_model,
            "missing_fixed_inputs_in_db": missing_fixed_inputs_in_db,
            "missing_fixed_inputs_in_model": missing_fixed_inputs_in_model,
            "missing_in_depends_on_blocks": missing_data_depends_on_blocks,
        }.items()
        if v
    }

    missing_data: dict[str, dict[str, set[str] | dict[str, set[str]]]] = {}
    if diff and product_db:
        missing_data[product_db.name] = diff

    return missing_data

from_other_lifecycle classmethod

from_other_lifecycle(
    other: SubscriptionModel,
    status: SubscriptionLifecycle,
    skip_validation: bool = False,
) -> S

Create new domain model from instance while changing the status.

This makes sure we always have a specific instance.

Source code in orchestrator/domain/base.py
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
@classmethod
def from_other_lifecycle(
    cls: type[S],
    other: "SubscriptionModel",
    status: SubscriptionLifecycle,
    skip_validation: bool = False,
) -> S:
    """Create new domain model from instance while changing the status.

    This makes sure we always have a specific instance.
    """
    if not cls.__base_type__:
        # Import here to prevent cyclic imports
        from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

        cls = SUBSCRIPTION_MODEL_REGISTRY.get(other.product.name, cls)  # type:ignore
        cls = lookup_specialized_type(cls, status)

    # this will raise ValueError when wrong lifecycle transitions are detected in the new domain model
    if not skip_validation:
        validate_lifecycle_change(other, status)

    data = cls._data_from_lifecycle(other, status, other.subscription_id)
    data["status"] = status
    if data["start_date"] is None and status == SubscriptionLifecycle.ACTIVE:
        data["start_date"] = nowtz()
    if data["end_date"] is None and status == SubscriptionLifecycle.TERMINATED:
        data["end_date"] = nowtz()

    model = cls(**data)
    model._db_model = other._db_model

    return model

from_product_id classmethod

from_product_id(
    product_id: UUID | UUIDstr,
    customer_id: str,
    status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL,
    description: str | None = None,
    insync: bool = False,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    note: str | None = None,
) -> S

Use product_id (and customer_id) to return required fields of a new empty subscription.

Source code in orchestrator/domain/base.py
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
@classmethod
def from_product_id(
    cls: type[S],
    product_id: UUID | UUIDstr,
    customer_id: str,
    status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL,
    description: str | None = None,
    insync: bool = False,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    note: str | None = None,
) -> S:
    """Use product_id (and customer_id) to return required fields of a new empty subscription."""
    # Caller wants a new instance and provided a product_id and customer_id
    product_db = db.session.get(ProductTable, product_id)
    if not product_db:
        raise KeyError("Could not find a product for the given product_id")

    product = ProductModel(
        product_id=product_db.product_id,
        name=product_db.name,
        description=product_db.description,
        product_type=product_db.product_type,
        tag=product_db.tag,
        status=product_db.status,
        created_at=product_db.created_at,
        end_date=product_db.end_date,
    )

    if description is None:
        description = f"Initial subscription of {product.description}"

    subscription_id = uuid4()
    subscription = SubscriptionTable(
        subscription_id=subscription_id,
        product_id=product_id,
        customer_id=customer_id,
        description=description,
        status=status.value,
        insync=insync,
        start_date=start_date,
        end_date=end_date,
        note=note,
    )
    db.session.add(subscription)

    fixed_inputs = {fi.name: fi.value for fi in product_db.fixed_inputs}
    instances = cls._init_instances(subscription_id)

    model = cls(
        product=product,
        customer_id=customer_id,
        subscription_id=subscription_id,
        description=description,
        status=status,
        insync=insync,
        start_date=start_date,
        end_date=end_date,
        note=note,
        **fixed_inputs,
        **instances,
    )
    model._db_model = subscription
    return model

from_subscription classmethod

from_subscription(subscription_id: UUID | UUIDstr) -> S

Use a subscription_id to return required fields of an existing subscription.

Source code in orchestrator/domain/base.py
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
@classmethod
def from_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> S:
    """Use a subscription_id to return required fields of an existing subscription."""
    subscription = cls._get_subscription(subscription_id)
    if subscription is None:
        raise ValueError(f"Subscription with id: {subscription_id}, does not exist")
    product = cls._to_product_model(subscription.product)

    status = SubscriptionLifecycle(subscription.status)

    if not cls.__base_type__:
        # Import here to prevent cyclic imports
        from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

        try:
            cls = SUBSCRIPTION_MODEL_REGISTRY[subscription.product.name]  # type:ignore
        except KeyError:
            raise ProductNotInRegistryError(
                f"'{subscription.product.name}' is not found within the SUBSCRIPTION_MODEL_REGISTRY"
            )
        cls = lookup_specialized_type(cls, status)
    elif not issubclass(cls, lookup_specialized_type(cls, status)):
        raise ValueError(f"{cls} is not valid for lifecycle {status}")

    fixed_inputs = {fi.name: fi.value for fi in subscription.product.fixed_inputs}

    instances = cls._load_instances(subscription.instances, status, match_domain_attr=False)

    try:
        model = cls(
            product=product,
            customer_id=subscription.customer_id,
            subscription_id=subscription.subscription_id,
            description=subscription.description,
            status=status,
            insync=subscription.insync,
            start_date=subscription.start_date,
            end_date=subscription.end_date,
            note=subscription.note,
            **fixed_inputs,
            **instances,
        )
        model._db_model = subscription
        return model
    except ValidationError:
        logger.exception(
            "Subscription is not correct in database", loaded_fixed_inputs=fixed_inputs, loaded_instances=instances
        )
        raise

save

save() -> None

Save the subscription to the database.

Source code in orchestrator/domain/base.py
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
def save(self) -> None:
    """Save the subscription to the database."""
    specialized_type = lookup_specialized_type(self.__class__, self.status)
    if specialized_type and not isinstance(self, specialized_type):
        raise ValueError(
            f"Lifecycle status {self.status.value} requires specialized type {specialized_type!r}, was: {type(self)!r}"
        )

    sub = db.session.get(
        SubscriptionTable,
        self.subscription_id,
        options=[
            selectinload(SubscriptionTable.instances)
            .selectinload(SubscriptionInstanceTable.product_block)
            .selectinload(ProductBlockTable.resource_types),
            selectinload(SubscriptionTable.instances).selectinload(SubscriptionInstanceTable.values),
        ],
    )
    if not sub:
        sub = self._db_model

    # Make sure we refresh the object and not use an already mapped object
    db.session.refresh(sub)

    self._db_model = sub
    sub.product_id = self.product.product_id
    sub.customer_id = self.customer_id
    sub.description = self.description
    sub.status = self.status.value
    sub.insync = self.insync
    sub.start_date = self.start_date
    sub.end_date = self.end_date
    sub.note = self.note

    db.session.add(sub)
    db.session.flush()  # Sends INSERT and returns subscription_id without committing transaction

    old_instances_dict = {instance.subscription_instance_id: instance for instance in sub.instances}

    saved_instances, depends_on_instances = self._save_instances(self.subscription_id, self.status)

    for instances in depends_on_instances.values():
        for instance in instances:
            if instance.subscription_id != self.subscription_id:
                raise ValueError(
                    "Attempting to save a Foreign `Subscription Instance` directly below a subscription. "
                    "This is not allowed."
                )
    sub.instances = saved_instances

    # Calculate what to remove
    instances_set = {instance.subscription_instance_id for instance in sub.instances}
    for instance_id in instances_set:
        old_instances_dict.pop(instance_id, None)

    # What's left should be removed
    for instance in old_instances_dict.values():
        db.session.delete(instance)

    db.session.flush()

It is also quite helpful to see how the Product Type is stored in the database—To see this, look at the ProductTable model as it shows all of the attributes stored in the database to store your WFO products:

orchestrator.db.models.ProductTable

Bases: orchestrator.db.database.BaseModel

Source code in orchestrator/db/models.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class ProductTable(BaseModel):
    __tablename__ = "products"
    __allow_unmapped__ = True

    product_id = mapped_column(UUIDType, server_default=text("uuid_generate_v4()"), primary_key=True)
    name = mapped_column(String(), nullable=False, unique=True)
    description = mapped_column(Text(), nullable=False)
    product_type = mapped_column(String(255), nullable=False)
    tag = mapped_column(String(TAG_LENGTH), nullable=False, index=True)
    status = mapped_column(String(STATUS_LENGTH), nullable=False)
    created_at = mapped_column(UtcTimestamp, nullable=False, server_default=text("current_timestamp()"))
    end_date = mapped_column(UtcTimestamp)
    product_blocks = relationship(
        "ProductBlockTable",
        secondary=product_product_block_association,
        lazy="select",
        backref=backref("products", lazy=True),
        cascade_backrefs=False,
        passive_deletes=True,
    )
    workflows = relationship(
        "WorkflowTable",
        secondary=product_workflows_association,
        secondaryjoin="and_(products_workflows.c.workflow_id == WorkflowTable.workflow_id, "
        "WorkflowTable.deleted_at == None)",
        lazy="select",
        cascade_backrefs=False,
        passive_deletes=True,
    )
    fixed_inputs = relationship(
        "FixedInputTable", cascade="all, delete-orphan", backref=backref("product", lazy=True), passive_deletes=True
    )

    __table_args__ = {"extend_existing": True}

    def find_block_by_name(self, name: str) -> ProductBlockTable:
        if session := object_session(self):
            return session.query(ProductBlockTable).with_parent(self).filter(ProductBlockTable.name == name).one()
        raise AssertionError("Session should not be None")

    def fixed_input_value(self, name: str) -> str:
        if session := object_session(self):
            return (
                session.query(FixedInputTable)
                .with_parent(self)
                .filter(FixedInputTable.name == name)
                .with_entities(FixedInputTable.value)
                .scalar()
            )
        raise AssertionError("Session should not be None")

    def _subscription_workflow_key(self, target: Target) -> str | None:
        wfs = list(filter(lambda w: w.target == target, self.workflows))
        return wfs[0].name if len(wfs) > 0 else None

    def create_subscription_workflow_key(self) -> str | None:
        return self._subscription_workflow_key(Target.CREATE)

    def terminate_subscription_workflow_key(self) -> str | None:
        return self._subscription_workflow_key(Target.TERMINATE)

    def modify_subscription_workflow_key(self, name: str) -> str | None:
        wfs = list(filter(lambda w: w.target == Target.MODIFY and w.name == name, self.workflows))
        return wfs[0].name if len(wfs) > 0 else None

    def workflow_by_key(self, name: str) -> WorkflowTable | None:
        return first_true(self.workflows, None, lambda wf: wf.name == name)  # type: ignore

Subscription Model Registry

When you define a Product Type as a domain model in python, you also need to register it in the subscription model registry, by using the SUBSCRIPTION_MODEL_REGISTRY dictionary, like is shown here in the example workflow orchestrator:

# Copyright 2019-2023 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

from products.product_types.core_link import CoreLink
from products.product_types.l2vpn import L2vpn
from products.product_types.node import Node
from products.product_types.port import Port

SUBSCRIPTION_MODEL_REGISTRY.update(
    {
        "node Cisco": Node,
        "node Nokia": Node,
        "node Cumulus": Node,
        "node FRR": Node,
        "port 10G": Port,
        "port 100G": Port,
        "core link 10G": CoreLink,
        "core link 100G": CoreLink,
        "l2vpn": L2vpn,
    }
)

Automatically Generating Product Types

If all of this seems like too much work, then good news, as all clever engineers before us have done, we've fixed that with YAML! Using the WFO CLI, you can generate your product types directly from a YAML. For more information on how to do that, check out the CLI generate command documentation.

Creating Database Migrations

After defining all of the components of a Product type, you'll also need to create a database migration to properly wire-up the product in the orchestrator's database. A migration file for this example Node model looks like this:

Example: example-orchestrator/migrations/versions/schema/2023-10-27_a84ca2e5e4db_add_node.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""Add node product.

Revision ID: a84ca2e5e4db
Revises: a77227fe5455
Create Date: 2023-10-27 11:25:40.994878

"""
from uuid import uuid4

from alembic import op
from orchestrator.migrations.helpers import (
    create,
    create_workflow,
    delete,
    delete_workflow,
    ensure_default_workflows,
)
from orchestrator.targets import Target

# revision identifiers, used by Alembic.
revision = "a84ca2e5e4db"
down_revision = "a77227fe5455"
branch_labels = None
depends_on = None

new_products = {
    "products": {
        "node Cisco": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "Cisco",
            },
        },
        "node Nokia": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "Nokia",
            },
        },
        "node Cumulus": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "Cumulus",
            },
        },
        "node FRR": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "FRR",
            },
        },
    },
    "product_blocks": {
        "Node": {
            "product_block_id": uuid4(),
            "description": "node product block",
            "tag": "NODE",
            "status": "active",
            "resources": {
                "role_id": "ID in CMDB of role of the node in the network",
                "type_id": "ID in CMDB of type of the node",
                "site_id": "ID in CMDB of site where the node is located",
                "node_status": "Operational status of the node",
                "node_name": "Unique name of the node",
                "node_description": "Description of the node",
                "ims_id": "ID of the node in the inventory management system",
                "nrm_id": "ID of the node in the network resource manager",
                "ipv4_ipam_id": "ID of the node’s iPv4 loopback address in IPAM",
                "ipv6_ipam_id": "ID of the node’s iPv6 loopback address in IPAM",
            },
            "depends_on_block_relations": [],
        },
    },
    "workflows": {},
}

new_workflows = [
    {
        "name": "create_node",
        "target": Target.CREATE,
        "description": "Create node",
        "product_type": "Node",
    },
    {
        "name": "modify_node",
        "target": Target.MODIFY,
        "description": "Modify node",
        "product_type": "Node",
    },
    {
        "name": "modify_sync_ports",
        "target": Target.MODIFY,
        "description": "Update node interfaces",
        "product_type": "Node",
    },
    {
        "name": "terminate_node",
        "target": Target.TERMINATE,
        "description": "Terminate node",
        "product_type": "Node",
    },
    {
        "name": "validate_node",
        "target": Target.SYSTEM,
        "description": "Validate node",
        "product_type": "Node",
    },
]


def upgrade() -> None:
    conn = op.get_bind()
    create(conn, new_products)
    for workflow in new_workflows:
        create_workflow(conn, workflow)
    ensure_default_workflows(conn)


def downgrade() -> None:
    conn = op.get_bind()
    for workflow in new_workflows:
        delete_workflow(conn, workflow["name"])

    delete(conn, new_products)

Thankfully, you don't have to write these database migrations by hand, you can simply use the main.py db migrate-domain-models command that is part of the orchestrator CLI, documented here.