In this notebook, we show a simple application of the dask chunking experiments for particle reading within yt itself.
Several changes are necessary for this to work:
BaseIOHandler
reads in particle data from files.ParticleContainer
so that our chunks can be effectively pickledBaseIOHandler
¶One way to use dask
effectively is to send stack each chunk's read in a delayed dataframe. In this approach, we have a delayed dataframe for each particle type we're reading where each dataframe is partitioned by dask
over the chunks we supply. Dask delayed dataframes don't require the size a priori, which simplifies building the dataframe but we do need to supply a metadata dictionary to designate the expected datatype for each column. The modified _read_particle_selection
is not yet on a committed branch, so here's the function:
def _read_particle_selection(self, chunks, selector, fields):
rv = {}
ind = {}
# We first need a set of masks for each particle type
ptf = defaultdict(list) # ptype -> on-disk fields to read
fsize = defaultdict(lambda: 0) # ptype -> size of return value
psize = defaultdict(lambda: 0) # ptype -> particle count on disk
field_maps = defaultdict(list) # ptype -> fields (including unions)
chunks = list(chunks)
unions = self.ds.particle_unions
# What we need is a mapping from particle types to return types
for field in fields:
ftype, fname = field
fsize[field] = 0
# We should add a check for p.fparticle_unions or something here
if ftype in unions:
for pt in unions[ftype]:
ptf[pt].append(fname)
field_maps[pt, fname].append(field)
else:
ptf[ftype].append(fname)
field_maps[field].append(field)
# Now we have our full listing
# an experimental dask approach
# build the meta dictionary for each chunk's dataframe so that empty chunks
# don't cause problems.
ptype_meta = {}
for ptype, flds in ptf.items():
meta_dict = {}
for fld in flds:
meta_dict[fld] = pd.Series([], dtype=np.float64)
ptype_meta[ptype] = meta_dict
# build the delayed chunk reader (still has parallel issues...)
# particle_field_args = self._read_particle_field_args()
ptypes = list(ptf.keys())
delayed_dfs = {}
for ptype in ptypes:
# build a dataframe from delayed for each particle type
this_ptf = {ptype: ptf[ptype]}
delayed_chunks = [
# all these things need to be pickleable...
dask.delayed(self._read_single_ptype)(
ch, this_ptf, selector, ptype_meta[ptype]
)
for ch in chunks
]
delayed_dfs[ptype] = ddf.from_delayed(delayed_chunks, meta=ptype_meta[ptype])
# up to here, everything is delayed w dask, need to read into into memory across
# chunks to return rv:
rv = {}
for ptype in ptypes:
for col in delayed_dfs[ptype].columns:
rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute()
# but if we returned the delayed dataframes, could do things like this:
# delayed_dfs['PartType0'].Density.mean().compute()
return rv
def _read_single_ptype(self, chunk, ptf, selector, meta_dict):
# read a single chunk and single particle type into a pandas dataframe so that
# we can use dask.dataframe.from_delayed! fields within a particle type should
# have the same length?
chunk_results = pd.DataFrame(meta_dict)
# each particle type could be a different dataframe...
for field_r, vals in self._read_particle_fields([chunk], ptf, selector):
chunk_results[field_r[1]] = vals
return chunk_results
ParticleContainer
¶A single chunk is composed of various particle continers, and there are several modifications we need to make to the ParticleContainer
related to pickleability. So first off, the ParticleContainer
on yt's master branch currently cannot be pickled as it inherits a __reduce__
method from the Dataset
class that requires the class to be a registered method, which the ParticleContainer
is not. The particleIndexRefactor
branch at github.com/chrishavlin/yt does just that by overriding the __reduce__
method in ParticleContainer
.
The other important bit in the particleIndexRefactor
branch is that it adds an explicit base_selector
argument to __init__
rather than pulling out the selector object from the base_region
argument. The reason for this is that when the arguments are unpickled, the base_region
object will not have an initialized index. And so when base_region.selector
is accessed, it first triggers an index-read. Reading the index for every dask process is very slow. But since the majority of the selector objects can now be pickled directly, by adding it as an argument we can avoid that index read.
When yt pickles a Dataset
object, it uses a __reduce__
method that stores the object arguments used to build the Dataset
and then re-initializes the dataset on unpickling. Those arguments are kept in the ParameterStore
object, which under normal operation is an in-memory cache of parameters. Since the parameter store is in memory, it's not available to the separate dask tasks. But there is a yt config option that stores these parameters in an on-disk csv instead. Setting StoreParameterFiles = True
in the yt config will allow the separate dask tasks to read the paramter file and reconstitute the dataset as needed.
Ok, so first let's use a single processor, in which case nothing changes in terms of user experience:
import yt
ds = yt.load_sample("snapshot_033")
ad = ds.all_data()
yt : [WARNING ] 2020-11-06 15:09:12,944 tqdm is not installed, progress bar can not be displayed. yt : [INFO ] 2020-11-06 15:09:13,303 Files located at /home/chris/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033. yt : [INFO ] 2020-11-06 15:09:13,304 Default to loading snap_033.0.hdf5 for snapshot_033 dataset yt : [INFO ] 2020-11-06 15:09:13,397 Parameters: current_time = 4.343952725460923e+17 s yt : [INFO ] 2020-11-06 15:09:13,399 Parameters: domain_dimensions = [1 1 1] yt : [INFO ] 2020-11-06 15:09:13,400 Parameters: domain_left_edge = [0. 0. 0.] yt : [INFO ] 2020-11-06 15:09:13,400 Parameters: domain_right_edge = [25. 25. 25.] yt : [INFO ] 2020-11-06 15:09:13,402 Parameters: cosmological_simulation = 1 yt : [INFO ] 2020-11-06 15:09:13,402 Parameters: current_redshift = -4.811891664902035e-05 yt : [INFO ] 2020-11-06 15:09:13,403 Parameters: omega_lambda = 0.762 yt : [INFO ] 2020-11-06 15:09:13,403 Parameters: omega_matter = 0.238 yt : [INFO ] 2020-11-06 15:09:13,404 Parameters: omega_radiation = 0.0 yt : [INFO ] 2020-11-06 15:09:13,404 Parameters: hubble_constant = 0.73 yt : [INFO ] 2020-11-06 15:09:13,499 Allocating for 4.194e+06 particles Loading particle index: 100%|██████████| 12/12 [00:00<00:00, 170.64it/s]
Let's check out some chunk info:
ds.index._identify_base_chunk(ad)
chunks = list(ds.index._chunk_io(ad))
len(chunks)
12
So we have 12 chunks for this dataset. Ok, let's read in a field:
%%time
si = ad[('PartType4','Silicon')]
CPU times: user 109 ms, sys: 24.4 ms, total: 134 ms Wall time: 127 ms
si
unyt_array([0.00313673, 0.0035234 , 0.00203385, ..., 0.00464466, 0.00060355, 0.00029561], '(dimensionless)')
OK, so now if we spin up a dask client, the delayed dataframes that we build in _read_particle_selection
will automatically be split up across our chunks! So let's spin up a client, using 4 workers and 3 threads per worker, so each chunk will get its own process:
from dask.distributed import Client
c = Client(threads_per_worker=3,n_workers=4)
c
Client
|
Cluster
|
and read in a couple different fields (so that we're not just reading the cached field):
%%time
d = ad[('PartType0','Density')]
CPU times: user 305 ms, sys: 60.2 ms, total: 365 ms Wall time: 3.89 s
d
unyt_array([ 6577205. , 15850306. , 6765328.5, ..., 4304681.5, 5155429.5, 7586393.5], 'code_mass/code_length**3')
%%time
ox = ad[('PartType4','Oxygen')]
CPU times: user 126 ms, sys: 17.5 ms, total: 144 ms Wall time: 572 ms
%%time
T = ad[('PartType0','Temperature')]
CPU times: user 151 ms, sys: 32.1 ms, total: 183 ms Wall time: 646 ms
That initial density read takes a bit of extra time (due to some internal dask initialization?) but we can see subsequent reads of other fields are much faster. The CPU time are fairly close to the single processor read, but we can see the wall time is 400-500 seconds slower for the parallel read, which can be attributed to the extra communication overhead between processes.
Here's a screenshot of the Task Stream during the Temperature
read:
c.close()
c = Client(threads_per_worker=1,n_workers=4)
c
Client
|
Cluster
|
%%time
met = ad[('PartType4','Metallicity')]
CPU times: user 340 ms, sys: 26.4 ms, total: 366 ms Wall time: 4.1 s
%%time
hel = ad[('PartType4','Helium')]
CPU times: user 147 ms, sys: 3.92 ms, total: 151 ms Wall time: 661 ms
c.close()