This notebook will cover the documentation for Nd-Multicore.
This module is currently used for running embarrassingly parallel pipelines on data located in the BOSS. However, the modularity of the code should allow for further parallel functions in the future.
Specifically, the current pipeline accomplishes the following things:
The rest of the notebook will focus on the steps of the pipeline and how the code works.
The first step is to load the resource from the BOSS. We do this through the NeuroDataResource.py file and class. Look into neurodata.cfg.example to see what parameters you have to provide. This code is mostly handled for you and there should be little changes you have to make to it. For reference though, here is how the NeuroDataResource class functions:
You should mainly concern yourself with the class variables and the get_cutout function if you are modifying this code so we just listed those below. If you want the entire source code, just open the python file.
class NeuroDataResource:
def __init__(self, host, token, collection, experiment, requested_channels,
x_range,
y_range,
z_range):
# Create BOSS Remote
self._bossRemote = BossRemote({'protocol': 'https',
'host': host,
'token': token})
# Specify collection
self.collection = collection
# Specify experiment
self.experiment = experiment
# Pull all channels (used for validation of pulling data)
self.channels = self._bossRemote.list_channels(collection, experiment)
# Specify which channels you want to pull
if len(requested_channels) == 0:
self.requested_channels = self.channels
else:
self.requested_channels = requested_channels
self._get_coord_frame_details()
# validate range
if not self.correct_range(z_range, y_range, x_range):
raise Exception("Error: Inccorect dimension range")
# Range of data you are pulling from
self.x_range = x_range
self.y_range = y_range
self.z_range = z_range
def get_cutout(self, chan, zRange=None, yRange=None, xRange=None):
if chan not in self.channels:
print('Error: Channel Not Found in this Resource')
return
if zRange is None or yRange is None or xRange is None:
print('Error: You must supply zRange, yRange, xRange kwargs in list format')
return
channel_resource = self._get_channel(chan)
datatype = channel_resource.datatype
data = self._bossRemote.get_cutout(channel_resource,
0,
xRange,
yRange,
zRange)
#Datatype check. Recast to original datatype if data is float64
if data.dtype == datatype:
return data
else:
return data.astype(datatype)
After a NeuroDataResource object is created, the next step is to compute block sizes for your data. We show the function below
'''
This function is designed to compute proper block sizes (less than 2 gb)
when given a NDR
Input:
Resource NeuroDataResource class containing necessary parameters
block_size (x, y, z) specifying size of blocks
'''
def compute_blocks(resource, block_size):
x_start, x_end = resource.x_range
y_start, y_end = resource.y_range
z_start, z_end = resource.z_range
blocks = intern.block_compute(x_start, x_end, y_start, y_end, z_start, z_end, (0, 0, 0), block_size)
### IMPORTANT blocks are returned as x, y, z ###
for i in range(len(blocks)):
x_range, y_range, z_range = blocks[i]
# create Block object to preserve original location of block
blocks[i] = Block(z_range, y_range, x_range)
return blocks
The purpose of this function is to break the total data cube into smaller blocks that can be run on in parallel. This is especially useful since BOSS only allows you to pull 2gbs with each request.
The main function is done through the block_compute function that the Intern package provides. We structure these blocks into Block objects. These objects are used to maintain metadata about where each block originally came from in the BOSS.
Note that this function does nothing with the data variable for each block.
class Block:
def __init__(self, z_range, y_range, x_range):
self.x_start = x_range[0]
self.x_end = x_range[1]
self.y_start = y_range[0]
self.y_end = y_range[1]
self.z_start = z_range[0]
self.z_end = z_range[1]
self.data = None
'''
This is the main driver function to start multiprocessing
Input:
config_file Neurodata config file
function function to be run, must take in Data Dictionary!
cpus number of cpus to use
block_size size of blocks
'''
def run_parallel(config_file, function, cpus = None, block_size = (1000, 1000, 10)):
## Make resource and compute blocks
resource = ndr.get_boss_resource(config_file)
blocks = compute_blocks(resource, block_size)
## prepare job by fixing NeuroDataRresource argument
task = partial(job, resource = resource, function = function)
## Prepare pool
num_workers = cpus
if num_workers is None:
num_workers = mp.cpu_count() - 1
pool = mp.Pool(num_workers)
try:
print(pool.map(task, blocks))
except:
pool.terminate()
print("Parallel failed, closing pool and exiting")
raise
pool.terminate()
First, notice that this function includes instantiating the resource and computing all the blocks (get_boss_resource and compute_blocks).
The next step is just a trick with python functions. We use the "partial" function to fix certain parameters for the function "job". In particular, we are specifying that job should take in the NeuroDataResource object we just created and the function provided in the parameters.
This function, called "task", is what will be run across each block object in parallel.
Some other things to note, default block size is provided, and if cpus are not specified, we will automatically use the total number of cpus your node has - 1.
Here is the job we are running:
'''
This function pulls data from BOSS, and runs a function on it
Input:
block Block object without raw data
resource NeuroDataResource object
function pipeline to be run on data
Output:
String of block key (z_start, y_start, x_start)
'''
def job(block, resource, function = None):
print("Starting job, retrieiving data")
block = get_data(resource, block)
print("Starting algorithm")
try:
result = function(block.data)
except Exception as ex:
print(ex)
print("Ran into error in algorithm, exiting this block")
return
key = str(block.z_start) + "_" + str(block.y_start) + "_" + str(block.x_start)
print("Done with job")
return key
All this job does is populate the Block object with a data dictionary through the get_data function, then runs the specified function on the data dictionary. Note that if there is an exception in the function provided, the parallel process will not end but that block is skipped.
On the other hand, if there is an error with getting the data or the overall job function, the entire pool will terminate!
One final note, in the function you provide, you should also handle saving your results, merging etc.