Page tree

There exists a plethora of online material related to RAPIDS and one such example is this blog [] which showcases how to accelerate single cell genomic analysis using RAPIDS. The authors of this blog released several notebooks to showcase their work and these notebooks can be run on Gadi with some modifications. For the following example, we will look at the notebook [] and show how to modify our RAPIDS environment and run this on Gadi.

Install Missing Packages 

Most of the packages imported in this notebook are available in rapids/22.02. There are only two missing packages, scanpy and anndata. Please follow the instructions provided on this page under `Work with Other Python Packages` to gain an understanding of how to install additional packages.

For this example, we will use pip to install scanpy:

login-node $ python3 -m pip install -v --upgrade-strategy only-if-needed --prefix $INSTALL_DIR scanpy

where INSTALL_DIR defines where scanpy will be installed.

As scanpy installs anndata automatically as part of its dependencies, you should see both of these packages available in $INSTALL_DIR/lib/python3.9/site-packages after successfully running the command above. 

login-node $ ls $INSTALL_DIR/lib/python3.9/site-packages
anndata                  natsort-8.1.0.dist-info  scanpy                  sinfo-0.3.4.dist-info        tables
anndata-0.8.0.dist-info  numexpr                  scanpy-1.8.2.dist-info  stdlib_list                  tables-3.7.0.dist-info
natsort                  numexpr-2.8.1.dist-info  sinfo                   stdlib_list-0.8.0.dist-info  tables.libs

To test scanpy, you can try running the test suite that comes with it:

git clone -b 1.8.2
cd scanpy
py.test > ../scanpy.test.log

Given the scanpy tests take less than 10 minutes to complete, you should be able to run these tests on the login node. For more intensive tests, please run using a PBS job.

There are some tests in the scanpy test suite that fail as their RMS values are greater than the expected tolerance. These failed tests are a good demonstration of the risks involved when running applications that are not built from source on Gadi. As the failed tests are not exceeding the tolerance limit by a large amount, we will proceed with using this installation for the following example.

Download Input Data and Auxiliary Function Code

The example notebook has a short section of code that downloads the input data. On Gadi, the downloading would need to be run on either the login nodes or the copyq nodes as Gadi compute nodes have no access to external networks. Given the dataset in this example is small, we will download it on the login node directly:

login-node $ mkdir -p $WORKDIR
login-node $ wget ‐P $WORKDIR

where WORKDIR defines the path to the working directory in which you will run the notebook.

The notebook also calls auxiliary functions defined in another file hosted in the same GitHub repository. This file would also need to be downloaded to your working directory: 

login-node $ wget -P $WORKDIR

Note that we use the v2021.12.0 version notebook and this file may be different for other branches.

Start an Interactive Job on a GPU Node and Initiate the Dask LocalCUDACluster

Once the required python packages, input data and auxiliary functions all available on Gadi, we can run our example notebook on a GPU node. To gain a deeper understanding of the inner workings of this notebook, it is recommended to run it interactively.

To submit the interactive job:

login-node $ cd $WORKDIR
login-node $ qsub -I -P${PROJECT} -vINSTALL_DIR=$INSTALL_DIR -qgpuvolta -lwalltime=00:30:00,ncpus=24,ngpus=2,mem=180GB,jobfs=200GB,storage=gdata/dk92+gdata/${PROJECT},other=hyperthread,wd

Note that this interactive job assumes your default project $PROJECT has enough SU to support a 2-GPU job for half an hour. If this is not the case, please replace $PROJECT with a project code that has sufficient compute resources. For more information on how to look up resource availability in projects, please see the Gadi User Guide.

The interactive job also assumes the directory $INSTALL_DIR is located inside /g/data/${PROJECT} and $WORKDIR inside /scratch/${PROJECT} where ${PROJECT} defines the project code that supports this job. If this is not the case, please revise the string passed to the PBS -lstorage directive accordingly. More information on PBS directives can be found here.

Once the job is ready, prepare the environment and initiate the dask cluster inside python3. Please note that when editing your PYTHONPATH, the variable INSTALL_DIR that was passed through the PBS job submission line is used. If this INSTALL_DIR is not accessible from the job, importing the scanpy and anndata packages would fail. 

gpu-node $ module use /g/data/dk92/apps/Modules/modulefiles/
gpu-node $ module load rapids/22.02
gpu-node $ export PYTHONPATH=$INSTALL_DIR/lib/python3.9/site-packages:$PYTHONPATH
gpu-node $ python3
python3 >>> from dask_cuda import initialize, LocalCUDACluster
python3 >>> from dask.distributed import Client, default_client
python3 >>> cluster = LocalCUDACluster()
python3 >>> client = Client(cluster)
python3 >>> client
<Client: 'tcp://' processes=2 threads=2, memory=180.00 GiB>

When initiating the LocalCUDACluster, no argument is required as long as it expects workers to run on the same compute node. Running LocalCUDACluster() will start a local scheduler ready to connect with the same number of workers as the number of GPUs available inside the job. Since the Gadi gpuvolta queue has 4 GPUs per node, this method is only valid for jobs that require no more than 4 GPUs. To learn how to run tasks using more than 4 GPUs on multiple nodes, follow the instructions in Example 2 on this page.

Load Input Data using all the Dask CUDA Workers

In the example notebook, the input is ingested by calling the function read_with_filter defined in the file

python3 >>> input_file = "1M_brain_cells_10X.sparse.h5ad"
python3 >>> min_genes_per_cell = 200
python3 >>> max_genes_per_cell = 6000
python3 >>> dask_sparse_arr, genes, query = rapids_scanpy_funcs.read_with_filter(client,
...                                                        input_file,
...                                                        min_genes_per_cell=min_genes_per_cell,
...                                                        max_genes_per_cell=max_genes_per_cell,
...                                                        partial_post_processor=partial_post_processor)

This function first opens the h5 file to find out the total number of cells, then defines the batch of data read by each worker accordingly. The parallel read is initiated by calling dask.array.from_delayed which in turn launches all the individual reads. Every read fetches a piece of data no more than the default batch size of 50000.  Given the input file contains 1306127 cell records, all of the first 26 tasks read info from 50000 cells while the 27th read fetches the remaining 6127. By modifying the batch size, you can minimise the time spent for data ingestion.  Below is a benchmark showing the batch size and associated normalised read walltime:

batch size10000250005000073000100000165000200000
normalised read walltime1.0840.99011.0061.0501.0381.114

If data ingestion takes a considerable amount of time in your production jobs, you might want to go through this optimisation carefully. In general, IO operations can benefit from a larger batch size on Lustre filesystems as too many small reads and writes can result in very poor performance in terms of both efficiency and robustness. However, in the case of larger batch sizes leading to workload imbalance, the overall read walltime increases as the entire IO has to wait for the last task to finish. For example, from our benchmark above, a batch size of 200K results in 7 read tasks scheduled on 2 workers and it's very likely that one worker has to wait for the other while it performs the fourth read task.

Even though read_with_filter does more than IO operations, the trend is clear that time spent in IO dominants the overall overhead.

Run Data Analysis on Multiple GPUs

There are several computational intensive tasks in this notebook and we take PCA as an example to show how to run it on multiple GPUs on Gadi.

In the notebook, there are two PCA decomposition functions shipped with the `cuML` package:

from cuml.decomposition import PCA
from cuml.dask.decomposition import PCA

Only the cuml.dask.decompostiton package runs on multiple GPUs. You can find similar functions in this section of the cuml API references.

Save Plots to Files 

The example notebook can visualise the cluster results with plots. Given the current interactive job has no display set, the following modification to save the plots to files is necessary. Once the results are saved to a file, you can download this file to your local PC and inspect the results:

python3 >>>, color=["kmeans"],show=False,save="_kmeans.pdf")

The code above tells the scanpy plotting tool not to show the figure but  instead, save it to a pdf file. Once it finishes writing, this image can be found under `figures/tsne_kmeans.pdf` inside your working directory.

  • No labels