import dask.array as da
import numpy as np
Dask arrays can be created in a few different ways, but here we'll use the from_array
function. from_array
accepts anything that works with numpy style slicing, so we could pass in an hdf5 or netcdf4 Dataset instead of a numpy array. We'll use a numpy array here just for a quick demo. We also pass in a chunks
keyword, which tells dask how to block the array into chunks.
x = np.arange(3000**2).reshape((3000, 3000))
a = da.from_array(x, chunks=(750, 750))
a
now is a dask.array.Array
object, which looks a lot like a numpy array. It has many of the same methods even:
print([f for f in dir(a) if not f.startswith('_')])
['T', 'all', 'any', 'argmax', 'argmin', 'astype', 'cache', 'chunks', 'compute', 'conj', 'dask', 'dot', 'dtype', 'flatten', 'imag', 'map_blocks', 'map_overlap', 'max', 'mean', 'min', 'moment', 'name', 'nbytes', 'ndim', 'numblocks', 'prod', 'ravel', 'real', 'rechunk', 'reshape', 'shape', 'size', 'squeeze', 'std', 'store', 'sum', 'to_hdf5', 'topk', 'transpose', 'var', 'vindex', 'visualize', 'vnorm']
a.shape
(3000, 3000)
a.dtype
dtype('int64')
Computations done on the array are not run immediately, but recorded in a graph as a dask
attribute on the array. This is common for all dask collections (dask.array
, dask.dataframe
, dask.bag
and dask.imperative
). To see the graph, one can use the visualize
method:
a.visualize()
The graph of a
just shows several slices being taken out of x
- one for each chunk. Lets create a larger computation, and visualize it:
expr = a.dot(a.T).sum(axis=0)
expr.visualize()
That's quite a complicated graph! Before the computation is actually run (by calling the compute
method on the array), the graph is simplified with several optimization passes to improve efficiency. To visualize the final graph to be run, we can set the optimize_graph
keyword as True
:
expr.visualize(optimize_graph=True)
This is a much simpler computation, and is the result of inlining, fusing, and rewriting tasks.
from dask.diagnostics import Profiler, ResourceProfiler, ProgressBar, visualize
from bokeh.io import output_notebook
ProgressBar().register()
output_notebook()
with ResourceProfiler(dt=0.5) as rprof, Profiler() as prof:
out = expr.compute(num_workers=4)
[########################################] | 100% Completed | 4.5s
Here we'll plot a profile of the computation as executed by dask. The top plot has a box for each task showing the duration of the task (hover over the box with your mouse to see the task description). The bottom plot shows our resource usage.
As can be seen in this plot, we used 400% cpu, indicating all 4 cores of my computer were fully used.
visualize([prof, rprof])
<bokeh.models.plots.GridPlot at 0x110caa7d0>
Lets run the same computation in NumPy:
%time _ = x.dot(x.T).sum(axis=0)
CPU times: user 16 s, sys: 48.5 ms, total: 16.1 s Wall time: 16.1 s
This took roughly 4 times as long as the computation in dask, showing that for large enough arrays, the parallelism from dask provides significant speedup.