Skip to content

Product Types

Defining a Product Type

A Product Type (often referred to simply as a product) is the top level object of a domain model. A product is effectively the template used for creating a subscription instance, and you can instantiate as many instances of these as you want. To see an example product model, you can see a very simple Node product type from the example workflow orchestrator:

# Copyright 2019-2023 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from pydantic_forms.types import strEnum
from orchestrator.domain.base import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle

from products.product_blocks.node import NodeBlock, NodeBlockInactive, NodeBlockProvisioning


class Node_Type(strEnum):
    Cisco = "Cisco"
    Nokia = "Nokia"
    Cumulus = "Cumulus"
    FRR = "FRR"


class NodeInactive(SubscriptionModel, is_base=True):
    node_type: Node_Type
    node: NodeBlockInactive


class NodeProvisioning(NodeInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    node_type: Node_Type
    node: NodeBlockProvisioning


class Node(NodeProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    node_type: Node_Type
    node: NodeBlock

Type Hints

Notice how type hints are used on these classes—The WFO uses these types for pydantic validations and for type safety when serializing data into and out of the database. If you're not familiar with type hinting, learn about the benefits from PEP 484!

Fixed Inputs

When a hard coded value is stored on product model, like Node_Type is here, it is called a Fixed Input. Read more about Fixed Inputs here

Breaking this product down a bit more, we see 3 classes, NodeInactive, NodeProvisioning, and finally Node. These three classes are built off of each-other, with the lowest level class (NodeInactive) based off of the SubscriptionModel base class. Each class has two simple attributes, one is the Fixed Input of Node_Type, and the other is the root product block node. Each one of these classes represents the Node product in its various lifecycle states, which are defined here in the SubscriptionLifecycle enum:

orchestrator.types.SubscriptionLifecycle

Bases: pydantic_forms.types.strEnum

Source code in orchestrator/types.py
81
82
83
84
85
86
87
88
@strawberry.enum
class SubscriptionLifecycle(strEnum):
    INITIAL = "initial"
    ACTIVE = "active"
    MIGRATING = "migrating"
    DISABLED = "disabled"
    TERMINATED = "terminated"
    PROVISIONING = "provisioning"

To fully understand the Subscription Model, it's best to look at the SubscriptionModel itself in the code. Here you can also see the various methods available for use on these Subscription instances when you are using them in your workflow code:

orchestrator.domain.base.SubscriptionModel

Bases: orchestrator.domain.base.DomainModel

This is the base class for all product subscription models.

To use this class, see the examples below:

Definining a subscription model:

>>> class SubscriptionInactive(SubscriptionModel, product_type="SP"):  # doctest:+SKIP
...    block: Optional[ProductBlockModelInactive] = None

>>> class Subscription(BlockInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):  # doctest:+SKIP
...    block: ProductBlockModel

This example defines a subscription model with two different contraints based on lifecycle. Subscription is valid only for ACTIVE And SubscriptionInactive for all other states. product_type must be defined on the base class and need not to be defined on the others

Create a new empty subscription:

>>> example1 = SubscriptionInactive.from_product_id(product_id, customer_id)  # doctest:+SKIP

Create a new instance based on a dict in the state:

>>> example2 = SubscriptionInactive(**state)  # doctest:+SKIP

To retrieve a ProductBlockModel from the database:

>>> SubscriptionInactive.from_subscription(subscription_id)  # doctest:+SKIP
Source code in orchestrator/domain/base.py
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
class SubscriptionModel(DomainModel):
    r"""This is the base class for all product subscription models.

    To use this class, see the examples below:

    Definining a subscription model:

        >>> class SubscriptionInactive(SubscriptionModel, product_type="SP"):  # doctest:+SKIP
        ...    block: Optional[ProductBlockModelInactive] = None

        >>> class Subscription(BlockInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):  # doctest:+SKIP
        ...    block: ProductBlockModel

    This example defines a subscription model with two different contraints based on lifecycle. `Subscription` is valid only for `ACTIVE`
    And `SubscriptionInactive` for all other states.
    `product_type` must be defined on the base class and need not to be defined on the others

    Create a new empty subscription:

        >>> example1 = SubscriptionInactive.from_product_id(product_id, customer_id)  # doctest:+SKIP

    Create a new instance based on a dict in the state:

        >>> example2 = SubscriptionInactive(**state)  # doctest:+SKIP

    To retrieve a ProductBlockModel from the database:

        >>> SubscriptionInactive.from_subscription(subscription_id)  # doctest:+SKIP
    """

    __model_dump_cache__: ClassVar[dict[UUID, "SubscriptionModel"] | None] = None

    product: ProductModel
    customer_id: str
    _db_model: SubscriptionTable | None = PrivateAttr(default=None)
    subscription_id: UUID = Field(default_factory=uuid4)  # pragma: no mutate
    description: str = "Initial subscription"  # pragma: no mutate
    status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL  # pragma: no mutate
    insync: bool = False  # pragma: no mutate
    start_date: datetime | None = None  # pragma: no mutate
    end_date: datetime | None = None  # pragma: no mutate
    note: str | None = None  # pragma: no mutate
    version: int = 1  # pragma: no mutate

    def __new__(cls, *args: Any, status: SubscriptionLifecycle | None = None, **kwargs: Any) -> "SubscriptionModel":
        # status can be none if created during change_lifecycle
        if status and not issubclass(cls, lookup_specialized_type(cls, status)):
            raise ValueError(f"{cls} is not valid for status {status}")

        return super().__new__(cls)

    @classmethod
    def __pydantic_init_subclass__(  # type: ignore[override]
        cls, is_base: bool = False, lifecycle: list[SubscriptionLifecycle] | None = None, **kwargs: Any
    ) -> None:
        super().__pydantic_init_subclass__(lifecycle=lifecycle, **kwargs)

        if is_base:
            cls.__base_type__ = cls

        if is_base or lifecycle:
            register_specialized_type(cls, lifecycle)

        cls.__doc__ = make_subscription_model_docstring(cls, lifecycle)

    @classmethod
    def diff_product_in_database(cls, product_id: UUID) -> dict[str, dict[str, set[str] | dict[str, set[str]]]]:
        """Return any differences between the attrs defined on the domain model and those on product blocks in the database.

        This is only needed to check if the domain model and database models match which would be done during testing...
        """
        product_db = db.session.get(ProductTable, product_id)
        product_blocks_in_db = {pb.name for pb in product_db.product_blocks} if product_db else set()

        product_blocks_in_model = cls._get_depends_on_product_block_types()
        product_blocks_types_in_model = get_depends_on_product_block_type_list(product_blocks_in_model)

        product_blocks_in_model = set(
            flatten(map(attrgetter("__names__"), product_blocks_types_in_model))
        )  # type: ignore

        missing_product_blocks_in_db = product_blocks_in_model - product_blocks_in_db  # type: ignore
        missing_product_blocks_in_model = product_blocks_in_db - product_blocks_in_model  # type: ignore

        fixed_inputs_model = set(cls._non_product_block_fields_)
        fixed_inputs_in_db = {fi.name for fi in product_db.fixed_inputs} if product_db else set()

        missing_fixed_inputs_in_db = fixed_inputs_model - fixed_inputs_in_db
        missing_fixed_inputs_in_model = fixed_inputs_in_db - fixed_inputs_model

        logger.debug(
            "ProductTable blocks diff",
            product_block_db=product_db.name if product_db else None,
            product_blocks_in_db=product_blocks_in_db,
            product_blocks_in_model=product_blocks_in_model,
            fixed_inputs_in_db=fixed_inputs_in_db,
            fixed_inputs_model=fixed_inputs_model,
            missing_product_blocks_in_db=missing_product_blocks_in_db,
            missing_product_blocks_in_model=missing_product_blocks_in_model,
            missing_fixed_inputs_in_db=missing_fixed_inputs_in_db,
            missing_fixed_inputs_in_model=missing_fixed_inputs_in_model,
        )

        missing_data_depends_on_blocks: dict[str, set[str]] = {}
        for product_block_in_model in product_blocks_types_in_model:
            missing_data_depends_on_blocks.update(product_block_in_model.diff_product_block_in_database())

        diff: dict[str, set[str] | dict[str, set[str]]] = {
            k: v
            for k, v in {
                "missing_product_blocks_in_db": missing_product_blocks_in_db,
                "missing_product_blocks_in_model": missing_product_blocks_in_model,
                "missing_fixed_inputs_in_db": missing_fixed_inputs_in_db,
                "missing_fixed_inputs_in_model": missing_fixed_inputs_in_model,
                "missing_in_depends_on_blocks": missing_data_depends_on_blocks,
            }.items()
            if v
        }

        missing_data: dict[str, dict[str, set[str] | dict[str, set[str]]]] = {}
        if diff and product_db:
            missing_data[product_db.name] = diff

        return missing_data

    @classmethod
    def _load_root_instances(
        cls,
        subscription_id: UUID | UUIDstr,
    ) -> dict[str, Optional[dict] | list[dict]]:
        """Load root subscription instance(s) for this subscription model.

        When a new subscription model is loaded from an existing subscription, this function loads the entire root
        subscription instance(s) from database using an optimized postgres function. The result of that function
        is used to instantiate the root product block(s).

        The "old" method DomainModel._load_instances() would recursively load subscription instances from the
        database and individually instantiate nested blocks, more or less "manually" reconstructing the subscription.

        The "new" method SubscriptionModel._load_root_instances() takes a different approach; since it has all
        data for the root subscription instance, it can rely on Pydantic to instantiate the root block and all
        nested blocks in one go. This is also why it does not have the params `status` and `match_domain_attr` because
        this information is already encoded in the domain model of a product.
        """
        root_block_instance_ids = get_root_blocks_to_instance_ids(subscription_id)

        root_block_types = {
            field_name: list(flatten_product_block_types(product_block_type).keys())
            for field_name, product_block_type in cls._product_block_fields_.items()
        }

        def get_instances_by_block_names(block_names: list[str]) -> Iterable[dict]:
            for block_name in block_names:
                for instance_id in root_block_instance_ids.get(block_name, []):
                    yield get_subscription_instance_dict(instance_id)

        # Map root product block fields to subscription instance(s) dicts
        instances = {
            field_name: list(get_instances_by_block_names(block_names))
            for field_name, block_names in root_block_types.items()
        }

        # Transform values according to domain models (list[dict] -> dict, add None as default for optionals)
        rules = {
            klass.name: field_transformation_rules(klass) for klass in ProductBlockModel.registry.values() if klass.name
        }
        for instance_list in instances.values():
            for instance in instance_list:
                transform_instance_fields(rules, instance)

        # Support the (theoretical?) usecase of a list of root product blocks
        def unpack_instance_list(field_name: str, instance_list: list[dict]) -> list[dict] | dict | None:
            field_type = cls._product_block_fields_[field_name]
            if is_list_type(field_type):
                return instance_list
            return only(instance_list)

        return {
            field_name: unpack_instance_list(field_name, instance_list)
            for field_name, instance_list in instances.items()
        }

    @classmethod
    def from_product_id(
        cls: type[S],
        product_id: UUID | UUIDstr,
        customer_id: str,
        status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL,
        description: str | None = None,
        insync: bool = False,
        start_date: datetime | None = None,
        end_date: datetime | None = None,
        note: str | None = None,
        version: int = 1,
    ) -> S:
        """Use product_id (and customer_id) to return required fields of a new empty subscription."""
        # Caller wants a new instance and provided a product_id and customer_id
        product_db = db.session.get(ProductTable, product_id)
        if not product_db:
            raise KeyError("Could not find a product for the given product_id")

        product = ProductModel(
            product_id=product_db.product_id,
            name=product_db.name,
            description=product_db.description,
            product_type=product_db.product_type,
            tag=product_db.tag,
            status=product_db.status,
            created_at=product_db.created_at,
            end_date=product_db.end_date,
        )

        if description is None:
            description = f"Initial subscription of {product.description}"

        subscription_id = uuid4()
        subscription = SubscriptionTable(
            subscription_id=subscription_id,
            product_id=product_id,
            customer_id=customer_id,
            description=description,
            status=status.value,
            insync=insync,
            start_date=start_date,
            end_date=end_date,
            note=note,
            version=version,
        )
        db.session.add(subscription)

        fixed_inputs = {fi.name: fi.value for fi in product_db.fixed_inputs}
        instances = cls._init_instances(subscription_id)

        model = cls(
            product=product,
            customer_id=customer_id,
            subscription_id=subscription_id,
            description=description,
            status=status,
            insync=insync,
            start_date=start_date,
            end_date=end_date,
            note=note,
            version=version,
            **fixed_inputs,
            **instances,
        )
        model.db_model = subscription
        return model

    @classmethod
    def from_other_lifecycle(
        cls: type[S],
        other: "SubscriptionModel",
        status: SubscriptionLifecycle,
        skip_validation: bool = False,
    ) -> S:
        """Create new domain model from instance while changing the status.

        This makes sure we always have a specific instance.
        """
        if not cls.__base_type__:
            # Import here to prevent cyclic imports
            from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

            cls = SUBSCRIPTION_MODEL_REGISTRY.get(other.product.name, cls)  # type:ignore
            cls = lookup_specialized_type(cls, status)

        # this will raise ValueError when wrong lifecycle transitions are detected in the new domain model
        if not skip_validation:
            validate_lifecycle_change(other, status)

        data = cls._data_from_lifecycle(other, status, other.subscription_id)
        data["status"] = status
        if data["start_date"] is None and status == SubscriptionLifecycle.ACTIVE:
            data["start_date"] = nowtz()
        if data["end_date"] is None and status == SubscriptionLifecycle.TERMINATED:
            data["end_date"] = nowtz()

        model = cls(**data)
        model.db_model = other._db_model

        return model

    # Some common functions shared by from_other_product and from_subscription
    @classmethod
    def _get_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> SubscriptionTable | None:

        if not isinstance(subscription_id, UUID | UUIDstr):
            raise TypeError(f"subscription_id is of type {type(subscription_id)} instead of UUID | UUIDstr")

        loaders = [
            joinedload(SubscriptionTable.product).selectinload(ProductTable.fixed_inputs),
        ]

        return db.session.get(SubscriptionTable, subscription_id, options=loaders)

    @classmethod
    def _to_product_model(cls: type[S], product: ProductTable) -> ProductModel:
        return ProductModel(
            product_id=product.product_id,
            name=product.name,
            description=product.description,
            product_type=product.product_type,
            tag=product.tag,
            status=product.status,
            created_at=product.created_at if product.created_at else None,
            end_date=product.end_date if product.end_date else None,
        )

    @classmethod
    def from_other_product(
        cls: type[S],
        old_instantiation: S,
        new_product_id: UUID | str,
        new_root: tuple[str, ProductBlockModel] | None = None,
    ) -> S:
        db_product = get_product_by_id(new_product_id)
        if not db_product:
            raise KeyError("Could not find a product for the given product_id")

        old_subscription_id = old_instantiation.subscription_id
        if not (subscription := cls._get_subscription(old_subscription_id)):
            raise ValueError(f"Subscription with id: {old_subscription_id}, does not exist")
        product = cls._to_product_model(db_product)

        status = SubscriptionLifecycle(subscription.status)

        if not cls.__base_type__:
            # Import here to prevent cyclic imports
            from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

            cls = SUBSCRIPTION_MODEL_REGISTRY.get(subscription.product.name, cls)  # type:ignore
            cls = lookup_specialized_type(cls, status)
        elif not issubclass(cls, lookup_specialized_type(cls, status)):
            raise ValueError(f"{cls} is not valid for lifecycle {status}")

        fixed_inputs = {fi.name: fi.value for fi in db_product.fixed_inputs}

        if new_root:
            name, product_block = new_root
            instances = {name: product_block}
        else:
            # TODO test using cls._load_root_instances() here as well
            instances = cls._load_instances(subscription.instances, status, match_domain_attr=False)  # type:ignore

        try:
            model = cls(
                product=product,
                customer_id=subscription.customer_id,
                subscription_id=subscription.subscription_id,
                description=subscription.description,
                status=status,
                insync=subscription.insync,
                start_date=subscription.start_date,
                end_date=subscription.end_date,
                note=subscription.note,
                version=subscription.version,
                **fixed_inputs,
                **instances,
            )
            model.db_model = subscription
            return model
        except ValidationError:
            logger.exception(
                "Subscription is not correct in database", loaded_fixed_inputs=fixed_inputs, loaded_instances=instances
            )
            raise

    @classmethod
    def from_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> S:
        """Use a subscription_id to return required fields of an existing subscription."""
        from orchestrator.domain.context_cache import get_from_cache, store_in_cache

        if cached_model := get_from_cache(subscription_id):
            return cast(S, cached_model)

        if not (subscription := cls._get_subscription(subscription_id)):
            raise ValueError(f"Subscription with id: {subscription_id}, does not exist")
        product = cls._to_product_model(subscription.product)

        status = SubscriptionLifecycle(subscription.status)

        if not cls.__base_type__:
            # Import here to prevent cyclic imports
            from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

            try:
                cls = SUBSCRIPTION_MODEL_REGISTRY[subscription.product.name]  # type:ignore
            except KeyError:
                raise ProductNotInRegistryError(
                    f"'{subscription.product.name}' is not found within the SUBSCRIPTION_MODEL_REGISTRY"
                )
            cls = lookup_specialized_type(cls, status)
        elif not issubclass(cls, lookup_specialized_type(cls, status)):
            raise ValueError(f"{cls} is not valid for lifecycle {status}")

        fixed_inputs = {fi.name: fi.value for fi in subscription.product.fixed_inputs}

        instances = cls._load_root_instances(subscription_id)

        try:
            model = cls(
                product=product,
                customer_id=subscription.customer_id,
                subscription_id=subscription.subscription_id,
                description=subscription.description,
                status=status,
                insync=subscription.insync,
                start_date=subscription.start_date,
                end_date=subscription.end_date,
                note=subscription.note,
                version=subscription.version,
                **fixed_inputs,
                **instances,
            )
            model.db_model = subscription

            store_in_cache(model)

            return model
        except ValidationError:
            logger.exception(
                "Subscription is not correct in database", loaded_fixed_inputs=fixed_inputs, loaded_instances=instances
            )
            raise

    def save(self) -> None:
        """Save the subscription to the database."""
        specialized_type = lookup_specialized_type(self.__class__, self.status)
        if specialized_type and not isinstance(self, specialized_type):
            raise ValueError(
                f"Lifecycle status {self.status.value} requires specialized type {specialized_type!r}, was: {type(self)!r}"
            )

        existing_sub = db.session.get(
            SubscriptionTable,
            self.subscription_id,
            options=[
                selectinload(SubscriptionTable.instances)
                .joinedload(SubscriptionInstanceTable.product_block)
                .selectinload(ProductBlockTable.resource_types),
                selectinload(SubscriptionTable.instances).selectinload(SubscriptionInstanceTable.values),
            ],
        )
        if not (sub := (existing_sub or self.db_model)):
            raise ValueError("Cannot save SubscriptionModel without a db_model")

        # Make sure we refresh the object and not use an already mapped object
        db.session.refresh(sub)

        self.db_model = sub
        sub.product_id = self.product.product_id
        sub.customer_id = self.customer_id
        sub.description = self.description
        sub.status = self.status.value
        sub.insync = self.insync
        sub.start_date = self.start_date
        sub.end_date = self.end_date
        sub.note = self.note

        db.session.add(sub)
        db.session.flush()  # Sends INSERT and returns subscription_id without committing transaction

        old_instances_dict = {instance.subscription_instance_id: instance for instance in sub.instances}

        saved_instances, depends_on_instances = self._save_instances(self.subscription_id, self.status)

        for instances in depends_on_instances.values():
            for instance in instances:
                if instance.subscription_id != self.subscription_id:
                    raise ValueError(
                        "Attempting to save a Foreign `Subscription Instance` directly below a subscription. "
                        "This is not allowed."
                    )
        sub.instances = saved_instances

        # Calculate what to remove
        instances_set = {instance.subscription_instance_id for instance in sub.instances}
        for instance_id in instances_set:
            old_instances_dict.pop(instance_id, None)

        # What's left should be removed
        for instance in old_instances_dict.values():
            db.session.delete(instance)

        db.session.flush()

    @property
    def db_model(self) -> SubscriptionTable | None:
        if not self._db_model:
            self._db_model = self._get_subscription(self.subscription_id)
        return self._db_model

    @db_model.setter
    def db_model(self, value: SubscriptionTable) -> None:
        self._db_model = value

diff_product_in_database classmethod

diff_product_in_database(
    product_id: uuid.UUID,
) -> dict[str, dict[str, set[str] | dict[str, set[str]]]]

Return any differences between the attrs defined on the domain model and those on product blocks in the database.

This is only needed to check if the domain model and database models match which would be done during testing...

Source code in orchestrator/domain/base.py
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
@classmethod
def diff_product_in_database(cls, product_id: UUID) -> dict[str, dict[str, set[str] | dict[str, set[str]]]]:
    """Return any differences between the attrs defined on the domain model and those on product blocks in the database.

    This is only needed to check if the domain model and database models match which would be done during testing...
    """
    product_db = db.session.get(ProductTable, product_id)
    product_blocks_in_db = {pb.name for pb in product_db.product_blocks} if product_db else set()

    product_blocks_in_model = cls._get_depends_on_product_block_types()
    product_blocks_types_in_model = get_depends_on_product_block_type_list(product_blocks_in_model)

    product_blocks_in_model = set(
        flatten(map(attrgetter("__names__"), product_blocks_types_in_model))
    )  # type: ignore

    missing_product_blocks_in_db = product_blocks_in_model - product_blocks_in_db  # type: ignore
    missing_product_blocks_in_model = product_blocks_in_db - product_blocks_in_model  # type: ignore

    fixed_inputs_model = set(cls._non_product_block_fields_)
    fixed_inputs_in_db = {fi.name for fi in product_db.fixed_inputs} if product_db else set()

    missing_fixed_inputs_in_db = fixed_inputs_model - fixed_inputs_in_db
    missing_fixed_inputs_in_model = fixed_inputs_in_db - fixed_inputs_model

    logger.debug(
        "ProductTable blocks diff",
        product_block_db=product_db.name if product_db else None,
        product_blocks_in_db=product_blocks_in_db,
        product_blocks_in_model=product_blocks_in_model,
        fixed_inputs_in_db=fixed_inputs_in_db,
        fixed_inputs_model=fixed_inputs_model,
        missing_product_blocks_in_db=missing_product_blocks_in_db,
        missing_product_blocks_in_model=missing_product_blocks_in_model,
        missing_fixed_inputs_in_db=missing_fixed_inputs_in_db,
        missing_fixed_inputs_in_model=missing_fixed_inputs_in_model,
    )

    missing_data_depends_on_blocks: dict[str, set[str]] = {}
    for product_block_in_model in product_blocks_types_in_model:
        missing_data_depends_on_blocks.update(product_block_in_model.diff_product_block_in_database())

    diff: dict[str, set[str] | dict[str, set[str]]] = {
        k: v
        for k, v in {
            "missing_product_blocks_in_db": missing_product_blocks_in_db,
            "missing_product_blocks_in_model": missing_product_blocks_in_model,
            "missing_fixed_inputs_in_db": missing_fixed_inputs_in_db,
            "missing_fixed_inputs_in_model": missing_fixed_inputs_in_model,
            "missing_in_depends_on_blocks": missing_data_depends_on_blocks,
        }.items()
        if v
    }

    missing_data: dict[str, dict[str, set[str] | dict[str, set[str]]]] = {}
    if diff and product_db:
        missing_data[product_db.name] = diff

    return missing_data

from_other_lifecycle classmethod

from_other_lifecycle(
    other: orchestrator.domain.base.SubscriptionModel,
    status: orchestrator.types.SubscriptionLifecycle,
    skip_validation: bool = False,
) -> S

Create new domain model from instance while changing the status.

This makes sure we always have a specific instance.

Source code in orchestrator/domain/base.py
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
@classmethod
def from_other_lifecycle(
    cls: type[S],
    other: "SubscriptionModel",
    status: SubscriptionLifecycle,
    skip_validation: bool = False,
) -> S:
    """Create new domain model from instance while changing the status.

    This makes sure we always have a specific instance.
    """
    if not cls.__base_type__:
        # Import here to prevent cyclic imports
        from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

        cls = SUBSCRIPTION_MODEL_REGISTRY.get(other.product.name, cls)  # type:ignore
        cls = lookup_specialized_type(cls, status)

    # this will raise ValueError when wrong lifecycle transitions are detected in the new domain model
    if not skip_validation:
        validate_lifecycle_change(other, status)

    data = cls._data_from_lifecycle(other, status, other.subscription_id)
    data["status"] = status
    if data["start_date"] is None and status == SubscriptionLifecycle.ACTIVE:
        data["start_date"] = nowtz()
    if data["end_date"] is None and status == SubscriptionLifecycle.TERMINATED:
        data["end_date"] = nowtz()

    model = cls(**data)
    model.db_model = other._db_model

    return model

from_product_id classmethod

from_product_id(
    product_id: uuid.UUID | pydantic_forms.types.UUIDstr,
    customer_id: str,
    status: orchestrator.types.SubscriptionLifecycle = SubscriptionLifecycle.INITIAL,
    description: str | None = None,
    insync: bool = False,
    start_date: datetime.datetime | None = None,
    end_date: datetime.datetime | None = None,
    note: str | None = None,
    version: int = 1,
) -> S

Use product_id (and customer_id) to return required fields of a new empty subscription.

Source code in orchestrator/domain/base.py
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
@classmethod
def from_product_id(
    cls: type[S],
    product_id: UUID | UUIDstr,
    customer_id: str,
    status: SubscriptionLifecycle = SubscriptionLifecycle.INITIAL,
    description: str | None = None,
    insync: bool = False,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    note: str | None = None,
    version: int = 1,
) -> S:
    """Use product_id (and customer_id) to return required fields of a new empty subscription."""
    # Caller wants a new instance and provided a product_id and customer_id
    product_db = db.session.get(ProductTable, product_id)
    if not product_db:
        raise KeyError("Could not find a product for the given product_id")

    product = ProductModel(
        product_id=product_db.product_id,
        name=product_db.name,
        description=product_db.description,
        product_type=product_db.product_type,
        tag=product_db.tag,
        status=product_db.status,
        created_at=product_db.created_at,
        end_date=product_db.end_date,
    )

    if description is None:
        description = f"Initial subscription of {product.description}"

    subscription_id = uuid4()
    subscription = SubscriptionTable(
        subscription_id=subscription_id,
        product_id=product_id,
        customer_id=customer_id,
        description=description,
        status=status.value,
        insync=insync,
        start_date=start_date,
        end_date=end_date,
        note=note,
        version=version,
    )
    db.session.add(subscription)

    fixed_inputs = {fi.name: fi.value for fi in product_db.fixed_inputs}
    instances = cls._init_instances(subscription_id)

    model = cls(
        product=product,
        customer_id=customer_id,
        subscription_id=subscription_id,
        description=description,
        status=status,
        insync=insync,
        start_date=start_date,
        end_date=end_date,
        note=note,
        version=version,
        **fixed_inputs,
        **instances,
    )
    model.db_model = subscription
    return model

from_subscription classmethod

from_subscription(
    subscription_id: uuid.UUID | pydantic_forms.types.UUIDstr,
) -> S

Use a subscription_id to return required fields of an existing subscription.

Source code in orchestrator/domain/base.py
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
@classmethod
def from_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> S:
    """Use a subscription_id to return required fields of an existing subscription."""
    from orchestrator.domain.context_cache import get_from_cache, store_in_cache

    if cached_model := get_from_cache(subscription_id):
        return cast(S, cached_model)

    if not (subscription := cls._get_subscription(subscription_id)):
        raise ValueError(f"Subscription with id: {subscription_id}, does not exist")
    product = cls._to_product_model(subscription.product)

    status = SubscriptionLifecycle(subscription.status)

    if not cls.__base_type__:
        # Import here to prevent cyclic imports
        from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

        try:
            cls = SUBSCRIPTION_MODEL_REGISTRY[subscription.product.name]  # type:ignore
        except KeyError:
            raise ProductNotInRegistryError(
                f"'{subscription.product.name}' is not found within the SUBSCRIPTION_MODEL_REGISTRY"
            )
        cls = lookup_specialized_type(cls, status)
    elif not issubclass(cls, lookup_specialized_type(cls, status)):
        raise ValueError(f"{cls} is not valid for lifecycle {status}")

    fixed_inputs = {fi.name: fi.value for fi in subscription.product.fixed_inputs}

    instances = cls._load_root_instances(subscription_id)

    try:
        model = cls(
            product=product,
            customer_id=subscription.customer_id,
            subscription_id=subscription.subscription_id,
            description=subscription.description,
            status=status,
            insync=subscription.insync,
            start_date=subscription.start_date,
            end_date=subscription.end_date,
            note=subscription.note,
            version=subscription.version,
            **fixed_inputs,
            **instances,
        )
        model.db_model = subscription

        store_in_cache(model)

        return model
    except ValidationError:
        logger.exception(
            "Subscription is not correct in database", loaded_fixed_inputs=fixed_inputs, loaded_instances=instances
        )
        raise

save

save() -> None

Save the subscription to the database.

Source code in orchestrator/domain/base.py
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
def save(self) -> None:
    """Save the subscription to the database."""
    specialized_type = lookup_specialized_type(self.__class__, self.status)
    if specialized_type and not isinstance(self, specialized_type):
        raise ValueError(
            f"Lifecycle status {self.status.value} requires specialized type {specialized_type!r}, was: {type(self)!r}"
        )

    existing_sub = db.session.get(
        SubscriptionTable,
        self.subscription_id,
        options=[
            selectinload(SubscriptionTable.instances)
            .joinedload(SubscriptionInstanceTable.product_block)
            .selectinload(ProductBlockTable.resource_types),
            selectinload(SubscriptionTable.instances).selectinload(SubscriptionInstanceTable.values),
        ],
    )
    if not (sub := (existing_sub or self.db_model)):
        raise ValueError("Cannot save SubscriptionModel without a db_model")

    # Make sure we refresh the object and not use an already mapped object
    db.session.refresh(sub)

    self.db_model = sub
    sub.product_id = self.product.product_id
    sub.customer_id = self.customer_id
    sub.description = self.description
    sub.status = self.status.value
    sub.insync = self.insync
    sub.start_date = self.start_date
    sub.end_date = self.end_date
    sub.note = self.note

    db.session.add(sub)
    db.session.flush()  # Sends INSERT and returns subscription_id without committing transaction

    old_instances_dict = {instance.subscription_instance_id: instance for instance in sub.instances}

    saved_instances, depends_on_instances = self._save_instances(self.subscription_id, self.status)

    for instances in depends_on_instances.values():
        for instance in instances:
            if instance.subscription_id != self.subscription_id:
                raise ValueError(
                    "Attempting to save a Foreign `Subscription Instance` directly below a subscription. "
                    "This is not allowed."
                )
    sub.instances = saved_instances

    # Calculate what to remove
    instances_set = {instance.subscription_instance_id for instance in sub.instances}
    for instance_id in instances_set:
        old_instances_dict.pop(instance_id, None)

    # What's left should be removed
    for instance in old_instances_dict.values():
        db.session.delete(instance)

    db.session.flush()

It is also quite helpful to see how the Product Type is stored in the database—To see this, look at the ProductTable model as it shows all of the attributes stored in the database to store your WFO products:

orchestrator.db.models.ProductTable

Bases: orchestrator.db.database.BaseModel

Source code in orchestrator/db/models.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
class ProductTable(BaseModel):
    __tablename__ = "products"
    __table_args__ = {"extend_existing": True}

    __allow_unmapped__ = True

    product_id = mapped_column(UUIDType, server_default=text("uuid_generate_v4()"), primary_key=True)
    name = mapped_column(String(), nullable=False, unique=True)
    description = mapped_column(Text(), nullable=False)
    product_type = mapped_column(String(255), nullable=False)
    tag = mapped_column(String(TAG_LENGTH), nullable=False, index=True)
    status = mapped_column(String(STATUS_LENGTH), nullable=False)
    created_at = mapped_column(UtcTimestamp, nullable=False, server_default=text("current_timestamp()"))
    end_date = mapped_column(UtcTimestamp)

    product_blocks = relationship(
        "ProductBlockTable",
        secondary=product_product_block_association,
        back_populates="products",
        passive_deletes=True,
    )
    workflows = relationship(
        "WorkflowTable",
        secondary=product_workflows_association,
        secondaryjoin="and_(products_workflows.c.workflow_id == WorkflowTable.workflow_id, "
        "WorkflowTable.deleted_at == None)",
        back_populates="products",
        passive_deletes=True,
    )
    fixed_inputs = relationship(
        "FixedInputTable", cascade="all, delete-orphan", back_populates="product", passive_deletes=True
    )

    def find_block_by_name(self, name: str) -> ProductBlockTable:
        if session := object_session(self):
            return session.query(ProductBlockTable).with_parent(self).filter(ProductBlockTable.name == name).one()
        raise AssertionError("Session should not be None")

    def fixed_input_value(self, name: str) -> str:
        if session := object_session(self):
            return (
                session.query(FixedInputTable)
                .with_parent(self)
                .filter(FixedInputTable.name == name)
                .with_entities(FixedInputTable.value)
                .scalar()
            )
        raise AssertionError("Session should not be None")

    def _subscription_workflow_key(self, target: Target) -> str | None:
        wfs = list(filter(lambda w: w.target == target, self.workflows))
        return wfs[0].name if len(wfs) > 0 else None

    def create_subscription_workflow_key(self) -> str | None:
        return self._subscription_workflow_key(Target.CREATE)

    def terminate_subscription_workflow_key(self) -> str | None:
        return self._subscription_workflow_key(Target.TERMINATE)

    def modify_subscription_workflow_key(self, name: str) -> str | None:
        wfs = list(filter(lambda w: w.target == Target.MODIFY and w.name == name, self.workflows))
        return wfs[0].name if len(wfs) > 0 else None

    def workflow_by_key(self, name: str) -> WorkflowTable | None:
        return first_true(self.workflows, None, lambda wf: wf.name == name)  # type: ignore

Subscription Model Registry

When you define a Product Type as a domain model in python, you also need to register it in the subscription model registry, by using the SUBSCRIPTION_MODEL_REGISTRY dictionary, like is shown here in the example workflow orchestrator:

# Copyright 2019-2023 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

from products.product_types.core_link import CoreLink
from products.product_types.l2vpn import L2vpn
from products.product_types.node import Node
from products.product_types.port import Port

SUBSCRIPTION_MODEL_REGISTRY.update(
    {
        "node Cisco": Node,
        "node Nokia": Node,
        "node Cumulus": Node,
        "node FRR": Node,
        "port 10G": Port,
        "port 100G": Port,
        "core link 10G": CoreLink,
        "core link 100G": CoreLink,
        "l2vpn": L2vpn,
    }
)

Automatically Generating Product Types

If all of this seems like too much work, then good news, as all clever engineers before us have done, we've fixed that with YAML! Using the WFO CLI, you can generate your product types directly from a YAML. For more information on how to do that, check out the CLI generate command documentation.

Creating Database Migrations

After defining all of the components of a Product type, you'll also need to create a database migration to properly wire-up the product in the orchestrator's database. A migration file for this example Node model looks like this:

Example: example-orchestrator/migrations/versions/schema/2023-10-27_a84ca2e5e4db_add_node.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""Add node product.

Revision ID: a84ca2e5e4db
Revises: a77227fe5455
Create Date: 2023-10-27 11:25:40.994878

"""
from uuid import uuid4

from alembic import op
from orchestrator.migrations.helpers import (
    create,
    create_workflow,
    delete,
    delete_workflow,
    ensure_default_workflows,
)
from orchestrator.targets import Target

# revision identifiers, used by Alembic.
revision = "a84ca2e5e4db"
down_revision = "a77227fe5455"
branch_labels = None
depends_on = None

new_products = {
    "products": {
        "node Cisco": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "Cisco",
            },
        },
        "node Nokia": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "Nokia",
            },
        },
        "node Cumulus": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "Cumulus",
            },
        },
        "node FRR": {
            "product_id": uuid4(),
            "product_type": "Node",
            "description": "Network node",
            "tag": "NODE",
            "status": "active",
            "product_blocks": [
                "Node",
            ],
            "fixed_inputs": {
                "node_type": "FRR",
            },
        },
    },
    "product_blocks": {
        "Node": {
            "product_block_id": uuid4(),
            "description": "node product block",
            "tag": "NODE",
            "status": "active",
            "resources": {
                "role_id": "ID in CMDB of role of the node in the network",
                "type_id": "ID in CMDB of type of the node",
                "site_id": "ID in CMDB of site where the node is located",
                "node_status": "Operational status of the node",
                "node_name": "Unique name of the node",
                "node_description": "Description of the node",
                "ims_id": "ID of the node in the inventory management system",
                "nrm_id": "ID of the node in the network resource manager",
                "ipv4_ipam_id": "ID of the node’s iPv4 loopback address in IPAM",
                "ipv6_ipam_id": "ID of the node’s iPv6 loopback address in IPAM",
            },
            "depends_on_block_relations": [],
        },
    },
    "workflows": {},
}

new_workflows = [
    {
        "name": "create_node",
        "target": Target.CREATE,
        "description": "Create node",
        "product_type": "Node",
    },
    {
        "name": "modify_node",
        "target": Target.MODIFY,
        "description": "Modify node",
        "product_type": "Node",
    },
    {
        "name": "modify_sync_ports",
        "target": Target.MODIFY,
        "description": "Update node interfaces",
        "product_type": "Node",
    },
    {
        "name": "terminate_node",
        "target": Target.TERMINATE,
        "description": "Terminate node",
        "product_type": "Node",
    },
    {
        "name": "validate_node",
        "target": Target.SYSTEM,
        "description": "Validate node",
        "product_type": "Node",
    },
]


def upgrade() -> None:
    conn = op.get_bind()
    create(conn, new_products)
    for workflow in new_workflows:
        create_workflow(conn, workflow)
    ensure_default_workflows(conn)


def downgrade() -> None:
    conn = op.get_bind()
    for workflow in new_workflows:
        delete_workflow(conn, workflow["name"])

    delete(conn, new_products)

Thankfully, you don't have to write these database migrations by hand, you can simply use the main.py db migrate-domain-models command that is part of the orchestrator CLI, documented here.