from dask_jobqueue import SLURMCluster
from distributed import Client, LocalCluster
import dask
# Point Dask to the SLURM to use as it's back end
= SLURMCluster(
cluster ="64g", processes=1, cores=8
memory
)
# Scale out to 4 nodes
= 4
num_nodes
cluster.scale(num_nodes)= Client(cluster) client
Dask SLURMClusters
The MLeRP notebook environment uses Dask SLURMClusters to create a middle ground that has the interactivity of a notebook backed by the power of a GPU Cluster. This notebook shows how you can use the lion service to use a CPU based notebook session for your basic analysis and code development. Then, when you’re ready to run tests you will use Dask to submit your python functions to the SLURM queue.
This enables:
- Flexibility to experiment with your dataset interactively
- Ability to change compute requirements such as RAM, size of GPU, number of processes and so on… without ever leaving the notebook environment
- Elastic scaling of compute
- Efficient utilisation of the hardware
- Releasing of resources when not in use
Dask will now spin our jobs up in anticipation for work to the scale that you specify.
You can check in on your jobs like you would with any other SLURM job with squeue
.
!squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
933 BigCats dask-wor mhar0048 PD 0:00 1 (None)
932 BigCats dask-wor mhar0048 PD 0:00 1 (None)
931 BigCats dask-wor mhar0048 PD 0:00 1 (None)
930 BigCats dask-wor mhar0048 PD 0:00 1 (None)
919 BigCats Jupyter ramachap R 51:44 1 mlerp-monash-node00
920 BigCats Jupyter ramachap R 47:22 1 mlerp-monash-node00
916 BigCats Jupyter charla R 5:02:19 1 mlerp-monash-node00
You can modify the output of squeue
to work for you by passing in flags.
!squeue --me --format "%.8P %.15j %.8T %.10M %.12L %.4C %.7m %R %q"
PARTITIO NAME STATE TIME TIME_LEFT CPUS MIN_MEM NODELIST(REASON) QOS
BigCats dask-worker RUNNING 0:00 30:00 8 60G mlerp-monash-node00 cheetah
BigCats dask-worker RUNNING 0:00 30:00 8 60G mlerp-monash-node00 cheetah
BigCats dask-worker RUNNING 0:00 30:00 8 60G mlerp-monash-node00 cheetah
BigCats dask-worker RUNNING 0:00 30:00 8 60G mlerp-monash-node00 cheetah
Alternatively, we can use the adapt method, which will let us scale out as we need the compute… and scale back when we’re idle letting others use the cluster.
We reccommend that you use the adapt method while you’re actively developing your code so that you don’t need to worry about cleaning up after yourself. The scale method can be used when you’re ready to run longer tests with higher utilisation.
=0, maximum=num_nodes) cluster.adapt(minimum
<distributed.deploy.adaptive.Adaptive at 0x7fcf13681270>
# You may need to run this cell a few times while waiting for Dask to clean up
!squeue
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
919 BigCats Jupyter ramachap R 51:44 1 mlerp-monash-node00
920 BigCats Jupyter ramachap R 47:22 1 mlerp-monash-node00
930 BigCats dask-wor mhar0048 R 0:00 1 mlerp-monash-node00
931 BigCats dask-wor mhar0048 R 0:00 1 mlerp-monash-node00
932 BigCats dask-wor mhar0048 R 0:00 1 mlerp-monash-node00
933 BigCats dask-wor mhar0048 R 0:00 1 mlerp-monash-node00
916 BigCats Jupyter charla R 5:02:19 1 mlerp-monash-node00
Dask has a UI that will let you see how the tasks are being computed. You won’t be able to connect to this with your web browser but VSCode and Jupyter have extensions for you to connect to it.
Use the loopback address: http://127.0.0.1:8787 (Adjust the port to the one listed when you make the client if needed)
Now let’s define a dask array and perform some computation. Dask arrays are parallelised across your workers nodes so they can be greater than the size of one worker’s memory. Dask evaluates lazily, retuning ‘futures’ which record the tasks needed to be completed in the compute graph. They can be computed later for its value.
Dask also has parallelised implementations of dataframes and collections of objects (called bags). These are written to be as similar as possible to familiar libraries like numpy, pandas and pyspark. You can read more about arrays, dataframes and bags with Dask’s documentation.
import dask.array as da
= da.random.random((1000, 1000, 1000))
x # Note how the value of the array hasn't been computed yet x
|
You can check squeue while this is running to see the jobs dynamically spinning up to perform the computation.
0][0][:10].compute() x[
array([0.66750137, 0.25089681, 0.7443936 , 0.7159385 , 0.09395558,
0.86980697, 0.11161041, 0.28340384, 0.19653293, 0.69794351])
Finally, we can shut down the SLURMCluster now that we’re done with it.
# Shut down the cluster
client.shutdown()
2023-11-23 05:05:33,925 - distributed.deploy.adaptive_core - INFO - Adaptive stop