#!/usr/bin/env python # coding: utf-8 # # Treating notebooks as data # In[111]: from toolz.curried import * from pandas import * from pathlib import Path from nbformat import reads from collections import UserDict from nbformat.v4 import * from nbformat import v4 from functools import partialmethod from operator import methodcaller, ne from nbconvert import exporters from traitlets import import_item, config import json from nbformat import NotebookNode from IPython.display import * from markupsafe import escape # In[96]: class Loader(UserDict): def __missing__(self, path): return None def __getitem__(self, path): return super().__getitem__(path.suffix) def __call__(self, path): return self[path](path) # In[97]: class Notebook(DataFrame): @property def _constructor(self): return type(self) _metadata = ['loader', 'meta'] loader = Loader({ '.ipynb': compose(reads, Path.read_text) }) @property def _constructor_sliced(self): return Series def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False): self.meta, path = {}, None if isinstance(data, Path): path, data = data, self.loader(data) if isinstance(data, NotebookNode): self.meta, data = get(['metadata', 'cells'], data) super().__init__(data, index, columns, dtype, copy) def __finalize__(self, other=None, method=None): if method == 'merge': self.meta.update(**getattr(other.left, 'meta', {})) if method == 'concat': for object in other.objs: self.meta.update(**getattr(object, 'meta', {})) return self def to_notebook(self, to=None): nb = new_notebook( cells=compose(list, map(self.cellify), pluck(1))(self.iterrows()), metadata=self.meta) to and Path(to).write_text(writes(nb)) return nb def raw(self, type='raw'): return df[df.cell_type.eq(type)] code, markdown = partialmethod(raw, 'code'), partialmethod(raw, 'markdown') def find(self, query=''): return self[self.source.str.contains(query)] @staticmethod def cellify(dict): if isinstance(dict, Series): dict = dict.to_dict() cell = dict['cell_type'] if cell == 'code': dict['execution_count'] = ( None if dict['execution_count'] is None or np.isnan(dict['execution_count']) or ne(*[dict['execution_count']]*2) else int(dict['execution_count'])) if cell == 'markdown': del dict['execution_count'], dict['outputs'] return methodcaller('_'.join(('new', cell, 'cell')), **dict)(v4) def preprocess(self, preprocessors=[], resources={}, ): nb = self.to_notebook() for pre in preprocessors: if isinstance(pre, str): pre = import_item(pre) if callable(pre): pre = pre() nb, resources = pre.preprocess(nb, resources) return Notebook(nb) def append(self, source, type='code', **kwargs): return super().append( methodcaller('_'.join(('new', type, 'cell')), source, **kwargs)(v4), ignore_index=True) def to_export(self, name='html', **kwargs): return compose(first, exporters.export)( exporters.get_exporter(name), self.to_notebook(), **kwargs) for name in exporters.get_export_names(): 'to_'+name not in dir(Notebook) and setattr( Notebook, 'to_'+name, partialmethod(Notebook.to_export, name)) # In[98]: def read_notebooks(paths=['.'], suffixes=['.ipynb']): if not isiterable(paths): paths = [paths] return concat({ notebook: excepts( json.JSONDecodeError, Notebook, lambda e: Series({}) )(notebook) for path in map(Path, paths) for notebook in ( concatv( *(path.glob('*'+suffix) for suffix in suffixes) ) if path.is_dir() else path if isiterable(path) else [path])}) # # Create a Notebook from a DataFrame # In[99]: ( Notebook() .append("a = 42") ).to_notebook('part.ipynb'); # # Load the Notebook # In[100]: nb = read_notebooks([Path('part.ipynb')]) # # Manually append source code to it # In[101]: new = ( nb .append("print(a)") .append("""__import__("IPython").display.HTML(str(a))""") ) new # # Execute the code # In[102]: new.preprocess(['nbconvert.preprocessors.execute.ExecutePreprocessor']) # # Modify the source and rerun the code # In[118]: new2 = new.copy() new2.source = new.source.str.replace('42', '100') newest = new2.preprocess(['nbconvert.preprocessors.execute.ExecutePreprocessor']) # # Creating Slides # In[119]: def make_slide(object): return object.update(slideshow=dict(slide_type='slide')) or object # In[120]: newest.metadata.apply(make_slide) # ## Export the slides with a specific configuration. # In[121]: slides = newest.to_slides(config=traitlets.config.Config( SlidesExporter=dict(reveal_url_prefix="https://cdn.jsdelivr.net/npm/reveal.js@3.5.0"))) # ## Use `srcdoc` to embed an IFramed presentation. # In[122]: HTML(compose( """""".format, markupsafe.escape )(slides)) # In[ ]: