Page tree

Python provides the built-in package called multiprocessing which supports swapping processes. It can be used to accelerate the problem solving in a parallel way. However, it is limited to the parallelism of a single node. It may take extra efforts to rewrite the existing python multiprocessing script with other parallelisms like MPI or Dask to utilising computer resources across multiple nodes. 

Ray provides a solution to address this issue via the ray.util.multiprocessing.Pool API. You only need minor changes to extend your existing multiprocessing script to utilise multiple nodes. Here we show an example case on how to make this happening with NCI-data-analysis module. 

The example scripts of Monte Carlo Pi estimation is taken from this blog post which describes the utilisation of ray.util.multiprocessing.Pool . We are interested in how to utilise ray.util.multiprocessing.Pool API and verify its performance benefits at Gadi.

Serial run

The serial Monto Carlo Pi estimation script is shown below

Serial Monte Carlo Pi estimation
import math
import random
import time

def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi(num_samples):
    start = time.time()
    num_inside = sample(num_samples)
    
    print("pi ~= {}".format((4*num_inside)/num_samples))
    print("Finished in: {:.2f}s".format(time.time()-start))
approximate_pi(200000000)

The above script utilises 1 CPU core and it takes about 96.31s elapsed time to complete

pi ~= 3.14156054
Finished in: 96.31s

Parallel computing with Multiprocessing

We can parallelize the above serial script with multiprocessing.Pool to make it utilising the full compute resources of a single node ( run within a PBS job requesting 1 node): 

Parallel on single node using multiprocessing.Pool
import math
import random
import time

def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi_parallel(num_samples):
    from multiprocessing.pool import Pool
    pool = Pool()
    
    start = time.time()
    num_inside = 0
    sample_batch_size = 100000
    for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
        num_inside += result
        
    print("pi ~= {}".format((4*num_inside)/num_samples))
    print("Finished in: {:.2f}s".format(time.time()-start))
approximate_pi_parallel(200000000)

It takes about 2.78s to complete to run in a single node (Gadi "normal" queue). 

pi ~= 3.14166092
Finished in: 2.78s

The speedup relative to the serial script is about 34.64 with the efficiency of 72% from a serial run to a single node parallel computing (48 cores).

Distributed computing with Ray

To extend the above multiprocessing script across multiple nodes, you need 2 steps of work.

Step 1: Set up a pre-defined Ray cluster 

Please refer to here on how to set up a pre-defined Ray cluster with 2 nodes in ARE or Gadi. 

Step 2: Create multiprocessing pool with ray.util.multiprocessing.Pool API

Simply replace "from multiprocessing.pool import Pool" in the above script with "from ray.util.multiprocessing.pool import Pool" as shown below.

Then you can connect the Pool to the pre-defined Ray cluster via the argument ray_address="auto".

Distributed across multiple nodes with ray.util.multiprocessing.pool
import math
import random
import time

def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi_distributed(num_samples):
    from ray.util.multiprocessing.pool import Pool # NOTE: Only the import statement is changed.
    pool = Pool(ray_address="auto")
        
    start = time.time()
    num_inside = 0
    sample_batch_size = 100000
    for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
        num_inside += result
        
    print("pi ~= {}".format((4*num_inside)/num_samples))
    print("Finished in: {:.2f}s".format(time.time()-start))


approximate_pi_distributed(200000000)

The above distributed computing script running at 2 nodes may take 1.62s to complete.

pi ~= 3.1414136 
Finished in: 1.62s

Its speedup vs. 1 node calculation is about 1.72 and the parallel efficacy is about 85% from 1 node to 2 nodes.

Conclusion

The Ray ray.util.multiprocessing.Pool API can be used to efficiently extend user's existing multiprocessing script to utilise multiple node resources.


Please Note

  • As the compute loads in this example is relatively small, you may get a bit variant scalability measurements.
  • The measured scalability may vary from task to task when you test other scripts.
  • No labels