Scikit-Image and Dask Experiment

This notebook investigates performance parallelizing scikit-image operations. We find that Dask provides a very small speedup, despite consuming 400% CPU. We benchmark various components of the computation and find that we are likely bound by something other than CPU processing, like memory access.

This work was done in collaboration with the following scikit-image developers at the SciPy 2017 sprints

  • Emmanuelle Gouillart
  • Stefan Van der Walt
  • Nelle Varoquaux

Download and load data

This data is courtesy of Emmanuelle Gouillart: 250MB download

In [1]:
import numpy as np
dat = np.fromfile('Al8Cu_1000_g13_t4_200_250.vol',
                  dtype=np.float32).reshape((50, 1104, 1104))
dat /= abs(dat).max()
In [2]:
dat.shape
Out[2]:
(50, 1104, 1104)
In [3]:
%matplotlib inline
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(10, 10))
plt.imshow(dat[40, :, :], cmap='gray')
Out[3]:
<matplotlib.image.AxesImage at 0x7f55f84ff710>

Split array into equal chunks, perform overlapping gaussian filter

Lets look at a common operation, smoothing the image with a gaussian filter. We find that calling this normally on the image takes around 2.5 seconds.

In [4]:
import skimage.filters
%time _ = skimage.filters.gaussian(dat, sigma=3)
CPU times: user 2.24 s, sys: 64 ms, total: 2.3 s
Wall time: 2.3 s

Parallelize with Dask.array

We split this array into four chunks in the large spatial dimensions, leaving the short dimension (50) full.

We then map the skimage.filters.gaussian function on each block, providing an overlap of 9, which should provide enough space for the gaussian filter to be smooth over chunk transitions.

In [5]:
import dask.array as da
In [6]:
%%time
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 5.18 ms
In [7]:
y
Out[7]:
dask.array<trim, shape=(50, 1104, 1104), dtype=float64, chunksize=(50, 552, 552)>
In [8]:
%time _ = y.compute()
CPU times: user 5.49 s, sys: 668 ms, total: 6.16 s
Wall time: 1.94 s

We see a modest speedup (20-30%) which is far below the 300% speedup we might expect on our four-core machine.

The rest of this notebook tries to identify the burdens involved in this computation.

Task generation costs

In [9]:
%%time
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 6.46 ms

Overlapping introduces some graph overhead

There are about 100 tasks for four chunks. We can probably bring this down.

In [10]:
len(y.dask)
Out[10]:
109
In [11]:
y.visualize(optimize_graph=True)
Out[11]:

Memory copy costs from stitching

The dask.array.map_overlap function (which is the main component of skimage.apply_parallel) does a fair amount of getitem and np.concatenate calls. These can be wasteful. Lets benchmark with a simple no-op computation.

In [12]:
%%time
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
# y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
y = x.map_overlap(lambda x: x, depth=9, boundary='none')
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 6.76 ms
In [13]:
%time _ = y.compute()
CPU times: user 468 ms, sys: 280 ms, total: 748 ms
Wall time: 309 ms

This is non-trivial, and something that we can work to reduce, but also isn't our limiting factor.

Look at single-machine diagnostics

Lets look at a profile of our computation in many threads

In [14]:
from dask.diagnostics import Profiler, ResourceProfiler, visualize

x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')

with Profiler() as prof, ResourceProfiler(dt=0.1) as rprof:
    y.compute(optimize_graph=False)
In [15]:
from bokeh.plotting import show, output_notebook
output_notebook()
show(visualize([prof, rprof], show=False))
Loading BokehJS ...

Lets make some observations:

  1. All of our cores report being busy at the same time (400% cpu usage) so our lack of speedup isn't a GIL issue
  2. The green bars are for skimage.filters.gaussian. These take up most of the time.
  3. The blue-purple bars on the right are the re-stitching component that is from dask.array. These are significant and should be reduced, but will only offer modest improvements.
  4. We're using 3GB of memory for what is only a 250MB dataset
In [16]:
dat.nbytes / 1e6
Out[16]:
243.7632

So how well does skimage.filters.gaussian parallelize normally?

Lets just call gaussian on dat in four threads at the same time. This is weak-scaling instead of strong-scaling. If gaussian isn't amenable to being run in multiple threads at the same time then this should take four times as long. If gaussian is entirely bound by CPU and doesn't introduce other constraints then this should run in the same time as a single computation

There is no stitching here, so this is best case.

In [17]:
%time _ = skimage.filters.gaussian(dat, sigma=3)  # single computation in single thread
CPU times: user 2.41 s, sys: 76 ms, total: 2.49 s
Wall time: 2.51 s
In [18]:
from concurrent.futures import ThreadPoolExecutor, wait
e = ThreadPoolExecutor()
In [19]:
%%time
futures = [e.submit(skimage.filters.gaussian, dat, sigma=3) for _ in range(4)]
wait(futures)
CPU times: user 18.1 s, sys: 660 ms, total: 18.8 s
Wall time: 4.72 s

It looks like we're seeing a difference of 2x, so this is right in the middle. This Scikit-image function is partially parallelizable.

How about a different algorithm, like denoising

Lets try another algorithm to see if we get a different reaction. We try the skimage.restoration.denoise_tv_chambolle function. This seems to use a great deal of RAM, so we choose to work on a subset of the data.

In [20]:
%time _ = skimage.restoration.denoise_tv_chambolle(dat[:10, ...])
CPU times: user 4.18 s, sys: 980 ms, total: 5.16 s
Wall time: 5.17 s
In [21]:
%%time
with ResourceProfiler(dt=0.1) as rprof:
    futures = [e.submit(skimage.restoration.denoise_tv_chambolle, dat[:10, ...]) for _ in range(4)]
    wait(futures)
CPU times: user 55.1 s, sys: 12.6 s, total: 1min 7s
Wall time: 17.7 s
In [22]:
show(visualize([rprof], show=False))
W-1005 (SNAPPED_TOOLBAR_ANNOTATIONS): Snapped toolbars and annotations on the same side MAY overlap visually: Figure(id='acda2fa8-8b21-4c8a-8fca-bb3ae95e6890', ...)

Lest try this with the distributed scheduler

This will use processes, which might have better properties

In [23]:
from dask.distributed import Client
client = Client(set_as_default=False)
client
Out[23]:

Client

Cluster

  • Workers: 4
  • Cores: 4
  • Memory: 10.01 GB
In [24]:
%%time
from dask.distributed import wait as dask_wait

futures = [client.submit(skimage.restoration.denoise_tv_chambolle, dat[:10, ...], pure=False)
           for _ in range(4)]
dask_wait(futures)
/home/mrocklin/workspace/distributed/distributed/worker.py:652: UserWarning: Serialized function is very large.  This is likely due to closing over large local variables. Serialized function is 49 MB
  warnings.warn(large_function_warning % (len(b) / 1e6))
CPU times: user 2.61 s, sys: 592 ms, total: 3.2 s
Wall time: 19.3 s

Lets try pre-scattering data around.

In [25]:
%%time

future = client.scatter(dat[:10, ...], broadcast=True)
futures = [client.submit(skimage.restoration.denoise_tv_chambolle, future, pure=False)
           for _ in range(4)]
dask_wait(futures)
CPU times: user 2.22 s, sys: 388 ms, total: 2.61 s
Wall time: 18.6 s

Doesn't help that much

This is a strong indication that the issue isn't GIL-bound

Lets profile the gaussian function on a single machine

In [26]:
%load_ext snakeviz
In [27]:
%%snakeviz
_ = skimage.filters.gaussian(dat, sigma=3)
 
*** Profile stats marshalled to file '/tmp/tmpyqxp_1hp'. 

This renders a visual diagnostic in a separate window that shows that almost all time is taken within the scipy.ndimage._nd_image.correlate1d function. Unfortunately CProfile seems unable to dive more deeply than this.

Conclusions

Concrete next steps

I suspect that these scikit-image functions are bound by some other resource than CPU processing power. My first guess would be that they are accessing or creating memory in inefficient ways. It might be worth rewriting one or two functions in a way that avoided memory allocation and access and then retrying this sort of experiment.

On the dask side we can reduce the task graph overhead a bit more. We might also consider just rewriting the skimage.apply_parallel function by hand with dask.delayed to remove some of the generalities of dask.array.map_blocks that may not be relevant here.

General take-aways

This is a good example of how parallel computing may be ineffective, especially if you are not bound by raw computing power.

Fortunately, the variety of visual diagnostic and parallel computing tools at our disposal allows us to get a variety of perspectives on the problem and help guide our work. In particular we used the following:

  1. Snakeviz to visualize CProfile outputs that help us understand which functions take up the most time in a single thread.
  2. The single-machine Dask diagnostics. These provide useful static plots that help us understand when each task ran and the CPU and Memory use during the computation. The resource profiler (cpu and memory use) in particular was helpful for normal computations, not just when using Dask.
  3. The Dask.distributed dashboard was useful to identify communication and load balancing issues, though it is difficult to include in static documents like this one, so we did not include much discussion here.