Dask is an open-source parallel computing library for Python that enables the processing of large datasets and complex computations across multiple cores and distributed systems. It provides a flexible and dynamic task scheduling system that allows users to build complex workflows and handle large-scale data processing.
Dask is a compelling alternative to PySpark for distributed computing, particularly for those who work primarily within the Python ecosystem. Dask doesn’t require a Java runtime, and can be imported like any other third-party package. It is much lighter weight because it doesn’t require the heavy infrastructure needed to support Spark computing environments. PySpark typically needs a full Spark cluster setup with distributed resource management and coordination (e.g., YARN or Kubernetes), which involves significant overhead. In contrast, Dask operates within a Python environment and can scale from a single machine to a cluster as needed, without requiring the setup of a full distributed system.
In particular, I was interested in setting up a Dask cluster to leverage my own computing resources rather than rely on cloud resources or deployment software such as colied, which I would almost certainly opt for if creating a cloud-based cluster in a real-world setting.
There were a number of laptops at my disposal in various states of underutilization and disrepair, and figured setting up a compute cluster would be a great way to gain familiarity with Dask, as well as give new life to these otherwise unused machines. In particular, my cluster consists of:
Lenovo Slim 7 Pro X (Ubuntu 22.04, 20GB RAM, 1TB SSD, 16 CPU): client/scheduler
It is straightforward to extend the cluster to additional workers. The only requirements are 1) the machine be accessible from the client via ssh, and 2) the availability of a Python environment consistent with the other worker nodes in terms of install location and package versions.
I attempted including an additional worker using a Windows client running the same dask conda environment via WSL, but couldn’t get it to work. I’m sure it’s possible, I just didn’t spend the time to configue it properly. I’ll pick this up again down the road and will publish an update.
In order to ensure a consistent environment across workers, I installed Miniforge to the same location on client and worker nodes, and used an environment file save to a GitHub Gist which was referenced at the time of environment creation:
The dask-labextension is a JupyterLab extension that provides a user-friendly interface for monitoring Dask clusters from JupyterLab. It provides real-time visualizations of cluster performance and displays worker status, memory usage, and task progress, and offers a high degree of configurability. I highly recommend utilizing the Dask lab extension if managing Dask clusters from JupyterLab.
DaskLab extension
All nodes run within the dask conda environment located at:
~/miniforge3/envs/dask/bin/python
To demonstrate my cluster, I’ll use the Flight Prices dataset available on Kaggle. This is a 31GB dataset containing one-way flight itineraries found on Expedia between 2022-04-16 and 2022-10-05 for airport codes ATL, DFW, DEN, ORD, LAX, CLT, MIA, JFK, EWR, SFO, DTW, BOS, PHL, LGA, IAD and OAK. The file is larger than the client’s available memory, so it couldn’t be analyzed using Pandas which requires the entire dataset to be loaded into RAM. Dask can spill intermediate results to disk for datasets too large to fit into memory, giving us the ability to work with much larger datasets than would otherwise be possible.
To get an idea of what the data looks like, we can read the first 100 records using Pandas:
import numpy as npimport pandas as pdpd.set_option('display.max_columns', None)pd.set_option('display.width', None)np.set_printoptions(suppress=True, precision=5)pd.options.mode.chained_assignment =None# Obtained from from https://www.kaggle.com/datasets/dilwong/flightpricesdata_path ="~/shared/data/airlines/itineraries.csv"df = pd.read_csv(data_path, nrows=100)df.head(7)
legId
searchDate
flightDate
startingAirport
destinationAirport
fareBasisCode
travelDuration
elapsedDays
isBasicEconomy
isRefundable
isNonStop
baseFare
totalFare
seatsRemaining
totalTravelDistance
segmentsDepartureTimeEpochSeconds
segmentsDepartureTimeRaw
segmentsArrivalTimeEpochSeconds
segmentsArrivalTimeRaw
segmentsArrivalAirportCode
segmentsDepartureAirportCode
segmentsAirlineName
segmentsAirlineCode
segmentsEquipmentDescription
segmentsDurationInSeconds
segmentsDistance
segmentsCabinCode
0
9ca0e81111c683bec1012473feefd28f
2022-04-16
2022-04-17
ATL
BOS
LA0NX0MC
PT2H29M
0
False
False
True
217.67
248.6
9
947.0
1650214620
2022-04-17T12:57:00.000-04:00
1650223560
2022-04-17T15:26:00.000-04:00
BOS
ATL
Delta
DL
Airbus A321
8940
947
coach
1
98685953630e772a098941b71906592b
2022-04-16
2022-04-17
ATL
BOS
LA0NX0MC
PT2H30M
0
False
False
True
217.67
248.6
4
947.0
1650191400
2022-04-17T06:30:00.000-04:00
1650200400
2022-04-17T09:00:00.000-04:00
BOS
ATL
Delta
DL
Airbus A321
9000
947
coach
2
98d90cbc32bfbb05c2fc32897c7c1087
2022-04-16
2022-04-17
ATL
BOS
LA0NX0MC
PT2H30M
0
False
False
True
217.67
248.6
9
947.0
1650209700
2022-04-17T11:35:00.000-04:00
1650218700
2022-04-17T14:05:00.000-04:00
BOS
ATL
Delta
DL
Boeing 757-200
9000
947
coach
3
969a269d38eae583f455486fa90877b4
2022-04-16
2022-04-17
ATL
BOS
LA0NX0MC
PT2H32M
0
False
False
True
217.67
248.6
8
947.0
1650218340
2022-04-17T13:59:00.000-04:00
1650227460
2022-04-17T16:31:00.000-04:00
BOS
ATL
Delta
DL
Airbus A321
9120
947
coach
4
980370cf27c89b40d2833a1d5afc9751
2022-04-16
2022-04-17
ATL
BOS
LA0NX0MC
PT2H34M
0
False
False
True
217.67
248.6
9
947.0
1650203940
2022-04-17T09:59:00.000-04:00
1650213180
2022-04-17T12:33:00.000-04:00
BOS
ATL
Delta
DL
Airbus A321
9240
947
coach
5
79eda9f841e226a1e2121d74211e595c
2022-04-16
2022-04-17
ATL
BOS
VH0AUEL1
PT2H38M
0
False
False
True
217.67
248.6
7
947.0
1650206700
2022-04-17T10:45:00.000-04:00
1650216180
2022-04-17T13:23:00.000-04:00
BOS
ATL
JetBlue Airways
B6
NaN
9480
947
coach
6
9335fae376c38bb61263281779f469ec
2022-04-16
2022-04-17
ATL
BOS
V0AJZNN1
PT4H12M
0
False
False
False
213.02
251.1
3
956.0
1650198000||1650205620
2022-04-17T08:20:00.000-04:00||2022-04-17T10:2...
1650203400||1650213120
2022-04-17T09:50:00.000-04:00||2022-04-17T12:3...
CLT||BOS
ATL||CLT
American Airlines||American Airlines
AA||AA
Airbus A320||Airbus A320
5400||7500
228||728
coach||coach
Using LocalCluster
Before demonstrating the use of SSHCluster, we’ll look at LocalCLuster, which allows us to create a local multi-core cluster for parallel computing on a single machine. It helps to distribute tasks across multiple CPUs or threads on your local machine, giving us the ability to scale computations beyond a single core:
startingAirport
CLT 321.456261
MIA 299.955595
OAK 534.211396
DFW 294.077856
ATL 303.774077
IAD 370.046151
JFK 375.406488
DTW 330.940539
DEN 335.077884
LGA 299.220774
BOS 285.865775
ORD 281.691875
PHL 344.088743
EWR 302.986457
LAX 379.254937
SFO 434.504077
Name: totalFare, dtype: float64
client.shutdown()
Using SSHCluster
SSHCluster is used to create our distributed cluster on the local network. It accepts a list of hostnames, the first of which will be used for the scheduler and the rest as workers. We can repeat the name of the first host to have it also serve as a worker.
The full list of worker options can be found here. Note that in when using LocalCluster, n_workers specifies the total number of workers. When using SSHCluster, n_workers is the number of workers per host. In the next cell, we’ve create a cluster with two nodes, 1 worker each, with 4 threads per worker:
startingAirport
CLT 321.456261
MIA 299.955595
OAK 534.211396
DFW 294.077856
ATL 303.774077
IAD 370.046151
JFK 375.406488
DTW 330.940539
DEN 335.077884
LGA 299.220774
BOS 285.865775
ORD 281.691875
PHL 344.088743
EWR 302.986457
LAX 379.254937
SFO 434.504077
Name: totalFare, dtype: float64
client.shutdown()
The Dask Lab extention didn’t seem to work when using SSHCluster. This probably has more to do with my setup than any issue with the extention itself, but even after tinkering with it for a while, I couldn’t get it to display usage metrics.
While I was able to get SSHCluster setup and running on my local network, I’ll probably opt for LocalCLuster for my distributed computing needs, first because my data processing needs aren’t that significant and this was intended as a proof of concept, and second not having access to the dash lab extention when using SSHCluter is limiting. In addition, at times I noticed a mismatch between the actual number of workers and number of workers displayed by the SSHCluster client, so it was difficult to determine if the cluster was properly configured without diagnostic tooling.
I plan on researching how to go about setting up an additional worker on WSL within an SSHCluster, but that will be a future post.