Conditional Steps¶
conditional lets you skip individual workflow steps at runtime based on the
current workflow state. Unlike run predicates, which prevent a
workflow from starting, conditional controls whether individual steps execute
during an already-running workflow.
When a step is skipped:
- The state dictionary is passed through unchanged
- The step is recorded in the process log with a
Skippedstatus - The workflow continues normally — a skipped step is not a failure
How it works¶
conditional(predicate) takes a callable (State) -> bool and returns a
wrapper callable. That wrapper accepts either a single Step or a StepList
(built with begin >> ...) and returns a new StepList where every step is
individually guarded by the predicate.
The predicate is evaluated once per step, not once per group. When wrapping multiple steps, the predicate is called again for each step in the group. This means that if an earlier wrapped step modifies the state in a way that changes the predicate result, later steps in the same group may behave differently.
Usage patterns¶
Wrapping a single step¶
from orchestrator.core.workflow import StepList, begin, conditional
from pydantic_forms.types import FormGenerator, UUIDstr
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
class ModifyDnsForm(FormPage):
description: str
ttl: int = 3600
new_cname: str | None = None # leave empty to keep current CNAME
user_input = yield ModifyDnsForm
return user_input.model_dump()
# ... step definitions ...
if_cname_changed = conditional(lambda state: state.get("new_cname") is not None)
@modify_workflow(
"Modify DNS Record",
initial_input_form=initial_input_form_generator,
)
def modify_dns_record() -> StepList:
return (
begin
>> update_subscription
>> if_cname_changed(notify_customer)
>> update_dns_in_ipam
>> set_status_provisioning
)
from orchestrator.workflow import StepList, begin, conditional
from pydantic_forms.types import FormGenerator, UUIDstr
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
class ModifyDnsForm(FormPage):
description: str
ttl: int = 3600
new_cname: str | None = None # leave empty to keep current CNAME
user_input = yield ModifyDnsForm
return user_input.model_dump()
# ... step definitions ...
if_cname_changed = conditional(lambda state: state.get("new_cname") is not None)
@modify_workflow(
"Modify DNS Record",
initial_input_form=initial_input_form_generator,
)
def modify_dns_record() -> StepList:
return (
begin
>> update_subscription
>> if_cname_changed(notify_customer)
>> update_dns_in_ipam
>> set_status_provisioning
)
Wrapping multiple steps¶
Pass a StepList (built with begin >> ...) to guard multiple steps with the
same predicate. The predicate is re-evaluated for each step individually,
so if a step within the StepList modifies the state, later steps in the
group may be skipped or executed differently:
from orchestrator.core.workflow import StepList, begin, conditional
if_config_enabled = conditional(lambda state: state.get("config_enabled") is True)
@terminate_workflow("Terminate Product", initial_input_form=initial_input_form_generator)
def terminate_product() -> StepList:
return (
begin
>> if_config_enabled(
begin
>> ansible_dryrun
>> ansible_live
)
>> cleanup
)
from orchestrator.workflow import StepList, begin, conditional
if_config_enabled = conditional(lambda state: state.get("config_enabled") is True)
@terminate_workflow("Terminate Product", initial_input_form=initial_input_form_generator)
def terminate_product() -> StepList:
return (
begin
>> if_config_enabled(
begin
>> ansible_dryrun
>> ansible_live
)
>> cleanup
)
Predicate depending on a prior step’s output¶
A step can write a value to the state, and a subsequent conditional can read
it. This pattern is useful when the condition depends on information that must be
fetched at runtime:
from orchestrator.core.workflow import State, StepList, begin, conditional, step
@step("Check deployment status")
def check_deployment_status(subscription: MySubscription) -> State:
return {"is_deployed": check_if_deployed(subscription)}
if_deployed = conditional(lambda state: state["is_deployed"])
@modify_workflow("Modify Product", initial_input_form=initial_input_form_generator)
def modify_product() -> StepList:
return (
begin
>> check_deployment_status
>> assemble_payload
>> if_deployed(remove_legacy_config)
>> deploy_changes
)
from orchestrator.workflow import State, StepList, begin, conditional, step
@step("Check deployment status")
def check_deployment_status(subscription: MySubscription) -> State:
return {"is_deployed": check_if_deployed(subscription)}
if_deployed = conditional(lambda state: state["is_deployed"])
@modify_workflow("Modify Product", initial_input_form=initial_input_form_generator)
def modify_product() -> StepList:
return (
begin
>> check_deployment_status
>> assemble_payload
>> if_deployed(remove_legacy_config)
>> deploy_changes
)
Reusing a conditional across workflows¶
conditional can be used as a decorator on a named predicate function. The
resulting wrapper can then be imported and applied to guard different steps in
different workflow modules, keeping predicate logic in one place and avoiding
duplicated lambda expressions:
# conditions.py — shared conditional predicates
from orchestrator.core.workflow import State, conditional
@conditional
def if_core_node(state: State) -> bool:
"""Guard steps that only apply to core (non-edge) node types."""
return state["subscription"]["node"]["node_type"] == "core"
# workflows/redeploy_baseconfig.py
from orchestrator.core.workflow import StepList, begin
from products.node.conditions import if_core_node
@modify_workflow("Redeploy Baseconfig", initial_input_form=input_form_generator)
def redeploy_baseconfig() -> StepList:
return (
begin
>> generate_config
>> deploy_config
>> if_core_node(deploy_boilerplate)
>> update_descriptions
)
# workflows/modify_node.py
from orchestrator.core.workflow import StepList, begin
from products.node.conditions import if_core_node
@modify_workflow("Modify Node", initial_input_form=input_form_generator)
def modify_node() -> StepList:
return (
begin
>> validate_node
>> if_core_node(sync_upstream_peers)
>> apply_changes
)
# conditions.py — shared conditional predicates
from orchestrator.workflow import State, conditional
@conditional
def if_core_node(state: State) -> bool:
"""Guard steps that only apply to core (non-edge) node types."""
return state["subscription"]["node"]["node_type"] == "core"
# workflows/redeploy_baseconfig.py
from orchestrator.workflow import StepList, begin
from products.node.conditions import if_core_node
@modify_workflow("Redeploy Baseconfig", initial_input_form=input_form_generator)
def redeploy_baseconfig() -> StepList:
return (
begin
>> generate_config
>> deploy_config
>> if_core_node(deploy_boilerplate)
>> update_descriptions
)
# workflows/modify_node.py
from orchestrator.workflow import StepList, begin
from products.node.conditions import if_core_node
@modify_workflow("Modify Node", initial_input_form=input_form_generator)
def modify_node() -> StepList:
return (
begin
>> validate_node
>> if_core_node(sync_upstream_peers)
>> apply_changes
)
API Reference¶
orchestrator.core.workflow.conditional
conditional(p: collections.abc.Callable[[pydantic_forms.types.State], bool]) -> Callable[..., StepList]
Use a predicate to conditionally skip workflow steps at runtime.
When the predicate p returns True for the current workflow state,
the wrapped step(s) execute normally. When it returns False, each
wrapped step returns Skipped — the step is recorded but does not
modify the state or halt the workflow.
Can wrap a single step or multiple steps (via a StepList). When
wrapping multiple steps the predicate is evaluated independently for
each step.
Source code in .venv/lib/python3.14/site-packages/orchestrator/core/workflow.py
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 | |
options: heading_level: 3