Skip to content

Workflow Steps

Workflows are what actually takes a product definition and populates your domain models. To read more about the architectural design of workflows check out the architecture page on workflows. To see more details about the types of steps that are available for use, read on to the next section.

Step Types

orchestrator.workflow

step

step(name: str) -> Callable[[StepFunc], Step]

Mark a function as a workflow step.

Source code in orchestrator/workflow.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def step(name: str) -> Callable[[StepFunc], Step]:
    """Mark a function as a workflow step."""

    def decorator(func: StepFunc) -> Step:
        @functools.wraps(func)
        def wrapper(state: State) -> Process:
            with bound_contextvars(func=func.__qualname__):
                step_in_inject_args = inject_args(func)
                try:
                    with transactional(db, logger):
                        result = step_in_inject_args(state)
                        return Success(result)
                except Exception as ex:
                    logger.warning("Step failed", exc_info=ex)
                    return Failed(ex)

        return make_step_function(wrapper, name)

    return decorator

retrystep

retrystep(name: str) -> Callable[[StepFunc], Step]

Mark a function as a retryable workflow step.

If this step fails it goes to Waiting were it will be retried periodically. If it Success it acts as a normal step.

Source code in orchestrator/workflow.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def retrystep(name: str) -> Callable[[StepFunc], Step]:
    """Mark a function as a retryable workflow step.

    If this step fails it goes to `Waiting` were it will be retried periodically. If it `Success` it acts as a normal
    step.
    """

    def decorator(func: StepFunc) -> Step:
        @functools.wraps(func)
        def wrapper(state: State) -> Process:
            with bound_contextvars(func=func.__qualname__):
                step_in_inject_args = inject_args(func)
                try:
                    with transactional(db, logger):
                        result = step_in_inject_args(state)
                        return Success(result)
                except Exception as ex:
                    return Waiting(ex)

        return make_step_function(wrapper, name)

    return decorator

inputstep

inputstep(name: str, assignee: Assignee) -> Callable[[InputStepFunc], Step]

Add user input step to workflow.

IMPORTANT: In contrast to other workflow steps, the @inputstep wrapped function will not run in the workflow engine! This means that it must be free of side effects!

Example::

@inputstep("User step", assignee=Assignee.NOC)
def user_step(state: State) -> FormGenerator:
    class Form(FormPage):
        name: str
    user_input = yield Form
    return {**user_input.model_dump(), "some extra key": True}
Source code in orchestrator/workflow.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def inputstep(name: str, assignee: Assignee) -> Callable[[InputStepFunc], Step]:
    """Add user input step to workflow.

    IMPORTANT: In contrast to other workflow steps, the `@inputstep` wrapped function will not run in the
    workflow engine! This means that it must be free of side effects!

    Example::

        @inputstep("User step", assignee=Assignee.NOC)
        def user_step(state: State) -> FormGenerator:
            class Form(FormPage):
                name: str
            user_input = yield Form
            return {**user_input.model_dump(), "some extra key": True}

    """

    def decorator(func: InputStepFunc) -> Step:
        def wrapper(state: State) -> FormGenerator:
            form_generator_in_form_inject_args = form_inject_args(func)

            form_generator = _handle_simple_input_form_generator(form_generator_in_form_inject_args)

            return form_generator(state)

        @functools.wraps(func)
        def suspend(state: State) -> Process:
            return Suspend(state)

        return make_step_function(suspend, name, wrapper, assignee)

    return decorator

conditional

conditional(p: Callable[[State], bool]) -> Callable[..., StepList]

Use a predicate to control whether a step is run.

Source code in orchestrator/workflow.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def conditional(p: Callable[[State], bool]) -> Callable[..., StepList]:
    """Use a predicate to control whether a step is run."""

    def _conditional(steps_or_func: StepList | Step) -> StepList:
        if isinstance(steps_or_func, Step):
            steps = StepList([steps_or_func])
        else:
            steps = steps_or_func

        def wrap(step: Step) -> Step:
            @functools.wraps(step)
            def wrapper(state: State) -> Process:
                return step(state) if p(state) else Skipped(state)

            return make_step_function(wrapper, step.name, step.form, step.assignee)

        return steps.map(wrap)

    return _conditional

Database Implications

Because a workflow is tied to a product type, this means that you will need a new database migration when adding new workflows. Thankfully, the CLI tool will help you with this! Check out the CLI docs for db migrate-workflows for more information.

Building a Step Function Signature

One important detail that is very helpful to understand is how your Step Function's python function signature is used to deserialize objects from the database into something you can use in your workflow python code. The WFO achieves this by introspecting the step function signature for the type hints you've defined and then tries to populate the appropriate objects for you. To understand the details of how this works, look at this method:

orchestrator.utils.state.inject_args

inject_args(func: StepFunc) -> Callable[[State], State]

Allow functions to specify values from the state dict as parameters named after the state keys.

Note

Domain models are subject to special processing (see: :ref:domain models processing <domain-models-processing>)

What this decorator does is better explained with an example than lots of text. So normally we do this::

def load_initial_state_for_modify(state: State) -> State:
    customer_id = state["customer_id"]
    subscription_id = state["subscription_id"]
    ....
    # build new_state
    ...
    return {**state, **new_state}

With this decorator we can do::

@inject_args
def load_initial_state_for_modify(customer_id: UUID, subscription_id: UUID) -> State:
    ....
    # build new_state
    ...
    return new_state

So any parameters specified to the step function are looked up in the state dict supplied by the step decorator and passed as values to the step function. The dict new_state returned by the step function will be merged with that of the original state dict and returned as the final result.

It knows how to deal with parameters that have a default. Eg, given::

@inject_args
def do_stuff_with_saps(subscription_id: UUID, sap1: Dict, sap2: Optional[Dict] = None) -> State:
    ....
    # build new_state
    ...
    return new_state

Both subscription_id and sap1 need to be present in the state. However, sap2 can be present but does not need to be. If it is not present in the state it will get the value None

.. _domain-models-processing:

Domain models as parameters are subject to special processing. Eg, given::

@inject_args
def do_stuff(light_path: Sn8LightPath) -> State:
    ...
    return {'light_path': light_path}  # <- required for any changes to be saved to the DB

Then the key 'light_path' is looked up in the state. If it is present, it is expected to be either:

  • a UUID (or str representation of a UUID)
  • a dictionary with at least a key 'subscription_id', representing a domain model.

It will use the UUID found to retrieve the domain model from the DB and inject it into the step function. None of the other data from the domain model (in case of it being a dict representation) will be used! At the end of the step function any domain models explicitly returned will be automatically saved to the DB; this includes any new domain models that might be created in the step and returned by the step. Hence, the automatic save is not limited to domain models requested as part of the step parameter list.

If the key light_path was not found in the state, the parameter is interpreted as a request to create a domain model of the given type. For that to work correctly the keys product and customer_id need to be present in the state. This will not work for more than one domain model. E.g. you can't request two domain models to be created as we will not know to which of the two domain models product is applicable to.

Also supported is wrapping a domain model in Optional or List. Other types are not supported.

Args: func: a step function with parameters (that should be keys into the state dict, except for optional ones)

Returns: The original state dict merged with the state that step function returned.

Source code in orchestrator/utils/state.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def inject_args(func: StepFunc) -> Callable[[State], State]:
    """Allow functions to specify values from the state dict as parameters named after the state keys.

    !!! note
        Domain models are subject to special processing (see: :ref:`domain models processing
        <domain-models-processing>`)

    What this decorator does is better explained with an example than lots of text. So normally we do this::

        def load_initial_state_for_modify(state: State) -> State:
            customer_id = state["customer_id"]
            subscription_id = state["subscription_id"]
            ....
            # build new_state
            ...
            return {**state, **new_state}

    With this decorator we can do::

        @inject_args
        def load_initial_state_for_modify(customer_id: UUID, subscription_id: UUID) -> State:
            ....
            # build new_state
            ...
            return new_state

    So any parameters specified to the step function are looked up in the `state` dict supplied by the `step` decorator
    and passed as values to the step function. The dict `new_state` returned by the step function will be merged with
    that of the original `state` dict and returned as the final result.

    It knows how to deal with parameters that have a default. Eg, given::

        @inject_args
        def do_stuff_with_saps(subscription_id: UUID, sap1: Dict, sap2: Optional[Dict] = None) -> State:
            ....
            # build new_state
            ...
            return new_state

    Both `subscription_id` and `sap1` need to be present in the state. However, `sap2` can be present but does not need
    to be. If it is not present in the state it will get the value `None`

    .. _domain-models-processing:

    Domain models as parameters are subject to special processing. Eg, given::

        @inject_args
        def do_stuff(light_path: Sn8LightPath) -> State:
            ...
            return {'light_path': light_path}  # <- required for any changes to be saved to the DB

    Then the key 'light_path' is looked up in the state. If it is present, it is expected to be either:

    - a UUID (or str representation of a UUID)
    - a dictionary with at least a key 'subscription_id', representing a domain model.

    It will use the UUID found to retrieve the domain model from the DB and inject it into the step function. None of
    the other data from the domain model (in case of it being a dict representation) will be used! At the end of the
    step function any domain models explicitly returned will be automatically saved to the DB; this includes any new
    domain models that might be created in the step and returned by the step. Hence, the automatic save is not limited
    to domain models requested as part of the step parameter list.

    If the key `light_path` was not found in the state, the parameter is interpreted as a request to create a
    domain model of the given type. For that to work correctly the keys `product` and `customer_id` need to be
    present in the state. This will not work for more than one domain model. E.g. you can't request two domain
    models to be created as we will not know to which of the two domain models `product` is applicable to.

    Also supported is wrapping a domain model in ``Optional`` or ``List``. Other types are not supported.

    Args:
        func: a step function with parameters (that should be keys into the state dict, except for optional ones)

    Returns:
        The original state dict merged with the state that step function returned.

    """

    @wraps(func)
    def wrapper(state: State) -> State:
        args = _build_arguments(func, state)
        new_state = func(*args)

        # Support step functions that don't return anything
        if new_state is None:
            new_state = {}

        _save_models(new_state)

        return {**state, **new_state}

    return wrapper