Page tree

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

On this page

Overview

The module `nci-parallel` provides a MPI-based taskfarming application to run embarrassingly parallel tasks in a single big PBS job. It maintains its own task queue, every time there are subtasks finished in the PBS job, the next tasks in the list will be dispatched to the available cores.

We recommend it over `pbsdsh` to run tasks that have very different execution times. For example, for tasks whose execution time follows an exponential distribution, to launch less concurrent jobs than the total number of tasks by at least 10 times saves time could be wasted in waiting for longer tasks to finish. To be more specific, if you have 2000 tasks to run within the same job, please request the resource to support NO MORE than 200 tasks to run concurrently to achieve better CPU utilisation rate.

Usage

To use `nci-parallel`, please follow the example below.

  1. create a text file including all the command lines you would call to run every tasks, one task per line. For example, a file called `cmds.txt` contains 1000 lines. In each line, the script `./test.sh` takes different input argument, 0-999, to define the task.

    ./test.sh 0 
    ./test.sh 1
    ./test.sh 2
    ...
    ./test.sh 998
    ./test.sh 999

    Make sure to set the execute permission to the test.sh file using the chmod u+x /path/to/test.sh command.

  2. edit the job submission script, say called job.sh, to run nci-parallel which in turn launches the above 1000 tasks. Here is an example of running each task using 4 CPU cores and launching 96 concurrent tasks in a 384-core job, 4*96=384, with the timeout threshold of 4000 seconds.

    #!/bin/bash
    
    #PBS -q normal
    #PBS -l ncpus=384
    #PBS -l walltime=04:00:00
    #PBS -l mem=380GB
    #PBS -l wd
    
    # Load module, always specify version number.
    module load nci-parallel/1.0.0a
    
    export ncores_per_task=4
    export ncores_per_numanode=12
    
    # Must include `#PBS -l storage=scratch/ab12+gdata/yz98` if the job
    # needs access to `/scratch/ab12/` and `/g/data/yz98/`. Details on:
    # https://opus.nci.org.au/display/Help/PBS+Directives+Explained
    
    mpirun -np $((PBS_NCPUS/ncores_per_task)) --map-by ppr:$((ncores_per_numanode/ncores_per_task)):NUMA:PE=${ncores_per_task} nci-parallel --input-file cmds.txt --timeout 4000

    Depending on the `ncores_per_task` in your own case, you might need to revise the argument passed to the `--map-by` flag. In the above example, it forces each task to use the 4 CPU cores in the same numa node and launches 3 tasks on the current numa nodes before moving to the next one. If your ncores_per_task<=12, 12 is the number of cores attached to each numa node on Cascade Lake nodes, and the remainder of ncores_per_numanode divided by ncores_per_task is zero (12%ncores_per_task=0), you can safely revise only the value of ncores_per_task in the above script. Otherwise, please consider how to launch your tasks and test the binding before running your production tasks.

    The timeout option is useful in terms of cutting off the long running tasks. It controls the maximum execution time for every task wrapped in the job. Set the value to the time beyond which it would be more economical to rerun it as a new job rather than waiting it to finish in the current job, wasting SUs that spending on idling cores. 
  3. tune your own scripts to make sure each task utilises ncores_per_task CPU cores before the job submission. For example, to run a python script after testing the binding, the test.sh script could be 

    #!/bin/bash
    
    pid=$(grep ^Pid /proc/self/status)
    corelist=$(grep Cpus_allowed_list: /proc/self/status | awk '{print $2}')
    host=$(hostname | sed 's/.gadi.nci.org.au//g')
    echo subtask $1 running in $pid using cores $corelist on compute node $host
    
    # Load module, always specify version number.
    module load python3/3.7.4
    
    python3 main.py $1

    where $1 is the first argument passed to test.sh in the file cmds.txt. The python script main.py needs to understand how to distribute and balance the workload across the 4 cores designated to it. You may use other packages such as multiprocessing and mpi4py to enable the parallelisation on multiple cores within each task.

    In the above example test.sh file, the content of the standard output stream all goes to the job's STDOUT. They are concatenated and can be found at the beginning of the job's .o log. If the total size of the contents in the job's STDOUT is approaching GB scale, please explicitly redirect it to files. For jobs that have frequent small IO operations, it is recommended to redirect it to files in the folder $PBS_JOBFS. This job specific folder is located on the compute nodes that hosting the job and gives the exclusive access to the tasks in the job. To use it, a final data transfer back to the shared filesystem is crucial to keep the output files because the folder $PBS_JOBFS and its content will be completely removed immediately after the job finishes. 
  4. submit the job. For example, to submit the job defined in the job submission script shown in step 2 above through the project xy11, run

    $ qsub -P xy11 job.sh

There are other options available to define how to launch the MPI processes for every tasks, see more details in its help information. For example, to see the info for nci-parallel/1.0.0a, run

$ module load nci-parallel/1.0.0a
Loading nci-parallel/1.0.0a
  Loading requirement: openmpi/4.1.0

$ nci-parallel -h
NCI Parallel, version 1.0.0
MPI-based farming for embarrassingly parallel tasks.

Usage:
    nci-parallel [OPTIONS]

Options:
  --append,-a           append to output files
  --dedicated,-d        dedicate rank 0 to task management
  --help,-h             print this message and exit
  --input-file,-i PATH  read task commands from PATH instead of stdin
  --no-clobber,-n       error if output file already exists
  --output-dir,-o PATH  redirect output to PATH/<id>.std{err,out}
  --poll,-p TIME        poll for messages on server every TIME interval
                        (specifying a value of 0 is equivalent to --dedicated)
  --shell,-S PATH       use PATH to interpret and launch tasks
  --status,-s PATH      read/write status of tasks in PATH (allows for restart)
  --timeout,-t TIME     kill tasks after SEC seconds (will send SIGTERM after
                        TIME period and then SIGKILL after 1 another second)
  --verbose,-v          increase verbosity (can be specified multiple times)

Time based options accept these suffixes:
  s = seconds
  ms = milliseconds
  us = microseconds
  ns = nanoseconds
Providing no suffix is equivalent to 's'.

Default behaviour is to read from stdin and write to stdout and stderr. If you
request --output-dir it will default to truncating output files that already
exist; use the --append or --no-clobber flags to change this behaviour.

As tasks complete, the server will poll for completion messages from the
workers based on the --poll argument; if this is not specified, it defaults to
every 100ms. For large jobs, or short tasks, it may be helpful to decrease this
value to e.g. 10ms or less; this improves the latency in workers obtaining new
tasks. For ultra large jobs or tiny tasks, you can dedicate one rank to the
management of tasks by setting this to 0 (or using the --dedicated flag).
  • No labels