Dask

Dask erfüllt zwei verschiedene Aufgaben:

  1. die dynamische Aufgabenplanung wird optimiert, ähnlich wie bei Airflow, Luigi oder Celery

  2. Arrays, Dataframes und Lists werden parallel mit dynamischem Task Scheduling ausgeführt.

Skalierung von Laptops bis hin zu Clustern

Dask kann mit uv auf einem Laptop installiert werden und erweitert die Größe der Datensätze von passt in den Arbeitsspeicher zu passt auf die Festplatte. Dask kann jedoch auch auf einen Cluster mit Hunderten von Rechnern skaliert werden. Dask ist robust, flexibel, Data Local und hat eine geringe Latenzzeit. Weitere Informationen findet ihr in der Dokumentation zum Distributed Scheduler. Dieser einfache Übergang zwischen einer einzelnen Maschine und einem Cluster ermöglicht einen einfachen Start und ein Wachstum nach Bedarf.

Dask installieren

Ihr könnt alles installieren, was für die meisten gängigen Anwendungen von Dask erforderlich ist (Arrays, Dataframes, …). Dies installiert sowohl Dask als auch Abhängigkeiten wie NumPy, Pandas, usw., die für verschiedene Arbeiten benötigt werden:

$ uv add "dask[complete]"

Es können aber auch nur einzelne Subsets installiert werden:

$ uv add "dask[array]"
$ uv add "dask[dataframe]"
$ uv add "dask[diagnostics]"
$ uv add "dask[distributed]"

Vertraute Bedienung

Dask DataFrame

… imitiert Pandas

[1]:
import pandas as pd


df = pd.read_csv("tutorials.csv")
grouped = df.groupby("Title")
grouped.agg("mean")
[1]:
Unnamed: 0 2021-12 2022-01 2022-02
Title
Jupyter Tutorial 0.5 18103.5 20505.5 13099.0
PyViz Tutorial 2.0 4873.0 3930.0 2573.0
Python Basics 4.5 261.0 251.0 341.0
[2]:
import dask.dataframe as dd


df = dd.read_csv("tutorials.csv")
grouped = df.groupby("Title")
grouped.agg("mean").head()
[2]:
Unnamed: 0 2021-12 2022-01 2022-02
Title
Jupyter Tutorial 0.5 18103.5 20505.5 13099.0
PyViz Tutorial 2.0 4873.0 3930.0 2573.0
Python Basics 4.5 261.0 251.0 341.0

Dask Array

… imitiert NumPy

[3]:
import h5py
import numpy as np


f = h5py.File("mydata.h5")
x = np.array(f["."])
[4]:
import dask.array as da


f = h5py.File("mydata.h5")
x = da.array(f["."])

Dask Bag

… imitiert iterators, Toolz und PySpark.

[5]:
import dask.bag as db


b = db.from_sequence([10, 3, 5, 7, 11, 4])
list(b.topk(2))
[5]:
[11, 10]

Siehe auch

Dask Delayed

… imitiert loops und umschließt benutzerdefinierten Code, siehe auch Erstellen einer delayed-Pipeline.

concurrent.futures

Das Interface ermöglicht die Übermittlung von selbstdefinierten Aufgaben.

Bemerkung

Für das folgende Beispiel muss Dask mit der distributed-Option installiert werden, z.B.

$ uv add "dask[distributed]"
[6]:
from dask.distributed import Client
[7]:
client = Client()

Dadurch werden die lokalen Worker als Prozesse gestartet. Um die lokalen Worker als Threads auszuführen, könnt ihr processes=False als Parameter übergeben:

[8]:
client = Client(processes=False)
/Users/veit/cusy/trn/jupyter-tutorial/uvenvs/py313/.venv/lib/python3.13/site-packages/distributed/node.py:187: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 62320 instead
  warnings.warn(

Jetzt könnt ihr eure eigenen Aufgaben ausführen und Abhängigkeiten mithilfe der submit-Methode verketten:

[9]:
from math import pi


def inc(x):
    return x + 1


def circumference(x):
    return 2 * pi * x


increments = client.submit(inc, 10)
circumferences = client.submit(circumference, increments)
[10]:
circumferences.result()
[10]:
69.11503837897544