Distributed and parallel computing for larger-than-RAM datasets. Five components: DataFrames (parallel pandas), Arrays (parallel NumPy), Bags (unstructured data), Futures (task-based parallelism), Schedulers (threads/processes/distributed). Scales from laptops to HPC clusters. For in-memory speed on single machine use polars; for out-of-core analytics without cluster use vaex.
Dask is a Python library for parallel and distributed computing that scales familiar pandas/NumPy APIs to larger-than-memory datasets. It provides five main components (DataFrames, Arrays, Bags, Futures, Schedulers) and scales from single-machine multi-core to multi-node HPC clusters.
pip install dask[complete] # All components
pip install dask[dataframe] # DataFrames only
pip install dask[distributed] # Distributed scheduler + dashboard
pip install dask-jobqueue # HPC cluster integration (SLURM, PBS)
import dask.dataframe as dd
# Read multiple files as a single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
ddf = dd.read_parquet('data/', columns=['id', 'value', 'category'])
# Operations are lazy until .compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').agg({'value': ['mean', 'sum']}).compute()
print(result.shape) # (n_categories, 2)
# Custom operations via map_partitions (preferred over apply)
def normalize_partition(df):
df['norm_value'] = (df['value'] - df['value'].mean()) / df['value'].std()
return df
ddf = ddf.map_partitions(normalize_partition)
# Joins
ddf_merged = ddf.merge(lookup_ddf, on='category', how='left')
# Write results
ddf.to_parquet('output/', engine='pyarrow')
# Repartitioning for optimal chunk sizes
ddf = ddf.repartition(npartitions=20) # By count
ddf = ddf.repartition(partition_size='100MB') # By size
# Index management for sorted operations
ddf = ddf.set_index('timestamp', sorted=True)
# Debugging
print(f"Partitions: {ddf.npartitions}")
print(f"Dtypes: {ddf.dtypes}")
sample = ddf.get_partition(0).compute() # Inspect first partition
import dask.array as da
import numpy as np
# Create from various sources
x = da.random.random((100000, 1000), chunks=(10000, 1000))
x = da.from_array(np_array, chunks=(10000, 1000))
x = da.from_zarr('large_dataset.zarr')
# Standard operations (lazy)
y = (x - x.mean(axis=0)) / x.std(axis=0) # Normalize
z = da.dot(x.T, x) # Matrix multiply
u, s, v = da.linalg.svd(x) # SVD
# Compute and persist
result = y.mean(axis=0).compute()
print(result.shape) # (1000,)
# Custom operations with map_blocks
def custom_filter(block):
from scipy.ndimage import gaussian_filter
return gaussian_filter(block, sigma=2)
filtered = da.map_blocks(custom_filter, x, dtype=x.dtype)
# Rechunking for different access patterns
x_rechunked = x.rechunk({0: 5000, 1: 500})
# Save to disk
da.to_zarr(y, 'normalized.zarr')
import dask.bag as db
import json
# Read unstructured data
bag = db.read_text('logs/*.json').map(json.loads)
# Functional operations
valid = bag.filter(lambda x: x['status'] == 'success')
ids = valid.pluck('user_id')
flat = bag.map(lambda x: x['tags']).flatten()
# Aggregation — use foldby instead of groupby (much faster)
counts = bag.foldby(
key='category',
binop=lambda total, x: total + x['amount'],
initial=0,
combine=lambda a, b: a + b,
combine_initial=0
).compute()
# Convert to DataFrame for structured analysis
ddf = valid.to_dataframe(meta={'user_id': 'str', 'amount': 'float64', 'category': 'str'})
from dask.distributed import Client
client = Client() # Local cluster with all cores
print(client.dashboard_link) # http://localhost:8787
# Submit individual tasks (executes immediately, not lazy)
def process(x, param):
return x ** param
future = client.submit(process, 42, param=2)
print(future.result()) # 1764
# Map over many inputs
futures = client.map(process, range(100), param=2)
results = client.gather(futures)
print(len(results)) # 100
# Scatter large data to workers (avoids repeated transfers)
import numpy as np
big_data = np.random.random((10000, 1000))
data_future = client.scatter(big_data, broadcast=True)
# Submit tasks using scattered data
futures = [client.submit(process_chunk, data_future, i) for i in range(10)]
results = client.gather(futures)
# Progressive result processing
from dask.distributed import as_completed
for future in as_completed(futures):
result = future.result()
print(f"Completed: {result}")
# Coordination primitives
from dask.distributed import Lock, Queue, Event
lock = Lock('resource-lock')
with lock:
# Thread-safe operation across workers
pass
client.close()
import dask
# Global scheduler setting
dask.config.set(scheduler='threads') # Default: GIL-releasing numeric work
dask.config.set(scheduler='processes') # Pure Python, GIL-bound work
dask.config.set(scheduler='synchronous') # Debugging with pdb
# Context manager for temporary change
with dask.config.set(scheduler='synchronous'):
result = computation.compute() # Can use pdb here
# Per-compute override
result = ddf.mean().compute(scheduler='processes')
# Distributed scheduler with resource control
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')
print(client.dashboard_link)
# HPC cluster integration
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(
cores=24, memory='100GB',
walltime='02:00:00', queue='regular'
)
cluster.scale(jobs=10) # Request 10 SLURM jobs
client = Client(cluster)
# Adaptive scaling
cluster.adapt(minimum=2, maximum=20)
result = computation.compute()
client.close()
| Data Type | Component | When to Use |
|---|---|---|
| Tabular (CSV, Parquet) | DataFrames | Standard pandas-like operations at scale |
| Numeric arrays (HDF5, Zarr) | Arrays | NumPy operations, linear algebra, image processing |
| Text, JSON, logs | Bags | ETL/cleaning → convert to DataFrame for analysis |
| Custom parallel tasks | Futures | Dynamic workflows, parameter sweeps, task dependencies |
| Any of above | Schedulers | Control execution backend (threads/processes/distributed) |
Control level: DataFrames/Arrays/Bags = high-level lazy API. Futures = low-level immediate execution.
All DataFrames, Arrays, and Bags build a task graph — nothing executes until .compute() or .persist().
.compute() — execute and return result to local memory.persist() — execute and keep result on workers (for reuse across multiple computations)dask.compute(a, b, c) — compute multiple results in a single pass (shares intermediates)Target: ~100 MB per chunk (or 10 chunks per core in worker memory).
| Chunk Size | Effect |
|---|---|
| Too large (>1 GB) | Memory overflow, poor parallelization |
| Optimal (~100 MB) | Good parallelism, manageable memory |
| Too small (<1 MB) | Excessive scheduling overhead |
Example: 8 cores, 32 GB RAM → target ~400 MB per chunk (32 GB / 8 cores / 10).
| Scheduler | Overhead | Best For | GIL |
|---|---|---|---|
threads (default) | ~10 µs/task | NumPy, pandas, scikit-learn | Affected |
processes | ~10 ms/task | Pure Python, text processing | Not affected |
synchronous | ~1 µs/task | Debugging with pdb | N/A |
distributed | ~1 ms/task | Dashboard, clusters, advanced features | Configurable |
import dask.dataframe as dd
import dask
# Extract: Read all CSV files
ddf = dd.read_csv('raw_data/*.csv', dtype={'amount': 'float64'})
# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].fillna(0)
ddf = ddf.dropna(subset=['category'])
# Aggregate
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean', 'count']})
# Load: Save as Parquet (columnar, compressed)
summary.to_parquet('output/summary.parquet')
print(f"Processed {len(ddf)} rows across {ddf.npartitions} partitions")
import dask.array as da
# Load large scientific dataset
x = da.from_zarr('experiment_data.zarr') # e.g., (50000, 50000) float64
print(f"Shape: {x.shape}, Chunks: {x.chunks}")
# Normalize per-column
x_norm = (x - x.mean(axis=0)) / x.std(axis=0)
# Compute covariance matrix
cov = da.dot(x_norm.T, x_norm) / (x_norm.shape[0] - 1)
# SVD for dimensionality reduction (top-k)
u, s, v = da.linalg.svd_compressed(x_norm, k=50)
# Save results
da.to_zarr(u, 'pca_components.zarr')
print(f"Explained variance (top 5): {(s[:5]**2 / (s**2).sum()).compute()}")
This workflow is a simple combination of Bags (Section 3) → DataFrames (Section 1): read JSON logs with Bags, filter/transform, convert to DataFrame for groupby analysis. Each step maps directly to Core API examples above.
| Parameter | Module | Default | Description |
|---|---|---|---|
npartitions | DataFrame | auto | Number of partitions (controls parallelism) |
partition_size | DataFrame | — | Target size per partition (e.g., '100MB') |
chunks | Array | required | Chunk dimensions (e.g., (10000, 1000)) |
blocksize | Bag | '128 MiB' | File read block size |
scheduler | All | 'threads' | Execution backend ('threads', 'processes', 'synchronous') |
n_workers | Distributed | auto | Number of worker processes |
threads_per_worker | Distributed | auto | Threads per worker |
memory_limit | Distributed | auto | Per-worker memory limit (e.g., '4GB') |
sorted | set_index | False | Whether data is pre-sorted (enables optimizations) |
meta | map_partitions | — | Output DataFrame/Series structure template |
Let Dask handle data loading — Never load data into pandas/numpy first then convert. Use dd.read_csv() / da.from_zarr() directly.
Batch compute calls — Use dask.compute(a, b, c) instead of calling .compute() in loops. Allows sharing intermediates.
Use map_partitions over apply — ddf.apply(func, axis=1) creates one task per row. ddf.map_partitions(func) creates one task per partition.
Persist reused intermediates — Call .persist() on data accessed multiple times, then del when done.
Use the dashboard — client.dashboard_link shows task progress, memory usage, worker states. Essential for diagnosing performance issues.
Anti-pattern — Excessively large task graphs: If len(ddf.__dask_graph__()) returns millions, increase chunk sizes or use map_partitions/map_blocks to fuse operations.
Anti-pattern — Wrong scheduler for workload: Using threads for pure Python text processing (GIL-bound) or processes for NumPy operations (unnecessary serialization overhead).
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import train_test_split
import dask.array as da
X = da.random.random((100000, 50), chunks=(10000, 50))
y = da.random.randint(0, 2, size=100000, chunks=10000)
# Preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
print(f"Train: {X_train.shape}, Test: {X_test.shape}")
from dask.distributed import Client
client = Client()
class RunningStats:
def __init__(self):
self.count = 0
self.total = 0.0
def add(self, value):
self.count += 1
self.total += value
return self.total / self.count
# Create actor on a worker
stats = client.submit(RunningStats, actor=True).result()
# Call methods (~1ms roundtrip)
for v in [10, 20, 30]:
mean = stats.add(v).result()
print(f"Running mean: {mean}")
client.close()
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster()
cluster.adapt(minimum=2, maximum=50)
client = Client(cluster)
# Run computation on auto-scaling cluster
result = large_computation.compute()
client.close()
| Problem | Cause | Solution |
|---|---|---|
MemoryError during .compute() | Result too large for local memory | Use .to_parquet() or .persist() instead of .compute() |
| Slow computation start | Task graph has millions of tasks | Increase chunk sizes; use map_partitions/map_blocks |
| Poor parallelization | GIL contention with threads scheduler | Switch to scheduler='processes' for Python-heavy code |
TypeError in map_partitions | Missing or wrong meta parameter | Provide meta=pd.DataFrame({'col': pd.Series(dtype='float64')}) |
| Workers killed (OOM) | Chunks exceed worker memory | Decrease chunk size; increase memory_limit |
KilledWorker exception | Worker process crashed | Check worker logs; reduce memory per task; increase memory_limit |
| Slow joins/merges | Data not pre-sorted on join key | Call ddf.set_index('key', sorted=True) before join |
NotImplementedError | Operation not supported by Dask | Use map_partitions with pandas equivalent |
| Dashboard not accessible | Distributed client not started | Use client = Client() to enable distributed scheduler |
| Data type mismatch across partitions | Inconsistent CSV files | Specify dtype explicitly in dd.read_csv() |
references/collections_guide.md — Detailed DataFrames, Arrays, and Bags guide with comprehensive code examples for reading, transforming, aggregating, and writing data. Covers map_partitions patterns, meta parameter, chunking strategies, map_blocks, foldby, and collection conversion. Consolidated from original dataframes.md, arrays.md, and bags.md. Original best-practices.md content relocated to Best Practices section and Key Concepts inline.references/distributed_computing.md — Futures API, distributed coordination primitives (Locks, Queues, Events, Variables), Actors, scheduler configuration, HPC cluster setup (SLURM, Kubernetes), adaptive scaling, dashboard monitoring, and performance profiling. Consolidated from original futures.md and schedulers.md.Not migrated: Original had 6 reference files. best-practices.md content consolidated into Best Practices section and Key Concepts (chunk strategy, scheduler selection). Remaining content organized into 2 reference files covering the 5 main components.