Python provides the built-in package called multiprocessing which supports swapping processes. It can be used to accelerate the problem solving in a parallel way. However, it is limited to the parallelism of a single node. It may take extra efforts to rewrite the existing python multiprocessing script with other parallelisms like MPI or Dask to utilising computer resources across multiple nodes.
Ray provides a solution to address this issue via the ray.util.multiprocessing.Pool API. You only need minor changes to extend your existing multiprocessing script to utilise multiple nodes. Here we show an example case on how to make this happening with NCI-data-analysis module.
The example scripts of Monte Carlo Pi estimation is taken from this blog post which describes the utilisation of ray.util.multiprocessing.Pool . We are interested in how to utilise ray.util.multiprocessing.Pool API and verify its performance benefits at Gadi.
Serial run
The serial Monto Carlo Pi estimation script is shown below
import math import random import time def sample(num_samples): num_inside = 0 for _ in range(num_samples): x, y = random.uniform(-1, 1), random.uniform(-1, 1) if math.hypot(x, y) <= 1: num_inside += 1 return num_inside def approximate_pi(num_samples): start = time.time() num_inside = sample(num_samples) print("pi ~= {}".format((4*num_inside)/num_samples)) print("Finished in: {:.2f}s".format(time.time()-start)) approximate_pi(200000000)
The above script utilises 1 CPU core and it takes about 96.31s elapsed time to complete
pi ~= 3.14156054 Finished in: 96.31s |
Parallel computing with Multiprocessing
We can parallelize the above serial script with multiprocessing.Pool to make it utilising the full compute resources of a single node ( run within a PBS job requesting 1 node):
import math import random import time def sample(num_samples): num_inside = 0 for _ in range(num_samples): x, y = random.uniform(-1, 1), random.uniform(-1, 1) if math.hypot(x, y) <= 1: num_inside += 1 return num_inside def approximate_pi_parallel(num_samples): from multiprocessing.pool import Pool pool = Pool() start = time.time() num_inside = 0 sample_batch_size = 100000 for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]): num_inside += result print("pi ~= {}".format((4*num_inside)/num_samples)) print("Finished in: {:.2f}s".format(time.time()-start)) approximate_pi_parallel(200000000)
It takes about 2.78s to complete to run in a single node (Gadi "normal" queue).
pi ~= 3.14166092 Finished in: 2.78s |
The speedup relative to the serial script is about 34.64 with the efficiency of 72% from a serial run to a single node parallel computing (48 cores).
Distributed computing with Ray
To extend the above multiprocessing script across multiple nodes, you need 2 steps of work.
Step 1: Set up a pre-defined Ray cluster
Please refer to here on how to set up a pre-defined Ray cluster with 2 nodes in ARE or Gadi.
Step 2: Create multiprocessing pool with ray.util.multiprocessing.Pool API
Simply replace "from multiprocessing.pool import Pool" in the above script with "from ray.util.multiprocessing.pool import Pool" as shown below.
Then you can connect the Pool to the pre-defined Ray cluster via the argument ray_address="auto".
import math import random import time def sample(num_samples): num_inside = 0 for _ in range(num_samples): x, y = random.uniform(-1, 1), random.uniform(-1, 1) if math.hypot(x, y) <= 1: num_inside += 1 return num_inside def approximate_pi_distributed(num_samples): from ray.util.multiprocessing.pool import Pool # NOTE: Only the import statement is changed. pool = Pool(ray_address="auto") start = time.time() num_inside = 0 sample_batch_size = 100000 for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]): num_inside += result print("pi ~= {}".format((4*num_inside)/num_samples)) print("Finished in: {:.2f}s".format(time.time()-start)) approximate_pi_distributed(200000000)
The above distributed computing script running at 2 nodes may take 1.62s to complete.
pi ~= 3.1414136 |
Its speedup vs. 1 node calculation is about 1.72 and the parallel efficacy is about 85% from 1 node to 2 nodes.
Conclusion
The Ray ray.util.multiprocessing.Pool API can be used to efficiently extend user's existing multiprocessing script to utilise multiple node resources.
Please Note
- As the compute loads in this example is relatively small, you may get a bit variant scalability measurements.
- The measured scalability may vary from task to task when you test other scripts.