Skip to content

app.py

The app.py module is used in orchestrator-core for actually running the entire WFO FastAPI backend and the CLI.

FastAPI Backend

The code for the WFO's Fast API backend is very well documented, so look through the functions used in this module here:

orchestrator.app

The main application module.

This module contains the main OrchestratorCore class for the FastAPI backend and provides the ability to run the CLI.

OrchestratorCore

Bases: fastapi.applications.FastAPI

Source code in orchestrator/app.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class OrchestratorCore(FastAPI):
    graphql_router: Any | None = None
    broadcast_thread: ProcessDataBroadcastThread | None = None

    def __init__(
        self,
        title: str = "The Orchestrator",
        description: str = "The orchestrator is a project that enables users to run workflows.",
        openapi_url: str = "/api/openapi.json",
        docs_url: str = "/api/docs",
        redoc_url: str = "/api/redoc",
        version: str = __version__,
        default_response_class: type[Response] = JSONResponse,
        base_settings: AppSettings = app_settings,
        **kwargs: Any,
    ) -> None:
        initialise_logging(LOGGER_OVERRIDES)
        init_model_loaders()
        if base_settings.ENABLE_GRAPHQL_STATS_EXTENSION:
            monitor_sqlalchemy_queries()

        self.auth_manager = AuthManager()
        self.base_settings = base_settings
        websocket_manager = init_websocket_manager(base_settings)
        distlock_manager = init_distlock_manager(base_settings)

        startup_functions: list[Callable] = [distlock_manager.connect_redis]
        shutdown_functions: list[Callable] = [distlock_manager.disconnect_redis]
        if websocket_manager.enabled:
            startup_functions.append(websocket_manager.connect_redis)
            shutdown_functions.extend([websocket_manager.disconnect_all, websocket_manager.disconnect_redis])

        if base_settings.EXECUTOR == ExecutorType.THREADPOOL:
            # Only need broadcast thread when using threadpool executor
            self.broadcast_thread = ProcessDataBroadcastThread(websocket_manager)
            startup_functions.append(self.broadcast_thread.start)
            shutdown_functions.append(self.broadcast_thread.stop)

        super().__init__(
            title=title,
            description=description,
            openapi_url=openapi_url,
            docs_url=docs_url,
            redoc_url=redoc_url,
            version=version,
            default_response_class=default_response_class,
            on_startup=startup_functions,
            on_shutdown=shutdown_functions,
            **kwargs,
        )

        self.include_router(api_router, prefix="/api")

        init_database(base_settings)

        self.add_middleware(ClearStructlogContextASGIMiddleware)
        self.add_middleware(SessionMiddleware, secret_key=base_settings.SESSION_SECRET)
        self.add_middleware(DBSessionMiddleware, database=db)
        origins = base_settings.CORS_ORIGINS.split(",")
        self.add_middleware(
            CORSMiddleware,
            allow_origins=origins,
            allow_methods=base_settings.CORS_ALLOW_METHODS,
            allow_headers=base_settings.CORS_ALLOW_HEADERS,
            expose_headers=base_settings.CORS_EXPOSE_HEADERS,
        )

        self.add_exception_handler(FormException, form_error_handler)  # type: ignore[arg-type]
        self.add_exception_handler(ProblemDetailException, problem_detail_handler)  # type: ignore[arg-type]
        add_exception_handler(self)

        @self.router.get("/", response_model=str, response_class=JSONResponse, include_in_schema=False)
        def _index() -> str:
            return "Orchestrator orchestrator"

    def add_sentry(
        self,
        sentry_dsn: str,
        trace_sample_rate: float,
        server_name: str,
        environment: str,
        release: str | None = GIT_COMMIT_HASH,
    ) -> None:
        logger.info("Adding Sentry middleware to app", app=self.title)
        if self.base_settings.EXECUTOR == ExecutorType.WORKER:
            from sentry_sdk.integrations.celery import CeleryIntegration

            sentry_integrations.append(CeleryIntegration())

        if self.graphql_router:
            sentry_integrations.append(StrawberryIntegration(async_execution=True))

        sentry_sdk.init(
            dsn=sentry_dsn,
            traces_sample_rate=trace_sample_rate,
            server_name=server_name,
            environment=environment,
            release=f"orchestrator@{release}",
            integrations=sentry_integrations,
            propagate_traces=True,
            profiles_sample_rate=trace_sample_rate,
        )

    @staticmethod
    def register_subscription_models(product_to_subscription_model_mapping: dict[str, type[SubscriptionModel]]) -> None:
        """Register your subscription models.

        This method is needed to register your subscription models inside the orchestrator core.

        Args:
            product_to_subscription_model_mapping: The dictionary should contain a mapping of products to SubscriptionModels.
                The selection will be done depending on the name of the product.

        Returns:
            None:

        Examples:
            >>> product_to_subscription_model_mapping = { # doctest:+SKIP
            ...     "Generic Product One": GenericProductModel,
            ...     "Generic Product Two": GenericProductModel,
            ... }

        """
        SUBSCRIPTION_MODEL_REGISTRY.update(product_to_subscription_model_mapping)

    def register_graphql(
        self: "OrchestratorCore",
        query: Any = Query,
        mutation: Any = Mutation,
        register_models: bool = True,
        subscription_interface: Any = SubscriptionInterface,
        graphql_models: StrawberryModelType | None = None,
        scalar_overrides: ScalarOverrideType | None = None,
        extensions: list | None = None,
        custom_context_getter: ContextGetterFactory | None = None,
    ) -> None:
        new_router = create_graphql_router(
            self.auth_manager,
            query,
            mutation,
            register_models,
            subscription_interface,
            self.broadcast_thread,
            graphql_models,
            scalar_overrides,
            extensions=extensions,
            custom_context_getter=custom_context_getter,
        )
        if not self.graphql_router:
            self.graphql_router = new_router
            self.include_router(new_router, prefix="/api/graphql")
        else:
            self.graphql_router.schema = new_router.schema

    def register_authentication(self, authentication_instance: OIDCAuth) -> None:
        """Registers a custom authentication instance for the application.

        Use this method to replace the default OIDC authentication mechanism with a custom one,
        enhancing the security and tailoring user authentication to specific needs of the application.

        Args:
            authentication_instance (OIDCAuth): The custom OIDCAuth instance to use.

        Returns:
            None
        """
        self.auth_manager.authentication = authentication_instance

    def register_authorization(self, authorization_instance: Authorization) -> None:
        """Registers a custom authorization instance to manage user permissions and access controls.

        This method enables customization of the authorization logic, defining what authenticated users
        can do within the application. It integrates with the application's security framework to enforce
        permission checks tailored to your requirements.

        Args:
            authorization_instance (Authorization): The custom Authorization instance to use.

        Returns:
            None
        """
        self.auth_manager.authorization = authorization_instance

    def register_graphql_authorization(self, graphql_authorization_instance: GraphqlAuthorization) -> None:
        """Registers a custom GraphQL-specific authorization instance for managing access controls in GraphQL operations.

        This provides an opportunity to apply specialized authorization rules and policies for GraphQL interactions,
        enhancing security where the default settings do not suffice.

        Args:
            graphql_authorization_instance (GraphqlAuthorization): The instance responsible for GraphQL-specific authorization.

        Returns:
            None
        """
        self.auth_manager.graphql_authorization = graphql_authorization_instance
register_authentication
register_authentication(authentication_instance: OIDCAuth) -> None

Registers a custom authentication instance for the application.

Use this method to replace the default OIDC authentication mechanism with a custom one, enhancing the security and tailoring user authentication to specific needs of the application.

Parameters:

  • authentication_instance (oauth2_lib.fastapi.OIDCAuth) –

    The custom OIDCAuth instance to use.

Returns:

  • None

    None

Source code in orchestrator/app.py
229
230
231
232
233
234
235
236
237
238
239
240
241
def register_authentication(self, authentication_instance: OIDCAuth) -> None:
    """Registers a custom authentication instance for the application.

    Use this method to replace the default OIDC authentication mechanism with a custom one,
    enhancing the security and tailoring user authentication to specific needs of the application.

    Args:
        authentication_instance (OIDCAuth): The custom OIDCAuth instance to use.

    Returns:
        None
    """
    self.auth_manager.authentication = authentication_instance
register_authorization
register_authorization(authorization_instance: Authorization) -> None

Registers a custom authorization instance to manage user permissions and access controls.

This method enables customization of the authorization logic, defining what authenticated users can do within the application. It integrates with the application's security framework to enforce permission checks tailored to your requirements.

Parameters:

  • authorization_instance (oauth2_lib.fastapi.Authorization) –

    The custom Authorization instance to use.

Returns:

  • None

    None

Source code in orchestrator/app.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def register_authorization(self, authorization_instance: Authorization) -> None:
    """Registers a custom authorization instance to manage user permissions and access controls.

    This method enables customization of the authorization logic, defining what authenticated users
    can do within the application. It integrates with the application's security framework to enforce
    permission checks tailored to your requirements.

    Args:
        authorization_instance (Authorization): The custom Authorization instance to use.

    Returns:
        None
    """
    self.auth_manager.authorization = authorization_instance
register_graphql_authorization
register_graphql_authorization(
    graphql_authorization_instance: GraphqlAuthorization,
) -> None

Registers a custom GraphQL-specific authorization instance for managing access controls in GraphQL operations.

This provides an opportunity to apply specialized authorization rules and policies for GraphQL interactions, enhancing security where the default settings do not suffice.

Parameters:

  • graphql_authorization_instance (oauth2_lib.fastapi.GraphqlAuthorization) –

    The instance responsible for GraphQL-specific authorization.

Returns:

  • None

    None

Source code in orchestrator/app.py
258
259
260
261
262
263
264
265
266
267
268
269
270
def register_graphql_authorization(self, graphql_authorization_instance: GraphqlAuthorization) -> None:
    """Registers a custom GraphQL-specific authorization instance for managing access controls in GraphQL operations.

    This provides an opportunity to apply specialized authorization rules and policies for GraphQL interactions,
    enhancing security where the default settings do not suffice.

    Args:
        graphql_authorization_instance (GraphqlAuthorization): The instance responsible for GraphQL-specific authorization.

    Returns:
        None
    """
    self.auth_manager.graphql_authorization = graphql_authorization_instance
register_subscription_models staticmethod
register_subscription_models(
    product_to_subscription_model_mapping: dict[str, type[SubscriptionModel]]
) -> None

Register your subscription models.

This method is needed to register your subscription models inside the orchestrator core.

Parameters:

  • product_to_subscription_model_mapping (dict[str, type[orchestrator.domain.SubscriptionModel]]) –

    The dictionary should contain a mapping of products to SubscriptionModels. The selection will be done depending on the name of the product.

Returns:

  • None ( None ) –

Examples:

>>> product_to_subscription_model_mapping = {
...     "Generic Product One": GenericProductModel,
...     "Generic Product Two": GenericProductModel,
... }
Source code in orchestrator/app.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@staticmethod
def register_subscription_models(product_to_subscription_model_mapping: dict[str, type[SubscriptionModel]]) -> None:
    """Register your subscription models.

    This method is needed to register your subscription models inside the orchestrator core.

    Args:
        product_to_subscription_model_mapping: The dictionary should contain a mapping of products to SubscriptionModels.
            The selection will be done depending on the name of the product.

    Returns:
        None:

    Examples:
        >>> product_to_subscription_model_mapping = { # doctest:+SKIP
        ...     "Generic Product One": GenericProductModel,
        ...     "Generic Product Two": GenericProductModel,
        ... }

    """
    SUBSCRIPTION_MODEL_REGISTRY.update(product_to_subscription_model_mapping)

A great example of how to use the functions available in app.py with your own main.py when you instantiate your own instance of the orchestrator can be seen in the example orchestrator repository's main.py file.

# Copyright 2019-2023 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as core_cli
from orchestrator.settings import AppSettings

from graphql_federation import CUSTOM_GRAPHQL_MODELS
import products  # noqa: F401  Side-effects
import workflows  # noqa: F401  Side-effects

app = OrchestratorCore(base_settings=AppSettings())
app.register_graphql(graphql_models=CUSTOM_GRAPHQL_MODELS)

if __name__ == "__main__":
    core_cli()

CLI App

The orchestrator core also has a CLI application that is documented in detail here. You can bring this into your main.py file so that you can run the orchestrator CLI for development like so:

if __name__ == "__main__":
    core_cli()