Dask SLURMClusters

The MLeRP notebook environment uses Dask SLURMClusters to create a middle ground that has the interactivity of a notebook backed by the power of HPC. This notebook shows how you can use the lion service to use a CPU based notebook session for your basic analysis and code development. Then, when you’re ready to run tests you will use Dask to submit your python functions to the SLURM queue.

This enables:

from dask_jobqueue import SLURMCluster
from distributed import Client, LocalCluster
import dask

# Point Dask to the SLURM to use as it's back end
cluster = SLURMCluster(
    memory="64g", processes=1, cores=8
)

# Scale out to 4 nodes
num_nodes = 4
cluster.scale(num_nodes)
client = Client(cluster)

Dask will now spin our jobs up in anticipation for work to the scale that you specify.

You can check in on your jobs like you would with any other SLURM job with squeue.

!squeue
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
               933   BigCats dask-wor mhar0048 PD       0:00      1 (None)
               932   BigCats dask-wor mhar0048 PD       0:00      1 (None)
               931   BigCats dask-wor mhar0048 PD       0:00      1 (None)
               930   BigCats dask-wor mhar0048 PD       0:00      1 (None)
               919   BigCats Jupyter  ramachap  R      51:44      1 mlerp-monash-node00
               920   BigCats Jupyter  ramachap  R      47:22      1 mlerp-monash-node00
               916   BigCats Jupyter    charla  R    5:02:19      1 mlerp-monash-node00

You can modify the output of squeue to work for you by passing in flags.

!squeue --me --format "%.8P %.15j %.8T %.10M %.12L %.4C %.7m %R %q"
PARTITIO            NAME    STATE       TIME    TIME_LEFT CPUS MIN_MEM NODELIST(REASON) QOS
 BigCats     dask-worker  RUNNING       0:00        30:00    8     60G mlerp-monash-node00 cheetah
 BigCats     dask-worker  RUNNING       0:00        30:00    8     60G mlerp-monash-node00 cheetah
 BigCats     dask-worker  RUNNING       0:00        30:00    8     60G mlerp-monash-node00 cheetah
 BigCats     dask-worker  RUNNING       0:00        30:00    8     60G mlerp-monash-node00 cheetah

Alternatively, we can use the adapt method, which will let us scale out as we need the compute… and scale back when we’re idle letting others use the cluster.

We reccommend that you use the adapt method while you’re actively developing your code so that you don’t need to worry about cleaning up after yourself. The scale method can be used when you’re ready to run longer tests with higher utilisation.

cluster.adapt(minimum=0, maximum=num_nodes)
<distributed.deploy.adaptive.Adaptive at 0x7fcf13681270>
# You may need to run this cell a few times while waiting for Dask to clean up
!squeue
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
               919   BigCats Jupyter  ramachap  R      51:44      1 mlerp-monash-node00
               920   BigCats Jupyter  ramachap  R      47:22      1 mlerp-monash-node00
               930   BigCats dask-wor mhar0048  R       0:00      1 mlerp-monash-node00
               931   BigCats dask-wor mhar0048  R       0:00      1 mlerp-monash-node00
               932   BigCats dask-wor mhar0048  R       0:00      1 mlerp-monash-node00
               933   BigCats dask-wor mhar0048  R       0:00      1 mlerp-monash-node00
               916   BigCats Jupyter    charla  R    5:02:19      1 mlerp-monash-node00

Dask has a UI that will let you see how the tasks are being computed. You won’t be able to connect to this with your web browser but VSCode and Jupyter have extensions for you to connect to it.

Use the loopback address: http://127.0.0.1:8787 (Adjust the port to the one listed when you make the client if needed)

Now let’s define a dask array and perform some computation. Dask arrays are parallelised across your workers nodes so they can be greater than the size of one worker’s memory. Dask evaluates lazily, retuning ‘futures’ which record the tasks needed to be completed in the compute graph. They can be computed later for its value.

Dask also has parallelised implementations of dataframes and collections of objects (called bags). These are written to be as similar as possible to familiar libraries like numpy, pandas and pyspark. You can read more about arrays, dataframes and bags with Dask’s documentation.

import dask.array as da
x = da.random.random((1000, 1000, 1000))
x  # Note how the value of the array hasn't been computed yet
Array Chunk
Bytes 7.45 GiB 126.51 MiB
Shape (1000, 1000, 1000) (255, 255, 255)
Dask graph 64 chunks in 1 graph layer
Data type float64 numpy.ndarray

You can check squeue while this is running to see the jobs dynamically spinning up to perform the computation.

x[0][0][:10].compute()
array([0.66750137, 0.25089681, 0.7443936 , 0.7159385 , 0.09395558,
       0.86980697, 0.11161041, 0.28340384, 0.19653293, 0.69794351])

Finally, we can shut down the SLURMCluster now that we’re done with it.

# Shut down the cluster
client.shutdown()
2023-11-23 05:05:33,925 - distributed.deploy.adaptive_core - INFO - Adaptive stop