Skip to content

Command Line Interface Commands

This page documents CLI commands available in orchestrator-core.

The syntax of a CLI command is:

python main.py <command> <sub_command>

Where:

  • <command> is one of the top-level headings in this page
  • <sub_command> is one of the secondary headings in this page

Some examples:

python main.py db migrate_tasks

python main.py generate workflows

python main.py scheduler show-schedule

Each command can also be run with --help to get information directly in the CLI.

Top level options:

--install-completion [bash|zsh|fish|powershell|pwsh]

Install completion for the specified shell. [default: None]

--show-completion [bash|zsh|fish|powershell|pwsh]

Show completion for the specified shell, to copy it or customize the installation. [default: None]

db

Interact with the application database. By default, does nothing, specify main.py db --help for more information.

downgrade

The downgrade command will downgrade the database to the previous revision or to the optionally specified revision.

CLI Options
Arguments:
    [REVISION]  Rev id to upgrade to  [default: -1]
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@app.command()
def downgrade(revision: str = typer.Argument("-1", help="Rev id to downgrade to")) -> None:
    """The `downgrade` command will downgrade the database to the previous revision or to the optionally specified revision.

    Args:
        revision (str, optional): Optional argument to indicate where to downgrade to. [default: -1]

    Returns:
        None

    CLI Options:
        ```sh
        Arguments:
            [REVISION]  Rev id to upgrade to  [default: -1]
        ```

    """
    command.downgrade(alembic_cfg(), revision)

heads

The heads command command shows the Alembic database heads.

CLI Options

None

Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
112
113
114
115
116
117
118
119
120
@app.command(help="Get the database heads")
def heads() -> None:
    """The `heads` command command shows the Alembic database heads.

    CLI Options:
        None

    """
    command.heads(alembic_cfg())

history

The history command lists Alembic revision history/changeset scripts in chronological order.

CLI Options
Options:
    -v, --verbose  Verbose output
    -c, --current  Indicate current revision
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@app.command()
def history(
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
    indicate_current: bool = typer.Option(True, "--current", "-c", help="Indicate current revision"),
) -> None:
    """The `history` command lists Alembic revision history/changeset scripts in chronological order.

    Args:
        verbose (bool, optional): Verbose output
        indicate_current (bool, optional): Indicate current revision

    Returns:
        None

    CLI Options:
        ```sh
        Options:
            -v, --verbose  Verbose output
            -c, --current  Indicate current revision
        ```
    """
    command.history(alembic_cfg(), verbose=verbose, indicate_current=indicate_current)

init

Initialize the migrations directory.

This command will initialize a migration directory for the orchestrator core application and setup a correct migration environment. It will also throw an exception when it detects conflicting files and directories.

CLI Options

None

Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@app.command(
    help="Initialize an empty migrations environment. This command will throw an exception when it detects conflicting files and directories."
)
def init() -> None:
    """Initialize the `migrations` directory.

    This command will initialize a migration directory for the orchestrator core application and setup a correct
    migration environment. It will also throw an exception when it detects conflicting files and directories.

    Returns:
        None

    CLI Options:
        None

    """

    if os.access(migration_dir, os.F_OK) and os.listdir(migration_dir):
        raise OSError(f"Directory {migration_dir} already exists and is not empty")

    logger.info("Creating directory", directory=os.path.abspath(migration_dir))
    os.makedirs(migration_dir)
    versions = os.path.join(migration_dir, "versions")
    logger.info("Creating directory", directory=os.path.abspath(versions))
    os.makedirs(versions)
    versions_schema = os.path.join(migration_dir, "versions/schema")
    logger.info("Creating directory", directory=os.path.abspath(versions_schema))
    os.makedirs(versions_schema)

    source_env_py = os.path.join(orchestrator_module_location, f"{migration_dir}/templates/env.py.j2")
    env_py = os.path.join(migration_dir, "env.py")
    logger.info("Creating file", file=os.path.abspath(env_py))
    copyfile(source_env_py, env_py)

    source_script_py_mako = os.path.join(orchestrator_module_location, f"{migration_dir}/script.py.mako")
    script_py_mako = os.path.join(migration_dir, "script.py.mako")
    logger.info("Creating file", file=os.path.abspath(script_py_mako))
    copyfile(source_script_py_mako, script_py_mako)

    source_helpers_py = os.path.join(orchestrator_module_location, f"{migration_dir}/templates/helpers.py.j2")
    helpers_py = os.path.join(migration_dir, "helpers.py")
    logger.info("Creating file", file=os.path.abspath(helpers_py))
    copyfile(source_helpers_py, helpers_py)

    template = jinja_env.get_template("alembic.ini.j2")

    if not os.access(os.path.join(os.getcwd(), "alembic.ini"), os.F_OK):
        logger.info("Creating file", file=os.path.join(os.getcwd(), "alembic.ini"))
        with open(os.path.join(os.getcwd(), "alembic.ini"), "w") as alembic_ini:
            alembic_ini.write(template.render(migrations_dir=migration_dir))
    else:
        logger.info("Skipping Alembic.ini file. It already exists")

merge

Merge database revisions.

It is possible when using multiple git branches in your WFO development lifecycle to have multiple Alembic heads emerge. This command will allow you to merge those two (or more) heads to resolve the issue. You also might need to run this after updating your version of orchestrator-core if there have been schema changes.

Read More Here

CLI Options
Arguments:
    [REVISIONS]  Add the revision you would like to merge to this command.

Options:
    -m, --message TEXT  The revision message
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@app.command(help="Merge database revisions.")
def merge(
    revisions: str = typer.Argument(default=None, help="Add the revision you would like to merge to this command."),
    message: str = typer.Option(None, "--message", "-m", help="The revision message"),
) -> None:
    """Merge database revisions.

    It is possible when using multiple git branches in your WFO development lifecycle to have
    multiple Alembic heads emerge. This command will allow you to merge those two (or more)
    heads to resolve the issue. You also might need to run this after updating your version
    of orchestrator-core if there have been schema changes.

    [Read More Here](https://alembic.sqlalchemy.org/en/latest/branches.html#merging-branches)

    Args:
        revisions: List of revisions to merge
        message: Optional message for the revision.

    Returns:
        None

    CLI Options:
        ```sh
        Arguments:
            [REVISIONS]  Add the revision you would like to merge to this command.

        Options:
            -m, --message TEXT  The revision message
        ```
    """
    command.merge(alembic_cfg(), revisions, message=message)

migrate_workflows

The migrate-workflows command creates a migration file based on the difference between workflows in the database and registered WorkflowInstances in your codebase.

BACKUP YOUR DATABASE BEFORE USING THE MIGRATION!

You will be prompted with inputs for new models and resource type updates. Resource type updates are only handled when it’s renamed in all product blocks.

Returns None unless --test is used, in which case it returns: - tuple: - list of upgrade SQL statements in string format. - list of downgrade SQL statements in string format.

CLI Arguments
Arguments:
    MESSAGE  Migration name  [required]

Options:
    --test / --no-test  Optional boolean if you don't want to generate a migration
    file  [default: no-test]
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
@app.command(help="Create migration file based on diff workflows in db")
def migrate_workflows(
    message: str = typer.Argument(..., help="Migration name"),
    test: bool = typer.Option(False, help="Optional boolean if you don't want to generate a migration file"),
) -> tuple[list[dict], list[dict]] | None:
    """The `migrate-workflows` command creates a migration file based on the difference between workflows in the database and registered WorkflowInstances in your codebase.

    !!! warning "BACKUP YOUR DATABASE BEFORE USING THE MIGRATION!"

    You will be prompted with inputs for new models and resource type updates.
    Resource type updates are only handled when it's renamed in all product blocks.

    Args:
        message: Message/description of the generated migration.
        test: Optional boolean if you don't want to generate a migration file.

    Returns None unless `--test` is used, in which case it returns:
        - tuple:
            - list of upgrade SQL statements in string format.
            - list of downgrade SQL statements in string format.

    CLI Arguments:
        ```sh
        Arguments:
            MESSAGE  Migration name  [required]

        Options:
            --test / --no-test  Optional boolean if you don't want to generate a migration
            file  [default: no-test]
        ```
    """
    if not app_settings.TESTING:
        init_database(app_settings)

    if test:
        print(  # noqa: T001, T201
            f"{str_fmt('NOTE:', flags=[COLOR.BOLD, COLOR.CYAN])} Running in test mode. No migration file will be generated.\n"
        )

    workflows_to_add, workflows_to_delete = create_workflows_migration_wizard()

    # String 'template' arguments
    import_str = "from orchestrator.core.migrations.helpers import create_workflow, delete_workflow\n"
    tpl_preamble_lines = []
    tpl_upgrade_lines = []
    tpl_downgrade_lines = []

    if workflows_to_add:
        tpl_preamble_lines.append(f"new_workflows = {json.dumps(workflows_to_add, indent=4)}\n")
        tpl_upgrade_lines.extend(
            [(" " * 4) + "for workflow in new_workflows:", (" " * 8) + "create_workflow(conn, workflow)"]
        )
        tpl_downgrade_lines.extend(
            [(" " * 4) + "for workflow in new_workflows:", (" " * 8) + 'delete_workflow(conn, workflow["name"])']
        )

    if workflows_to_delete:
        tpl_preamble_lines.append(f"old_workflows = {json.dumps(workflows_to_delete, indent=4)}\n")
        tpl_upgrade_lines.extend(
            [(" " * 4) + "for workflow in old_workflows:", (" " * 8) + 'delete_workflow(conn, workflow["name"])']
        )
        tpl_downgrade_lines.extend(
            [(" " * 4) + "for workflow in old_workflows:", (" " * 8) + "create_workflow(conn, workflow)"]
        )

    preamble = "\n".join(
        [
            import_str,
            *tpl_preamble_lines,
        ]
    )
    sql_upgrade_str = "\n".join(tpl_upgrade_lines)
    sql_downgrade_str = "\n".join(tpl_downgrade_lines)

    if test:
        return workflows_to_add, workflows_to_delete

    create_migration_file(alembic_cfg(), sql_upgrade_str, sql_downgrade_str, message, preamble=preamble)
    return None

migrate_tasks

The migrate-tasks command creates a migration file based on the difference between tasks in the database and registered TaskInstances in your codebase.

BACKUP YOUR DATABASE BEFORE USING THE MIGRATION!

You will be prompted with inputs for new models and resource type updates. Resource type updates are only handled when it’s renamed in all product blocks.

Returns None unless --test is used, in which case it returns: - tuple: - list of upgrade SQL statements in string format. - list of downgrade SQL statements in string format.

CLI Arguments
Arguments:
    MESSAGE  Migration name  [required]

Options:
    --test / --no-test  Optional boolean if you don't want to generate a migration
    file  [default: no-test]
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
@app.command(help="Create migration file based on diff tasks in db")
def migrate_tasks(
    message: str = typer.Argument(..., help="Migration name"),
    test: bool = typer.Option(False, help="Optional boolean if you don't want to generate a migration file"),
) -> tuple[list[dict], list[dict]] | None:
    """The `migrate-tasks` command creates a migration file based on the difference between tasks in the database and registered TaskInstances in your codebase.

    !!! warning "BACKUP YOUR DATABASE BEFORE USING THE MIGRATION!"

    You will be prompted with inputs for new models and resource type updates.
    Resource type updates are only handled when it's renamed in all product blocks.

    Args:
        message: Message/description of the generated migration.
        test: Optional boolean if you don't want to generate a migration file.

    Returns None unless `--test` is used, in which case it returns:
        - tuple:
            - list of upgrade SQL statements in string format.
            - list of downgrade SQL statements in string format.

    CLI Arguments:
        ```sh
        Arguments:
            MESSAGE  Migration name  [required]

        Options:
            --test / --no-test  Optional boolean if you don't want to generate a migration
            file  [default: no-test]
        ```
    """
    if not app_settings.TESTING:
        init_database(app_settings)

    if test:
        print(  # noqa: T001, T201
            f"{str_fmt('NOTE:', flags=[COLOR.BOLD, COLOR.CYAN])} Running in test mode. No migration file will be generated.\n"
        )

    tasks_to_add, tasks_to_delete = create_tasks_migration_wizard()

    # String 'template' arguments
    import_str = "from orchestrator.core.migrations.helpers import create_task, delete_workflow\n"
    tpl_preamble_lines = []
    tpl_upgrade_lines = []
    tpl_downgrade_lines = []

    if tasks_to_add:
        tpl_preamble_lines.append(f"new_tasks = {json.dumps(tasks_to_add, indent=4)}\n")
        tpl_upgrade_lines.extend([(" " * 4) + "for task in new_tasks:", (" " * 8) + "create_task(conn, task)"])
        tpl_downgrade_lines.extend(
            [(" " * 4) + "for task in new_tasks:", (" " * 8) + 'delete_workflow(conn, task["name"])']
        )

    if tasks_to_delete:
        tpl_preamble_lines.append(f"old_tasks = {json.dumps(tasks_to_delete, indent=4)}\n")
        tpl_upgrade_lines.extend(
            [(" " * 4) + "for task in old_tasks:", (" " * 8) + 'delete_workflow(conn, task["name"])']
        )
        tpl_downgrade_lines.extend([(" " * 4) + "for task in old_tasks:", (" " * 8) + "create_task(conn, task)"])

    preamble = "\n".join(
        [
            import_str,
            *tpl_preamble_lines,
        ]
    )
    sql_upgrade_str = "\n".join(tpl_upgrade_lines)
    sql_downgrade_str = "\n".join(tpl_downgrade_lines)

    if test:
        return tasks_to_add, tasks_to_delete

    create_migration_file(alembic_cfg(), sql_upgrade_str, sql_downgrade_str, message, preamble=preamble)
    return None

revision

The revision command creates a new Alembic revision file.

CLI Options
Options:
    -m, --message TEXT              The revision message
    --version-path TEXT             Specify specific path from config for version file
    --autogenerate / --no-autogenerate
                                    Detect schema changes and add migrations [default: no-autogenerate]
    --head TEXT                     Determine the head you need to add your migration to.
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@app.command()
def revision(
    message: str = typer.Option(None, "--message", "-m", help="The revision message"),
    version_path: str = typer.Option(None, "--version-path", help="Specify specific path from config for version file"),
    autogenerate: bool = typer.Option(False, help="Detect schema changes and add migrations"),
    head: str = typer.Option("data@head", help="Determine the head you need to add your migration to."),
) -> None:
    """The `revision` command creates a new Alembic revision file.

    Args:
        message: The revision message
        version_path: Specify specific path from config for version file
        autogenerate: Whether to detect schema changes.
        head: To which head the migration applies

    Returns:
        None

    CLI Options:
        ```sh
        Options:
            -m, --message TEXT              The revision message
            --version-path TEXT             Specify specific path from config for version file
            --autogenerate / --no-autogenerate
                                            Detect schema changes and add migrations [default: no-autogenerate]
            --head TEXT                     Determine the head you need to add your migration to.
        ```
    """
    create_data_head_if_not_exists({"writer": create_writer(), "environment": get_template_environment()})
    command.revision(alembic_cfg(), message, version_path=version_path, autogenerate=autogenerate, head=head)

upgrade

The upgrade command will upgrade the database to the specified revision.

CLI Options
Arguments:
    [REVISION]  Rev id to upgrade to

Options:
    --help  Show this message and exit.
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/database.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@app.command()
def upgrade(revision: str = typer.Argument(help="Rev id to upgrade to")) -> None:
    """The `upgrade` command will upgrade the database to the specified revision.

    Args:
        revision: Optional argument to indicate where to upgrade to.

    Returns:
        None

    CLI Options:
        ```sh
        Arguments:
            [REVISION]  Rev id to upgrade to

        Options:
            --help  Show this message and exit.
        ```

    """
    command.upgrade(alembic_cfg(), revision)

migrate-domain-models

The migrate-domain-models CLI command is used to automatically generate the data migrations that you’ll need when you add or change a Domain Model. It will inspect your DB and the existing domain models, analyse the differences and it will generate an Alembic data migration in the correct folder.

Features:

  • detect a new Domain Model attribute / resource type
  • detect a renamed Domain Model attribute / resource type
  • detect a removed Domain Model attribute / resource type
  • detect a new Domain Model
  • detect a removed Domain Model
  • ability to ask for human input when needed

Below in the documentation these features are discussed in more detail.

BACKUP DATABASE BEFORE USING THE MIGRATION!

Arguments

  • message: Message/description of the generated migration.
  • --test: Optional boolean if you don’t want to generate a migration file.
  • --inputs: stringified dict to prefill inputs. The inputs and updates argument is mostly used for testing, prefilling the given inputs, here examples:
    • new product: inputs = { "new_product_name": { "description": "add description", "product_type": "add_type", "tag": "add_tag" }}
    • new product fixed input: inputs = { "new_product_name": { "new_fixed_input_name": "value" }}
    • new product block: inputs = { "new_product_block_name": { "description": "add description", "tag": "add_tag" } }
    • new resource type: inputs = { "new_resource_type_name": { "description": "add description", "value": "add default value", "new_product_block_name": "add default value for block" }}
      • new_product_block_name prop inserts value specifically for that block.
      • value prop is inserted as default for all existing instances it is added to.
  • --updates: stringified dict to prefill inputs.
    • renaming a fixed input:
      • updates = { "fixed_inputs": { "product_name": { "old_fixed_input_name": "new_fixed_input_name" } } }
    • renaming a resource type to a new resource type:
      • inputs = { "new_resource_type_name": { "description": "add description" }}
      • updates = { "resource_types": { "old_resource_type_name": "new_resource_type_name" } }
    • renaming a resource type to existing resource type: updates = { "resource_types": { "old_resource_type_name": "new_resource_type_name" } }

Example

You need products in the SUBSCRIPTION_MODEL_REGISTRY, for this example I will use these models (taken out of example-orchestrator):

UserGroup Block
from orchestrator.core.domain.base import SubscriptionModel, ProductBlockModel
from orchestrator.core.types import SubscriptionLifecycle


class UserGroupBlockInactive(
    ProductBlockModel,
    lifecycle=[SubscriptionLifecycle.INITIAL],
    product_block_name="UserGroupBlock",
):
    group_name: str | None = None
    group_id: int | None = None


class UserGroupBlockProvisioning(
    UserGroupBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    group_name: str
    group_id: int | None = None


class UserGroupBlock(
    UserGroupBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]
):
    group_name: str
    group_id: int
from orchestrator.domain.base import SubscriptionModel, ProductBlockModel
from orchestrator.types import SubscriptionLifecycle


class UserGroupBlockInactive(
    ProductBlockModel,
    lifecycle=[SubscriptionLifecycle.INITIAL],
    product_block_name="UserGroupBlock",
):
    group_name: str | None = None
    group_id: int | None = None


class UserGroupBlockProvisioning(
    UserGroupBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    group_name: str
    group_id: int | None = None


class UserGroupBlock(
    UserGroupBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]
):
    group_name: str
    group_id: int
UserGroup Product
from orchestrator.core.domain.base import SubscriptionModel
from orchestrator.core.types import SubscriptionLifecycle

from products.product_blocks.user_group import (
    UserGroupBlock,
    UserGroupBlockInactive,
    UserGroupBlockProvisioning,
)


class UserGroupInactive(
    SubscriptionModel, is_base=True, lifecycle=[SubscriptionLifecycle.INITIAL]
):
    settings: UserGroupBlockInactive


class UserGroupProvisioning(
    UserGroupInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    settings: UserGroupBlockProvisioning


class UserGroup(UserGroupProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    settings: UserGroupBlock
from orchestrator.domain.base import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle

from products.product_blocks.user_group import (
    UserGroupBlock,
    UserGroupBlockInactive,
    UserGroupBlockProvisioning,
)


class UserGroupInactive(
    SubscriptionModel, is_base=True, lifecycle=[SubscriptionLifecycle.INITIAL]
):
    settings: UserGroupBlockInactive


class UserGroupProvisioning(
    UserGroupInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    settings: UserGroupBlockProvisioning


class UserGroup(UserGroupProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    settings: UserGroupBlock
User Block
from orchestrator.core.domain.base import ProductBlockModel
from orchestrator.core.types import SubscriptionLifecycle

from products.product_blocks.user_group import (
    UserGroupBlock,
    UserGroupBlockInactive,
    UserGroupBlockProvisioning,
)


class UserBlockInactive(
    ProductBlockModel,
    lifecycle=[SubscriptionLifecycle.INITIAL],
    product_block_name="UserBlock",
):
    group: UserGroupBlockInactive
    username: str | None = None
    age: int | None = None
    user_id: int | None = None


class UserBlockProvisioning(
    UserBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    group: UserGroupBlockProvisioning
    username: str
    age: int | None = None
    user_id: int | None = None


class UserBlock(UserBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    group: UserGroupBlock
    username: str
    age: int | None = None
    user_id: int
from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle

from products.product_blocks.user_group import (
    UserGroupBlock,
    UserGroupBlockInactive,
    UserGroupBlockProvisioning,
)


class UserBlockInactive(
    ProductBlockModel,
    lifecycle=[SubscriptionLifecycle.INITIAL],
    product_block_name="UserBlock",
):
    group: UserGroupBlockInactive
    username: str | None = None
    age: int | None = None
    user_id: int | None = None


class UserBlockProvisioning(
    UserBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    group: UserGroupBlockProvisioning
    username: str
    age: int | None = None
    user_id: int | None = None


class UserBlock(UserBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    group: UserGroupBlock
    username: str
    age: int | None = None
    user_id: int
User Product
from orchestrator.core.domain.base import SubscriptionModel
from orchestrator.core.types import SubscriptionLifecycle, strEnum

from products.product_blocks.user import (
    UserBlock,
    UserBlockInactive,
    UserBlockProvisioning,
)


class Affiliation(strEnum):
    internal = "internal"
    external = "external"


class UserInactive(SubscriptionModel, is_base=True):
    affiliation: Affiliation
    settings: UserBlockInactive


class UserProvisioning(UserInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    affiliation: Affiliation
    settings: UserBlockProvisioning


class User(UserProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    affiliation: Affiliation
    settings: UserBlock
from orchestrator.domain.base import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, strEnum

from products.product_blocks.user import (
    UserBlock,
    UserBlockInactive,
    UserBlockProvisioning,
)


class Affiliation(strEnum):
    internal = "internal"
    external = "external"


class UserInactive(SubscriptionModel, is_base=True):
    affiliation: Affiliation
    settings: UserBlockInactive


class UserProvisioning(UserInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    affiliation: Affiliation
    settings: UserBlockProvisioning


class User(UserProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    affiliation: Affiliation
    settings: UserBlock
SUBSCRIPTION_MODEL_REGISTRY
from orchestrator.core.domain import SUBSCRIPTION_MODEL_REGISTRY

from products.product_types.user import User
from products.product_types.user_group import UserGroup

# Register models to actual definitions for deserialization purposes
SUBSCRIPTION_MODEL_REGISTRY.update(
    {
        "User group": UserGroup,
        "User internal": User,
        "User external": User,
    }
)
from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY

from products.product_types.user import User
from products.product_types.user_group import UserGroup

# Register models to actual definitions for deserialization purposes
SUBSCRIPTION_MODEL_REGISTRY.update(
    {
        "User group": UserGroup,
        "User internal": User,
        "User external": User,
    }
)

Running the command

  • only with a message

    python main.py db migrate-domain-models "message"
    

  • Running it as test

    python main.py db migrate-domain-models "message" --test
    

  • Running the command with inputs prefilled

    python main.py db migrate-domain-models "message" --inputs "{ "" }"
    

The command will first go through all products and map the differences with the database. debug log example:

2022-10-27 11:45:10 [debug] ProductTable blocks diff [orchestrator.core.domain.base] fixed_inputs_in_db=set() fixed_inputs_model=set() missing_fixed_inputs_in_db=set() missing_fixed_inputs_in_model=set() missing_product_blocks_in_db=set() missing_product_blocks_in_model=set() product_block_db=User group product_blocks_in_db={'UserGroupBlock'} product_blocks_in_model={'UserGroupBlock'}

You will be prompted with inputs when updates are found.

  • rename of resource type input (renaming age to user_age in User Block). Only works when the resource type is renamed in all Blocks:

    Update resource types
    Do you wish to rename resource type age to user_age? [y/N]:

  • rename of fixed input (renaming affiliation to affiliationing in User Product):

    Update fixed inputs
    Do you wish to rename fixed input affiliation to affiliationing for product User internal? [y/N]:

  • update of resource type per block (renaming age to user_age in User Block and not chosing to rename resource type). The input will loop until skipped or when there are no options anymore:

    • first you get to choose which old resource type to update, skip will create/delete all resource types. > Update block resource types
      Which resource type would you want to update in UserBlock Block?
      1) age
      q) skip
      ?
      
    • then you get to choose which new resource type to update with, skip will give you the first question again.
      Which resource type should update age?
      1) user_age
      q) skip
      ?
      
    • with 1 and 1, the log level difference would look like:
      2023-02-08 14:11:25 [info] update_block_resource_types [orchestrator.core.cli.migrate_domain_models] update_block_resource_types={'UserBlock': {'age': 'user_age'}}
      

It will log the differences on info level:

2022-10-27 11:45:10 [info] create_products                   [orchestrator.core.cli.migrate_domain_models] create_products={'User group': <class 'products.product_types.user_group.UserGroup'>, 'User internal': <class 'products.product_types.user.User'>, 'User external': <class 'products.product_types.user.User'>}
2022-10-27 11:45:10 [info] delete_products                   [orchestrator.core.cli.migrate_domain_models] delete_products=set()
2022-10-27 11:45:10 [info] create_product_fixed_inputs       [orchestrator.core.cli.migrate_domain_models] create_product_fixed_inputs={'affiliation': {'User external', 'User internal'}}
2022-10-27 11:45:10 [info] update_product_fixed_inputs       [orchestrator.core.cli.migrate_domain_models] update_product_fixed_inputs={}
2022-10-27 11:45:10 [info] delete_product_fixed_inputs       [orchestrator.core.cli.migrate_domain_models] delete_product_fixed_inputs={}
2022-10-27 11:45:10 [info] create_product_to_block_relations [orchestrator.core.cli.migrate_domain_models] create_product_to_block_relations={'UserGroupBlock': {'User group'}, 'UserBlock': {'User external', 'User internal'}}
2022-10-27 11:45:10 [info] delete_product_to_block_relations [orchestrator.core.cli.migrate_domain_models] delete_product_to_block_relations={}
2022-10-27 11:45:10 [info] create_resource_types             [orchestrator.core.cli.migrate_domain_models] create_resource_types={'username', 'age', 'group_name', 'user_id', 'group_id'}
2022-10-27 11:45:10 [info] rename_resource_types             [orchestrator.core.cli.migrate_domain_models] rename_resource_types={}
2022-10-27 11:45:10 [info] delete_resource_types             [orchestrator.core.cli.migrate_domain_models] delete_resource_types=set()
2022-10-27 11:45:10 [info] create_resource_type_relations    [orchestrator.core.cli.migrate_domain_models] create_resource_type_relations={'group_name': {'UserGroupBlock'}, 'group_id': {'UserGroupBlock'}, 'username': {'UserBlock'}, 'age': {'UserBlock'}, 'user_id': {'UserBlock'}}
2022-10-27 11:45:10 [info] delete_resource_type_relations    [orchestrator.core.cli.migrate_domain_models] delete_resource_type_relations={}
2022-10-27 11:45:10 [info] create_product_blocks             [orchestrator.core.cli.migrate_domain_models] create_blocks={'UserGroupBlock': <class 'products.product_blocks.user_group.UserGroupBlock'>, 'UserBlock': <class 'products.product_blocks.user.UserBlock'>}
2022-10-27 11:45:10 [info] delete_product_blocks             [orchestrator.core.cli.migrate_domain_models] delete_blocks=set()
2022-10-27 11:45:10 [info] create_product_block_relations    [orchestrator.core.cli.migrate_domain_models] create_product_block_relations={'UserGroupBlock': {'UserBlock'}}
2022-10-27 11:45:10 [info] delete_product_block_relations    [orchestrator.core.cli.migrate_domain_models] delete_product_block_relations={}

You will be asked to confirm the actions in order to continue:

WARNING: Deleting products will also delete its subscriptions.
Confirm the above actions [y/N]:

After confirming, it will start generating the SQL, logging the SQL on debug level and prompt the user for new resources:

  • new product example:

    Create new products
    Product: UserGroup User group
    Supply the production description: User group product
    Supply the product tag: GROUP

    2022-10-27 11:45:10 [debug] generated SQL [orchestrator.core.cli.domain_gen_helpers.helpers] sql_string=INSERT INTO products (name, description, product_type, tag, status) VALUES ('User group', 'User group product', 'UserGroup', 'GROUP', 'active') RETURNING products.product_id
    

  • new fixed input (the type isn’t checked, so typing an incorrect value will insert in db):

    Create fixed inputs
    Supply fixed input value for product User internal and fixed input affiliation: internal
    Supply fixed input value for product User external and fixed input affiliation: external

    2022-10-27 11:45:10 [debug] generated SQL [orchestrator.core.cli.domain_gen_helpers.helpers] sql_string=INSERT INTO fixed_inputs (name, value, product_id) VALUES ('affiliation', 'internal', (SELECT products.product_id FROM products WHERE products.name IN ('User internal'))), ('affiliation', 'external', (SELECT products.product_id FROM products WHERE products.name IN ('User external')))
    

  • new product block:

    Create product blocks
    Product block: UserGroupBlock
    Supply the product block description: User group settings
    Supply the product block tag: UGS

    2022-10-27 11:45:10 [debug] generated SQL [orchestrator.core.cli.domain_gen_helpers.helpers] sql_string=`#!sql INSERT INTO product_blocks (name, description, tag, status) VALUES ('UserGroupBlock', 'User group settings', 'UGS', 'active') RETURNING product_blocks.product_block_id`
    

  • new resource type:

    Create resource types
    Supply description for new resource type group_name: Unique name of user group

    2022-10-27 11:45:10 [debug] generated SQL [orchestrator.core.cli.domain_gen_helpers.helpers] sql_string=INSERT INTO resource_types (resource_type, description) VALUES ('group_name', 'Unique name of user group') RETURNING resource_types.resource_type_id
    

  • default value for resource type per product block (necessary for adding a default value to existing instances):

    Create subscription instance values
    Supply default subscription instance value for resource type group_name and product block UserGroupBlock: group

    2022-10-27 11:45:10 [debug] generated SQL [orchestrator.core.cli.domain_gen_helpers.resource_type_helpers] sql_string=
                    WITH subscription_instance_ids AS (
                        SELECT subscription_instances.subscription_instance_id
                        FROM   subscription_instances
                        WHERE  subscription_instances.product_block_id IN (
                            SELECT product_blocks.product_block_id
                            FROM   product_blocks
                            WHERE  product_blocks.name = 'UserGroupBlock'
                        )
                    )
    
                    INSERT INTO
                        subscription_instance_values (subscription_instance_id, resource_type_id, value)
                    SELECT
                        subscription_instance_ids.subscription_instance_id,
                        resource_types.resource_type_id,
                        'group'
                    FROM resource_types
                    CROSS JOIN subscription_instance_ids
                    WHERE resource_types.resource_type = 'group_name'
    

Last part generates the migration with the generated SQL:

Generating migration file
2022-10-27 11:45:10 [info] Version Locations [orchestrator.core.cli.database] locations=/home/tjeerddie/projects_surf/example-orchestrator/migrations/versions/schema /home/tjeerddie/projects_surf/example-orchestrator/.venv/lib/python3.10/site-packages/orchestrator/migrations/versions/schema
  Generating /home/tjeerddie/projects_surf/example-orchestrator/migrations/versions/schema/2022-10-27_a8946b2d1647_test.py ...  done
Migration generated. Don't forget to create a database backup before migrating!

If you are running with --test, the SQL file will not be generated.

generate

Generate products, workflows and other artifacts.

Products can be described in a YAML configuration file which makes it easy to generate product and product block domain models, and skeleton workflows and unit tests. Note that this is a one time thing, the generate commands do not support updating existing products, product-blocks, workflows and migrations, in this case have a look at the db migrate-domain-models and db migrate-workflows commands. But it does however help in defining new products with stakeholders, will generate code that conforms to current workfloworchestrator coding BCP, and will actually run (although limited in functionality of course).

After describing a new product in a configuration file, the following commands are typically run:

python main.py generate product-blocks
python main.py generate products
python main.py generate workflows
python main.py generate migration

The generate command should be called from the top level folder of your orchestrator implementation, this is the folder that contains the products sub folder, amongst others, except when the --prefix is used to point to that folder. In case there are product blocks defined that use other generated product blocks, the order in which generate product-blocks is run is important, the code for the blocks used in other blocks should be generated first.

config file

See the Generate Config File guide for full documentation on the YAML product configuration format used by the generate commands.

migration

The python main.py generate migration command creates a migration from a configuration file.

Options

–config-file - The configuration file [default: None] –python-version - Python version for generated code [default: 3.11] –skip-existing-blocks - If set, the migration will not contain product blocks for which a python implementation exists [default: False]

product

The python main.py generate product command creates a product domain model from a configuration file.

Options

–config-file - The configuration file [default: None] –dryrun | –no-dryrun - Dry run [default: dryrun] –force - Force overwrite of existing files –python-version - Python version for generated code [default: 3.11] –folder-prefix - Folder prefix, e.g. /workflows [default: None]

product-blocks

The python main.py generate product-blocks command creates product block domain models from a configuration file.

Options

–config-file - The configuration file [default: None] –dryrun | –no-dryrun - Dry run [default: dryrun] –force - Force overwrite of existing files –python-version - Python version for generated code [default: 3.11] –folder-prefix - Folder prefix, e.g. /workflows [default: None]

unit-tests

The python main.py generate unit-tests command creates unit tests from a configuration file.

Options

–config-file - The configuration file [default: None] –dryrun | –no-dryrun - Dry run [default: dryrun] –force - Force overwrite of existing files –python-version - Python version for generated code [default: 3.11] –tdd - Force test driven development with failing asserts [default: True]

workflows

The python main.py generate workflows command creates create, modify, terminate and validate workflows from a configuration file. The --custom-templates option can be used to specify a folder with custom templates to add additional import statements, input form fields and workflow steps to the create, modify and terminate workflows.

Options

–config-file - The configuration file [default: None] –dryrun | –no-dryrun - Dry run [default: dryrun] –force - Force overwrite of existing files –python-version - Python version for generated code [default: 3.11] –folder-prefix - Folder prefix, e.g. /workflows [default: None] –custom-templates - Custom templates folder [default: None]

Note

The workflows/__init__.py will only be extended with the needed LazyWorkflowInstance declarations when --force is used.

scheduler

Commands to interact with the scheduler and scheduled jobs.

run

Starts the scheduler in the foreground.

While running, this process will:

  • Periodically wake up when the next schedule is due for execution, and run it
  • Process schedule changes made through the schedule API
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/scheduler.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@app.command()
def run() -> None:
    """Starts the scheduler in the foreground.

    While running, this process will:

      * Periodically wake up when the next schedule is due for execution, and run it
      * Process schedule changes made through the schedule API
    """

    def _get_scheduled_task_item_from_queue(redis_conn: Redis) -> tuple[str, bytes] | None:
        """Get an item from the Redis Queue for scheduler tasks."""
        try:
            return redis_conn.brpop(SCHEDULER_QUEUE, timeout=1)
        except ConnectionError as e:
            typer.echo(f"There was a connection error with Redis. Retrying in 3 seconds... {e}")
            time.sleep(3)
        except Exception as e:
            typer.echo(f"There was an unexpected error with Redis. Retrying in 1 second... {e}")
            time.sleep(1)

        return None

    with get_scheduler() as scheduler_connection:
        redis_connection = create_redis_client(app_settings.CACHE_URI.get_secret_value())
        while True:
            item = _get_scheduled_task_item_from_queue(redis_connection)
            if not item:
                continue

            with db.database_scope():
                workflow_scheduler_queue(item, scheduler_connection)

force

Force the execution of (a) scheduler(s) based on a schedule ID.

Use the show-schedule command to determine the ID of the schedule to execute.

CLI Arguments
Arguments:
    SCHEDULE_ID  ID of the schedule to execute
Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/scheduler.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@app.command()
def force(task_id: str) -> None:
    """Force the execution of (a) scheduler(s) based on a schedule ID.

    Use the `show-schedule` command to determine the ID of the schedule to execute.

    CLI Arguments:
        ```sh
        Arguments:
            SCHEDULE_ID  ID of the schedule to execute
        ```
    """
    task = get_scheduler_task(task_id)

    if not task:
        typer.echo(f"Task '{task_id}' not found.")
        raise typer.Exit(code=1)

    typer.echo(f"Running Task [{task.id}] now...")
    try:
        task.func(*task.args or (), **task.kwargs or {})
        typer.echo("Task executed successfully.")
    except Exception as e:
        typer.echo(f"Task execution failed: {e}")
        raise typer.Exit(code=1)

show_schedule

The show-schedule command shows an overview of the scheduled jobs.

Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/scheduler.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@app.command()
def show_schedule() -> None:
    """The `show-schedule` command shows an overview of the scheduled jobs."""
    from rich.console import Console
    from rich.table import Table

    from orchestrator.core.schedules.service import get_linker_entries_by_schedule_ids

    console = Console()

    table = Table(title="Scheduled Tasks")
    table.add_column("id", no_wrap=True)
    table.add_column("name")
    table.add_column("source")
    table.add_column("next run time")
    table.add_column("trigger")

    scheduled_tasks = get_all_scheduler_tasks()
    _schedule_ids = [task.id for task in scheduled_tasks]
    api_managed = {str(i.schedule_id) for i in get_linker_entries_by_schedule_ids(_schedule_ids)}

    for task in scheduled_tasks:
        source = "API" if task.id in api_managed else "decorator"
        run_time = str(task.next_run_time.replace(microsecond=0))
        table.add_row(task.id, task.name, source, str(run_time), str(task.trigger))

    console.print(table)

load_initial_schedule

The load-initial-schedule command loads the initial schedule using the scheduler API.

The initial schedules are
  • Task Resume Workflows
  • Task Clean Up Tasks
  • Task Validate Subscriptions

This command is idempotent since 4.7.1 when the scheduler is running. The schedules are only created when they do not already exist in the database.

Source code in .venv/lib/python3.12/site-packages/orchestrator/core/cli/scheduler.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@app.command()
def load_initial_schedule() -> None:
    """The `load-initial-schedule` command loads the initial schedule using the scheduler API.

    The initial schedules are:
      - Task Resume Workflows
      - Task Clean Up Tasks
      - Task Validate Subscriptions

    This command is idempotent since 4.7.1 when the scheduler is running. The schedules are only
    created when they do not already exist in the database.
    """
    initial_schedules = [
        {
            "name": "Task Resume Workflows",
            "workflow_name": "task_resume_workflows",
            "workflow_id": "",
            "trigger": "interval",
            "trigger_kwargs": {"hours": 1},
        },
        {
            "name": "Task Clean Up Tasks",
            "workflow_name": "task_clean_up_tasks",
            "workflow_id": "",
            "trigger": "interval",
            "trigger_kwargs": {"hours": 6},
        },
        {
            "name": "Task Validate Subscriptions",
            "workflow_name": "task_validate_subscriptions",
            "workflow_id": "",
            "trigger": "cron",
            "trigger_kwargs": {"hour": 0, "minute": 10},
        },
        {
            "name": "Task Validate Products",
            "workflow_name": "task_validate_products",
            "workflow_id": "",
            "trigger": "cron",
            "trigger_kwargs": {"hour": 2, "minute": 30},
        },
    ]

    for schedule in initial_schedules:
        # enrich with workflow id
        workflow_name = cast(str, schedule.get("workflow_name"))
        workflow = get_workflow_by_name(workflow_name)

        if not workflow:
            typer.echo(f"Workflow '{schedule['workflow_name']}' not found. Skipping schedule.")
            continue

        schedule["workflow_id"] = workflow.workflow_id

        typer.echo(f"Initial Schedule: {schedule}")
        add_unique_scheduled_task_to_queue(APSchedulerJobCreate(**schedule))  # type: ignore