Standing Up Ad-hoc Compute Clusters with Dask

Creating ad-hoc compute clusters with dask
Python
Published

January 2, 2025

Dask is an open-source parallel computing library for Python that enables the processing of large datasets and complex computations across multiple cores and distributed systems. It provides a flexible and dynamic task scheduling system that allows users to build complex workflows and handle large-scale data processing.

Dask is a compelling alternative to PySpark for distributed computing, particularly for those who work primarily within the Python ecosystem. Dask doesn’t require a Java runtime, and can be imported like any other third-party package. It is much lighter weight because it doesn’t require the heavy infrastructure needed to support Spark computing environments. PySpark typically needs a full Spark cluster setup with distributed resource management and coordination (e.g., YARN or Kubernetes), which involves significant overhead. In contrast, Dask operates within a Python environment and can scale from a single machine to a cluster as needed, without requiring the setup of a full distributed system.

In particular, I was interested in setting up a Dask cluster to leverage my own computing resources rather than rely on cloud resources or deployment software such as colied, which I would almost certainly opt for if creating a cloud-based cluster in a real-world setting.

There were a number of laptops at my disposal in various states of underutilization and disrepair, and figured setting up a compute cluster would be a great way to gain familiarity with Dask, as well as give new life to these otherwise unused machines. In particular, my cluster consists of:

  1. Lenovo Slim 7 Pro X (Ubuntu 22.04, 20GB RAM, 1TB SSD, 16 CPU): client/scheduler
  2. Sony VAIO (Ubuntu 22.04, 16GB RAM, 512GB SSD, 4 CPU): worker
  3. Raspberry Pi 5 (Debian 12.7, 4GB RAM, 128GB SSD, 4 CPU): worker

It is straightforward to extend the cluster to additional workers. The only requirements are 1) the machine be accessible from the client via ssh, and 2) the availability of a Python environment consistent with the other worker nodes in terms of install location and package versions.

I attempted including an additional worker using a Windows client running the same dask conda environment via WSL, but couldn’t get it to work. I’m sure it’s possible, I just didn’t spend the time to configue it properly. I’ll pick this up again down the road and will publish an update.

In order to ensure a consistent environment across workers, I installed Miniforge to the same location on client and worker nodes, and used an environment file save to a GitHub Gist which was referenced at the time of environment creation:

(base) $ conda env create --file=https://gist.githubusercontent.com/jtrive84/f0d23fcf22bb590caefca1e10243aba2/raw/53328d955a4f543581448c6b6033da94058eb245/dask.yaml

The dask environment contains:

name: dask
channels:
- conda-forge
- defaults
dependencies:
- python=3.12
- dask=2024.12.0
- dask-labextension=7.0.0
- numpy=2.2.1
- asyncssh=2.19.0
- pyarrow=18.1.0
- matplotlib=3.9.4
- ipykernel
- watermark
- jupyterlab

The dask-labextension is a JupyterLab extension that provides a user-friendly interface for monitoring Dask clusters from JupyterLab. It provides real-time visualizations of cluster performance and displays worker status, memory usage, and task progress, and offers a high degree of configurability. I highly recommend utilizing the Dask lab extension if managing Dask clusters from JupyterLab.

DaskLab extension


All nodes run within the dask conda environment located at:

~/miniforge3/envs/dask/bin/python

To demonstrate my cluster, I’ll use the Flight Prices dataset available on Kaggle. This is a 31GB dataset containing one-way flight itineraries found on Expedia between 2022-04-16 and 2022-10-05 for airport codes ATL, DFW, DEN, ORD, LAX, CLT, MIA, JFK, EWR, SFO, DTW, BOS, PHL, LGA, IAD and OAK. The file is larger than the client’s available memory, so it couldn’t be analyzed using Pandas which requires the entire dataset to be loaded into RAM. Dask can spill intermediate results to disk for datasets too large to fit into memory, giving us the ability to work with much larger datasets than would otherwise be possible.

To get an idea of what the data looks like, we can read the first 100 records using Pandas:


import numpy as np
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
np.set_printoptions(suppress=True, precision=5)
pd.options.mode.chained_assignment = None

# Obtained from from https://www.kaggle.com/datasets/dilwong/flightprices
data_path = "~/shared/data/airlines/itineraries.csv"

df = pd.read_csv(data_path, nrows=100)

df.head(7)
legId searchDate flightDate startingAirport destinationAirport fareBasisCode travelDuration elapsedDays isBasicEconomy isRefundable isNonStop baseFare totalFare seatsRemaining totalTravelDistance segmentsDepartureTimeEpochSeconds segmentsDepartureTimeRaw segmentsArrivalTimeEpochSeconds segmentsArrivalTimeRaw segmentsArrivalAirportCode segmentsDepartureAirportCode segmentsAirlineName segmentsAirlineCode segmentsEquipmentDescription segmentsDurationInSeconds segmentsDistance segmentsCabinCode
0 9ca0e81111c683bec1012473feefd28f 2022-04-16 2022-04-17 ATL BOS LA0NX0MC PT2H29M 0 False False True 217.67 248.6 9 947.0 1650214620 2022-04-17T12:57:00.000-04:00 1650223560 2022-04-17T15:26:00.000-04:00 BOS ATL Delta DL Airbus A321 8940 947 coach
1 98685953630e772a098941b71906592b 2022-04-16 2022-04-17 ATL BOS LA0NX0MC PT2H30M 0 False False True 217.67 248.6 4 947.0 1650191400 2022-04-17T06:30:00.000-04:00 1650200400 2022-04-17T09:00:00.000-04:00 BOS ATL Delta DL Airbus A321 9000 947 coach
2 98d90cbc32bfbb05c2fc32897c7c1087 2022-04-16 2022-04-17 ATL BOS LA0NX0MC PT2H30M 0 False False True 217.67 248.6 9 947.0 1650209700 2022-04-17T11:35:00.000-04:00 1650218700 2022-04-17T14:05:00.000-04:00 BOS ATL Delta DL Boeing 757-200 9000 947 coach
3 969a269d38eae583f455486fa90877b4 2022-04-16 2022-04-17 ATL BOS LA0NX0MC PT2H32M 0 False False True 217.67 248.6 8 947.0 1650218340 2022-04-17T13:59:00.000-04:00 1650227460 2022-04-17T16:31:00.000-04:00 BOS ATL Delta DL Airbus A321 9120 947 coach
4 980370cf27c89b40d2833a1d5afc9751 2022-04-16 2022-04-17 ATL BOS LA0NX0MC PT2H34M 0 False False True 217.67 248.6 9 947.0 1650203940 2022-04-17T09:59:00.000-04:00 1650213180 2022-04-17T12:33:00.000-04:00 BOS ATL Delta DL Airbus A321 9240 947 coach
5 79eda9f841e226a1e2121d74211e595c 2022-04-16 2022-04-17 ATL BOS VH0AUEL1 PT2H38M 0 False False True 217.67 248.6 7 947.0 1650206700 2022-04-17T10:45:00.000-04:00 1650216180 2022-04-17T13:23:00.000-04:00 BOS ATL JetBlue Airways B6 NaN 9480 947 coach
6 9335fae376c38bb61263281779f469ec 2022-04-16 2022-04-17 ATL BOS V0AJZNN1 PT4H12M 0 False False False 213.02 251.1 3 956.0 1650198000||1650205620 2022-04-17T08:20:00.000-04:00||2022-04-17T10:2... 1650203400||1650213120 2022-04-17T09:50:00.000-04:00||2022-04-17T12:3... CLT||BOS ATL||CLT American Airlines||American Airlines AA||AA Airbus A320||Airbus A320 5400||7500 228||728 coach||coach


Using LocalCluster

Before demonstrating the use of SSHCluster, we’ll look at LocalCLuster, which allows us to create a local multi-core cluster for parallel computing on a single machine. It helps to distribute tasks across multiple CPUs or threads on your local machine, giving us the ability to scale computations beyond a single core:


# Creating LocalCluster instance.
from dask.distributed import LocalCluster

cluster = LocalCluster(n_workers=2, threads_per_worker=4) 
client = cluster.get_client()

client

Client

Client-a5ec0bd6-c9fe-11ef-8e58-3003c82ce6ed

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

2025-01-03 12:16:44,335 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 initialized by task ('shuffle-transfer-8b04cd7a3e831297147d5c00d79f8bb9', 295) executed on worker tcp://127.0.0.1:33431
2025-01-03 12:18:07,698 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 deactivated due to stimulus 'task-finished-1735928287.6946151'
2025-01-03 12:19:33,219 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle b7f5ce97ecf82f18ee01140b59d55007 initialized by task ('shuffle-transfer-b7f5ce97ecf82f18ee01140b59d55007', 99) executed on worker tcp://127.0.0.1:33431
2025-01-03 12:20:26,159 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle b7f5ce97ecf82f18ee01140b59d55007 deactivated due to stimulus 'task-finished-1735928426.1586232'
2025-01-03 12:20:35,652 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 initialized by task ('shuffle-transfer-8b04cd7a3e831297147d5c00d79f8bb9', 405) executed on worker tcp://127.0.0.1:36301
2025-01-03 12:21:36,894 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 8b04cd7a3e831297147d5c00d79f8bb9 deactivated due to stimulus 'task-finished-1735928496.8918214'

Then we can read in the itineraries dataset and perform an aggregation. First we get the number of records by starting airport:


import dask
import dask.dataframe as ddf

data_path = "~/shared/data/airlines/itineraries.csv"

df = ddf.read_csv(
    data_path, 
    usecols=["startingAirport", "destinationAirport", "baseFare", "totalFare"],
    assume_missing=True
)

dfstart = df.groupby("startingAirport").startingAirport.count()

dfstart.compute()
startingAirport
CLT    5494510
MIA    4930213
OAK    3809884
DFW    5674959
ATL    5312028
IAD    3464378
JFK    4425164
DTW    4547052
DEN    4697143
LGA    5919323
BOS    5883876
ORD    5503476
PHL    4726187
EWR    3970797
LAX    8073281
SFO    5706482
Name: startingAirport, dtype: int64


Next we compute the average total fare by starting airport:


dffare = df.groupby("startingAirport").totalFare.mean()
dffare.compute()
startingAirport
CLT    321.456261
MIA    299.955595
OAK    534.211396
DFW    294.077856
ATL    303.774077
IAD    370.046151
JFK    375.406488
DTW    330.940539
DEN    335.077884
LGA    299.220774
BOS    285.865775
ORD    281.691875
PHL    344.088743
EWR    302.986457
LAX    379.254937
SFO    434.504077
Name: totalFare, dtype: float64

client.shutdown()


Using SSHCluster

SSHCluster is used to create our distributed cluster on the local network. It accepts a list of hostnames, the first of which will be used for the scheduler and the rest as workers. We can repeat the name of the first host to have it also serve as a worker.

The full list of worker options can be found here. Note that in when using LocalCluster, n_workers specifies the total number of workers. When using SSHCluster, n_workers is the number of workers per host. In the next cell, we’ve create a cluster with two nodes, 1 worker each, with 4 threads per worker:


from dask.distributed import Client, SSHCluster

# Path to dask environment executable for all nodes. 
remote_python = "~/miniforge3/envs/dask/bin/python"

hosts = ["localhost", "192.168.86.138", "192.168.86.147"]

worker_opts = {"n_workers": 1, "nthreads": 4} #"memory_limit": "8GB"

connect_opts = {
    "known_hosts": None, 
    "username": "jtriz", 
    "password": "xxxxx",
}

cluster = SSHCluster(
    hosts=hosts,
    worker_options=worker_opts,
    connect_options=connect_opts,
    remote_python=remote_python
)

client = Client(cluster)

client
2025-01-03 12:24:23,300 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:23,299 - distributed.scheduler - INFO - State start
2025-01-03 12:24:23,304 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:23,303 - distributed.scheduler - INFO -   Scheduler at: tcp://192.168.86.154:46219
2025-01-03 12:24:24,150 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,149 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.86.154:35955'
2025-01-03 12:24:24,161 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,160 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.86.154:45391'
2025-01-03 12:24:24,538 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,536 - distributed.worker - INFO -       Start worker at: tcp://192.168.86.154:40165
2025-01-03 12:24:24,553 - distributed.deploy.ssh - INFO - 2025-01-03 12:24:24,551 - distributed.worker - INFO -       Start worker at: tcp://192.168.86.154:39183

Client

Client-f59132cc-c9ff-11ef-8e58-3003c82ce6ed

Connection method: Cluster object Cluster type: distributed.SpecCluster
Dashboard: http://192.168.86.154:8787/status

Cluster Info


Getting a count of the number of records by starting airport:


data_path = "~/shared/data/airlines/itineraries.csv"

df = ddf.read_csv(
    data_path, 
    usecols=["startingAirport", "destinationAirport", "baseFare", "totalFare"],
    assume_missing=True
)

dfstart = df.groupby("startingAirport").startingAirport.count()

dfstart.compute()
startingAirport
CLT    5494510
MIA    4930213
OAK    3809884
DFW    5674959
ATL    5312028
IAD    3464378
JFK    4425164
DTW    4547052
DEN    4697143
LGA    5919323
BOS    5883876
ORD    5503476
PHL    4726187
EWR    3970797
LAX    8073281
SFO    5706482
Name: startingAirport, dtype: int64


As before, computing the average total fare by starting airport:


dffare = df.groupby("startingAirport").totalFare.mean()
dffare.compute()
startingAirport
CLT    321.456261
MIA    299.955595
OAK    534.211396
DFW    294.077856
ATL    303.774077
IAD    370.046151
JFK    375.406488
DTW    330.940539
DEN    335.077884
LGA    299.220774
BOS    285.865775
ORD    281.691875
PHL    344.088743
EWR    302.986457
LAX    379.254937
SFO    434.504077
Name: totalFare, dtype: float64

client.shutdown()


The Dask Lab extention didn’t seem to work when using SSHCluster. This probably has more to do with my setup than any issue with the extention itself, but even after tinkering with it for a while, I couldn’t get it to display usage metrics.

While I was able to get SSHCluster setup and running on my local network, I’ll probably opt for LocalCLuster for my distributed computing needs, first because my data processing needs aren’t that significant and this was intended as a proof of concept, and second not having access to the dash lab extention when using SSHCluter is limiting. In addition, at times I noticed a mismatch between the actual number of workers and number of workers displayed by the SSHCluster client, so it was difficult to determine if the cluster was properly configured without diagnostic tooling.

I plan on researching how to go about setting up an additional worker on WSL within an SSHCluster, but that will be a future post.