From the ARE Jupyterlab, you can either set up:
- a Local Dask cluster (local to the node where JupyterLab is running)
- an ARE Jupyterlab controlled "dynamic" Dask cluster that is running separately on the Gadi batch that is separate to the JupyterLab.
- a "pre-defined" Dask cluster using multiple nodes directly from the ARE when starting the JupyterLab session
We note that to run Dask on Gadi's GPU node you should use RAPIDS. The above cases work for CPUs only.
You can start a JupyterLab session by clicking "JupyterLab" button after login ARE website.
Local Dask Cluster
In the JuputerLab launch form, you can request single GPU node resources as below
Step 1:
| Step 2:
|
---|---|
Wait until the "Open JupyterLab" button is highlighted.
Click "Open JupyterLab" button, you will enter the JupyterLab session with single node. You can use the resources on this local node hosting JupyterLab session to start a Local Dask Cluster on-the-fly.
After importing the necessary Dask modules, the essential lines needed in the Jupyter code are:
from dask.distributed import Client, LocalCluster cluster = LocalCluster() client = Client(cluster)
After it is set up, you can check the configuration via the Jupyter command "print(client)" as shown below
print(client) <Client: 'tcp://127.0.0.1:41179' processes=2 threads=2, memory=5.62 GiB>
This output shows a local Dask cluster (via the node-local loopback network interface 127.0.0.1) running on 2 CPU Cores - which are the resources that were requested as part of the setup of the current JupyterLab session of this example (i.e., 2 CPU Cores and 6 GB memory).
If you need a Dask cluster crossing multiple computer nodes, please refer to the pre-defined dask cluster.
Dynamic Dask cluster
You can request a small amount of computer resources that will just run the JupyterLab interactive session itself. After the JupyterLab session starts, you can then set up an "dynamic" Dask cluster on a standard Gadi queue from your Jupyter notebook. This will allow you to keep your JupyterLab session running after the completion of the "heavy compute" that is run on the main gadi nodes.
In the example below, we will be setting up a two node Dask cluster, where each node has 48 cores:
import dask.config from dask.distributed import Client,LocalCluster from dask_jobqueue import PBSCluster walltime = '01:00:00' cores = 48 memory = '192GB' cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory),processes=cores, job_extra=['-q <PBS queue name>','-P <project code>','-l ncpus='+str(cores),'-l mem='+str(memory), '-l storage=gdata/<project code>+gdata/<project code>+gdata/<project code>...'], local_directory='$TMPDIR', header_skip=["select"], python=os.environ["DASK_PYTHON"]) cluster.scale(jobs=2)
The PBSCluster function defines the resources of a single compute node, and the subsequent function cluster.scale defines how many compute nodes.
To break-down the description of this example, lets start with the definition of a single node. We could start a PBSCluster on a single node as follows i.e. 48 cores and 192GB memory of a normal queue:
cluster = PBSCluster(walltime="10:00:00", cores=48, memory="192GB",processes=48, job_extra=['-q normal','-P fp0','-l ncpus=48','-l mem=192GB','-l storage=gdata/dk92+scratch/fp0'], local_directory='$TMPDIR', header_skip=["select"], python=os.environ["DASK_PYTHON"])
The PBSCluster flags that are shown after "job_extra" (e.g. local_directory) define the configuration of the Dask cluster. The "cores" argument specifies how many total CPU cores are used in the Dask cluster and the "processes" argument specifies how many Dask workers are established in the single physical node. Usually we set the same value between "cores" and "processes" as shown above. In that case, each Dask worker is single threaded. If you intend to set up multiple threads per worker, you should reduce the number of processes. For example, "cores=48" and "processes=16" imply there are 16 Dask workers and each worker has 3 threads and each thread occupies a CPU core. In some instance, users want to assign more memory to a Dask worker thread. In that case, you could reduce the number of cores. For example, by fixing the job_extra flag, setting "cores=24" will assign 192GB/24=8 GB to a single CPU core. In other words, each Dask thread could access 8GB memory.
It is also necessary to specify "python=os.environ["DASK_PYTHON"]" to make sure all PBS jobs are running the same python environment used in the current JupyterLab session. Note the environment variable DASK_PYTHON is only available for NCI-data-analysis module. If you are using other modules, please remove it .
Run your own Python Environment
If you are using your own python environment other than NCI-data-analysis, please remove "python=os.environ["DASK_PYTHON"]" from the flag list of "PBSCluster" method.
After specifying how to utilise a single node resources to set up the Dask cluster via PBSCluster function, we can scale it out crossing multiple nodes via the function cluster.scale(). For example, the following line scales the Dask cluster spanning 2 nodes. You could find 2 PBS jobs are submitted via the 'qstat' command.
cluster.scale(jobs=2) client = Client(cluster)
You can use the command "print(client)" in your notebook to view the status of the cluster (as described in the Local Dask Cluster case). Just keep refreshing the "print(client)" command until the values of "processes" and "threads"changes from 0 to a positive number before jumping to the next cell.Pre-defined ARE Dask cluster
Pre-defined Dask cluster
If you wanted to use the ARE to directly set up a DASK cluster, (e.g., for interactive debugging over 2 nodes which is 96 cores) then you request all the resources needed when starting an ARE JupyterLab session.
We note that the easiest way to configure this "pre-defined" Dask cluster is using our dk92 "Gadi_jupyterlab" module, as this has all the scripts to make this work.
You can start a JupyterLab session with multiple nodes and set up a pre-defined Dask cluster as below:
Step 1:
| Step 2:
|
---|---|
Wait until the "Open JupyterLab" button is highlighted.
Click "Open JupyterLab" button you will open the JupyterLab interface.
Start a new Python notebook or open an existing Python notebook, you can connect to the pre-defined Dask cluster by adding the following lines
from dask.distributed import Client,LocalCluster import os client = Client(scheduler_file=os.environ["DASK_PBS_SCHEDULER"]) print(client)
You can see in this example the Dask cluster consists of 96 processes (workers) and 96 threads ( 1 thread per worker) across 2 nodes.
Using Dask Dashboard
You can monitor the status of either the dynamic or the pre-defined Dask cluster via the Dask-dashboard.
Open the "Dask-dashboard" tag in the left-side toolbar, and click "Search" button, you will see a list of Dask cluster metrics.
You can click any metric on the list. For example, by clicking "WORKERS" button you will open a new page showing all Dask worker activities.
For more details on Dask dashboard, please refer to here.