#default_exp torch_core #export from local.test import * from local.core.all import * from local.torch_imports import * from fastprogress import progress_bar,master_bar from local.notebook.showdoc import * from PIL import Image #export _all_ = ['progress_bar','master_bar'] #export if torch.cuda.is_available(): if torch.cuda.current_device()==0: torch.cuda.set_device(int(os.environ.get('DEFAULT_GPU') or 0)) torch.backends.cudnn.benchmark = True #export @patch def __array_eq__(self:Tensor,b): return torch.equal(self,b) if self.dim() else self==b #export def _array2tensor(x): if x.dtype==np.uint16: x = x.astype(np.float32) return torch.from_numpy(x) #export def tensor(x, *rest, **kwargs): "Like `torch.as_tensor`, but handle lists too, and can pass multiple vector elements directly." if len(rest): x = (x,)+rest # There was a Pytorch bug in dataloader using num_workers>0. Haven't confirmed if fixed # if isinstance(x, (tuple,list)) and len(x)==0: return tensor(0) res = (x if isinstance(x, Tensor) else torch.tensor(x, **kwargs) if isinstance(x, (tuple,list)) else _array2tensor(x) if isinstance(x, ndarray) else as_tensor(x.values, **kwargs) if isinstance(x, (pd.Series, pd.DataFrame)) else as_tensor(x, **kwargs) if hasattr(x, '__array__') or is_iter(x) else _array2tensor(array(x), **kwargs)) if res.dtype is torch.float64: return res.float() return res test_eq(tensor(torch.tensor([1,2,3])), torch.tensor([1,2,3])) test_eq(tensor(array([1,2,3])), torch.tensor([1,2,3])) test_eq(tensor(1,2,3), torch.tensor([1,2,3])) test_eq_type(tensor(1.0), torch.tensor(1.0)) #export def set_seed(s): "Set random seed for `random`, `torch`, and `numpy` (where available)" try: torch.manual_seed(s) except NameError: pass try: np.random.seed(s%(2**32-1)) except NameError: pass random.seed(s) set_seed(2*33) a1 = np.random.random() a2 = torch.rand(()) a3 = random.random() set_seed(2*33) b1 = np.random.random() b2 = torch.rand(()) b3 = random.random() test_eq(a1,b1) test_eq(a2,b2) test_eq(a3,b3) #export def unsqueeze(x, dim=-1, n=1): "Same as `torch.unsqueeze` but can add `n` dims" for _ in range(n): x = x.unsqueeze(dim) return x t = tensor([1]) t2 = unsqueeze(t, n=2) test_eq(t2,t[:,None,None]) #export def unsqueeze_(x, dim=-1, n=1): "Same as `torch.unsqueeze_` but can add `n` dims" for _ in range(n): x.unsqueeze_(dim) return x t = tensor([1]) unsqueeze_(t, n=2) test_eq(t, tensor([1]).view(1,1,1)) #export def _fa_rebuild_tensor (cls, *args, **kwargs): return cls(torch._utils._rebuild_tensor_v2(*args, **kwargs)) def _fa_rebuild_qtensor(cls, *args, **kwargs): return cls(torch._utils._rebuild_qtensor (*args, **kwargs)) #export def apply(func, x, *args, **kwargs): "Apply `func` recursively to `x`, passing on args" if is_listy(x): return type(x)([apply(func, o, *args, **kwargs) for o in x]) if isinstance(x,dict): return {k: apply(func, v, *args, **kwargs) for k,v in x.items()} res = func(x, *args, **kwargs) return res if x is None else retain_type(res, x) #export def to_detach(b, cpu=True): "Recursively detach lists of tensors in `b `; put them on the CPU if `cpu=True`." def _inner(x, cpu=True): if not isinstance(x,Tensor): return x x = x.detach() return x.cpu() if cpu else x return apply(_inner, b, cpu=cpu) #export def to_half(b): "Recursively map lists of tensors in `b ` to FP16." return apply(lambda x: x.half() if torch.is_floating_point(x) else x, b) #export def to_float(b): "Recursively map lists of int tensors in `b ` to float." return apply(lambda x: x.float() if torch.is_floating_point(x) else x, b) #export # None: True if available; True: error if not availabe; False: use CPU defaults.use_cuda = None #export def default_device(use_cuda=-1): "Return or set default device; `use_cuda`: None - CUDA if available; True - error if not availabe; False - CPU" if use_cuda != -1: defaults.use_cuda=use_cuda use = defaults.use_cuda or (torch.cuda.is_available() and defaults.use_cuda is None) assert torch.cuda.is_available() or not use return torch.device(torch.cuda.current_device()) if use else torch.device('cpu') #cuda _td = torch.device(torch.cuda.current_device()) test_eq(default_device(None), _td) test_eq(default_device(True), _td) test_eq(default_device(False), torch.device('cpu')) default_device(None); #export def to_device(b, device=None): "Recursively put `b` on `device`." if device is None: device=default_device() def _inner(o): return o.to(device, non_blocking=True) if isinstance(o,Tensor) else o return apply(_inner, b) t = to_device((3,(tensor(3),tensor(2)))) t1,(t2,t3) = t test_eq_type(t,(3,(tensor(3).cuda(),tensor(2).cuda()))) test_eq(t2.type(), "torch.cuda.LongTensor") test_eq(t3.type(), "torch.cuda.LongTensor") #export def to_cpu(b): "Recursively map lists of tensors in `b ` to the cpu." return to_device(b,'cpu') t3 = to_cpu(t3) test_eq(t3.type(), "torch.LongTensor") test_eq(t3, 2) #export def to_np(x): "Convert a tensor to a numpy array." return apply(lambda o: o.data.cpu().numpy(), x) t3 = to_np(t3) test_eq(type(t3), np.ndarray) test_eq(t3, 2) #export class TensorBase(Tensor): def __new__(cls, x, **kwargs): res = torch.Tensor._make_subclass(cls, tensor(x)) res._meta = kwargs return res def __reduce_ex__(self,proto): torch.utils.hooks.warn_if_has_hooks(self) args = (type(self), self.storage(), self.storage_offset(), tuple(self.size()), self.stride()) if self.is_quantized: args = args + (self.q_scale(), self.q_zero_point()) f = _fa_rebuild_qtensor if self.is_quantized else _fa_rebuild_tensor return (f, args + (self.requires_grad, OrderedDict())) def gi(self, i): res = self[i] return type(self)(res) if isinstance(res,Tensor) else res #export def _patch_tb(): if getattr(TensorBase,'_patched',False): return TensorBase._patched = True def get_f(fn): def _f(self, *args, **kwargs): cls = self.__class__ res = getattr(super(TensorBase, self), fn)(*args, **kwargs) return cls(res) if isinstance(res,Tensor) else res return _f t = tensor([1]) skips = '__getitem__ __class__ __deepcopy__ __delattr__ __dir__ __doc__ __getattribute__ __hash__ __init__ \ __init_subclass__ __new__ __reduce__ __reduce_ex__ __module__ __setstate__'.split() for fn in dir(t): if fn in skips: continue f = getattr(t, fn) if isinstance(f, (MethodWrapperType, BuiltinFunctionType, BuiltinMethodType, MethodType, FunctionType)): setattr(TensorBase, fn, get_f(fn)) _patch_tb() #export class TensorCategory(TensorBase): pass #export class TensorMultiCategory(TensorCategory): pass class _T(TensorBase): pass t = _T(range(5)) test_eq(t[0], 0) test_eq_type(t.gi(0), _T(0)) test_eq_type(t.gi(slice(2)), _T([0,1])) test_eq_type(t+1, _T(range(1,6))) test_eq(type(pickle.loads(pickle.dumps(t))), _T) t = tensor([1,2,3]) m = TensorBase([False,True,True]) test_eq(t[m], tensor([2,3])) t = tensor([[1,2,3],[1,2,3]]) m = TensorBase([[False,True,True], [False,True,True]]) test_eq(t[m], tensor([2,3,2,3])) t = TensorBase([[1,2,3],[1,2,3]], a=1) test_eq(t._meta, {'a': 1}) x = retain_type(tensor([4,5,6]), t) test_eq(x._meta, {'a': 1}) #export class TensorImageBase(TensorBase): _show_args = ArrayImageBase._show_args def show(self, ctx=None, **kwargs): return show_image(self, ctx=ctx, **{**self._show_args, **kwargs}) #export class TensorImage(TensorImageBase): pass #export class TensorImageBW(TensorImage): _show_args = ArrayImageBW._show_args #export class TensorMask(TensorImageBase): _show_args = ArrayMask._show_args im = Image.open(TEST_IMAGE) im_t = TensorImage(array(im)) test_eq(type(im_t), TensorImage) im_t2 = TensorMask(tensor(1)) test_eq(type(im_t2), TensorMask) test_eq(im_t2, tensor(1)) ax = im_t.show(figsize=(2,2)) test_fig_exists(ax) #export @patch def tensored(self:L): "`mapped(tensor)`" return self.map(tensor) @patch def stack(self:L, dim=0): "Same as `torch.stack`" return torch.stack(list(self.tensored()), dim=dim) @patch def cat (self:L, dim=0): "Same as `torch.cat`" return torch.cat (list(self.tensored()), dim=dim) show_doc(L.tensored) t = L(([1,2],[3,4])) test_eq(t.tensored(), [tensor(1,2),tensor(3,4)]) show_doc(L.stack) test_eq(t.stack(), tensor([[1,2],[3,4]])) show_doc(L.cat) test_eq(t.cat(), tensor([1,2,3,4])) #export def concat(*ls): "Concatenate tensors, arrays, lists, or tuples" if not len(ls): return [] it = ls[0] if isinstance(it,torch.Tensor): res = torch.cat(ls) elif isinstance(it,ndarray): res = np.concatenate(ls) else: res = itertools.chain.from_iterable(map(L,ls)) if isinstance(it,(tuple,list)): res = type(it)(res) else: res = L(res) return retain_type(res, it) a,b,c = [1],[1,2],[1,1,2] test_eq(concat(a,b), c) test_eq_type(concat(tuple (a),tuple (b)), tuple (c)) test_eq_type(concat(array (a),array (b)), array (c)) test_eq_type(concat(tensor(a),tensor(b)), tensor(c)) test_eq_type(concat(TensorBase(a),TensorBase(b)), TensorBase(c)) test_eq_type(concat([1,1],1), [1,1,1]) test_eq_type(concat(1,1,1), L(1,1,1)) test_eq_type(concat(L(1,2),1), L(1,2,1)) #export class Chunks: "Slice and int indexing into a list of lists" def __init__(self, chunks, lens=None): self.chunks = chunks self.lens = L(map(len,self.chunks) if lens is None else lens) self.cumlens = np.cumsum(0+self.lens) self.totlen = self.cumlens[-1] def __getitem__(self,i): if isinstance(i,slice): return retain_type(self.getslice(i), old=self.chunks[0]) di,idx = self.doc_idx(i) return retain_type(self.chunks[di][idx], old=self.chunks[0]) def getslice(self, i): st_d,st_i = self.doc_idx(ifnone(i.start,0)) en_d,en_i = self.doc_idx(ifnone(i.stop,self.totlen+1)) res = [self.chunks[st_d][st_i:(en_i if st_d==en_d else sys.maxsize)]] for b in range(st_d+1,en_d): res.append(self.chunks[b]) if st_d!=en_d and en_d0: res[x] = 1. else: res[list(L(x, use_list=None))] = 1. return res test_eq(one_hot([1,4], 5), tensor(0,1,0,0,1).byte()) test_eq(one_hot(torch.tensor([]), 5), tensor(0,0,0,0,0).byte()) test_eq(one_hot(2, 5), tensor(0,0,1,0,0).byte()) #export def one_hot_decode(x, vocab=None): return L(vocab[i] if vocab else i for i,x_ in enumerate(x) if x_==1) test_eq(one_hot_decode(tensor(0,1,0,0,1)), [1,4]) test_eq(one_hot_decode(tensor(0,0,0,0,0)), [ ]) test_eq(one_hot_decode(tensor(0,0,1,0,0)), [2 ]) #export def params(m): "Return all parameters of `m`" return [p for p in m.parameters()] #export def trainable_params(m): "Return all trainable parameters of `m`" return [p for p in m.parameters() if p.requires_grad] m = nn.Linear(4,5) test_eq(trainable_params(m), [m.weight, m.bias]) m.weight.requires_grad_(False) test_eq(trainable_params(m), [m.bias]) #export bn_types = (nn.BatchNorm1d, nn.BatchNorm2d, nn.BatchNorm3d) #export def bn_bias_params(m, with_bias=True): "Return all bias and BatchNorm parameters" if isinstance(m, bn_types): return L(m.parameters()) if with_bias else L(m.weight) res = L(m.children()).map(bn_bias_params, with_bias=with_bias).concat() #if with_bias and hasattr(m, 'bias'): res.append(m.bias) return res model = nn.Sequential(nn.Linear(10,20), nn.BatchNorm1d(20), nn.Conv1d(3,4, 3)) test_eq(bn_bias_params(model), [model[1].weight, model[1].bias]) model = nn.ModuleList([nn.Linear(10,20), nn.Sequential(nn.BatchNorm1d(20), nn.Conv1d(3,4, 3))]) test_eq(bn_bias_params(model), [model[1][0].weight, model[1][0].bias]) model = nn.ModuleList([nn.Linear(10,20), nn.Sequential(nn.BatchNorm1d(20), nn.Conv1d(3,4, 3))]) test_eq(bn_bias_params(model, with_bias=False), [model[1][0].weight]) #export def batch_to_samples(b, max_n=10): "'Transposes' a batch to (at most `max_n`) samples" if isinstance(b, Tensor): return retain_types(list(b[:max_n]), [b]) else: res = L(b).map(partial(batch_to_samples,max_n=max_n)) return retain_types(res.zip(), [b]) t = tensor([1,2,3]) test_eq(batch_to_samples([t,t+1], max_n=2), ([1,2],[2,3])) test_eq(batch_to_samples(tensor([1,2,3]), 10), [1, 2, 3]) test_eq(batch_to_samples([tensor([1,2,3]), tensor([4,5,6])], 10), [(1, 4), (2, 5), (3, 6)]) test_eq(batch_to_samples([tensor([1,2,3]), tensor([4,5,6])], 2), [(1, 4), (2, 5)]) test_eq(batch_to_samples([tensor([1,2,3]), [tensor([4,5,6]),tensor([7,8,9])]], 10), [(1, (4, 7)), (2, (5, 8)), (3, (6, 9))]) test_eq(batch_to_samples([tensor([1,2,3]), [tensor([4,5,6]),tensor([7,8,9])]], 2), [(1, (4, 7)), (2, (5, 8))]) t = Tuple(tensor([1,2,3]),TensorBase([2,3,4])) test_eq_type(batch_to_samples(t)[0][1], TensorBase(2)) test_eq(batch_to_samples(t).map(type), [Tuple]*3) #export @patch def interp_1d(x:Tensor, xp, fp): "Same as `np.interp`" slopes = (fp[1:]-fp[:-1])/(xp[1:]-xp[:-1]) incx = fp[:-1] - (slopes*xp[:-1]) locs = (x[:,None]>=xp[None,:]).long().sum(1)-1 locs = locs.clamp(0,len(slopes)-1) return slopes[locs]*x + incx[locs] brks = tensor(0,1,2,4,8,64).float() ys = tensor(range_of(brks)).float() ys /= ys[-1].item() pts = tensor(0.2,0.5,0.8,3,5,63) preds = pts.interp_1d(brks, ys) test_close(preds.numpy(), np.interp(pts.numpy(), brks.numpy(), ys.numpy())) plt.scatter(brks,ys) plt.scatter(pts,preds) plt.legend(['breaks','preds']); # export def logit(x): "Logit of `x`, clamped to avoid inf." x = x.clamp(1e-7, 1-1e-7) return -(1/x-1).log() #export def num_distrib(): "Return the number of processes in distributed training (if applicable)." return int(os.environ.get('WORLD_SIZE', 0)) #export def rank_distrib(): "Return the distributed rank of this process (if applicable)." return int(os.environ.get('RANK', 0)) #export def make_cross_image(bw=True): "Create a tensor containing a cross image, either `bw` (True) or color" if bw: im = torch.zeros(5,5) im[2,:] = 1. im[:,2] = 1. else: im = torch.zeros(3,5,5) im[0,2,:] = 1. im[1,:,2] = 1. return im plt.imshow(make_cross_image(), cmap="Greys"); plt.imshow(make_cross_image(False).permute(1,2,0)); #export def show_image_batch(b, show=show_titled_image, items=9, cols=3, figsize=None, **kwargs): "Display batch `b` in a grid of size `items` with `cols` width" if items0 else False tst = nn.Linear(4,5) assert requires_grad(tst) for p in tst.parameters(): p.requires_grad_(False) assert not requires_grad(tst) #export def init_default(m, func=nn.init.kaiming_normal_): "Initialize `m` weights with `func` and set `bias` to 0." if func: if hasattr(m, 'weight'): func(m.weight) if hasattr(m, 'bias') and hasattr(m.bias, 'data'): m.bias.data.fill_(0.) return m tst = nn.Linear(4,5) tst.weight.data.uniform_(-1,1) tst.bias.data.uniform_(-1,1) tst = init_default(tst, func = lambda x: x.data.fill_(1.)) test_eq(tst.weight, torch.ones(5,4)) test_eq(tst.bias, torch.zeros(5)) #export def cond_init(m, func): "Apply `init_default` to `m` unless it's a batchnorm module" if (not isinstance(m, bn_types)) and requires_grad(m): init_default(m, func) tst = nn.Linear(4,5) tst.weight.data.uniform_(-1,1) tst.bias.data.uniform_(-1,1) cond_init(tst, func = lambda x: x.data.fill_(1.)) test_eq(tst.weight, torch.ones(5,4)) test_eq(tst.bias, torch.zeros(5)) tst = nn.BatchNorm2d(5) init = [tst.weight.clone(), tst.bias.clone()] cond_init(tst, func = lambda x: x.data.fill_(1.)) test_eq(tst.weight, init[0]) test_eq(tst.bias, init[1]) #export def apply_leaf(m, f): "Apply `f` to children of `m`." c = m.children() if isinstance(m, nn.Module): f(m) for l in c: apply_leaf(l,f) tst = nn.Sequential(nn.Linear(4,5), nn.Sequential(nn.Linear(4,5), nn.Linear(4,5))) apply_leaf(tst, partial(init_default, func=lambda x: x.data.fill_(1.))) for l in [tst[0], *tst[1]]: test_eq(l.weight, torch.ones(5,4)) for l in [tst[0], *tst[1]]: test_eq(l.bias, torch.zeros(5)) #export def apply_init(m, func=nn.init.kaiming_normal_): "Initialize all non-batchnorm layers of `m` with `func`." apply_leaf(m, partial(cond_init, func=func)) tst = nn.Sequential(nn.Linear(4,5), nn.Sequential(nn.Linear(4,5), nn.BatchNorm1d(5))) init = [tst[1][1].weight.clone(), tst[1][1].bias.clone()] apply_init(tst, func=lambda x: x.data.fill_(1.)) for l in [tst[0], tst[1][0]]: test_eq(l.weight, torch.ones(5,4)) for l in [tst[0], tst[1][0]]: test_eq(l.bias, torch.zeros(5)) test_eq(tst[1][1].weight, init[0]) test_eq(tst[1][1].bias, init[1]) #export from multiprocessing import Process, Queue #export def set_num_threads(nt): "Get numpy (and others) to use `nt` threads" try: import mkl; mkl.set_num_threads(nt) except: pass torch.set_num_threads(1) os.environ['IPC_ENABLE']='1' for o in ['OPENBLAS_NUM_THREADS','NUMEXPR_NUM_THREADS','OMP_NUM_THREADS','MKL_NUM_THREADS']: os.environ[o] = str(nt) #export @delegates(concurrent.futures.ProcessPoolExecutor) class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor): def __init__(self, max_workers=None, on_exc=print, **kwargs): self.not_parallel = max_workers==0 self.on_exc = on_exc if self.not_parallel: max_workers=1 super().__init__(max_workers, **kwargs) def map(self, f, items, *args, **kwargs): g = partial(f, *args, **kwargs) if self.not_parallel: return map(g, items) try: return super().map(g, items) except Exception as e: self.on_exc(e) #export def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=True, **kwargs): "Applies `func` in parallel to `items`, using `n_workers`" with ProcessPoolExecutor(n_workers) as ex: r = ex.map(f,items, *args, **kwargs) if progress: if total is None: total = len(items) r = progress_bar(r, total=total, leave=False) return L(r) def add_one(x, a=1): time.sleep(random.random()/100) return x+a inp,exp = range(50),range(1,51) test_eq(parallel(add_one, inp, n_workers=2), exp) test_eq(parallel(add_one, inp, n_workers=0), exp) test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52)) test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52)) #export def run_procs(f, f_done, args): "Call `f` for each item in `args` in parallel, yielding `f_done`" processes = L(args).map(Process, args=arg0, target=f) for o in processes: o.start() try: yield from f_done() except Exception as e: print(e) finally: processes.map(Self.join()) #export def parallel_gen(cls, items, n_workers=defaults.cpus, as_gen=False, **kwargs): "Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel." batches = np.array_split(items, n_workers) idx = np.cumsum(0 + L(batches).map(len)) queue = Queue() def f(batch, start_idx): for i,b in enumerate(cls(**kwargs)(batch)): queue.put((start_idx+i,b)) def done(): return (queue.get() for _ in progress_bar(items, leave=False)) yield from run_procs(f, done, L(batches,idx).zip()) class SleepyBatchFunc: def __init__(self): self.a=1 def __call__(self, batch): for k in batch: time.sleep(random.random()/4) yield k+self.a x = np.linspace(0,0.99,20) res = L(parallel_gen(SleepyBatchFunc, x, n_workers=2)) test_eq(res.sorted().itemgot(1), x+1) #export def script_use_ctx(f): "Decorator: create jit script and pass everything in `ctx.saved_variables to `f`, after `*args`" sf = torch.jit.script(f) def _f(ctx, *args, **kwargs): return sf(*args, *ctx.saved_variables, **kwargs) return update_wrapper(_f,f) #export def script_save_ctx(static, *argidx): "Decorator: create jit script and save args with indices `argidx` using `ctx.save_for_backward`" def _dec(f): sf = torch.jit.script(f) def _f(ctx, *args, **kwargs): if argidx: save = [args[o] for o in argidx] ctx.save_for_backward(*save) if not argidx: args = [ctx]+args return sf(*args, **kwargs) if static: _f = staticmethod(_f) return update_wrapper(_f,f) return _dec #export def script_fwd(*argidx): "Decorator: create static jit script and save args with indices `argidx` using `ctx.save_for_backward`" return script_save_ctx(True, *argidx) #export def script_bwd(f): "Decorator: create static jit script and pass everything in `ctx.saved_variables to `f`, after `*args`" return staticmethod(script_use_ctx(f)) #export def grad_module(cls): "Decorator: convert `cls` into an autograd function" class _c(nn.Module): def forward(self, *args, **kwargs): return cls.apply(*args, **kwargs) return _c #hide from local.notebook.export import notebook2script notebook2script(all_fs=True)