Skip to content

Workflows

Workflows are what actually takes a product definition and populates your domain models. To read more about the architectural design of workflows check out the architecture page on workflows. To see more details about the workflow lifecycle states and functions, read on to the next section.

orchestrator.workflow.ProcessStatus

Bases: pydantic_forms.types.strEnum

Source code in orchestrator/workflow.py
590
591
592
593
594
595
596
597
598
599
600
601
602
@strawberry.enum
class ProcessStatus(strEnum):
    CREATED = "created"
    RUNNING = "running"
    SUSPENDED = "suspended"
    WAITING = "waiting"
    AWAITING_CALLBACK = "awaiting_callback"
    ABORTED = "aborted"
    FAILED = "failed"
    API_UNAVAILABLE = "api_unavailable"
    INCONSISTENT_DATA = "inconsistent_data"
    COMPLETED = "completed"
    RESUMED = "resumed"

orchestrator.workflows.utils

create_workflow

create_workflow(
    description: str,
    initial_input_form: pydantic_forms.types.InputStepFunc | None = None,
    status: orchestrator.types.SubscriptionLifecycle = SubscriptionLifecycle.ACTIVE,
    additional_steps: orchestrator.workflow.StepList | None = None,
    authorize_callback: orchestrator.utils.auth.Authorizer | None = None,
    retry_auth_callback: orchestrator.utils.auth.Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]

Transform an initial_input_form and a step list into a workflow with a target=Target.CREATE.

Use this for create workflows only.

Example::

@create_workflow("create service port")
def create_service_port() -> StepList:
    do_something
    >> do_something_else
Source code in orchestrator/workflows/utils.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def create_workflow(
    description: str,
    initial_input_form: InputStepFunc | None = None,
    status: SubscriptionLifecycle = SubscriptionLifecycle.ACTIVE,
    additional_steps: StepList | None = None,
    authorize_callback: Authorizer | None = None,
    retry_auth_callback: Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]:
    """Transform an initial_input_form and a step list into a workflow with a target=Target.CREATE.

    Use this for create workflows only.

    Example::

        @create_workflow("create service port")
        def create_service_port() -> StepList:
            do_something
            >> do_something_else
    """
    create_initial_input_form_generator = wrap_create_initial_input_form(initial_input_form)

    def _create_workflow(f: Callable[[], StepList]) -> Workflow:
        steplist = (
            init
            >> f()
            >> (additional_steps or StepList())
            >> set_status(status)
            >> resync
            >> refresh_subscription_search_index
            >> done
        )

        return make_workflow(
            f,
            description,
            create_initial_input_form_generator,
            Target.CREATE,
            steplist,
            authorize_callback=authorize_callback,
            retry_auth_callback=retry_auth_callback,
        )

    return _create_workflow

modify_workflow

modify_workflow(
    description: str,
    initial_input_form: pydantic_forms.types.InputStepFunc | None = None,
    additional_steps: orchestrator.workflow.StepList | None = None,
    authorize_callback: orchestrator.utils.auth.Authorizer | None = None,
    retry_auth_callback: orchestrator.utils.auth.Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]

Transform an initial_input_form and a step list into a workflow.

Use this for modify workflows.

Example::

@modify_workflow("modify service port") -> StepList:
def modify_service_port():
    do_something
    >> do_something_else
Source code in orchestrator/workflows/utils.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def modify_workflow(
    description: str,
    initial_input_form: InputStepFunc | None = None,
    additional_steps: StepList | None = None,
    authorize_callback: Authorizer | None = None,
    retry_auth_callback: Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]:
    """Transform an initial_input_form and a step list into a workflow.

    Use this for modify workflows.

    Example::

        @modify_workflow("modify service port") -> StepList:
        def modify_service_port():
            do_something
            >> do_something_else
    """

    wrapped_modify_initial_input_form_generator = wrap_modify_initial_input_form(initial_input_form)

    def _modify_workflow(f: Callable[[], StepList]) -> Workflow:
        steplist = (
            init
            >> store_process_subscription()
            >> unsync
            >> f()
            >> (additional_steps or StepList())
            >> resync
            >> refresh_subscription_search_index
            >> done
        )

        return make_workflow(
            f,
            description,
            wrapped_modify_initial_input_form_generator,
            Target.MODIFY,
            steplist,
            authorize_callback=authorize_callback,
            retry_auth_callback=retry_auth_callback,
        )

    return _modify_workflow

terminate_workflow

terminate_workflow(
    description: str,
    initial_input_form: pydantic_forms.types.InputStepFunc | None = None,
    additional_steps: orchestrator.workflow.StepList | None = None,
    authorize_callback: orchestrator.utils.auth.Authorizer | None = None,
    retry_auth_callback: orchestrator.utils.auth.Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]

Transform an initial_input_form and a step list into a workflow.

Use this for terminate workflows.

Example::

@terminate_workflow("terminate service port") -> StepList:
def terminate_service_port():
    do_something
    >> do_something_else
Source code in orchestrator/workflows/utils.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def terminate_workflow(
    description: str,
    initial_input_form: InputStepFunc | None = None,
    additional_steps: StepList | None = None,
    authorize_callback: Authorizer | None = None,
    retry_auth_callback: Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]:
    """Transform an initial_input_form and a step list into a workflow.

    Use this for terminate workflows.

    Example::

        @terminate_workflow("terminate service port") -> StepList:
        def terminate_service_port():
            do_something
            >> do_something_else
    """

    wrapped_terminate_initial_input_form_generator = wrap_modify_initial_input_form(initial_input_form)

    def _terminate_workflow(f: Callable[[], StepList]) -> Workflow:
        steplist = (
            init
            >> store_process_subscription()
            >> unsync
            >> f()
            >> (additional_steps or StepList())
            >> set_status(SubscriptionLifecycle.TERMINATED)
            >> resync
            >> refresh_subscription_search_index
            >> done
        )

        return make_workflow(
            f,
            description,
            wrapped_terminate_initial_input_form_generator,
            Target.TERMINATE,
            steplist,
            authorize_callback=authorize_callback,
            retry_auth_callback=retry_auth_callback,
        )

    return _terminate_workflow

validate_workflow

validate_workflow(
    description: str,
) -> Callable[[Callable[[], StepList]], Workflow]

Transform an initial_input_form and a step list into a workflow.

Use this for subscription validate workflows.

Example::

@validate_workflow("create service port")
def create_service_port():
    do_something
    >> do_something_else
Source code in orchestrator/workflows/utils.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def validate_workflow(description: str) -> Callable[[Callable[[], StepList]], Workflow]:
    """Transform an initial_input_form and a step list into a workflow.

    Use this for subscription validate workflows.

    Example::

        @validate_workflow("create service port")
        def create_service_port():
            do_something
            >> do_something_else
    """

    def _validate_workflow(f: Callable[[], StepList]) -> Workflow:
        steplist = init >> store_process_subscription() >> unsync_unchecked >> f() >> resync >> done

        return make_workflow(f, description, validate_initial_input_form_generator, Target.VALIDATE, steplist)

    return _validate_workflow

reconcile_workflow

reconcile_workflow(
    description: str,
    additional_steps: orchestrator.workflow.StepList | None = None,
    authorize_callback: orchestrator.utils.auth.Authorizer | None = None,
    retry_auth_callback: orchestrator.utils.auth.Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]

Similar to a modify_workflow but without required input user input to perform a sync with external systems based on the subscriptions existing configuration.

Use this for subscription reconcile workflows.

Example::

@reconcile_workflow("Reconcile l2vpn")
def reconcile_l2vpn() -> StepList:
    return (
        begin
        >> update_l2vpn_in_external_systems
    )
Source code in orchestrator/workflows/utils.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def reconcile_workflow(
    description: str,
    additional_steps: StepList | None = None,
    authorize_callback: Authorizer | None = None,
    retry_auth_callback: Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]:
    """Similar to a modify_workflow but without required input user input to perform a sync with external systems based on the subscriptions existing configuration.

    Use this for subscription reconcile workflows.

    Example::

        @reconcile_workflow("Reconcile l2vpn")
        def reconcile_l2vpn() -> StepList:
            return (
                begin
                >> update_l2vpn_in_external_systems
            )
    """

    wrapped_reconcile_initial_input_form_generator = wrap_modify_initial_input_form(None)

    def _reconcile_workflow(f: Callable[[], StepList]) -> Workflow:
        steplist = (
            init
            >> store_process_subscription()
            >> unsync
            >> f()
            >> (additional_steps or StepList())
            >> resync
            >> refresh_subscription_search_index
            >> done
        )

        return make_workflow(
            f,
            description,
            wrapped_reconcile_initial_input_form_generator,
            Target.RECONCILE,
            steplist,
            authorize_callback=authorize_callback,
            retry_auth_callback=retry_auth_callback,
        )

    return _reconcile_workflow

orchestrator.workflow.workflow

workflow(
    description: str,
    initial_input_form: pydantic_forms.types.InputStepFunc | None = None,
    target: orchestrator.targets.Target = Target.SYSTEM,
    authorize_callback: orchestrator.utils.auth.Authorizer | None = None,
    retry_auth_callback: orchestrator.utils.auth.Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]

Transform an initial_input_form and a step list into a workflow.

Use this for other workflows. For create workflows use :func:create_workflow

Example::

@workflow("create service port")
def create_service_port():
    init
    << do_something
    << done
Source code in orchestrator/workflow.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def workflow(
    description: str,
    initial_input_form: InputStepFunc | None = None,
    target: Target = Target.SYSTEM,
    authorize_callback: Authorizer | None = None,
    retry_auth_callback: Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]:
    """Transform an initial_input_form and a step list into a workflow.

    Use this for other workflows. For create workflows use :func:`create_workflow`

    Example::

        @workflow("create service port")
        def create_service_port():
            init
            << do_something
            << done
    """
    if initial_input_form is None:
        initial_input_form_in_form_inject_args = None
    else:
        initial_input_form_in_form_inject_args = form_inject_args(initial_input_form)

    def _workflow(f: Callable[[], StepList]) -> Workflow:
        return make_workflow(
            f,
            description,
            initial_input_form_in_form_inject_args,
            target,
            f(),
            authorize_callback=authorize_callback,
            retry_auth_callback=retry_auth_callback,
        )

    return _workflow

Workflow helpers to register them in DB

orchestrator.migrations.helpers

add_product_block_relation_between_products_by_id

add_product_block_relation_between_products_by_id(
    conn: sqlalchemy.engine.Connection,
    in_use_by_id: uuid.UUID | pydantic_forms.types.UUIDstr,
    depends_on_id: uuid.UUID | pydantic_forms.types.UUIDstr,
) -> None

Add product block relation by product block id.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • in_use_by_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the product block that uses another product block.

  • depends_on_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the product block that is used as dependency.

Usage:

in_use_by_id = "in_use_by_id"
depends_on_id = "depends_on_id"
add_product_block_relation_between_products_by_id(conn, in_use_by_id, depends_on_id)

Source code in orchestrator/migrations/helpers.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
def add_product_block_relation_between_products_by_id(
    conn: sa.engine.Connection, in_use_by_id: UUID | UUIDstr, depends_on_id: UUID | UUIDstr
) -> None:
    """Add product block relation by product block id.

    Args:
        conn: DB connection as available in migration main file.
        in_use_by_id: ID of the product block that uses another product block.
        depends_on_id: ID of the product block that is used as dependency.

    Usage:
    ```python
    in_use_by_id = "in_use_by_id"
    depends_on_id = "depends_on_id"
    add_product_block_relation_between_products_by_id(conn, in_use_by_id, depends_on_id)
    ```
    """

    conn.execute(
        sa.text(
            """
            INSERT INTO product_block_relations (in_use_by_id, depends_on_id)
            VALUES (:in_use_by_id, :depends_on_id)
            """
        ),
        {
            "in_use_by_id": in_use_by_id,
            "depends_on_id": depends_on_id,
        },
    )

add_products_to_workflow_by_product_tag

add_products_to_workflow_by_product_tag(
    conn: sqlalchemy.engine.Connection,
    workflow_name: str,
    product_tag: str,
    product_name_like: str = "%%",
) -> None

Add products to a workflow by product tag.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • workflow_name (str) –

    Name of the workflow to add the products to.

  • product_tag (str) –

    Tag of the product to add to the workflow.

  • product_name_like (optional, default: '%%' ) –

    Part of the product name to get more specific products (necessary for fw v2)

Usage:

product_tag = "product_tag"
workflow_name = "workflow_name"
add_products_to_workflow_by_product_tag(conn, product_tag, workflow_name)

Source code in orchestrator/migrations/helpers.py
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
def add_products_to_workflow_by_product_tag(
    conn: sa.engine.Connection, workflow_name: str, product_tag: str, product_name_like: str = "%%"
) -> None:
    """Add products to a workflow by product tag.

    Args:
        conn: DB connection as available in migration main file.
        workflow_name: Name of the workflow to add the products to.
        product_tag: Tag of the product to add to the workflow.
        product_name_like (optional): Part of the product name to get more specific products (necessary for fw v2)

    Usage:
    ```python
    product_tag = "product_tag"
    workflow_name = "workflow_name"
    add_products_to_workflow_by_product_tag(conn, product_tag, workflow_name)
    ```
    """

    conn.execute(
        sa.text(
            """
            WITH workflow AS (SELECT workflow_id
                              FROM workflows
                              WHERE name = :workflow_name)
            INSERT
            INTO products_workflows (product_id, workflow_id)
            SELECT p.product_id,
                   nw.workflow_id
            FROM products AS p
                     CROSS JOIN workflow AS nw
            WHERE p.tag = :product_tag
              AND name LIKE :product_name_like
            """
        ),
        {
            "workflow_name": workflow_name,
            "product_tag": product_tag,
            "product_name_like": product_name_like,
        },
    )

convert_instance_relations_to_resource_type_relations_by_domain_model_attr

convert_instance_relations_to_resource_type_relations_by_domain_model_attr(
    conn: sqlalchemy.engine.Connection,
    domain_model_attr: str,
    resource_type_id: uuid.UUID | pydantic_forms.types.UUIDstr,
    cleanup: bool = True,
) -> None

Move instance type relations to resouce type relations by domain model attribute.

Note: It removes the instance relations after moving! Use the cleanup argument if you don't want this

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • domain_model_attr (str) –

    Name of the domain model attribute that connects the product blocks together.

  • resource_type_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the resource type that you want to move the instance relations to.

  • cleanup (bool, default: True ) –

    remove old instance relations after the migrate?

Usage

domain_model_attr = "domain_model_attr" resource_type_id = "id" convert_instance_relations_to_resource_type_relations_by_domain_model_attr( conn, domain_model_attr, resource_type_id )

Source code in orchestrator/migrations/helpers.py
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
def convert_instance_relations_to_resource_type_relations_by_domain_model_attr(
    conn: sa.engine.Connection, domain_model_attr: str, resource_type_id: UUID | UUIDstr, cleanup: bool = True
) -> None:
    """Move instance type relations to resouce type relations by domain model attribute.

    Note: It removes the instance relations after moving! Use the `cleanup` argument if you don't want this

    Args:
        conn: DB connection as available in migration main file.
        domain_model_attr: Name of the domain model attribute that connects the product blocks together.
        resource_type_id: ID of the resource type that you want to move the instance relations to.
        cleanup: remove old instance relations after the migrate?

    Usage:
        >>> domain_model_attr = "domain_model_attr"
        >>> resource_type_id = "id"
        >>> convert_instance_relations_to_resource_type_relations_by_domain_model_attr(
            conn, domain_model_attr, resource_type_id
        )
    """
    conn.execute(
        sa.text(
            """
            INSERT INTO subscription_instance_values (subscription_instance_id, resource_type_id, value)
            WITH instance_relations AS (SELECT in_use_by_id, depends_on_id
                                        FROM subscription_instance_relations
                                        WHERE domain_model_attr = :domain_model_attr)
            SELECT ir.in_use_by_id    AS subscription_instance_id,
                   :resource_type_id  AS resource_type_id,
                   si.subscription_id AS value
            from subscription_instances as si
                     inner join instance_relations as ir on si.subscription_instance_id = ir.depends_on_id
            """
        ),
        {
            "domain_model_attr": domain_model_attr,
            "resource_type_id": resource_type_id,
        },
    )

    if cleanup:
        conn.execute(
            sa.text("DELETE FROM subscription_instance_relations WHERE domain_model_attr=:attr"),
            {"attr": domain_model_attr},
        )

convert_resource_type_relations_to_instance_relations

convert_resource_type_relations_to_instance_relations(
    conn: sqlalchemy.engine.Connection,
    resource_type_id: uuid.UUID | pydantic_forms.types.UUIDstr,
    domain_model_attr: str,
    cleanup: bool = True,
) -> None

Move resouce type relations to instance type relations using resource type id.

Note: It removes the resource type relations after moving! (comment out the second execute to try without the removal)

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • resource_type_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the resource type that you want to move to instance relations.

  • domain_model_attr (str) –

    Name of the domain model attribute that connects the product blocks together.

  • cleanup (bool, default: True ) –

    remove old resource type relations after the migrate?

Usage

resource_type_id = "id" domain_model_attr = "domain_model_attr" convert_resource_type_relation_to_instance_relations( conn, resource_type_id, domain_model_attr )

Source code in orchestrator/migrations/helpers.py
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
def convert_resource_type_relations_to_instance_relations(
    conn: sa.engine.Connection, resource_type_id: UUID | UUIDstr, domain_model_attr: str, cleanup: bool = True
) -> None:
    """Move resouce type relations to instance type relations using resource type id.

    Note: It removes the resource type relations after moving! (comment out the second execute to try without the removal)

    Args:
        conn: DB connection as available in migration main file.
        resource_type_id: ID of the resource type that you want to move to instance relations.
        domain_model_attr: Name of the domain model attribute that connects the product blocks together.
        cleanup: remove old resource type relations after the migrate?

    Usage:
        >>> resource_type_id = "id"
        >>> domain_model_attr = "domain_model_attr"
        >>> convert_resource_type_relation_to_instance_relations(
            conn, resource_type_id, domain_model_attr
        )
    """
    conn.execute(
        sa.text(
            """
            INSERT INTO subscription_instance_relations (in_use_by_id, depends_on_id, order_id, domain_model_attr)
            WITH dependencies AS (SELECT siv.value                    AS subscription_id,
                                         siv.subscription_instance_id AS in_use_by_instance_id,
                                         si.product_block_id
                                  FROM subscription_instance_values AS siv
                                           LEFT JOIN subscription_instances AS si
                                                     on siv.subscription_instance_id = si.subscription_instance_id
                                  WHERE siv.resource_type_id = :resource_type_id)
            SELECT in_use_by_instance_id                                        AS in_use_by_id,
                   sii.subscription_instance_id                                 AS depends_on_id,
                   (row_number() OVER (PARTITION BY in_use_by_instance_id) - 1) AS order_id,
                   :domain_model_attr                                           AS domain_model_attr
            FROM subscription_instances AS sii
                     INNER JOIN dependencies AS dep ON sii.subscription_id = uuid(dep.subscription_id)
            ON CONFLICT DO NOTHING
            """
        ),
        {
            "resource_type_id": resource_type_id,
            "domain_model_attr": domain_model_attr,
        },
    )

    if cleanup:
        conn.execute(
            sa.text(
                """
                DELETE
                FROM subscription_instance_values
                WHERE resource_type_id = :resource_type_id
                """
            ),
            {"resource_type_id": resource_type_id},
        )

create

create(conn: sqlalchemy.engine.Connection, new: dict) -> None

Call other functions in this file based on the schema.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • new (dict) –

    a dict with everything you want to make and link

Example

new_stuff = { "products": { "Example Product": { "product_id": "c9dc2374-514c-11eb-b685-acde48001122", "product_type": "ProductType1", "description": "Product description", "tag": "ProductType", "status": "active", "product_blocks": [ "Example Product Block" ], "fixed_inputs": { "fixed_input_1": ("value", "f6a4f529-ad17-4ad8-b8ba-45684e2354ba"), "fixed_input_2": ("value", "5a67321d-45d5-4921-aa93-b8708b5d74c6") } }, "Example Product 2": { "product_id": "c9dc2374-514c-11eb-b685-acde48001122", "product_type": "ProductType1", "description": "Product description", "tag": "ProductType", "status": "active", "product_block_ids": [ "37afe017-5a04-4d87-96b0-b8f88a328d7a" ] } }, "product_blocks": { "Example Product Block": { "product_block_id": "37afe017-5a04-4d87-96b0-b8f88a328d7a", "description": "Product description", "tag": "ProductType", "status": "active", "resources": { "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"), "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc") } }, "Generated UUID Product Block": { "product_block_id": "37afe017-5a04-4d87-96b0-b8f88a328d7a", "description": "Product description", "tag": "ProductType", "status": "active", "resources": { "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"), "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc") } } }, "resources": { "Existing Product": { "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"), "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc") } }, "workflows": { "workflow_name": { "workflow_id": "f2702074-3203-454c-b298-6dfa7675423d", "target": "CREATE", "description": "Workflow description", "tag": "ProductType1", "search_phrase": "Search Phrase%", } } }

Source code in orchestrator/migrations/helpers.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
def create(conn: sa.engine.Connection, new: dict) -> None:  # noqa: C901
    """Call other functions in this file based on the schema.

    Args:
        conn: DB connection as available in migration main file
        new: a dict with everything you want to make and link

    Example:
        >>> new_stuff = {
                "products": {
                    "Example Product": {
                        "product_id": "c9dc2374-514c-11eb-b685-acde48001122",
                        "product_type": "ProductType1",
                        "description": "Product description",
                        "tag": "ProductType",
                        "status": "active",
                        "product_blocks": [
                            "Example Product Block"
                        ],
                        "fixed_inputs": {
                            "fixed_input_1": ("value", "f6a4f529-ad17-4ad8-b8ba-45684e2354ba"),
                            "fixed_input_2": ("value", "5a67321d-45d5-4921-aa93-b8708b5d74c6")
                        }
                    },
                    "Example Product 2": {
                        "product_id": "c9dc2374-514c-11eb-b685-acde48001122",
                        "product_type": "ProductType1",
                        "description": "Product description",
                        "tag": "ProductType",
                        "status": "active",
                        "product_block_ids": [
                            "37afe017-5a04-4d87-96b0-b8f88a328d7a"
                        ]
                    }
                },
                "product_blocks": {
                    "Example Product Block": {
                        "product_block_id": "37afe017-5a04-4d87-96b0-b8f88a328d7a",
                        "description": "Product description",
                        "tag": "ProductType",
                        "status": "active",
                        "resources": {
                            "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"),
                            "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc")
                        }
                    },
                    "Generated UUID Product Block": {
                        "product_block_id": "37afe017-5a04-4d87-96b0-b8f88a328d7a",
                        "description": "Product description",
                        "tag": "ProductType",
                        "status": "active",
                        "resources": {
                            "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"),
                            "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc")
                        }
                    }
                },
                "resources": {
                    "Existing Product": {
                        "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"),
                        "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc")
                    }
                },
                "workflows": {
                    "workflow_name": {
                        "workflow_id": "f2702074-3203-454c-b298-6dfa7675423d",
                        "target": "CREATE",
                        "description": "Workflow description",
                        "tag": "ProductType1",
                        "search_phrase": "Search Phrase%",
                    }
                }
            }
    """
    resources = new.get("resources", {})
    product_block_uuids = {}

    # Create defined product blocks
    if "product_blocks" in new:
        for product_block_name, product_block in new.get("product_blocks", {}).items():
            # Move resources into one dict
            if "resources" in product_block:
                res_dict = {product_block_name: product_block["resources"]}
                resources.update(res_dict)
                del product_block["resources"]
        product_block_uuids = create_product_blocks(conn, new["product_blocks"])

    def get_product_block_id(product_block_name: str) -> str:
        if product_block_id := product_block_uuids.get(product_block_name):
            return product_block_id

        try:
            return str(get_product_block_id_by_name(conn, product_block_name))
        except Exception:
            raise ValueError(f"{product_block_name} is not a valid product block.")

    # Create defined products
    if "products" in new:
        for _, product in new["products"].items():
            # The best practice is for a product to have only 1 root product block.
            # Migrations created through the generator adhere to this practice.
            product_block_ids = product.get("product_block_ids", [])
            if root_product_block := product.get("root_product_block"):
                root_product_block_id = get_product_block_id(root_product_block)
                if product_block_ids and product_block_ids != [root_product_block_id]:
                    logger.warning("Overriding hardcoded product_block_ids with root product block id")
                product["product_block_ids"] = [root_product_block_id]
                continue

            # To avoid forcing users to re-write old migrations or their products, this function will still
            # allow inserting multiple root product blocks.
            if "product_blocks" in product:
                product.setdefault("product_block_ids", [])
                for product_block_name in product["product_blocks"]:
                    product["product_block_ids"].append(get_product_block_id(product_block_name))
                del product["product_blocks"]
        create_products(conn, new["products"])

    # Create defined resource types
    if resources:
        create_resource_types_for_product_blocks(conn, resources)

    # Create defined workflows
    if "workflows" in new:
        create_workflows(conn, new["workflows"])

    # Ensure default workflows exist for all products
    ensure_default_workflows(conn)

create_fixed_inputs

create_fixed_inputs(
    conn: sqlalchemy.engine.Connection,
    product_id: uuid.UUID | pydantic_forms.types.UUIDstr,
    new: dict,
) -> dict[str, str]

Create fixed inputs for a given product.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • product_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    UUID of the product to link to

  • new (dict) –

    an dict of your workflow data

Example

product_id = "id" new = { "fixed_input_1": ("value", "f6a4f529-ad17-4ad8-b8ba-45684e2354ba"), "fixed_input_2": ("value", "5a67321d-45d5-4921-aa93-b8708b5d74c6") } create_resource_types(conn, product_id, new)

without extra ID's you don't need the tuple:

>>> product_id = "id"
>>> new = {
    "fixed_input_1": "value",
    "fixed_input_2": "value",
}
>>> create_fixed_inputs(conn, product_id, new)
Source code in orchestrator/migrations/helpers.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def create_fixed_inputs(conn: sa.engine.Connection, product_id: UUID | UUIDstr, new: dict) -> dict[str, str]:
    """Create fixed inputs for a given product.

    Args:
        conn: DB connection as available in migration main file
        product_id: UUID of the product to link to
        new: an dict of your workflow data

    Example:
        >>> product_id = "id"
        >>> new = {
                "fixed_input_1": ("value", "f6a4f529-ad17-4ad8-b8ba-45684e2354ba"),
                "fixed_input_2": ("value", "5a67321d-45d5-4921-aa93-b8708b5d74c6")
            }
        >>> create_resource_types(conn, product_id, new)

    without extra ID's you don't need the tuple:

        >>> product_id = "id"
        >>> new = {
            "fixed_input_1": "value",
            "fixed_input_2": "value",
        }
        >>> create_fixed_inputs(conn, product_id, new)

    """
    insert_fixed_input_with_id = sa.text(
        """INSERT INTO fixed_inputs (fixed_input_id, name, value, created_at, product_id)
           VALUES (:fixed_input_id, :key, :value, now(), :product_id)
           ON CONFLICT DO NOTHING;"""
    )
    insert_fixed_input_without_id = sa.text(
        """INSERT INTO fixed_inputs (name, value, created_at, product_id)
           VALUES (:key, :value, now(), :product_id)
           ON CONFLICT DO NOTHING;"""
    )

    uuids = {}
    for key, values in new.items():
        if isinstance(values, tuple):
            value, fixed_input_id = values
            uuids[key] = fixed_input_id
            conn.execute(
                insert_fixed_input_with_id,
                {"fixed_input_id": fixed_input_id, "key": key, "value": value, "product_id": product_id},
            )
        else:
            conn.execute(insert_fixed_input_without_id, {"key": key, "value": values, "product_id": product_id})
            uuids[key] = get_fixed_input_id_by_name(conn, key)

    return uuids

create_product_blocks

create_product_blocks(
    conn: sqlalchemy.engine.Connection, new: dict
) -> dict[str, UUIDstr]

Create new product blocks.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • new (dict) –

    an dict of your workflow data

  • products

    list of product block ids to link these product blocks to

Example

new = { "Example Product Block": { "product_block_id": "37afe017-5a04-4d87-96b0-b8f88a328d7a", "description": "Product description", "tag": "ProductType", "status": "active", }, "Example Product Block Two": { "product_block_id": "e8312243-f5cc-4560-adf0-63be4cefeccd", "description": "Product description", "tag": "ProductType", "status": "active", } }

Source code in orchestrator/migrations/helpers.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def create_product_blocks(conn: sa.engine.Connection, new: dict) -> dict[str, UUIDstr]:
    """Create new product blocks.

    Args:
        conn: DB connection as available in migration main file
        new: an dict of your workflow data
        products: list of product block ids to link these product blocks to

    Example:
        >>> new = {
                "Example Product Block": {
                    "product_block_id": "37afe017-5a04-4d87-96b0-b8f88a328d7a",
                    "description": "Product description",
                    "tag": "ProductType",
                    "status": "active",
                },
                "Example Product Block Two": {
                    "product_block_id": "e8312243-f5cc-4560-adf0-63be4cefeccd",
                    "description": "Product description",
                    "tag": "ProductType",
                    "status": "active",
                }
            }
    """
    uuids = {}
    for name, product_block in new.items():
        product_block["name"] = name
        uuids[name] = product_block["product_block_id"]
        conn.execute(
            sa.text(
                """
                INSERT INTO product_blocks (product_block_id, name, description, tag, status, created_at)
                VALUES (:product_block_id, :name, :description, :tag, :status, now())
                ON CONFLICT DO NOTHING;
                """
            ),
            product_block,
        )
        if "resource_types" in product_block:
            block_resource_types = {name: product_block["resource_types"]}
            create_resource_types_for_product_blocks(conn, block_resource_types)

        if "in_use_by_block_relations" in product_block:
            for in_use_by_block_name in product_block["in_use_by_block_relations"]:
                in_use_by_block_id = get_product_block_id_by_name(conn, in_use_by_block_name)
                add_product_block_relation_between_products_by_id(
                    conn, str(in_use_by_block_id), str(product_block["product_block_id"])
                )

        if "depends_on_block_relations" in product_block:
            for depends_on_block_name in product_block["depends_on_block_relations"]:
                depends_on_block_id = get_product_block_id_by_name(conn, depends_on_block_name)
                add_product_block_relation_between_products_by_id(
                    conn, str(product_block["product_block_id"]), str(depends_on_block_id)
                )

    return uuids

create_products

create_products(
    conn: sqlalchemy.engine.Connection, new: dict
) -> dict[str, UUIDstr]

Create new products with their fixed inputs.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • new (dict) –

    an dict of your workflow data

Example

new = { "Example Product": { "product_id": "c9dc2374-514c-11eb-b685-acde48001122", "product_type": "ProductType1", "description": "Product description", "tag": "ProductType", "status": "active", "fixed_inputs": { "fixed_input_1": "value", "fixed_input_2": "value2" } }, "Example Product 2": { "product_type": "ProductType1", "description": "Product description", "tag": "ProductType", "status": "active", "product_block_ids": [ "37afe017-5a04-4d87-96b0-b8f88a328d7a" ] } }

Source code in orchestrator/migrations/helpers.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def create_products(conn: sa.engine.Connection, new: dict) -> dict[str, UUIDstr]:
    """Create new products with their fixed inputs.

    Args:
        conn: DB connection as available in migration main file
        new: an dict of your workflow data

    Example:
        >>> new = {
                "Example Product": {
                    "product_id": "c9dc2374-514c-11eb-b685-acde48001122",
                    "product_type": "ProductType1",
                    "description": "Product description",
                    "tag": "ProductType",
                    "status": "active",
                    "fixed_inputs": {
                        "fixed_input_1": "value",
                        "fixed_input_2": "value2"
                    }
                },
                "Example Product 2": {
                    "product_type": "ProductType1",
                    "description": "Product description",
                    "tag": "ProductType",
                    "status": "active",
                    "product_block_ids": [
                        "37afe017-5a04-4d87-96b0-b8f88a328d7a"
                    ]
                }
            }
    """
    uuids = {}
    for name, product in new.items():
        product["name"] = name
        current_uuid = product["product_id"]
        uuids[name] = current_uuid
        conn.execute(
            sa.text(
                """
                INSERT INTO products (product_id, name, description, product_type, tag, status, created_at)
                VALUES (:product_id, :name, :description, :product_type, :tag, :status, now())
                ON CONFLICT DO NOTHING;
                """
            ),
            product,
        )
        for product_block_uuid in product.get("product_block_ids", []):
            conn.execute(
                sa.text("INSERT INTO product_product_blocks VALUES (:product_id, :product_block_id)"),
                {
                    "product_id": current_uuid,
                    "product_block_id": product_block_uuid,
                },
            )
        if "fixed_inputs" in product:
            create_fixed_inputs(conn, current_uuid, product["fixed_inputs"])
    return uuids

create_resource_types_for_product_blocks

create_resource_types_for_product_blocks(
    conn: sqlalchemy.engine.Connection, new: dict
) -> list[UUID]

Create new resource types and link them to existing product_blocks by product_block name.

Note: If the resource type already exists it will still add the resource type to the provided Product Blocks.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • new (dict) –

    a dict with your product blocks and resource types

Returns: List of resource type ids in order of insertion

Usage examples

new_stuff = { "ProductBlockName1": { "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878") }, "ProductBlockName2": { "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"), "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc") } } create_resource_types(conn, new_stuff)

without extra ID's you don't need the tuple:

>>> new_stuff = {
    "ProductBlockName1": {
        "resource_type1": "Resource description"
    },
    "ProductBlockName2": {
        "resource_type1": "Resource description",
        "resource_type1": "Resource description"
    }
}

>>> create_resource_types_for_product_blocks(conn, new_stuff)
Source code in orchestrator/migrations/helpers.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def create_resource_types_for_product_blocks(conn: sa.engine.Connection, new: dict) -> list[UUID]:
    """Create new resource types and link them to existing product_blocks by product_block name.

    Note: If the resource type already exists it will still add the resource type to the provided Product Blocks.

    Args:
        conn: DB connection as available in migration main file
        new: a dict with your product blocks and resource types
    Returns:
        List of resource type ids in order of insertion

    Usage examples:
        >>> new_stuff = {
            "ProductBlockName1": {
                "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878")
            },
            "ProductBlockName2": {
                "resource_type1": ("Resource description", "a47a3f96-c32f-4e4d-8e8c-11596451e878"),
                "resource_type2": ("Resource description", "dffe1890-e0f8-4ed5-8d0b-e769c3f726cc")
            }
        }
        >>> create_resource_types(conn, new_stuff)

    without extra ID's you don't need the tuple:

        >>> new_stuff = {
            "ProductBlockName1": {
                "resource_type1": "Resource description"
            },
            "ProductBlockName2": {
                "resource_type1": "Resource description",
                "resource_type1": "Resource description"
            }
        }

        >>> create_resource_types_for_product_blocks(conn, new_stuff)

    """
    insert_resource_type_with_id = sa.text(
        """INSERT INTO resource_types (resource_type_id, resource_type, description)
           VALUES (:resource_type_id, :resource_type, :description)
           ON CONFLICT DO NOTHING;"""
    )
    insert_resource_type_without_id = sa.text(
        """INSERT INTO resource_types (resource_type, description)
           VALUES (:resource_type, :description)
           ON CONFLICT DO NOTHING;"""
    )

    resource_type_ids = []
    for resource_types in new.values():
        for resource_type, values in resource_types.items():
            if isinstance(values, tuple):
                description, resource_type_id = values
                conn.execute(
                    insert_resource_type_with_id,
                    {
                        "resource_type_id": resource_type_id,
                        "resource_type": resource_type,
                        "description": description,
                    },
                )
            else:
                conn.execute(insert_resource_type_without_id, {"resource_type": resource_type, "description": values})
            resource_type_ids.append(get_resource_type_id_by_name(conn, resource_type))

    for product_block, resource_types in new.items():
        conn.execute(
            sa.text(
                """
                WITH resource_type_ids AS (SELECT resource_types.resource_type_id
                                           FROM resource_types
                                           WHERE resource_types.resource_type = ANY (:new_resource_types)),
                     service_attach_point AS (SELECT product_blocks.product_block_id
                                              FROM product_blocks
                                              WHERE product_blocks.name = :product_block_name)

                INSERT
                INTO product_block_resource_types (product_block_id, resource_type_id)
                SELECT service_attach_point.product_block_id,
                       resource_type_ids.resource_type_id
                FROM service_attach_point
                         CROSS JOIN
                     resource_type_ids
                """
            ),
            {
                "product_block_name": product_block,
                "new_resource_types": list(resource_types.keys()),
            },
        )
    return resource_type_ids

create_task

create_task(conn: sqlalchemy.engine.Connection, task: dict) -> None

Create a new task.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • task (dict) –

    Dict with data for a new workflow. name: Name of the task. description: Description of the workflow.

Usage

task = { "name": "task_name", "description": "task description", } create_task(conn, task)

Source code in orchestrator/migrations/helpers.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def create_task(conn: sa.engine.Connection, task: dict) -> None:
    """Create a new task.

    Args:
        conn: DB connection as available in migration main file.
        task: Dict with data for a new workflow.
            name: Name of the task.
            description: Description of the workflow.

    Usage:
        >>> task = {
            "name": "task_name",
            "description": "task description",
        }
        >>> create_task(conn, task)
    """
    if has_table_column(table_name="workflows", column_name="is_task", conn=conn):
        query = """
                INSERT INTO workflows(name, target, is_task, description)
                VALUES (:name, 'SYSTEM', TRUE, :description)
                ON CONFLICT DO NOTHING
                RETURNING workflow_id
                """
    else:
        query = """
                INSERT INTO workflows(name, target, description)
                VALUES (:name, 'SYSTEM', :description)
                ON CONFLICT DO NOTHING
                RETURNING workflow_id
                """

    conn.execute(sa.text(query), task)

create_workflow

create_workflow(conn: sqlalchemy.engine.Connection, workflow: dict) -> None

Create a new workflow for a product.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • workflow (dict) –

    Dict with data for a new workflow. name: Name of the workflow. target: Target of the workflow ("CREATE", "MODIFY", "RECONCILE", "TERMINATE", "SYSTEM") description: Description of the workflow. product_type: Product type to add the workflow to.

Usage

workflow = { "name": "workflow_name", "target": "SYSTEM", "is_task": False, "description": "workflow description", "product_type": "product_type", "product_tag": "product_tag", } create_workflow(conn, workflow)

Source code in orchestrator/migrations/helpers.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def create_workflow(conn: sa.engine.Connection, workflow: dict) -> None:
    """Create a new workflow for a product.

    Args:
        conn: DB connection as available in migration main file.
        workflow: Dict with data for a new workflow.
            name: Name of the workflow.
            target: Target of the workflow ("CREATE", "MODIFY", "RECONCILE", "TERMINATE", "SYSTEM")
            description: Description of the workflow.
            product_type: Product type to add the workflow to.

    Usage:
        >>> workflow = {
            "name": "workflow_name",
            "target": "SYSTEM",
            "is_task": False,
            "description": "workflow description",
            "product_type": "product_type",
            "product_tag": "product_tag",
        }
        >>> create_workflow(conn, workflow)
    """
    params = workflow.copy()
    params.setdefault("is_task", False)
    params.setdefault("product_tag", None)

    query_parts = []

    if has_table_column(table_name="workflows", column_name="is_task", conn=conn):
        query_parts.append(
            """
            WITH new_workflow AS (
                INSERT INTO workflows (name, target, is_task, description)
                    VALUES (:name, :target, :is_task, :description)
                    ON CONFLICT DO NOTHING
                    RETURNING workflow_id
            )
            """
        )
    else:
        params.pop("is_task", None)
        query_parts.append(
            """
            WITH new_workflow AS (
                INSERT INTO workflows (name, target, description)
                    VALUES (:name, :target, :description)
                    ON CONFLICT DO NOTHING
                    RETURNING workflow_id
            )
            """
        )

    query_parts.append(
        """
        INSERT INTO products_workflows (product_id, workflow_id)
        SELECT p.product_id, nw.workflow_id
        FROM products AS p
                 CROSS JOIN new_workflow AS nw
        """
    )

    query_parts.append("WHERE p.product_type = :product_type")

    if params.get("product_tag") is not None:
        query_parts.append("AND p.tag = :product_tag")
    else:
        params.pop("product_tag", None)

    query_parts.append("ON CONFLICT DO NOTHING")

    query = "\n".join(query_parts)

    conn.execute(sa.text(query), params)

create_workflows

create_workflows(conn: sqlalchemy.engine.Connection, new: dict) -> None

Create new workflows.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • new (dict) –

    an dict of your workflow data

Example

new_workflows = { "workflow_name": { "workflow_id": "f2702074-3203-454c-b298-6dfa7675423d", "target": "CREATE", "is_task": False, "description": "Workflow description", "tag": "ProductBlockName1", "search_phrase": "Search Phrase%", } }

Source code in orchestrator/migrations/helpers.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def create_workflows(conn: sa.engine.Connection, new: dict) -> None:
    """Create new workflows.

    Args:
        conn: DB connection as available in migration main file
        new: an dict of your workflow data

    Example:
        >>> new_workflows = {
                "workflow_name": {
                    "workflow_id": "f2702074-3203-454c-b298-6dfa7675423d",
                    "target": "CREATE",
                    "is_task": False,
                    "description": "Workflow description",
                    "tag": "ProductBlockName1",
                    "search_phrase": "Search Phrase%",
                }
            }
    """
    for name, workflow in new.items():
        workflow["name"] = name

        if not workflow.get("is_task", False):
            workflow["is_task"] = False

        if has_table_column(table_name="workflows", column_name="is_task", conn=conn):
            query = """
                    WITH new_workflow AS (
                        INSERT INTO workflows (workflow_id, name, target, is_task, description)
                            VALUES (:workflow_id, :name, :target, :is_task, :description)
                            RETURNING workflow_id)
                    INSERT
                    INTO products_workflows (product_id, workflow_id)
                    SELECT p.product_id,
                           nw.workflow_id
                    FROM products AS p
                             CROSS JOIN new_workflow AS nw
                    WHERE p.tag = :tag
                      AND p.name LIKE :search_phrase
                    """
        else:
            # Remove is_task from workflow dict and insert SQL
            workflow = {k: v for k, v in workflow.items() if k != "is_task"}
            query = """
                    WITH new_workflow AS (
                        INSERT INTO workflows (workflow_id, name, target, description)
                            VALUES (:workflow_id, :name, :target, :description)
                            RETURNING workflow_id)
                    INSERT
                    INTO products_workflows (product_id, workflow_id)
                    SELECT p.product_id,
                           nw.workflow_id
                    FROM products AS p
                             CROSS JOIN new_workflow AS nw
                    WHERE p.tag = :tag
                      AND p.name LIKE :search_phrase
                    """

        conn.execute(sa.text(query), workflow)

delete

delete(conn: sqlalchemy.engine.Connection, obsolete: dict) -> None

Delete multiple products, product_blocks, resources, and workflows.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • obsolete (dict) –

    a dict with everything you want to delete

Example

obsolete = [ "products": [ "Example Product", "Example Product 2" ], "product_blocks": [ "Example Product Block", "Example Product Block 2" ], "resources": [ "resource_type4, "resource_type5" ], "workflows": [ "workflow_name_a", "workflow_name_b", ] ]

Source code in orchestrator/migrations/helpers.py
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
def delete(conn: sa.engine.Connection, obsolete: dict) -> None:
    """Delete multiple products, product_blocks, resources, and workflows.

    Args:
        conn: DB connection as available in migration main file
        obsolete: a dict with everything you want to delete

    Example:
        >>> obsolete = [
                "products": [
                    "Example Product",
                    "Example Product 2"
                ],
                "product_blocks": [
                    "Example Product Block",
                    "Example Product Block 2"
                ],
                "resources": [
                        "resource_type4,
                        "resource_type5"
                ],
                "workflows": [
                    "workflow_name_a",
                    "workflow_name_b",
                ]
            ]
    """
    if "resource_types" in obsolete:
        for res_type in obsolete["resource_types"]:
            delete_resource_type(conn, res_type)

    if "product_blocks" in obsolete:
        for product_block_name in obsolete["product_blocks"]:
            delete_product_block(conn, product_block_name)

    if "products" in obsolete:
        for product in obsolete["products"]:
            delete_product(conn, product)

    if "workflows" in obsolete:
        for workflow in obsolete["workflows"]:
            delete_workflow(conn, workflow)

delete_product

delete_product(conn: sqlalchemy.engine.Connection, name: str) -> None

Delete a product and it's occurrences in workflows and product_blocks.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • name (str) –

    a product name you want to delete

Example

obsolete_stuff = "name_1" delete_product(conn, obsolete_stuff)

Source code in orchestrator/migrations/helpers.py
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
def delete_product(conn: sa.engine.Connection, name: str) -> None:
    """Delete a product and it's occurrences in workflows and product_blocks.

    Args:
        conn: DB connection as available in migration main file
        name: a product name you want to delete

    Example:
        >>> obsolete_stuff = "name_1"
        >>> delete_product(conn, obsolete_stuff)
    """
    conn.execute(
        sa.text(
            """
            WITH deleted_p AS (
                DELETE FROM products WHERE name = :name
                    RETURNING product_id),
                 deleted_p_pb AS (
                     DELETE FROM product_product_blocks WHERE product_id IN (SELECT product_id FROM deleted_p)),
                 deleted_pb_rt AS (
                     DELETE FROM products_workflows WHERE product_id IN (SELECT product_id FROM deleted_p))
            SELECT *
            from deleted_p;
            """
        ),
        {"name": name},
    )

delete_product_block

delete_product_block(conn: sqlalchemy.engine.Connection, name: str) -> None

Delete a product block and it's occurrences in resource types and products.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • name (str) –

    a product_block name you want to delete

Example

obsolete_stuff = "name_1" delete_product_block(conn, obsolete_stuff)

Source code in orchestrator/migrations/helpers.py
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
def delete_product_block(conn: sa.engine.Connection, name: str) -> None:
    """Delete a product block and it's occurrences in resource types and products.

    Args:
        conn: DB connection as available in migration main file
        name: a product_block name you want to delete

    Example:
        >>> obsolete_stuff = "name_1"
        >>> delete_product_block(conn, obsolete_stuff)
    """
    conn.execute(
        sa.text(
            """
            WITH deleted_pb AS (
                DELETE FROM product_blocks WHERE name = :name
                    RETURNING product_block_id),
                 deleted_p_pb AS (
                     DELETE FROM product_product_blocks WHERE product_block_id IN (SELECT product_block_id FROM deleted_pb)),
                 deleted_pb_rt AS (
                     DELETE FROM product_block_resource_types WHERE product_block_id IN (SELECT product_block_id FROM deleted_pb))
            SELECT *
            from deleted_pb;
            """
        ),
        {"name": name},
    )

delete_resource_type

delete_resource_type(
    conn: sqlalchemy.engine.Connection, resource_type: str
) -> None

Delete a resource type and it's occurrences in product blocks and products.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • resource_type (str) –

    a resource_type name you want to delete

Example

resource_type = "name_1" delete_product_block(conn, resource_type)

Source code in orchestrator/migrations/helpers.py
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
def delete_resource_type(conn: sa.engine.Connection, resource_type: str) -> None:
    """Delete a resource type and it's occurrences in product blocks and products.

    Args:
        conn: DB connection as available in migration main file
        resource_type: a resource_type name you want to delete

    Example:
        >>> resource_type = "name_1"
        >>> delete_product_block(conn, resource_type)
    """
    conn.execute(
        sa.text(
            """
            WITH deleted_pb AS (
                DELETE FROM resource_types WHERE resource_type = :resource_type
                    RETURNING resource_type_id),
                 deleted_pb_rt AS (
                     DELETE FROM product_block_resource_types WHERE resource_type_id IN (SELECT resource_type_id FROM deleted_pb))
            SELECT *
            from deleted_pb;
            """
        ),
        {"resource_type": resource_type},
    )

delete_resource_type_by_id

delete_resource_type_by_id(
    conn: sqlalchemy.engine.Connection,
    id: uuid.UUID | pydantic_forms.types.UUIDstr,
) -> None

Delete resource type by resource type id.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the resource type to delete.

Usage:

resource_type_id = "id"
delete_resource_type_by_id(conn, resource_type_id)

Source code in orchestrator/migrations/helpers.py
866
867
868
869
870
871
872
873
874
875
876
877
878
879
def delete_resource_type_by_id(conn: sa.engine.Connection, id: UUID | UUIDstr) -> None:
    """Delete resource type by resource type id.

    Args:
        conn: DB connection as available in migration main file.
        id: ID of the resource type to delete.

    Usage:
    ```python
    resource_type_id = "id"
    delete_resource_type_by_id(conn, resource_type_id)
    ```
    """
    conn.execute(sa.text("DELETE FROM resource_types WHERE resource_type_id=:id"), {"id": id})

delete_resource_types

delete_resource_types(
    conn: sqlalchemy.engine.Connection, delete: collections.abc.Iterable
) -> None

Delete a resource type and it's occurrences in product blocks.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • delete (collections.abc.Iterable) –

    list of resource_type names you want to delete

Example

obsolete_stuff = ["name_1", "name_2"] delete_resource_types(conn, obsolete_stuff)

Source code in orchestrator/migrations/helpers.py
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
def delete_resource_types(conn: sa.engine.Connection, delete: Iterable) -> None:
    """Delete a resource type and it's occurrences in product blocks.

    Args:
        conn: DB connection as available in migration main file
        delete: list of resource_type names you want to delete

    Example:
        >>> obsolete_stuff = ["name_1", "name_2"]
        >>> delete_resource_types(conn, obsolete_stuff)
    """
    conn.execute(
        sa.text(
            """DELETE
               FROM product_block_resource_types
                   USING resource_types
               WHERE resource_types.resource_type_id = product_block_resource_types.resource_type_id
                 AND resource_types.resource_type = ANY (:obsolete)"""
        ),
        {"obsolete": tuple(delete)},
    )
    conn.execute(sa.text("DELETE FROM resource_types WHERE resource_type in :obsolete;"), {"obsolete": list(delete)})

delete_resource_types_from_product_blocks

delete_resource_types_from_product_blocks(
    conn: sqlalchemy.engine.Connection, delete: dict
) -> None

Delete resource type from product blocks.

Note: the resource_type itself will not be deleted.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • delete (dict) –

    dict of product_blocks and resource_types names that you want to unlink

Example

obsolete_stuff = { "ProductBlockName1": { "resource_type1": "Resource description" }, "ProductBlockName2": { "resource_type1": "Resource description", "resource_type1": "Resource description" } } delete_resource_types_from_product_blocks(conn, obsolete_stuff)

Source code in orchestrator/migrations/helpers.py
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
def delete_resource_types_from_product_blocks(conn: sa.engine.Connection, delete: dict) -> None:
    """Delete resource type from product blocks.

    Note: the resource_type itself will not be deleted.

    Args:
        conn: DB connection as available in migration main file
        delete: dict of product_blocks and resource_types names that you want to unlink

    Example:
        >>> obsolete_stuff = {
            "ProductBlockName1": {
                "resource_type1": "Resource description"
            },
            "ProductBlockName2": {
                "resource_type1": "Resource description",
                "resource_type1": "Resource description"
            }
        }
        >>> delete_resource_types_from_product_blocks(conn, obsolete_stuff)
    """
    for product_block_name, resource_types in delete.items():
        conn.execute(
            sa.text(
                """DELETE
                   FROM product_block_resource_types
                       USING resource_types
                   WHERE
                       product_block_id = (SELECT product_block_id FROM product_blocks WHERE name = :product_block_name)
                     AND resource_types.resource_type_id = product_block_resource_types.resource_type_id
                     AND resource_types.resource_type = ANY (:obsolete_resource_types)"""
            ),
            {
                "product_block_name": product_block_name,
                "obsolete_resource_types": list(resource_types.keys()),
            },
        )

delete_workflow

delete_workflow(conn: sqlalchemy.engine.Connection, name: str) -> None

Delete a workflow and its occurrences in products.

Note: the cascading delete rules in postgres will also ensure removal from products_workflows.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file

  • name (str) –

    a workflow name you want to delete

Example

obsolete_stuff = "name_1" delete_workflow(conn, obsolete_stuff)

Source code in orchestrator/migrations/helpers.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
def delete_workflow(conn: sa.engine.Connection, name: str) -> None:
    """Delete a workflow and its occurrences in products.

    Note: the cascading delete rules in postgres will also ensure removal from `products_workflows`.

    Args:
        conn: DB connection as available in migration main file
        name: a workflow name you want to delete

    Example:
        >>> obsolete_stuff = "name_1"
        >>> delete_workflow(conn, obsolete_stuff)
    """

    conn.execute(
        sa.text(
            """
            DELETE
            FROM workflows
            WHERE name = :name;
            """
        ),
        {"name": name},
    )

ensure_default_workflows

ensure_default_workflows(conn: sqlalchemy.engine.Connection) -> None

Ensure products_workflows table contains a link between all 'active' workflows and the set of workflows identified in the DEFAULT_PRODUCT_WORKFLOWS app_setting.

Note that the 0th element of the uuids are taken when generating product_workflow_table_rows because sqlalchemy returns a row tuple even if selecting for a single column.

Source code in orchestrator/migrations/helpers.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def ensure_default_workflows(conn: sa.engine.Connection) -> None:
    """Ensure products_workflows table contains a link between all 'active' workflows and the set of workflows identified in the DEFAULT_PRODUCT_WORKFLOWS app_setting.

    Note that the 0th element of the uuids are taken when generating product_workflow_table_rows because sqlalchemy returns a row tuple even if selecting for a single column.
    """
    products = sa.Table("products", sa.MetaData(), autoload_with=conn)
    workflows = sa.Table("workflows", sa.MetaData(), autoload_with=conn)
    product_workflows_table = sa.Table("products_workflows", sa.MetaData(), autoload_with=conn)

    all_product_uuids = conn.execute(sa.select(products.c.product_id)).fetchall()
    default_workflow_ids = conn.execute(
        sa.select(workflows.c.workflow_id).where(workflows.c.name.in_(app_settings.DEFAULT_PRODUCT_WORKFLOWS))
    ).fetchall()
    product_workflow_table_rows = [
        (product_uuid[0], workflow_uuid[0])
        for product_uuid in all_product_uuids
        for workflow_uuid in default_workflow_ids
    ]
    conn.execute(
        sa.dialects.postgresql.insert(product_workflows_table)
        .values(product_workflow_table_rows)
        .on_conflict_do_nothing(index_elements=("product_id", "workflow_id"))
    )

get_all_active_products_and_ids

get_all_active_products_and_ids(
    conn: sqlalchemy.engine.Connection,
) -> list[dict[str, UUID | UUIDstr | str]]

Return a list, with dicts containing keys product_id and name of active products.

Source code in orchestrator/migrations/helpers.py
120
121
122
123
def get_all_active_products_and_ids(conn: sa.engine.Connection) -> list[dict[str, UUID | UUIDstr | str]]:
    """Return a list, with dicts containing keys `product_id` and `name` of active products."""
    result = conn.execute(sa.text("SELECT product_id, name  FROM products WHERE status='active'"))
    return [{"product_id": row[0], "name": row[1]} for row in result.fetchall()]

has_table_column

has_table_column(
    table_name: str, column_name: str, conn: sqlalchemy.engine.Connection
) -> bool

Checks if the specified column exists in a given table.

inspector.get_columns raises an exception if the table does not exist, so we catch that exception and return False. This is useful for migrations where you want to ensure that a column exists before performing operations on it.

:param table_name: Name of the database table :param column_name: Name of the column to check :param conn: SQLAlchemy database Connection :return: True if the column exists, False otherwise

Source code in orchestrator/migrations/helpers.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def has_table_column(table_name: str, column_name: str, conn: sa.engine.Connection) -> bool:
    """Checks if the specified column exists in a given table.

    inspector.get_columns raises an exception if the table does not exist, so we catch that exception and return False.
    This is useful for migrations where you want to ensure that a column exists before performing operations on it.

    :param table_name: Name of the database table
    :param column_name: Name of the column to check
    :param conn: SQLAlchemy database Connection
    :return: True if the column exists, False otherwise
    """
    result = conn.execute(
        sa.text(
            """
            SELECT column_name
            FROM information_schema.columns
            WHERE table_name = :table_name and column_name = :column_name
            """
        ),
        {
            "table_name": table_name,
            "column_name": column_name,
        },
    )
    return result.first() is not None

insert_resource_type

insert_resource_type(
    conn: sqlalchemy.engine.Connection, resource_type: str, description: str
) -> None

Create a new resource types.

Source code in orchestrator/migrations/helpers.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def insert_resource_type(conn: sa.engine.Connection, resource_type: str, description: str) -> None:
    """Create a new resource types."""
    conn.execute(
        sa.text(
            """INSERT INTO resource_types (resource_type, description)
               VALUES (:resource_type, :description)
               ON CONFLICT DO NOTHING;"""
        ),
        {
            "resource_type": resource_type,
            "description": description,
        },
    )

remove_product_block_relation_between_products_by_id

remove_product_block_relation_between_products_by_id(
    conn: sqlalchemy.engine.Connection,
    in_use_by_id: uuid.UUID | pydantic_forms.types.UUIDstr,
    depends_on_id: uuid.UUID | pydantic_forms.types.UUIDstr,
) -> None

Remove product block relation by id.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • in_use_by_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the product block that uses another product block.

  • depends_on_id (uuid.UUID | pydantic_forms.types.UUIDstr) –

    ID of the product block that is used as dependency.

Usage

in_use_by_id = "in_use_by_id" depends_on_id = "depends_on_id" remove_product_block_relation_between_products_by_id( conn, in_use_by_id, depends_on_id )

Source code in orchestrator/migrations/helpers.py
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
def remove_product_block_relation_between_products_by_id(
    conn: sa.engine.Connection, in_use_by_id: UUID | UUIDstr, depends_on_id: UUID | UUIDstr
) -> None:
    """Remove product block relation by id.

    Args:
        conn: DB connection as available in migration main file.
        in_use_by_id: ID of the product block that uses another product block.
        depends_on_id: ID of the product block that is used as dependency.

    Usage:
        >>> in_use_by_id = "in_use_by_id"
        >>> depends_on_id = "depends_on_id"
        >>> remove_product_block_relation_between_products_by_id(
            conn, in_use_by_id, depends_on_id
        )
    """

    conn.execute(
        sa.text(
            """
            DELETE
            FROM product_block_relations
            WHERE in_use_by_id = :in_use_by_id
              AND depends_on_id = :depends_on_id
            """
        ),
        {
            "in_use_by_id": in_use_by_id,
            "depends_on_id": depends_on_id,
        },
    )

remove_products_from_workflow_by_product_tag

remove_products_from_workflow_by_product_tag(
    conn: sqlalchemy.engine.Connection,
    workflow_name: str,
    product_tag: str,
    product_name_like: str = "%%",
) -> None

Delete products from a workflow by product tag.

Parameters:

  • conn (sqlalchemy.engine.Connection) –

    DB connection as available in migration main file.

  • workflow_name (str) –

    Name of the workflow that the products need to be removed from.

  • product_tag (str) –

    Tag of the product to remove from the workflow.

  • product_name_like (optional, default: '%%' ) –

    Part of the product name to get more specific products (necessary for fw v2)

Usage:

product_tag = "product_tag"
workflow_name = "workflow_name"
remove_products_from_workflow_by_product_tag(conn, product_tag, workflow_name)

Source code in orchestrator/migrations/helpers.py
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def remove_products_from_workflow_by_product_tag(
    conn: sa.engine.Connection, workflow_name: str, product_tag: str, product_name_like: str = "%%"
) -> None:
    """Delete products from a workflow by product tag.

    Args:
        conn: DB connection as available in migration main file.
        workflow_name: Name of the workflow that the products need to be removed from.
        product_tag: Tag of the product to remove from the workflow.
        product_name_like (optional): Part of the product name to get more specific products (necessary for fw v2)

    Usage:
    ```python
    product_tag = "product_tag"
    workflow_name = "workflow_name"
    remove_products_from_workflow_by_product_tag(conn, product_tag, workflow_name)
    ```
    """

    conn.execute(
        sa.text(
            """
            DELETE
            FROM products_workflows
            WHERE workflow_id = (SELECT workflow_id
                                 FROM workflows
                                 where name = :workflow_name)
              AND product_id IN (SELECT product_id
                                 FROM products
                                 WHERE tag = :product_tag
                                   AND name LIKE :product_name_like)
            """
        ),
        {
            "workflow_name": workflow_name,
            "product_tag": product_tag,
            "product_name_like": product_name_like,
        },
    )