{ "cells": [ { "cell_type": "markdown", "id": "22d177dc-6cfb-4de2-9509-f1eb45e10cf2", "metadata": {}, "source": [ "# Scheduler" ] }, { "cell_type": "markdown", "id": "58696c91", "metadata": {}, "source": [ "There are many situations where you may want to control the number of calcjobs that are running at the same time. For example:\n", "\n", "- On the HPC cluster, user may has a limit on the maximum number of submissions that can be running at the same time.\n", "- On the local workstation, user may want to limit the number of calcjobs that are running at the same time to avoid overloading the system.\n", "\n", "## Managing the Scheduler\n", "\n", "Start a scheduler with name `test`:\n", "```\n", "aiida-scheduler scheduler start test\n", "```\n", "\n", "Stop the scheduler:\n", "```\n", "aiida-scheduler scheduler stop test\n", "```\n", "\n", "Show the status of the scheduler:\n", "```\n", "aiida-scheduler scheduler status test\n", "```\n", "\n", "Show details of the processes submitted to the scheduler:\n", "```\n", "aiida-scheduler scheduler show test\n", "```\n", "\n", "Set the maximum number of calcjobs that can be running at the same time:\n", "```\n", "aiida-scheduler scheduler set-max-calcjobs test 5\n", "```\n", "\n", "Set the maximum number of workflows (top-level WorkGraph) that can be running at the same time:\n", "```\n", "aiida-scheduler scheduler set-max-workflows test 5\n", "```\n", "\n", "Let's start a scheduler " ] }, { "cell_type": "code", "execution_count": null, "id": "2f30294e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting the scheduler ...\n" ] } ], "source": [ "!aiida-scheduler scheduler start test --max-calcjobs 2 --max-workflows 10" ] }, { "cell_type": "markdown", "id": "85c4c12e", "metadata": {}, "source": [ "Check the status of the scheduler:" ] }, { "cell_type": "code", "execution_count": null, "id": "161f3696", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[22mName status pk waiting process calcjob workflow\n", "test \u001b[32mRunning\u001b[0m 122897 0 0/10000 0/2 0/10\u001b[0m\n" ] } ], "source": [ "!aiida-scheduler scheduler status test" ] }, { "cell_type": "markdown", "id": "ae3f359e", "metadata": {}, "source": [ "\n", "## Example Usage\n", "\n", "Let's walk through an example where we creates four WorkGraphs with five calcjobs each.\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "03912de1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "WorkGraph process created, PK: 122898\n", "WorkGraph process created, PK: 122899\n", "WorkGraph process created, PK: 122900\n", "WorkGraph process created, PK: 122909\n" ] } ], "source": [ "from aiida_workgraph import WorkGraph\n", "from aiida import load_profile, orm\n", "from aiida.calculations.arithmetic.add import ArithmeticAddCalculation\n", "\n", "load_profile()\n", "\n", "# Use the calcjob: ArithmeticAddCalculation\n", "code = orm.load_code(\"add@localhost\")\n", "\n", "for i in range(4):\n", " wg = WorkGraph(\"test_max_number_jobs\")\n", " # Create N tasks\n", " for i in range(5):\n", " temp = wg.add_task(ArithmeticAddCalculation, name=f\"add{i}\", x=1, y=1, code=code)\n", " # Set a sleep option for each job (e.g., 10 seconds per job)\n", " temp.set({\"metadata.options.sleep\": 10})\n", " # submit the workgraph to a scheduler called \"test-scheduler\"\n", " wg.submit(scheduler=\"test\")" ] }, { "cell_type": "markdown", "id": "208e13f9", "metadata": {}, "source": [ "Note, all the WorkGraphs are submitted to a scheduler named `test`. Now, you can check the progress of the Scheduler using the following command:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1f86cb2a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[34m\u001b[1mReport\u001b[0m: \u001b[22mScheduler: test\u001b[0m\n", "\u001b[22m PK Created Process label Process State Priorities\n", "------ --------- ------------------------------- --------------- ------------\n", "122898 6s ago WorkGraph ⏵ Waiting\n", "122899 5s ago WorkGraph ⏵ Waiting\n", "122900 4s ago WorkGraph ⏹ Created\n", "122903 4s ago ArithmeticAddCalculation ⏹ Created\n", "122906 4s ago ArithmeticAddCalculation ⏹ Created\n", "122909 3s ago WorkGraph ⏹ Created -3\n", "122910 3s ago ArithmeticAddCalculation ⏹ Created 0\n", "122913 3s ago ArithmeticAddCalculation ⏹ Created 0\n", "122916 3s ago ArithmeticAddCalculation ⏹ Created 0\n", "122919 1s ago ArithmeticAddCalculation ⏹ Created -1\n", "122922 1s ago ArithmeticAddCalculation ⏹ Created -1\n", "122925 1s ago ArithmeticAddCalculation ⏹ Created -1\n", "122928 1s ago ArithmeticAddCalculation ⏹ Created -1\n", "122931 0s ago ArithmeticAddCalculation ⏹ Created -1\u001b[0m\n", "\u001b[22m\n", "Total results: 14\n", "\u001b[0m\n", "\u001b[22mname: test\u001b[0m\n", "\u001b[22mpk: 122897\u001b[0m\n", "\u001b[22mrunning_process: 5\u001b[0m\n", "\u001b[22mwaiting_process: 9\u001b[0m\n", "\u001b[22mrunning_workflow: 3\u001b[0m\n", "\u001b[22mrunning_calcjob: 2\u001b[0m\n", "\u001b[22mmax_calcjobs: 2\u001b[0m\n", "\u001b[22mmax_workflows: 10\u001b[0m\n", "\u001b[22mmax_processes: 10000\u001b[0m\n" ] } ], "source": [ "! aiida-scheduler scheduler show test" ] }, { "cell_type": "markdown", "id": "23644a50", "metadata": {}, "source": [ "\n", "This command will display information about the currently running calcjobs, including the fact that there are a maximum of 2 calcjobs running simultaneously.\n", "\n", "## Managing the Scheduler in the AiiDA GUI\n", "Open a terminal, and run:\n", "\n", "```\n", "aiida-gui start\n", "```\n", "\n", "You can monitor the progress visually by visiting `http://127.0.0.1:8000/scheduler/`, and go to the detail page of the scheduler by clicking on the scheduler name. This will show you the status of the scheduler.\n", "\n", "![Scheduler](_static/images/web-scheduler.png)\n", "\n", "\n", "## WorkChain Support in the Scheduler\n", "\n", "The scheduler in `aiida-workgraph` can also be used with WorkChains. However, this requires two additional steps compared to standard WorkChain usage:\n", "\n", "1. Use the `submit_to_scheduler` function from `aiida_workgraph.utils.control`.\n", "2. Override the `submit` method of the WorkChain you want to use.\n", "\n", "\n", "### Example\n", "\n", "Here we override the `submit` method of the `aiida.workflows.arithmetic.multiply_add.MultiplyAddWorkChain` to ensure it uses the scheduler for submitting processes.\n", "\n", "```python\n", "from aiida.orm import ProcessNode\n", "from typing import Type, Any\n", "\n", "class MultiplyAddWorkChain:\n", " \"\"\"WorkChain to multiply two numbers and add a third, adapted for scheduling.\"\"\"\n", " \n", " # other methods and attributes...\n", "\n", " def submit(\n", " self,\n", " process: Type[\"Process\"],\n", " inputs: dict[str, Any] | None = None,\n", " **kwargs,\n", " ) -> ProcessNode:\n", " \"\"\"Submit a process inside the workchain via the scheduler.\"\"\"\n", " from aiida_workgraph.utils.control import submit_to_scheduler_inside_workchain\n", " return submit_to_scheduler_inside_workchain(self, process, inputs, **kwargs)\n", "```\n", "\n", "\n", "### Submit via Scheduler\n", "\n", "You can now submit the patched WorkChain using:\n", "\n", "```python\n", "from aiida_workgraph.utils.control import submit_to_scheduler\n", "\n", "x = 1\n", "y = 2\n", "z = 3\n", "submit_to_scheduler(\n", " MultiplyAddWorkChain,\n", " inputs={\"x\": x, \"y\": y, \"z\": z, \"code\": code},\n", " scheduler=\"test\"\n", ")\n", "```\n", "\n", "\n", "### ⚠️ Warning: Nested WorkChains\n", "\n", "**If the WorkChain calls other WorkChains internally (i.e. nested WorkChains), this approach will not work out of the box**. You must also:\n", "\n", "- Patch all nested WorkChains in the same way (override their `submit` method).\n", "- Restart the AiiDA daemon.\n", "\n", "Failure to do so can result in processes that are not properly tracked or submitted via the scheduler.\n", "\n", "\n", "## ⚠️ Warning: `verdi process repair`\n", "\n", "Currently, the command `verdi process repair` will send the stuck processes to the normal AiiDA daemon worker queue, instead of the scheduler. This destroys the scheduler's ability to track the processes. Also, may result in running the same process multiple times.\n", "\n", "\n", "## Persistent Scheduler\n", "\n", "Last but not least, the scheduler is **persistent**. You can stop and restart it at any time using the same scheduler name, and all associated information will be preserved automatically." ] } ], "metadata": { "kernelspec": { "display_name": "aiida", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.0" } }, "nbformat": 4, "nbformat_minor": 5 }