%reload_ext autoreload %autoreload 2 #export from nb_007 import * import pandas as pd, re, html, os import warnings warnings.filterwarnings("ignore", message="numpy.dtype size changed") warnings.filterwarnings("ignore", message="numpy.ufunc size changed") import spacy from spacy.symbols import ORTH from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor PATH = Path('../data/aclImdb/') CLAS_PATH = PATH/'clas' LM_PATH = PATH/'lm' os.makedirs(CLAS_PATH, exist_ok=True) os.makedirs(LM_PATH, exist_ok=True) #export BOS,FLD,UNK,PAD = 'xxbos','xxfld','xxunk','xxpad' TK_UP,TK_REP,TK_WREP = 'xxup','xxrep','xxwrep' CLASSES = ['neg', 'pos', 'unsup'] def get_texts(path): texts,labels = [],[] for idx,label in enumerate(CLASSES): for fname in (path/label).glob('*.*'): texts.append(fname.open('r', encoding='utf8').read()) labels.append(idx) return np.array(texts),np.array(labels) train_texts,train_labels = get_texts(PATH/'train') valid_texts,valid_labels = get_texts(PATH/'test') train_idx = np.random.permutation(len(train_texts)) valid_idx = np.random.permutation(len(valid_texts)) train_texts,train_labels = train_texts[train_idx],train_labels[train_idx] valid_texts,valid_labels = valid_texts[valid_idx],valid_labels[valid_idx] train_df = pd.DataFrame({'text':train_texts, 'labels':train_labels}, columns=['labels','text']) valid_df = pd.DataFrame({'text':valid_texts, 'labels':valid_labels}, columns=['labels','text']) train_df[train_df['labels']!=2].to_csv(CLAS_PATH/'train.csv', header=False, index=False) valid_df.to_csv(CLAS_PATH/'valid.csv', header=False, index=False) all_texts = np.concatenate([train_texts,valid_texts]) idx = np.random.permutation(len(all_texts)) cut = int(0.1 * len(idx)) train_df = pd.DataFrame({'text':all_texts[idx[cut:]], 'labels':[0] * (len(all_texts)-cut)}, columns=['labels','text']) valid_df = pd.DataFrame({'text':all_texts[idx[:cut]], 'labels':[0] * cut}, columns=['labels','text']) train_df.to_csv(LM_PATH/'train.csv', header=False, index=False) valid_df.to_csv(LM_PATH/'valid.csv', header=False, index=False) #export def partition(a:Collection, sz:int) -> List[Collection]: "Splits iterables a in equal parts of size sz" return [a[i:i+sz] for i in range(0, len(a), sz)] def partition_by_cores(a:Collection, n_cpus:int) -> List[Collection]: "Split data equally among CPU cores" return partition(a, len(a)//n_cpus + 1) def num_cpus() -> int: "Return the number of CPUs in the system" try: return len(os.sched_getaffinity(0)) except AttributeError: return os.cpu_count() #export class BaseTokenizer(): "Basic class for a tokenizer function." def __init__(self, lang:str): self.lang = lang def tokenizer(self, t:spacy.tokens.doc.Doc) -> List[str]: raise NotImplementedError def add_special_cases(self, toks:Collection[str]): raise NotImplementedError #export class SpacyTokenizer(BaseTokenizer): "Little wrapper around a `spacy` tokenizer" def __init__(self, lang:str): self.tok = spacy.load(lang) def tokenizer(self, t:spacy.tokens.doc.Doc) -> List[str]: return [t.text for t in self.tok.tokenizer(t)] def add_special_cases(self, toks:Collection[str]): for w in toks: self.tok.tokenizer.add_special_case(w, [{ORTH: w}]) train_df = pd.read_csv(LM_PATH/'train.csv', header=None, chunksize=10) trn_df = next(train_df) test_tok = SpacyTokenizer('en') test_txt = trn_df.iloc[0][1] test_tok.tokenizer(test_txt) #export def sub_br(t:str) -> str: "Replaces the
by \n" re_br = re.compile(r'<\s*br\s*/?>', re.IGNORECASE) return re_br.sub("\n", t) def spec_add_spaces(t:str) -> str: "Add spaces between special characters" return re.sub(r'([/#])', r' \1 ', t) def rm_useless_spaces(t:str) -> str: "Remove multiple spaces" return re.sub(' {2,}', ' ', t) def replace_rep(t:str) -> str: "Replace repetitions at the character level" def _replace_rep(m:Collection[str]) -> str: c,cc = m.groups() return f' {TK_REP} {len(cc)+1} {c} ' re_rep = re.compile(r'(\S)(\1{3,})') return re_rep.sub(_replace_rep, t) def replace_wrep(t:str) -> str: "Replace word repetitions" def _replace_wrep(m:Collection[str]) -> str: c,cc = m.groups() return f' {TK_WREP} {len(cc.split())+1} {c} ' re_wrep = re.compile(r'(\b\w+\W+)(\1{3,})') return re_wrep.sub(_replace_wrep, t) def deal_caps(t:str) -> str: "Replace words in all caps" res = [] for s in re.findall(r'\w+|\W+', t): res += ([f' {TK_UP} ',s.lower()] if (s.isupper() and (len(s)>2)) else [s.lower()]) return ''.join(res) def fixup(x:str) -> str: "List of replacements from html strings" re1 = re.compile(r' +') x = x.replace('#39;', "'").replace('amp;', '&').replace('#146;', "'").replace( 'nbsp;', ' ').replace('#36;', '$').replace('\\n', "\n").replace('quot;', "'").replace( '
', "\n").replace('\\"', '"').replace('',UNK).replace(' @.@ ','.').replace( ' @-@ ','-').replace('\\', ' \\ ') return re1.sub(' ', html.unescape(x)) default_rules = [fixup, replace_rep, replace_wrep, deal_caps, spec_add_spaces, rm_useless_spaces, sub_br] default_spec_tok = [BOS, FLD, UNK, PAD] #export ListRules = Collection[Callable[[str],str]] class Tokenizer(): "Puts together rules, a tokenizer function and a language to process text with multiprocessing" def __init__(self, tok_fn:Callable=SpacyTokenizer, lang:str='en', rules:ListRules=None, special_cases:Collection[str]=None, n_cpus:int=None): self.tok_fn,self.lang,self.special_cases = tok_fn,lang,special_cases self.rules = rules if rules else default_rules self.special_cases = special_cases if special_cases else default_spec_tok self.n_cpus = n_cpus or num_cpus()//2 def __repr__(self) -> str: res = f'Tokenizer {self.tok_fn.__name__} in {self.lang} with the following rules:\n' for rule in self.rules: res += f' - {rule.__name__}\n' return res def proc_text(self, t:str, tok:BaseTokenizer) -> List[str]: "Processes one text" for rule in self.rules: t = rule(t) return tok.tokenizer(t) def process_all_1(self, texts:Collection[str]) -> List[List[str]]: "Processes a list of texts in one process" tok = self.tok_fn(self.lang) if self.special_cases: tok.add_special_cases(self.special_cases) return [self.proc_text(t, tok) for t in texts] def process_all(self, texts:Collection[str]) -> List[List[str]]: "Processes a list of texts in several processes" if self.n_cpus <= 1: return self.process_all_1(texts) with ProcessPoolExecutor(self.n_cpus) as e: return sum(e.map(self.process_all_1, partition_by_cores(texts, self.n_cpus)), []) tokenizer = Tokenizer(rules=default_rules, special_cases=[BOS, FLD, 'xxunk', 'xxpad'], n_cpus=1) tokenizer assert sub_br('end

begins again')=='end \n\n begins again' assert spec_add_spaces('\#%') == '\\ # %' assert rm_useless_spaces('this is') == 'this is' assert replace_rep('ffffffive .') == ' xxrep 6 f ive .' assert replace_wrep('five five five five .') == ' xxwrep 4 five .' assert deal_caps('ANGRY') == ' xxup angry' #export def get_chunk_length(csv_name:PathOrStr, chunksize:int) -> int: "Reads the number of chunks in a pandas `DataFrame`" dfs = pd.read_csv(csv_name, header=None, chunksize=chunksize) l = 0 for _ in dfs: l+=1 return l def get_total_length(csv_name:PathOrStr, chunksize:int) -> int: "Reads the the total length of a pandas `DataFrame`" dfs = pd.read_csv(csv_name, header=None, chunksize=chunksize) l = 0 for df in dfs: l+=len(df) return l #export def maybe_copy(old_fnames:Collection[PathOrStr], new_fnames:Collection[PathOrStr]): "Copies the `old_fnames` to `new_fnames` location if new_fnames don't exist or are less recent." os.makedirs(os.path.dirname(new_fnames[0]), exist_ok=True) for old_fname,new_fname in zip(old_fnames, new_fnames): if not os.path.isfile(new_fname) or os.path.getmtime(new_fname) < os.path.getmtime(old_fname): shutil.copyfile(old_fname, new_fname) #export import hashlib Tokens = Collection[Collection[str]] class Vocab(): "Contains the correspondance between numbers and tokens and numericalizes" def __init__(self, path:PathOrStr): self.itos = pickle.load(open(path/'itos.pkl', 'rb')) self.stoi = collections.defaultdict(int,{v:k for k,v in enumerate(self.itos)}) def numericalize(self, t:Collection[str]) -> List[int]: "Converts a list of tokens to their ids" return [self.stoi[w] for w in t] def textify(self, nums:Collection[int]) -> List[str]: "Converts a list of ids to their tokens" return ' '.join([self.itos[i] for i in nums]) @classmethod def create(cls, path:PathOrStr, tokens:Tokens, max_vocab:int, min_freq:int) -> 'Vocab': "Create a vocabulary from a set of tokens." freq = Counter(p for o in tokens for p in o) itos = [o for o,c in freq.most_common(max_vocab) if c > min_freq] itos.insert(0, PAD) if UNK in itos: itos.remove(UNK) itos.insert(0, UNK) pickle.dump(itos, open(path/'itos.pkl', 'wb')) h = hashlib.sha1(np.array(itos)) with open(path/'numericalize.log','w') as f: f.write(h.hexdigest()) return cls(path) #export TextMtd = IntEnum('TextMtd', 'CSV TOK IDS') import shutil class TextDataset(): "Basic dataset for NLP tasks." def __init__(self, path:PathOrStr, tokenizer:Tokenizer, vocab:Vocab=None, max_vocab:int=60000, chunksize:int=10000, name:str='train', min_freq:int=2, n_labels:int=1, create_mtd:TextMtd=TextMtd.CSV, classes:Classes=None): self.path,self.tokenizer,self.max_vocab,self.min_freq = Path(path/'tmp'),tokenizer,max_vocab,min_freq self.chunksize,self.name,self.n_labels,self.create_mtd = chunksize,name,n_labels,create_mtd self.vocab=vocab os.makedirs(self.path, exist_ok=True) if not self.check_toks(): self.tokenize() if not self.check_ids(): self.numericalize() if self.vocab is None: self.vocab = Vocab(self.path) self.ids = np.load(self.path/f'{self.name}_ids.npy') if os.path.isfile(self.path/f'{self.name}_lbl.npy'): self.labels = np.load(self.path/f'{self.name}_lbl.npy') else: self.labels = np.zeros((len(self.ids),), dtype=np.int64) self.classes = classes if classes else np.unique(self.labels) def __getitem__(self, idx:int) -> Tuple[int,int]: return self.ids[idx],self.labels[idx] def __len__(self) -> int: return len(self.ids) def general_check(self, pre_files:Collection[PathOrStr], post_files:Collection[PathOrStr]): "Checks that post_files exist and were modified after all the prefiles." if not np.all([os.path.isfile(fname) for fname in post_files]): return False for pre_file in pre_files: if os.path.getmtime(pre_file) > os.path.getmtime(post_files[0]): return False return True def check_ids(self) -> bool: "Checks if a new numericalization is needed." if self.create_mtd >= TextMtd.IDS: return True if not self.general_check([self.tok_files[0],self.id_files[1]], self.id_files): return False itos = pickle.load(open(self.id_files[1], 'rb')) h = hashlib.sha1(np.array(itos)) with open(self.id_files[2]) as f: if h.hexdigest() != f.read() or len(itos) > self.max_vocab + 2: return False toks,ids = np.load(self.tok_files[0]),np.load(self.id_files[0]) if len(toks) != len(ids): return False return True def check_toks(self) -> bool: "Checks if a new tokenization is needed." if self.create_mtd >= TextMtd.TOK: return True if not self.general_check([self.csv_file], self.tok_files): return False with open(self.tok_files[1]) as f: if repr(self.tokenizer) != f.read(): return False return True def tokenize(self): "Tokenizes the texts in the csv file" print(f'Tokenizing {self.name}. This might take a while so you should grab a coffee.') curr_len = get_chunk_length(self.csv_file, self.chunksize) dfs = pd.read_csv(self.csv_file, header=None, chunksize=self.chunksize) tokens,labels = [],[] for _ in progress_bar(range(curr_len), leave=False): df = next(dfs) lbls = df.iloc[:,range(self.n_labels)].values.astype(np.int64) texts = f'\n{BOS} {FLD} 1 ' + df[self.n_labels].astype(str) for i in range(self.n_labels+1, len(df.columns)): texts += f' {FLD} {i-n_lbls} ' + df[i].astype(str) toks = self.tokenizer.process_all(texts) tokens += toks labels += list(np.squeeze(lbls)) np.save(self.tok_files[0], np.array(tokens)) np.save(self.path/f'{self.name}_lbl.npy', np.array(labels)) with open(self.tok_files[1],'w') as f: f.write(repr(self.tokenizer)) def numericalize(self): "Numericalizes the tokens in the token file" print(f'Numericalizing {self.name}.') toks = np.load(self.tok_files[0]) if self.vocab is None: self.vocab = Vocab.create(self.path, toks, self.max_vocab, self.min_freq) ids = np.array([self.vocab.numericalize(t) for t in toks]) np.save(self.id_files[0], ids) def clear(self): "Removes temporary files" files = [self.path/f'{self.name}_{suff}.npy' for suff in ['ids','tok','lbl']] files.append(self.path/f'{self.name}.csv') for file in files: if os.path.isfile(file): os.remove(file) @property def csv_file(self) -> Path: return self.path/f'{self.name}.csv' @property def tok_files(self) -> List[Path]: return [self.path/f'{self.name}_tok.npy', self.path/'tokenize.log'] @property def id_files(self) -> List[Path]: return [self.path/f'{self.name}_ids.npy', self.path/'itos.pkl', self.path/'numericalize.log'] @classmethod def from_ids(cls, folder:PathOrStr, name:str, id_suff:str='_ids', lbl_suff:str='_lbl', itos:str='itos.pkl', **kwargs) -> 'TextDataset': "Creates a dataset from an id, a dictionary and label file." orig = [Path(folder/file) for file in [f'{name}{id_suff}.npy', f'{name}{lbl_suff}.npy', itos]] dest = [Path(folder)/'tmp'/file for file in [f'{name}_ids.npy', f'{name}_lbl.npy', 'itos.pkl']] maybe_copy(orig, dest) return cls(folder, None, name=name, create_mtd=TextMtd.IDS, **kwargs) @classmethod def from_tokens(cls, folder:PathOrStr, name:str, tok_suff:str='_tok', lbl_suff:str='_lbl', **kwargs) -> 'TextDataset': "Creates a dataset from a token and label file." orig = [Path(folder/file) for file in [f'{name}{tok_suff}.npy', f'{name}{lbl_suff}.npy']] dest = [Path(folder)/'tmp'/file for file in [f'{name}_tok.npy', f'{name}_lbl.npy']] maybe_copy(orig, dest) return cls(folder, None, name=name, create_mtd=TextMtd.TOK, **kwargs) @classmethod def from_csv(cls, folder:PathOrStr, tokenizer:Tokenizer, name:str, **kwargs) -> 'TextDataset': "Creates a dataset from texts in a csv file." orig = [Path(folder)/f'{name}.csv'] dest = [Path(folder)/'tmp'/f'{name}.csv'] maybe_copy(orig, dest) return cls(folder, tokenizer, name=name, **kwargs) @classmethod def from_folder(cls, folder:PathOrStr, tokenizer:Tokenizer, name:str, classes:Classes=None, shuffle:bool=True, **kwargs) -> 'TextDataset': "Creates a dataset from a folder" path = Path(folder)/'tmp' os.makedirs(path, exist_ok=True) if classes is None: classes = [cls.name for cls in find_classes(Path(folder)/name)] texts,labels = [],[] for idx,label in enumerate(classes): for fname in (Path(folder)/name/label).glob('*.*'): texts.append(fname.open('r', encoding='utf8').read()) labels.append(idx) texts,labels = np.array(texts),np.array(labels) if shuffle: idx = np.random.permutation(len(texts)) texts,labels = texts[idx],labels[idx] df = pd.DataFrame({'text':texts, 'labels':labels}, columns=['labels','text']) if os.path.isfile(path/f'{name}.csv'): if get_total_length(path/f'{name}.csv', 10000) != len(df): df.to_csv(path/f'{name}.csv', index=False, header=False) else: df.to_csv(path/f'{name}.csv', index=False, header=False) return cls(folder, tokenizer, name=name, classes=classes, **kwargs) #export def extract_kwargs(names:Collection[str], kwargs:KWArgs): "Extracts the keys in names from the kwargs." new_kwargs = {} for arg_name in names: if arg_name in kwargs: arg_val = kwargs.pop(arg_name) new_kwargs[arg_name] = arg_val return new_kwargs, kwargs #export class LanguageModelLoader(): "Creates a dataloader with bptt slightly changing." def __init__(self, dataset:TextDataset, bs:int=64, bptt:int=70, backwards:bool=False): self.dataset,self.bs,self.bptt,self.backwards = dataset,bs,bptt,backwards self.data = self.batchify(np.concatenate(dataset.ids)) self.first,self.i,self.iter = True,0,0 self.n = len(self.data) def __iter__(self): self.i,self.iter = 0,0 while self.i < self.n-1 and self.iter int: return (self.n-1) // self.bptt def batchify(self, data:np.ndarray) -> LongTensor: "Splits the corpus in batches." nb = data.shape[0] // self.bs data = np.array(data[:nb*self.bs]).reshape(self.bs, -1).T if self.backwards: data=data[::-1] return LongTensor(data) def get_batch(self, i:int, seq_len:int) -> Tuple[LongTensor, LongTensor]: "Creates a batch of a given seq_len" seq_len = min(seq_len, len(self.data) - 1 - i) return self.data[i:i+seq_len], self.data[i+1:i+1+seq_len].contiguous().view(-1) #export def standard_data(datasets:Collection[DatasetBase], path:PathOrStr, **kwargs) -> DataBunch: "Simply creates a `DataBunch` from the `datasets`" return DataBunch.create(*datasets, path=path, **kwargs) def lm_data(datasets:Collection[TextDataset], path:PathOrStr, **kwargs) -> DataBunch: "Creates a `DataBunch` from the `datasets` for language modelling" dataloaders = [LanguageModelLoader(ds, **kwargs) for ds in datasets] return DataBunch(*dataloaders, path=path) #export DataFunc = Callable[[Collection[DatasetBase], PathOrStr, KWArgs], DataBunch] def data_from_textids(path:PathOrStr, train:str='train', valid:str='valid', test:Optional[str]=None, data_func:DataFunc=standard_data, itos:str='itos.pkl', **kwargs) -> DataBunch: "Creates a `DataBunch` from ids, labels and a dictionary." path=Path(path) txt_kwargs, kwargs = extract_kwargs(['max_vocab', 'chunksize', 'min_freq', 'n_labels', 'id_suff', 'lbl_suff'], kwargs) train_ds = TextDataset.from_ids(path, train, itos=itos, **txt_kwargs) datasets = [train_ds, TextDataset.from_ids(path, valid, itos=itos, **txt_kwargs)] if test: datasets.append(TextDataset.from_ids(path, test, itos=itos, **txt_kwargs)) return data_func(datasets, path, **kwargs) def data_from_texttokens(path:PathOrStr, train:str='train', valid:str='valid', test:Optional[str]=None, data_func:DataFunc=standard_data, vocab:Vocab=None, **kwargs) -> DataBunch: "Creates a `DataBunch` from tokens and labels." path=Path(path) txt_kwargs, kwargs = extract_kwargs(['max_vocab', 'chunksize', 'min_freq', 'n_labels', 'tok_suff', 'lbl_suff'], kwargs) train_ds = TextDataset.from_tokens(path, train, vocab=vocab, **txt_kwargs) datasets = [train_ds, TextDataset.from_tokens(path, valid, vocab=train_ds.vocab, **txt_kwargs)] if test: datasets.append(TextDataset.from_tokens(path, test, vocab=train_ds.vocab, **txt_kwargs)) return data_func(datasets, path, **kwargs) def data_from_textcsv(path:PathOrStr, tokenizer:Tokenizer, train:str='train', valid:str='valid', test:Optional[str]=None, data_func:DataFunc=standard_data, vocab:Vocab=None, **kwargs) -> DataBunch: "Creates a `DataBunch` from texts in csv files." path=Path(path) txt_kwargs, kwargs = extract_kwargs(['max_vocab', 'chunksize', 'min_freq', 'n_labels'], kwargs) train_ds = TextDataset.from_csv(path, tokenizer, train, vocab=vocab, **txt_kwargs) datasets = [train_ds, TextDataset.from_csv(path, tokenizer, valid, vocab=train_ds.vocab, **txt_kwargs)] if test: datasets.append(TextDataset.from_csv(path, tokenizer, test, vocab=train_ds.vocab, **txt_kwargs)) return data_func(datasets, path, **kwargs) tokenizer = Tokenizer(rules=default_rules, special_cases=[BOS, FLD, 'xxunk', 'xxpad']) data = data_from_textcsv(LM_PATH, tokenizer, chunksize=1000) data = data_from_texttokens(LM_PATH, chunksize=1000) data = data_from_textids(LM_PATH, chunksize=1000) #data[0].clear() #data[1].clear() def make_sample(dir_name='tst_folders'): os.makedirs(PATH/dir_name, exist_ok=True) PATH1 = PATH/dir_name for name,name1 in zip(['train', 'valid'],['train', 'test']): os.makedirs(PATH1/name, exist_ok=True) for clas in ['neg', 'pos']: os.makedirs(PATH1/name/clas, exist_ok=True) fnames = list((PATH/name1/clas).glob('*.txt')) for i in range(2000): shutil.copy(fnames[i], PATH1/name/clas/fnames[i].name) #make_sample() train_ds = TextDataset.from_folder(PATH/'tst_folders', tokenizer, 'train', chunksize=1000) valid_ds = TextDataset.from_folder(PATH/'tst_folders', tokenizer, 'valid', chunksize=1000, vocab=train_ds.vocab) #export def data_from_textfolder(path:PathOrStr, tokenizer:Tokenizer, train:str='train', valid:str='valid', test:Optional[str]=None, shuffle:bool=True, data_func:DataFunc=standard_data, vocab:Vocab=None, **kwargs): "Creates a `DataBunch` from text files in folders." path=Path(path) txt_kwargs, kwargs = extract_kwargs(['max_vocab', 'chunksize', 'min_freq', 'n_labels'], kwargs) train_ds = TextDataset.from_folder(path, tokenizer, train, shuffle=shuffle, vocab=vocab, **txt_kwargs) datasets = [train_ds, TextDataset.from_folder(path, tokenizer, valid, classes=train_ds.classes, shuffle=shuffle, vocab=train_ds.vocab, **txt_kwargs)] if test: datasets.append(TextDataset.from_folder(path, tokenizer, test, classes=train_ds.classes, shuffle=shuffle, vocab=train_ds.vocab, **txt_kwargs)) return data_func(datasets, path, **kwargs) data = data_from_textfolder(PATH/'tst_folders', tokenizer, chunksize=1000)