%load_ext autoreload
%autoreload 2
import seaborn as sns
import matplotlib.pyplot as plt
from bokeh.io import output_notebook
from bokeh.resources import CDN
output_notebook(CDN, hide_banner=True)
local_cluster = False
if local_cluster:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
plt.rcParams["figure.figsize"] = (8, 8)
plt.rcParams["figure.dpi"] = 150
plt.rcParams["font.size"] = 14
plt.rcParams["figure.constrained_layout.use"] = True
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.style.use('ggplot')
sns.set_style("whitegrid", {'axes.grid': False})
Motivation
Computer Science Principles
Parallelism
Distributed Computing
Imperative Programming
Declarative Programming
Organization
Queue Systems / Cluster Computing
Parameterization
Databases
Big Data
MapReduce
Spark
Streaming
Cloud Computing
Beyond / The future
There are three different types of problems that we will run into.
Full adult animal at cellular resolution 1000s of samples of full adult animals, imaged at 0.74 $\mu m$ resolution: Images 11500 x 2800 x 628 $\longrightarrow$ 20-40GVx / sample
Whole brain with cellular resolution 1 $cm^3$ scanned at 1 $\mu m$ resolution: Images $\longrightarrow$ 1000 GVx / sample
Normally when problems are approached they are solved for a single task as quickly as possible
im_in=imread('test.jpg');
im_filter=medfilt2(im_in,[5,5]);
cl_img=bwlabel(im_filter>10);
cl_count=hist(cl_img,1:100);
dlmwrite(cl_count,'out.txt')
You have to rewrite everything, everytime
If you start with a bad approach, it is very difficult to fix, big data and reproducibility must be considered from the beginning
Parallelism is when you can divide a task into separate pieces which can then be worked on at the same time.
Some tasks are easy to parallelize while others are very difficult. Rather than focusing on programming, real-life examples are good indicators of difficultly.
Distributed computing is very similar to parallel computing, but a bit more particular. Parallel means you process many tasks at the same time, while distributed means you are no longer on the same CPU, process, or even on the same machine.
The distributed has some important implications since once you are no longer on the same machine the number of variables like network delay, file system issues, and other users becomes a major problem.
The largest issue with parallel / distributed tasks is the need to access the same resources at the same time
Parallel computing requires a significant of coordinating between computers for non-easily parallelizable tasks.
The second major issue is mutability, if you have two cores / computers trying to write the same information at the same it is no longer deterministic (not good)
The simple act of taking turns and waiting for every independent process to take its turn can completely negate the benefits of parallel computing
Inherits all of the problems of parallel programming with a whole variety of new issues.
If you have 1000 computers working on solving a problem and one fails, you do not want your whole job to crash
How can you access and process data from many different computers quickly without very expensive infrastructure
Directly coordinating tasks on a computer.
They look fairly similar, so what is the difference? The second is needlessly complicated for one person, but what if you have a team, how can several people make an imparitive soup faster (chopping vegetables together?)
How can many people make a declarative soup faster? Give everyone a different task (not completely efficient since some tasks have to wait on others)
One of the major challenges of scaling up experiments and analysis is keeping all of the results organized in a clear manner. As we have seen in the last lectures, many of the results produced many text files
Queue processing systems (like Sun Grid Engine, Oracle Grid Engine, Apple XGrid, Condor, etc) are used to manage
Based on a set of rules for how to share the resources to the users to run tasks.
The node with which every other node communicates, the main address.
The nodes where the computation is performed.
The actual process that decides which jobs will run using which resources (worker nodes, memory, bandwidth) at which time
A database is a collection of data stored in the format of tables: a number of columns and rows.
from IPython.display import display, Markdown
import pandas as pd
display(Markdown('### Animals\nHere we have an table of the animals measured in an experiment and their weight'))
display(pd.DataFrame(dict(id=(1, 2, 3),
Weight=(100, 40, 80)
)))
display(Markdown(
'### Cells\nThe cells is then an analysis looking at the cellular structures'))
display(pd.DataFrame(dict(
Animal=(1, 1, 2),
Type=("Cancer", "Healthy", "Cancer"),
Anisotropy=(0.5, 1.0, 0.5),
Volume=(1, 2, 0.95))))
SQL (pronounced Sequel) stands for structured query language and is nearly universal for both searching (called querying) and adding (called inserting) data into databases. SQL is used in various forms from Firefox storing its preferences locally (using SQLite) to Facebook storing some of its user information (MySQL and Hive). So refering to the two tables we defined in the last entry, we can use SQL to get information about the tables independently of how they are stored (a single machine, a supercomputer, or in the cloud)
SELECT Volume FROM Cells
SELECT AVG(Volume) FROM Cells WHERE Type = "Cancer"
We could have done these easily without SQL using Excel, Matlab or R
SELECT Volume FROM Cells WHERE Animal IN
(SELECT id FROM Animal WHERE Weight>80)
SELECT ATable.Weight,CTable.Volume FROM Animals as ATable
INNER JOIN Cells as CTable on (ATable.id=CTable.Animal)
If we expand our SQL example to cellular networks with an additional table explicitly describing the links between cells
from IPython.display import display, Markdown
import pandas as pd
from collections import OrderedDict
display(Markdown('### Table View'))
pd.DataFrame(OrderedDict(id1=(1, 1, 3),
id2=(2, 3, 1),
No_Juncs=(800, 40, 300)
))
id1 | id2 | No_Juncs | |
---|---|---|---|
0 | 1 | 2 | 800 |
1 | 1 | 3 | 40 |
2 | 3 | 1 | 300 |
Now to calculate how many connections each cell has
SELECT id,COUNT(*) AS connection_count FROM Cells
INNER JOIN Network ON (id=id1 OR id=id2)
Basic networks can be entered and queries using SQL but relatively simple sounding requests can get complicated very quickly
How many cells are within two connections of each cell
SELECT id,COUNT(*) AS connection_count FROM Cells as CellsA
INNER JOIN Network as NetA ON Where (id=NetA.id1)
INNER JOIN Network as NetB ON Where (NetA.id2=NetB.id1)
This is still readable but becomes very cumbersome quickly and difficult to manage
A new generation of database software which extends the functionality of SQL to allow for more scalability (MongoDB) or specificity for problems like networks or graphs called generally Graph Databases
When a ton of heterogeneous is coming in fast.
Performant, scalable, and flexible
10X, 100X, 1000X is the same amount of effort
Director of AMPLab said their rate limiting factor is always enough interesting data
Google ran into 'big data' and its associated problems years ago: Peta- and exabytes of websites to collect and make sense of. Google uses an algorithm called PageRank(tm) for evaluating the quality of websites. They could have probably used existing tools if page rank were some magic program that could read and determine the quality of a site
for every_site_on_internet
current_site.rank=secret_pagerank_function(current_site)
end
Just divide all the websites into a bunch of groups and have each computer run a group, easy!
While the actual internals of PageRank are not public, the general idea is that sites are ranked based on how many sites link to them
for current_site in every_site_on_internet
current_pagerank = new SecretPageRankObj(current_site);
for other_site in every_site_on_internet
if current_site is_linked_to other_site
current_pagerank.add_site(other_site);
end
end
current_site.rank=current_pagerank.rank();
end
How do you divide this task?
some people claim to have had the idea before, Google is certainly the first to do it at scale
Several engineers at Google recognized common elements in many of the tasks being performed. They then proceeded to divide all tasks into two classes Map and Reduce
Map is where a function is applied to every element in the list and the function depends only on exactly that element $$ \vec{L} = \begin{bmatrix} 1,2,3,4,5 \end{bmatrix} $$ $$ f(x) = x^2 $$ $$ map(f \rightarrow \vec{L}) = \begin{bmatrix} 1,4,9,16,25 \end{bmatrix} $$
Reduce is more complicated and involves aggregating a number of different elements and summarizing them. For example the $\Sigma$ function can be written as a reduce function $$ \vec{L} = \begin{bmatrix} 1,2,3,4,5 \end{bmatrix} $$ $$ g(a,b) = a+b $$ Reduce then applies the function to the first two elements, and then to the result of the first two with the third and so on until all the elements are done $$ reduce(f \rightarrow \vec{L}) = g(g(g(g(1,2),3),4),5) $$
They designed a framework for handling distributing and running these types of jobs on clusters. So for each job a dataset ($\vec{L}$), Map-task ($f$), a grouping, and Reduce-task ($g$) are specified
All of the steps in between can be written once in a robust, safe manner and then used for every task which can be described using this MapReduce paradigm. These tasks $\langle \vec{L}, f(x), g(a,b) \rangle$ is refered to as a job.
The initial job was very basic, for more complicated jobs, a new notion of Key-value (KV) pairs must be introduced. A KV pair is made up of a key and value. A key must be comparable / hashable (a number, string, immutable list of numbers, etc) and is used for grouping data. The value is the associated information to this key.
Using MapReduce on a folder full of text-documents.
f(x) = [(word,1) for word in x.split(" ")]
L = ["cat dog car",
"dog car dog"]
$$ \downarrow \textbf{ Map } : f(x) $$[("cat",1),("dog",1),("car",1),("dog",1),("car",1),("dog",1)]
$$ \downarrow \textrm{ Shuffle / Group} $$"cat": (1)
"dog": (1,1,1)
"car": (1,1)
$$ \downarrow \textbf{ Reduce } : g(a,b) $$[("cat",1),("dog",3),("car",2)]
Here we make a word count example using all the lines of Shakespeare
import os
shake_path = os.path.join('..', 'common', 'data', 'shakespeare.txt')
with open(shake_path, 'r') as f:
all_lines = f.readlines()
print(all_lines[:5])
["A MIDSUMMER-NIGHT'S DREAM\n", '\n', 'Now , fair Hippolyta , our nuptial hour \n', 'Draws on apace : four happy days bring in \n', 'Another moon ; but O ! methinks how slow \n']
Here we run the code in an imperative fashion one line at a time.
from tqdm import tqdm
from collections import defaultdict
import string
word_count = defaultdict(lambda: 0) # default count is 0
for c_line in tqdm(all_lines):
for c_word in c_line.lower().strip().split(' '):
v_word = ''.join([c for c in c_word if c in string.ascii_lowercase])
if len(v_word) > 0:
word_count[v_word] += 1
100%|███████████████████████████████████████████████████████████████████████| 129107/129107 [00:02<00:00, 48048.73it/s]
print('Shakespeare used', len(word_count), 'different words')
print('Most frequent')
for w, count in sorted(word_count.items(), key=lambda x: -x[1])[:10]:
print(w, '\t', count)
print('Lease frequent')
for w, count in sorted(word_count.items(), key=lambda x: x[1])[:10]:
print(w, '\t', count)
Shakespeare used 26982 different words Most frequent the 26851 and 24077 i 20535 to 18561 of 16013 you 13856 a 13840 my 12282 that 10761 in 10537 Lease frequent midsummernights 1 wanes 1 newbent 1 solemnities 1 merriments 1 interchangd 1 lovetokens 1 prevailment 1 unhardend 1 filchd 1
Here we use the Map Reduce approach to divide the function up into Map and Reduce components
import doctest
import copy
import functools
# tests are very important for map reduce
def autotest(func):
globs = copy.copy(globals())
globs.update({func.__name__: func})
doctest.run_docstring_examples(
func, globs, verbose=True, name=func.__name__)
return func
# map function
@autotest
def line_to_words(in_line):
"""
Takes a single line and returns the words and counts
>>> line_to_words("hi i am. bob . ")
['hi', 'i', 'am', 'bob']
"""
words = in_line.lower().strip().split(' ')
v_words = [''.join([c for c in c_word if c in string.ascii_lowercase])
for c_word in words]
return [c_word for c_word in v_words if len(c_word) > 0]
Finding tests in line_to_words Trying: line_to_words("hi i am. bob . ") Expecting: ['hi', 'i', 'am', 'bob'] ok
import dask.bag as dbag
line_bag = dbag.from_sequence(all_lines, partition_size=10000)
line_bag
dask.bag<from_se..., npartitions=13>
map_output = line_bag.map(line_to_words).flatten()
map_output
dask.bag<flatten..., npartitions=13>
# we cheat a bit for the reduce step
reduce_output = map_output.frequencies()
top10 = reduce_output.topk(10, lambda x: x[1])
bot10 = reduce_output.topk(10, lambda x: -x[1])
import dask.diagnostics as diag
with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:
print('Top 10\n', top10.compute(num_workers=4))
print('Bottom 10\n', bot10.compute(num_workers=4))
[########################################] | 100% Completed | 5.5s Top 10 [('the', 26851), ('and', 24077), ('i', 20535), ('to', 18561), ('of', 16013), ('you', 13856), ('a', 13840), ('my', 12282), ('that', 10761), ('in', 10537)] [########################################] | 100% Completed | 5.9s Bottom 10 [('midsummernights', 1), ('wanes', 1), ('newbent', 1), ('solemnities', 1), ('merriments', 1), ('interchangd', 1), ('lovetokens', 1), ('prevailment', 1), ('unhardend', 1), ('filchd', 1)]
diag.visualize([prof, rprof])
Hadoop is the opensource version of MapReduce developed by Yahoo and released as an Apache project. It provides underlying infrastructure and filesystem that handles storing and distributing data so each machine stores some of the data locally and processing jobs run where the data is stored.
Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing
In the pure python ecosystem, there has been a recent development called Dask which aims to bring the fault-tolerant, robust distributed computed to numerical python codes. In particular the focus has been on taking libraries like numpy and scipy and making them run as easily as possible in a distributed setting. We will use these for the examples but they can be applied equally well to Spark and Hadoop-like problems.
More general than the MapReduce structure is the idea of making directed acyclical graphs. These are used in Spark, Dask for distributed computing and in Tensorflow and PyTorch for massively parallel computing since it allows complex operations to be defined in a declarative way which allows them to be optimized later depending on the actual resources available (and re-executed if some of those resources crash).
Facebook shows an example of why such representations are useful since they allow for the operations to be optimized later and massive performance improvements even for fairly basic operations.
import dask.array as da
from dask.dot import dot_graph
image_1 = da.zeros((5, 5), chunks=(5, 5))
image_2 = da.ones((5, 5), chunks=(5, 5))
dot_graph(image_1.dask)
image_4 = (image_1-10) + (image_2*50)
dot_graph(image_4.dask)
image_5 = da.matmul(image_1, image_4)
dot_graph(image_5.dask)
the initial examples were shown on very simple image problems. Here we can see how it looks for real imaging issues.
%matplotlib inline
import dask.array as da
from dask.dot import dot_graph
import numpy as np
from skimage.io import imread
import matplotlib.pyplot as plt
from skimage.util import montage as montage2d
# for showing results
import dask.diagnostics as diag
foam_stack = imread('../common/data/plateau_border.tif')[:-54, 52:-52, 52:-52]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 10))
ax1.imshow(np.sum(foam_stack, 0), cmap='bone_r')
ax2.imshow(montage2d(foam_stack[::10]), cmap='bone_r')
<matplotlib.image.AxesImage at 0x1e4ff1b6d68>
from itkwidgets import view
import itk
view(itk.GetImageFromArray(foam_stack))
Failed to display Jupyter Widget of type Viewer
.
If you're reading this message in the Jupyter Notebook or JupyterLab Notebook, it may mean that the widgets JavaScript is still loading. If this message persists, it likely means that the widgets JavaScript library is either not installed or not enabled. See the Jupyter Widgets Documentation for setup instructions.
If you're reading this message in another frontend (for example, a static rendering on GitHub or NBViewer), it may mean that your frontend doesn't currently support widgets.
da_foam = da.from_array(
foam_stack/255.0, chunks=(20, 400, 400), name='FoamImage')
dot_graph(da_foam.dask)
import dask_ndfilters as da_ndfilt
image_filt = da_ndfilt.gaussian_filter(1-da_foam, (3, 6, 6))
dot_graph(image_filt.dask)
import dask_ndmorph as ndmorph
from skimage.morphology import ball
erode_foam = ndmorph.binary_erosion(image_filt > 0.9, ball(12))
dot_graph(erode_foam.dask)
from scipy.ndimage import label
def block_label(in_block, block_id=None):
slice_no = block_id[0]
offset = (np.prod(in_block.shape)*slice_no).astype(np.int32)
label_img = label(in_block)[0].astype(np.int32)
label_img[label_img > 0] += offset
return label_img
lab_bubbles = erode_foam.map_blocks(block_label, dtype=np.int32)
with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:
processed_stack = lab_bubbles.compute(num_workers=4)
[########################################] | 100% Completed | 58.6s
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 10))
ax1.imshow(np.sum(processed_stack, 1), cmap='bone_r')
ax2.imshow(montage2d(processed_stack[::20]), cmap='nipy_spectral')
<matplotlib.image.AxesImage at 0x1e4911234e0>
diag.visualize([prof, rprof])
foam_slices_da = da.from_array(
foam_stack/255.0, chunks=(10, 500, 500), name='FoamSlices')
filt_slices = da_ndfilt.gaussian_filter(foam_slices_da, (1.0, 9, 9))
single_slice = filt_slices[50]
single_slice.visualize(optimize_graph=True)
with diag.ProgressBar():
plt.imshow(single_slice.compute(num_workers=4))
[########################################] | 100% Completed | 0.5s
with diag.ProgressBar():
plt.imshow(filt_slices.compute(num_workers=4)[50])
[########################################] | 100% Completed | 2.1s
Tools built for table-like data data structures and much better adapted to it.
Dozens of major companies (Apple, Google, Facebook, Cisco, ...) donate over $30M a year to development of Spark and the Berkeley Data Analytics Stack
Can handle static data or live data coming in from a 'streaming' device like a camera to do real-time analysis. The exact same code can be used for real-time analysis and static code
Projects at AMPLab like Spark and BlinkDB are moving towards approximate results.
mean(volume)
mean(volume).within_time(5)
mean(volume).within_ci(0.95)
For real-time image processing it might be the only feasible solution and could drastically reduce the amount of time spent on analysis.