Scaling the Orchestrator
By default the Orchestrator is capable to handle a reasonable amount of workflows and tasks. For a larger and more distributed workload we introduced the Celery library.
This document describes the two modes in which an Orchestrator instance can run and what you need to configure: 1. running tasks an workflows in a threadpool (default) 2. running the Orchestrator with a number of workflow workers
Running workflows or tasks within a threadpool
This is the default configuration. workflows and tasks are both scheduled by the same threadpool with equal priority. If you need to have tasks with a lower priority, you can for example use a scheduler and run them during a quiet period.
In AppSettings you will notice the default:
class AppSettings(BaseSettings):
# fields omitted
EXECUTOR: str = "threadpool"
# fields omitted
Running workflows or tasks using a worker
Celery concepts are introduced in the Documentation.
When using Celery, the Orchestrator is split into two parts: the orchestrator-api and the orchestrator-worker.
The orchestrator-api functionality is now limited to handling REST requests and delegating them (via one or more queues) to the orchestrator-worker. The workflows are executed in the orchestrator-worker.
Queues
Tasks and workflows are submitted on different queues. This allows for independent scaling of workers that handle low priority tasks and high priority tasks simply by letting the workers listen to different queues. Currently, there are two queues defined:
- workflows: starting or resuming workflows
- tasks: starting or resuming tasks
By default, Redis is used for the Celery Broker and Backend. See the next chapter about implementing on how to change this behaviour.
Implementing the worker
The orchestrator-core needs to know what workflows a user has defined. This information is only available in the actual application using the orchestrator-core. Here we will use an example as used withing SURF. The file is called tasks.py. First we define our own class derived from the Celery base class:
class OrchestratorCelery(Celery):
def on_init(self) -> None:
from orchestrator import OrchestratorCore
app = OrchestratorCore(base_settings=AppSettings())
init_app(app) # This will load the workflows
The init_app
should be replaced by your own function that at least makes sure that all the workflows are imported
(which make sure that the are registered) so that the worker can recognize them. This is the minimum implementation
you need, but you might want to add other initialisation that is needed to execute workflows.
Next we instantiate Celery using our own OrchestratorCelery
class:
broker = f"redis://{AppSettings().CACHE_URI}"
backend = f"rpc://{AppSettings().CACHE_URI}/0"
celery = OrchestratorCelery(
"proj", broker=broker, backend=backend, include=["orchestrator.services.tasks"]
)
celery.conf.update(result_expires=3600)
As you can see in the code above we are using Redis as broker. You can of course replace this by RabbitMQ or another broker of your choice. See the Celery documentation for more details.
"orchestrator.services.tasks"
is the namespace in orchestrator-core where the Celery tasks can be found. At the
moment 4 tasks are defined:
tasks.new_task
: start a new task (delivered on the Task queue)tasks.new_workflow
: start a new workflow (delivered on the Workflow queue)tasks.resume_task
: resume an existing task (delivered on the Task queue)tasks.resume_workflow
: resume an existing workflow (delivered on the Workflow queue)
Finally, we initialise the orchestrator core:
def init_celery() -> None:
from orchestrator.services.tasks import initialise_celery
initialise_celery(celery)
# Needed if we load this as a Celery worker because in that case, the application is not started with a user-specified top-level `__main__` module
init_celery()
The code above sets our local Celery instance (which initializes the workflows) as the celery instance that is going to be used by the orchestrator-core. Without this code, the orchestrator-core would be only aware of a limited set of workflows that are part of orchestrator-core itself.
An example implementation
When using Celery and Websockets you can use the following example and change it to your needs.
"""This module contains functions and classes necessary for celery worker processes.
When the orchestrator-core's thread process executor is specified as "celery", the `OrchestratorCore` FastAPI
application registers celery-specific task functions and `start_process` and `resume_process` now defer to the
celery task queue.
Celery's task queue enables features like nightly validations by providing a task queue and workers to execute
workflows that are all started in parallel, which would crash a single-threaded orchestrator-core.
The application flow looks like this when "celery" is the executor (and websockets are enabled):
- FastAPI application validates form input, and places a task on celery queue (create new process).
- If websockets are enabled, a connection should exist already b/t the client and backend.
- FastAPI application begins watching redis pubsub channel for process updates from celery.
- Celery worker picks up task from queue and begins executing.
- On each step completion, it publishes state information to redis pubsub channel.
- FastAPI application grabs this information and publishes it to the client websocket connection.
A celery worker container will start by calling this module instead of `main.py` like so:
```sh
celery -A esnetorch.celery_worker worker -E -l INFO -Q new_tasks,resume_tasks,new_workflows,resume_workflows
-A
points to this module where the worker class is defined-E
sends task-related events (capturable and monitorable)-l
is the short flag for --loglevel-Q
specifies the queues which the worker should watch for new tasks
See https://workfloworchestrator.org/orchestrator-core/reference-docs/app/scaling for more information. """
from uuid import UUID
from celery import Celery from celery.signals import worker_shutting_down from orchestrator.db import init_database from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY from orchestrator.services.tasks import initialise_celery from orchestrator.types import BroadcastFunc from orchestrator.websocket import broadcast_process_update_to_websocket, init_websocket_manager from orchestrator.websocket.websocket_manager import WebSocketManager from orchestrator.workflows import ALL_WORKFLOWS from structlog import get_logger
from esnetorch.settings import app_settings
logger = get_logger(name)
def process_broadcast_fn(process_id: UUID) -> None: # Catch all exceptions as broadcasting failure is noncritical to workflow completion try: broadcast_process_update_to_websocket(process_id) except Exception as e: logger.exception(e)
class OrchestratorWorker(Celery): websocket_manager: WebSocketManager process_broadcast_fn: BroadcastFunc
def on_init(self) -> None:
init_database(app_settings)
# Prepare the wrapped_websocket_manager
# Note: cannot prepare the redis connections here as broadcasting is async
self.websocket_manager = init_websocket_manager(app_settings)
self.process_broadcast_fn = process_broadcast_fn
# Load the products and load the workflows
import esnetorch.products # noqa: F401 Side-effects
import esnetorch.workflows # noqa: F401 Side-effects
logger.info(
"Loaded the workflows and products",
workflows=len(ALL_WORKFLOWS.values()),
products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
)
def close(self) -> None:
super().close()
celery = OrchestratorWorker( f"{app_settings.SERVICE_NAME}-worker", broker=str(app_settings.CACHE_URI), include=["orchestrator.services.tasks"] )
if app_settings.TESTING: celery.conf.update(backend=str(app_settings.CACHE_URI), task_ignore_result=False) else: celery.conf.update(task_ignore_result=True)
celery.conf.update( result_expires=3600, worker_prefetch_multiplier=1, worker_send_task_event=True, task_send_sent_event=True, )
### Running locally
If you want to test your application locally you have to start both the orchestrator-api and one or more workers.
For example:
Start the orchestrator api:
```bash
EXECUTOR="celery" uvicorn --reload --host 127.0.0.1 --port 8080 main:app
Notice that we are setting EXECUTOR
to celery
. Without that variable the api resorts to the default threadpool.
Start a single worker that listens both on the tasks
and workflows
queue (indicated by the -Q
flag):
celery -A surf.tasks worker --loglevel=info -Q new_tasks,resume_tasks,new_workflows,resume_workflows
Notice that -A surf.tasks
indicates the module that contains your 'celery.Celery' instance.
The queues are defined in the celery config (see in services/tasks.py):
celery.conf.task_routes = {
NEW_TASK: {"queue": "tasks"},
NEW_WORKFLOW: {"queue": "workflows"},
RESUME_TASK: {"queue": "tasks"},
RESUME_WORKFLOW: {"queue": "workflows"},
}
If you decide to override the queue names in this configuration, you also have to make sure that you also
update the names accordingly after the -Q
flag.