Websockets
Orchestrator provides a websocket interface through which the frontend can receive real-time updates. This includes:
- The process overview pages
- The process detail page
- Engine status
Implementation
To function properly in a scalable architecture, the websocket implementation consists of multiple layers.
The main component is the WebSocketManager
(WSM) which has the following responsibilities:
- Keep track of connected frontend clients
- Forward messages to all frontend clients
- Provide an interface to pass messages from a backend process (workflow/task)
In a setup with multiple isolated Orchestrator instances the WSM is initialized multiple times as well, therefore clients can be connected to any arbitrary WSM instance. Letting a backend process broadcast messages to all clients thus requires a message broker, for which we use Redis Pub/Sub.
There are 2 WSM implementations: a MemoryWebsocketManager
for development/testing, and a BroadcastWebsocketManager
that connects to Redis. We'll continue to discuss the latter.
BroadcastWebsocketManager.broadcast_data()
is called by backend processes, and publishes messages to a channel in Redis [1]BroadcastWebsocketManager.sender()
starts in a loop for each connected client, subscribes to a channel in Redis, and forwards messages into the websocket connection
[1] When using EXECUTOR="threadpool"
this function is not called directly, refer to the ProcessDataBroadcastThread section
Roughly speaking a message travels through these components:
Process
-> BroadcastWebsocketManager.broadcast_data()
-> Redis channel
-> BroadcastWebsocketManager.sender()
-> Websocket connection
-> Frontend client
ProcessDataBroadcastThread
Note: this section is only relevant when the orchestrator is configured with EXECUTOR="threadpool"
.
Backend processes are executed in a threadpool and therefore access the same WSM instance. This caused asyncio RuntimeErrors as the async Redis Pub/Sub implementation is not thread-safe.
To solve this there is a dedicated ProcessDataBroadcastThread
(attached to and managed by the OrchestratorCore
app) to perform the actual broadcast_data()
call.
The API endpoints which start/resume/abort a process, call api_broadcast_process_data(request)
to acquire a function that can be used to submit process updates into a threading.Queue
on which ProcessDataBroadcastThread
listens.