#default_exp data.load
#export
from local.torch_basics import *
from local.test import *
from torch.utils.data.dataloader import _MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter,_DatasetKind
_loaders = (_MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter)
from local.notebook.showdoc import *
bs = 4
letters = list(string.ascii_lowercase)
#export
def _wif(worker_id):
set_num_threads(1)
info = get_worker_info()
ds = info.dataset.d
ds.nw,ds.offs = info.num_workers,info.id
set_seed(info.seed)
ds.wif()
class _FakeLoader(GetAttr):
_auto_collation,collate_fn,drop_last,dataset_kind,_dataset_kind,_index_sampler = False,noops,False,_DatasetKind.Iterable,_DatasetKind.Iterable,Inf.count
def __init__(self, d, pin_memory, num_workers, timeout):
self.dataset,self.default,self.worker_init_fn = self,d,_wif
store_attr(self, 'd,pin_memory,num_workers,timeout')
def __iter__(self): return iter(self.d.create_batches(self.d.sample()))
@property
def multiprocessing_context(self): return (None,multiprocessing)[self.num_workers>0]
@contextmanager
def no_multiproc(self):
old_nw = self.num_workers
try:
self.num_workers = 0
yield self.d
finally: self.num_workers = old_nw
_collate_types = (ndarray, Tensor, typing.Mapping, str)
#export
def fa_collate(t):
b = t[0]
return (default_collate(t) if isinstance(b, _collate_types)
else type(t[0])([fa_collate(s) for s in zip(*t)]) if isinstance(b, Sequence)
else default_collate(t))
#e.g. x is int, y is tuple
t = [(1,(2,3)),(1,(2,3))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
t = [(1,(2,(3,4))),(1,(2,(3,4)))]
test_eq(fa_collate(t), default_collate(t))
test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])
#export
def fa_convert(t):
return (default_convert(t) if isinstance(t, _collate_types)
else type(t)([fa_convert(s) for s in t]) if isinstance(t, Sequence)
else default_convert(t))
t0 = array([1,2])
t = [t0,(t0,t0)]
test_eq(fa_convert(t), default_convert(t))
test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])
#export
class SkipItemException(Exception): pass
#export
@funcs_kwargs
class DataLoader(GetAttr):
wif=before_iter=after_item=before_batch=after_batch=after_iter = noops
_methods = 'wif before_iter create_batches create_item after_item before_batch create_batch retain after_batch after_iter'.split()
_default = 'dataset'
def __init__(self, dataset=None, bs=None, num_workers=0, pin_memory=False, timeout=0,
shuffle=False, drop_last=False, indexed=None, n=None, **kwargs):
assert not (bs is None and drop_last)
if indexed is None: indexed = dataset is not None and hasattr(dataset,'__getitem__')
if n is None:
try: n = len(dataset)
except TypeError: pass
store_attr(self, 'dataset,bs,shuffle,drop_last,indexed,n,pin_memory,timeout')
self.rng,self.nw,self.offs = random.Random(),1,0
self.fake_l = _FakeLoader(self, pin_memory, num_workers, timeout)
def __len__(self):
if self.n is None: raise TypeError
if self.bs is None: return self.n
return self.n//self.bs + (0 if self.drop_last or self.n%self.bs==0 else 1)
def get_idxs(self):
idxs = Inf.count if self.indexed else Inf.nones
if self.n is not None: idxs = list(itertools.islice(idxs, self.n))
if self.shuffle: idxs = self.shuffle_fn(idxs)
return idxs
def sample(self):
idxs = self.get_idxs()
return (b for i,b in enumerate(idxs) if i//(self.bs or 1)%self.nw==self.offs)
def __iter__(self):
self.randomize()
self.before_iter()
for b in _loaders[self.fake_l.num_workers==0](self.fake_l): yield self.after_batch(b)
self.after_iter()
if hasattr(self, 'it'): delattr(self, 'it')
def create_batches(self, samps):
self.it = iter(self.dataset) if self.dataset is not None else None
res = filter(lambda o:o is not None, map(self.do_item, samps))
yield from map(self.do_batch, self.chunkify(res))
def new(self, dataset=None, cls=None, **kwargs):
if dataset is None: dataset = self.dataset
if cls is None: cls = type(self)
cur_kwargs = dict(dataset=dataset, num_workers=self.fake_l.num_workers, pin_memory=self.pin_memory, timeout=self.timeout,
bs=self.bs, shuffle=self.shuffle, drop_last=self.drop_last, indexed=self.indexed)
for n in self._methods: cur_kwargs[n] = getattr(self, n)
return cls(**merge(cur_kwargs, kwargs))
@property
def prebatched(self): return self.bs is None
def do_item(self, s):
try: return self.after_item(self.create_item(s))
except SkipItemException: return None
def chunkify(self, b): return b if self.prebatched else chunked(b, self.bs, self.drop_last)
def shuffle_fn(self, idxs): return self.rng.sample(idxs, len(idxs))
def randomize(self): self.rng = random.Random(self.rng.randint(0,2**32-1))
def retain(self, res, b): return retain_types(res, b[0] if is_listy(b) else b)
def create_item(self, s): return next(self.it) if s is None else self.dataset[s]
def create_batch(self, b): return (fa_collate,fa_convert)[self.prebatched](b)
def do_batch(self, b): return self.retain(self.create_batch(self.before_batch(b)), b)
def one_batch(self):
with self.fake_l.no_multiproc(): return first(self)
Override item
and use the default infinite sampler to get a stream of unknown length (stop()
when you want to stop the stream).
class RandDL(DataLoader):
def create_item(self, s):
r = random.random()
return r if r<0.95 else stop()
L(RandDL())
(#119) [0.6499190875330292,0.6920993143601468,0.454128557091131,0.4621040162263076,0.701786166549396,0.09402992729389326,0.7352702711396566,0.5721437125266009,0.639819551789149,0.4898752410585695...]
L(RandDL(bs=4, drop_last=True)).map(len)
(#1) [4]
dl = RandDL(bs=4, num_workers=4, drop_last=True)
L(dl).map(len)
(#19) [4,4,4,4,4,4,4,4,4,4...]
test_eq(dl.fake_l.num_workers, 4)
with dl.fake_l.no_multiproc():
test_eq(dl.fake_l.num_workers, 0)
L(dl).map(len)
test_eq(dl.fake_l.num_workers, 4)
def _rand_item(s):
r = random.random()
return r if r<0.95 else stop()
L(DataLoader(create_item=_rand_item))
(#2) [0.12919553532781858,0.8398403767094285]
If you don't set bs
, then dataset
is assumed to provide an iterator or a __getitem__
that returns a batch.
ds1 = DataLoader(letters)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)
test_shuffled(L(DataLoader(letters, shuffle=True)), letters)
ds1 = DataLoader(letters, indexed=False)
test_eq(L(ds1), letters)
test_eq(len(ds1), 26)
t2 = L(tensor([0,1,2]),tensor([3,4,5]))
ds2 = DataLoader(t2)
test_eq_type(L(ds2), t2)
t3 = L(array([0,1,2]),array([3,4,5]))
ds3 = DataLoader(t3)
test_eq_type(L(ds3), t3.map(tensor))
ds4 = DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))
test_eq_type(L(ds4), t3)
test_eq(t3.f, 1)
If you do set bs
, then dataset
is assumed to provide an iterator or a __getitem__
that returns a single item of a batch.
def twoepochs(d): return ' '.join(''.join(o) for _ in range(2) for o in d)
ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')
ds1 = DataLoader(letters,4,num_workers=2)
test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')
ds1 = DataLoader(range(12), bs=4, num_workers=3)
test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))
ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq(t3.f, 2)
it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])
class SleepyDL(list):
def __getitem__(self,i):
time.sleep(random.random()/50)
return super().__getitem__(i)
t = SleepyDL(letters)
%time test_eq(DataLoader(t, num_workers=0), letters)
%time test_eq(DataLoader(t, num_workers=2), letters)
%time test_eq(DataLoader(t, num_workers=4), letters)
dl = DataLoader(t, shuffle=True, num_workers=1)
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl))
CPU times: user 4.3 ms, sys: 64 µs, total: 4.36 ms Wall time: 243 ms CPU times: user 10.4 ms, sys: 13.8 ms, total: 24.2 ms Wall time: 195 ms CPU times: user 8.62 ms, sys: 26.4 ms, total: 35 ms Wall time: 133 ms
class SleepyQueue():
"Simulate a queue with varying latency"
def __init__(self, q): self.q=q
def __iter__(self):
while True:
time.sleep(random.random()/100)
try: yield self.q.get_nowait()
except queues.Empty: return
q = Queue()
for o in range(30): q.put(o)
it = SleepyQueue(q)
%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30))
CPU times: user 20.6 ms, sys: 17.5 ms, total: 38.1 ms Wall time: 83.2 ms
class A(TensorBase): pass
for nw in (0,2):
t = A(tensor([1,2]))
dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
b = first(dl)
test_eq(type(b), A)
t = (A(tensor([1,2])),)
dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
b = first(dl)
test_eq(type(b[0]), A)
class A(TensorBase): pass
t = A(tensor(1,2))
tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
b = first(tdl)
test_eq(type(b), A)
# Unknown attributes are delegated to `dataset`
test_eq(tdl.pop(), tensor(1,2))
#hide
from local.notebook.export import notebook2script
notebook2script(all_fs=True)
Converted 00_test.ipynb. Converted 01_core_foundation.ipynb. Converted 01a_core_utils.ipynb. Converted 01b_core_dispatch.ipynb. Converted 01c_core_transform.ipynb. Converted 02_core_script.ipynb. Converted 03_torchcore.ipynb. Converted 03a_layers.ipynb. Converted 04_data_load.ipynb. Converted 05_data_core.ipynb. Converted 06_data_transforms.ipynb. Converted 07_data_block.ipynb. Converted 08_vision_core.ipynb. Converted 09_vision_augment.ipynb. Converted 09a_vision_data.ipynb. Converted 10_pets_tutorial.ipynb. Converted 11_vision_models_xresnet.ipynb. Converted 12_optimizer.ipynb. Converted 13_learner.ipynb. Converted 13a_metrics.ipynb. Converted 14_callback_schedule.ipynb. Converted 14a_callback_data.ipynb. Converted 15_callback_hook.ipynb. Converted 15a_vision_models_unet.ipynb. Converted 16_callback_progress.ipynb. Converted 17_callback_tracker.ipynb. Converted 18_callback_fp16.ipynb. Converted 19_callback_mixup.ipynb. Converted 20_interpret.ipynb. Converted 20a_distributed.ipynb. Converted 21_vision_learner.ipynb. Converted 22_tutorial_imagenette.ipynb. Converted 23_tutorial_transfer_learning.ipynb. Converted 30_text_core.ipynb. Converted 31_text_data.ipynb. Converted 32_text_models_awdlstm.ipynb. Converted 33_text_models_core.ipynb. Converted 34_callback_rnn.ipynb. Converted 35_tutorial_wikitext.ipynb. Converted 36_text_models_qrnn.ipynb. Converted 37_text_learner.ipynb. Converted 38_tutorial_ulmfit.ipynb. Converted 40_tabular_core.ipynb. Converted 41_tabular_model.ipynb. Converted 42_tabular_rapids.ipynb. Converted 50_data_block_examples.ipynb. Converted 60_medical_imaging.ipynb. Converted 65_medical_text.ipynb. Converted 70_callback_wandb.ipynb. Converted 90_notebook_core.ipynb. Converted 91_notebook_export.ipynb. Converted 92_notebook_showdoc.ipynb. Converted 93_notebook_export2html.ipynb. Converted 94_notebook_test.ipynb. Converted 95_index.ipynb. Converted 96_data_external.ipynb. Converted 97_utils_test.ipynb. Converted notebook2jekyll.ipynb.