Workflow Steps
Workflows are what actually takes a product definition and populates your domain models. To read more about the architectural design of workflows check out the architecture page on workflows. To see more details about the types of steps that are available for use, read on to the next section.
Step Types
orchestrator.workflow
step
step(name: str) -> Callable[[StepFunc], Step]
Mark a function as a workflow step.
Source code in orchestrator/workflow.py
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
|
retrystep
retrystep(name: str) -> Callable[[StepFunc], Step]
Mark a function as a retryable workflow step.
If this step fails it goes to Waiting
were it will be retried periodically. If it Success
it acts as a normal
step.
Source code in orchestrator/workflow.py
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
|
inputstep
inputstep(name: str, assignee: Assignee) -> Callable[[InputStepFunc], Step]
Add user input step to workflow.
IMPORTANT: In contrast to other workflow steps, the @inputstep
wrapped function will not run in the
workflow engine! This means that it must be free of side effects!
Example::
@inputstep("User step", assignee=Assignee.NOC)
def user_step(state: State) -> FormGenerator:
class Form(FormPage):
name: str
user_input = yield Form
return {**user_input.model_dump(), "some extra key": True}
Source code in orchestrator/workflow.py
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
|
conditional
conditional(p: Callable[[State], bool]) -> Callable[..., StepList]
Use a predicate to control whether a step is run.
Source code in orchestrator/workflow.py
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
|
Database Implications
Because a workflow is tied to a product type, this means that you will need a new database migration when adding new workflows. Thankfully, the CLI tool will help you with this! Check out the CLI docs for db migrate-workflows
for more information.
Building a Step Function Signature
One important detail that is very helpful to understand is how your Step Function's python function signature is used to deserialize objects from the database into something you can use in your workflow python code. The WFO achieves this by introspecting the step function signature for the type hints you've defined and then tries to populate the appropriate objects for you. To understand the details of how this works, look at this method:
orchestrator.utils.state.inject_args
inject_args(func: StepFunc) -> Callable[[State], State]
Allow functions to specify values from the state dict as parameters named after the state keys.
Note
Domain models are subject to special processing (see: :ref:domain models processing
<domain-models-processing>
)
What this decorator does is better explained with an example than lots of text. So normally we do this::
def load_initial_state_for_modify(state: State) -> State:
customer_id = state["customer_id"]
subscription_id = state["subscription_id"]
....
# build new_state
...
return {**state, **new_state}
With this decorator we can do::
@inject_args
def load_initial_state_for_modify(customer_id: UUID, subscription_id: UUID) -> State:
....
# build new_state
...
return new_state
So any parameters specified to the step function are looked up in the state
dict supplied by the step
decorator
and passed as values to the step function. The dict new_state
returned by the step function will be merged with
that of the original state
dict and returned as the final result.
It knows how to deal with parameters that have a default. Eg, given::
@inject_args
def do_stuff_with_saps(subscription_id: UUID, sap1: Dict, sap2: Optional[Dict] = None) -> State:
....
# build new_state
...
return new_state
Both subscription_id
and sap1
need to be present in the state. However, sap2
can be present but does not need
to be. If it is not present in the state it will get the value None
.. _domain-models-processing:
Domain models as parameters are subject to special processing. Eg, given::
@inject_args
def do_stuff(light_path: Sn8LightPath) -> State:
...
return {'light_path': light_path} # <- required for any changes to be saved to the DB
Then the key 'light_path' is looked up in the state. If it is present, it is expected to be either:
- a UUID (or str representation of a UUID)
- a dictionary with at least a key 'subscription_id', representing a domain model.
It will use the UUID found to retrieve the domain model from the DB and inject it into the step function. None of the other data from the domain model (in case of it being a dict representation) will be used! At the end of the step function any domain models explicitly returned will be automatically saved to the DB; this includes any new domain models that might be created in the step and returned by the step. Hence, the automatic save is not limited to domain models requested as part of the step parameter list.
If the key light_path
was not found in the state, the parameter is interpreted as a request to create a
domain model of the given type. For that to work correctly the keys product
and customer_id
need to be
present in the state. This will not work for more than one domain model. E.g. you can't request two domain
models to be created as we will not know to which of the two domain models product
is applicable to.
Also supported is wrapping a domain model in Optional
or List
. Other types are not supported.
Args: func: a step function with parameters (that should be keys into the state dict, except for optional ones)
Returns: The original state dict merged with the state that step function returned.
Source code in orchestrator/utils/state.py
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
|