# export
from local.core.imports import *
from local.notebook.core import *
import nbformat,inspect
from nbformat.sign import NotebookNotary
# default_exp notebook.export
# default_cls_lvl 3
The functions that transform the dev notebooks in the fastai library
A jupyter notebook is a json file behind the scenes. We can just read it with the json module, which will return a nested dictionary of dictionaries/lists of dictionaries, but there are some small differences between reading the json and using the tools from nbformat
so we'll use this one.
#export
def read_nb(fname):
"Read the notebook in `fname`."
with open(Path(fname),'r', encoding='utf8') as f: return nbformat.reads(f.read(), as_version=4)
fname
can be a string or a pathlib object.
test_nb = read_nb('91_notebook_export.ipynb')
The root has four keys: cells
contains the cells of the notebook, metadata
some stuff around the version of python used to execute the notebook, nbformat
and nbformat_minor
the version of nbformat.
test_nb.keys()
dict_keys(['cells', 'metadata', 'nbformat', 'nbformat_minor'])
test_nb['metadata']
{'kernelspec': {'display_name': 'Python 3', 'language': 'python', 'name': 'python3'}, 'language_info': {'codemirror_mode': {'name': 'ipython', 'version': 3}, 'file_extension': '.py', 'mimetype': 'text/x-python', 'name': 'python', 'nbconvert_exporter': 'python', 'pygments_lexer': 'ipython3', 'version': '3.6.9'}}
f"{test_nb['nbformat']}.{test_nb['nbformat_minor']}"
'4.4'
The cells key then contains a list of cells. Each one is a new dictionary that contains entries like the type (code or markdown), the source (what is written in the cell) and the output (for code cells).
test_nb['cells'][0]
{'cell_type': 'code', 'execution_count': None, 'metadata': {'hide_input': False}, 'outputs': [], 'source': '# export\nfrom local.core.imports import *\nfrom local.notebook.core import *\nimport nbformat,inspect\nfrom nbformat.sign import NotebookNotary'}
def _test_eq(a,b): assert a==b, f'{a}, {b}'
# export
def check_re(cell, pat, code_only=True):
"Check if `cell` contains a line with regex `pat`"
if code_only and cell['cell_type'] != 'code': return
if isinstance(pat, str): pat = re.compile(pat, re.IGNORECASE | re.MULTILINE)
return pat.search(cell['source'])
pat
can be a string or a compiled regex, if code_only=True
, ignores markdown cells.
cell = test_nb['cells'][0].copy()
assert check_re(cell, '# export') is not None
assert check_re(cell, re.compile('# export')) is not None
assert check_re(cell, '# bla') is None
cell['cell_type'] = 'markdown'
assert check_re(cell, '# export') is None
assert check_re(cell, '# export', code_only=False) is not None
# export
_re_blank_export = re.compile(r"""
# Matches any line with #export or #exports without any module name:
^ # beginning of line (since re.MULTILINE is passed)
\s* # any number of whitespace
\#\s* # # then any number of whitespace
exports? # export or exports
\s* # any number of whitespace
$ # end of line (since re.MULTILINE is passed)
""", re.IGNORECASE | re.MULTILINE | re.VERBOSE)
# export
_re_mod_export = re.compile(r"""
# Matches any line with #export or #exports with a module name and catches it in group 1:
^ # beginning of line (since re.MULTILINE is passed)
\s* # any number of whitespace
\#\s* # # then any number of whitespace
exports? # export or exports
\s* # any number of whitespace
(\S+) # catch a group with any non-whitespace chars
\s* # any number of whitespace
$ # end of line (since re.MULTILINE is passed)
""", re.IGNORECASE | re.MULTILINE | re.VERBOSE)
# export
def is_export(cell, default):
"Check if `cell` is to be exported and returns the name of the module."
if check_re(cell, _re_blank_export):
if default is None:
print(f"This cell doesn't have an export destination and was ignored:\n{cell['source'][1]}")
return default
tst = check_re(cell, _re_mod_export)
return os.path.sep.join(tst.groups()[0].split('.')) if tst else None
The cells to export are marked with an #export
or #exports
code, potentially with a module name where we want it exported. The default is given in a cell of the form #default_exp bla
inside the notebook (usually at the top), though in this function, it needs the be passed (the final script will read the whole notebook to find it).
cell = test_nb['cells'][0].copy()
assert is_export(cell, 'export') == 'export'
cell['source'] = "# exports"
assert is_export(cell, 'export') == 'export'
cell['source'] = "# export mod"
assert is_export(cell, 'export') == 'mod'
cell['source'] = "# export mod.file"
assert is_export(cell, 'export') == 'mod/file'
cell['source'] = "# expt mod.file"
assert is_export(cell, 'export') is None
# export
_re_default_exp = re.compile(r"""
# Matches any line with #default_exp with a module name and catches it in group 1:
^ # beginning of line (since re.MULTILINE is passed)
\s* # any number of whitespace
\#\s* # # then any number of whitespace
default_exp # export or exports
\s* # any number of whitespace
(\S+) # catch a group with any non-whitespace chars
\s* # any number of whitespace
$ # end of line (since re.MULTILINE is passed)
""", re.IGNORECASE | re.MULTILINE | re.VERBOSE)
# export
def find_default_export(cells):
"Find in `cells` the default export module."
for cell in cells:
tst = check_re(cell, _re_default_exp)
if tst: return tst.groups()[0]
Stops at the first cell containing a #default_exp
code and return the value behind. Returns None
if there are no cell with that code.
_test_eq(find_default_export(test_nb['cells']), 'notebook.export')
assert find_default_export(test_nb['cells'][2:]) is None
We're now ready to export notebooks!
# export
def _create_mod_file(fname, nb_path):
"Create a module file for `fname`."
fname.parent.mkdir(parents=True, exist_ok=True)
with open(fname, 'w') as f:
f.write(f"#AUTOGENERATED! DO NOT EDIT! File to edit: dev/{nb_path.name} (unless otherwise specified).")
f.write('\n\n__all__ = []')
#export
_re_patch_func = re.compile(r"""
# Catches any function decorated with @patch, its name in group 1 and the patched class in group 2
@patch # At any place in the cell, something that begins with @patch
\s*def # Any number of whitespace (including a new line probably) followed by def
\s+ # One whitespace or more
([^\(\s]*) # Catch a group composed of anything but whitespace or an opening parenthesis (name of the function)
\s*\( # Any number of whitespace followed by an opening parenthesis
[^:]* # Any number of character different of : (the name of the first arg that is type-annotated)
:\s* # A column followed by any number of whitespace
(?: # Non-catching group with either
([^,\s\(\)]*) # a group composed of anything but a comma, a parenthesis or whitespace (name of the class)
| # or
(\([^\)]*\))) # a group composed of something between parenthesis (tuple of classes)
\s* # Any number of whitespace
(?:,|\)) # Non-catching group with either a comma or a closing parenthesis
""", re.VERBOSE)
#hide
tst = _re_patch_func.search("""
@patch
def func(obj:Class):""")
_test_eq(tst.groups(), ("func", "Class", None))
tst = _re_patch_func.search("""
@patch
def func (obj:Class, a)""")
_test_eq(tst.groups(), ("func", "Class", None))
tst = _re_patch_func.search("""
@patch
def func (obj:(Class1, Class2), a)""")
_test_eq(tst.groups(), ("func", None, "(Class1, Class2)"))
#export
_re_typedispatch_func = re.compile(r"""
# Catches any function decorated with @typedispatch
(@typedispatch # At any place in the cell, catch a group with something that begins with @patch
\s*def # Any number of whitespace (including a new line probably) followed by def
\s+ # One whitespace or more
[^\(]* # Anything but whitespace or an opening parenthesis (name of the function)
\s*\( # Any number of whitespace followed by an opening parenthesis
[^\)]* # Any number of character different of )
\)\s*:) # A closing parenthesis followed by whitespace and :
""", re.VERBOSE)
#hide
assert _re_typedispatch_func.search("@typedispatch\ndef func(a, b):").groups() == ('@typedispatch\ndef func(a, b):',)
#export
_re_class_func_def = re.compile(r"""
# Catches any 0-indented function or class definition with its name in group 1
^ # Beginning of a line (since re.MULTILINE is passed)
(?:def|class) # Non-catching group for def or class
\s+ # One whitespace or more
([^\(\s]*) # Catching group with any character except an opening parenthesis or a whitespace (name)
\s* # Any number of whitespace
(?:\(|:) # Non-catching group with either an opening parenthesis or a : (classes don't need ())
""", re.MULTILINE | re.VERBOSE)
#hide
assert _re_class_func_def.search("class Class:").groups() == ('Class',)
assert _re_class_func_def.search("def func(a, b):").groups() == ('func',)
#export
_re_obj_def = re.compile(r"""
# Catches any 0-indented object definition (bla = thing) with its name in group 1
^ # Beginning of a line (since re.MULTILINE is passed)
([^=\s]*) # Catching group with any character except a whitespace or an equal sign
\s*= # Any number of whitespace followed by an =
""", re.MULTILINE | re.VERBOSE)
#hide
assert _re_obj_def.search("a = 1").groups() == ('a',)
_test_eq(_re_obj_def.search("a=1").groups(), ('a',))
# export
def _not_private(n):
for t in n.split('.'):
if t.startswith('_') or t.startswith('@'): return False
return '\\' not in t and '^' not in t and '[' not in t
def export_names(code, func_only=False):
"Find the names of the objects, functions or classes defined in `code` that are exported."
#Format monkey-patches with @patch
def _f(gps):
nm, cls, t = gps.groups()
if cls is not None: return f"def {cls}.{nm}():"
return '\n'.join([f"def {c}.{nm}():" for c in re.split(', *', t[1:-1])])
code = _re_typedispatch_func.sub('', code)
code = _re_patch_func.sub(_f, code)
names = _re_class_func_def.findall(code)
if not func_only: names += _re_obj_def.findall(code)
return [n for n in names if _not_private(n)]
This function only picks the zero-indented objects, functions or classes (we don't want the class methods for instance) and excludes private names (that begin with _
). It only returns func and class names when func_only=True
.
assert export_names("def my_func(x):\n pass\nclass MyClass():") == ["my_func", "MyClass"]
#Indented funcs are ignored (funcs inside a class)
assert export_names(" def my_func(x):\n pass\nclass MyClass():") == ["MyClass"]
#Private funcs are ignored
assert export_names("def _my_func():\n pass\nclass MyClass():") == ["MyClass"]
#trailing spaces
assert export_names("def my_func ():\n pass\nclass MyClass():") == ["my_func", "MyClass"]
#class without parenthesis
assert export_names("def my_func ():\n pass\nclass MyClass:") == ["my_func", "MyClass"]
#object and funcs
assert export_names("def my_func ():\n pass\ndefault_bla=[]:") == ["my_func", "default_bla"]
assert export_names("def my_func ():\n pass\ndefault_bla=[]:", func_only=True) == ["my_func"]
#Private objects are ignored
assert export_names("def my_func ():\n pass\n_default_bla = []:") == ["my_func"]
#Objects with dots are privates if one part is private
assert export_names("def my_func ():\n pass\ndefault.bla = []:") == ["my_func", "default.bla"]
assert export_names("def my_func ():\n pass\ndefault._bla = []:") == ["my_func"]
#Monkey-path with @patch are properly renamed
assert export_names("@patch\ndef my_func(x:Class):\n pass") == ["Class.my_func"]
assert export_names("@patch\ndef my_func(x:Class):\n pass", func_only=True) == ["Class.my_func"]
assert export_names("some code\n@patch\ndef my_func(x:Class, y):\n pass") == ["Class.my_func"]
assert export_names("some code\n@patch\ndef my_func(x:(Class1,Class2), y):\n pass") == ["Class1.my_func", "Class2.my_func"]
#Check delegates
assert export_names("@delegates(keep=True)\nclass someClass:\n pass") == ["someClass"]
#Typedispatch decorated functions shouldn't be added
assert export_names("@patch\ndef my_func(x:Class):\n pass\n@typedispatch\ndef func(x: TensorImage): pass") == ["Class.my_func"]
#export
_re_all_def = re.compile(r"""
# Catches a cell with defines \_all\_ = [\*\*] and get that \*\* in group 1
^_all_ # Beginning of line (since re.MULTILINE is passed)
\s*=\s* # Any number of whitespace, =, any number of whitespace
\[ # Opening [
([^\n\]]*) # Catching group with anything except a ] or newline
\] # Closing ]
""", re.MULTILINE | re.VERBOSE)
#Same with __all__
_re__all__def = re.compile(r'^__all__\s*=\s*\[([^\]]*)\]', re.MULTILINE)
# export
def extra_add(code):
"Catch adds to `__all__` required by a cell with `_all_=`"
if _re_all_def.search(code):
names = _re_all_def.search(code).groups()[0]
names = re.sub('\s*,\s*', ',', names)
names = names.replace('"', "'")
code = _re_all_def.sub('', code)
code = re.sub(r'([^\n]|^)\n*$', r'\1', code)
return names.split(','),code
return [],code
assert extra_add('_all_ = ["func", "func1", "func2"]') == (["'func'", "'func1'", "'func2'"],'')
assert extra_add('_all_ = ["func", "func1" , "func2"]') == (["'func'", "'func1'", "'func2'"],'')
assert extra_add("_all_ = ['func','func1', 'func2']\n") == (["'func'", "'func1'", "'func2'"],'')
assert extra_add('code\n\n_all_ = ["func", "func1", "func2"]') == (["'func'", "'func1'", "'func2'"],'code')
#export
def _add2add(fname, names, line_width=120):
if len(names) == 0: return
with open(fname, 'r', encoding='utf8') as f: text = f.read()
tw = TextWrapper(width=120, initial_indent='', subsequent_indent=' '*11, break_long_words=False)
re_all = _re__all__def.search(text)
start,end = re_all.start(),re_all.end()
text_all = tw.wrap(f"{text[start:end-1]}{'' if text[end-2]=='[' else ', '}{', '.join(names)}]")
with open(fname, 'w', encoding='utf8') as f: f.write(text[:start] + '\n'.join(text_all) + text[end:])
fname = 'test_add.txt'
with open(fname, 'w', encoding='utf8') as f: f.write("Bla\n__all__ = [my_file, MyClas]\nBli")
_add2add(fname, ['new_function'])
with open(fname, 'r', encoding='utf8') as f:
_test_eq(f.read(), "Bla\n__all__ = [my_file, MyClas, new_function]\nBli")
_add2add(fname, [f'new_function{i}' for i in range(10)])
with open(fname, 'r', encoding='utf8') as f:
_test_eq(f.read(), """Bla
__all__ = [my_file, MyClas, new_function, new_function0, new_function1, new_function2, new_function3, new_function4,
new_function5, new_function6, new_function7, new_function8, new_function9]
Bli""")
os.remove(fname)
# export
def _relative_import(name, fname):
mods = name.split('.')
splits = str(fname).split(os.path.sep)
if mods[0] not in splits: return name
splits = splits[splits.index(mods[0]):]
while len(mods)>0 and splits[0] == mods[0]: splits,mods = splits[1:],mods[1:]
return '.' * (len(splits)) + '.'.join(mods)
assert _relative_import('local.core', Path('local')/'data.py') == '.core'
assert _relative_import('local.core', Path('local')/'vision'/'data.py') == '..core'
assert _relative_import('local.vision.transform', Path('local')/'vision'/'data.py') == '.transform'
assert _relative_import('local.notebook.core', Path('local')/'data'/'external.py') == '..notebook.core'
assert _relative_import('local.vision', Path('local')/'vision'/'learner.py') == '.'
#export
#Catches any from local.bla import something and catches local.bla in group 1, the imported thing(s) in group 2.
_re_import = re.compile(r'^(\s*)from (local.\S*) import (.*)$')
# export
def _deal_import(code_lines, fname):
pat = re.compile(r'from (local.\S*) import (\S*)$')
lines = []
def _replace(m):
sp,mod,obj = m.groups()
return f"{sp}from {_relative_import(mod, fname)} import {obj}"
for line in code_lines:
line = re.sub('_'+'file_', '__'+'file__', line) #Need to break _file_ or that line will be treated
lines.append(_re_import.sub(_replace,line))
return lines
#hide
lines = ["from local.core import *", "nothing to see", " from local.vision import bla1, bla2", "from local.vision import models"]
assert _deal_import(lines, Path('local')/'data.py') == [
"from .core import *", "nothing to see", " from .vision import bla1, bla2", "from .vision import models"
]
#hide
#Tricking jupyter notebook to have a __file__ attribute. All _file_ will be replaced by __file__
_file_ = Path('local').absolute()/'notebook'/'export.py'
#export
def _get_index():
if not (Path(_file_).parent/'index.txt').exists(): return {}
return json.load(open(Path(_file_).parent/'index.txt', 'r', encoding='utf8'))
def _save_index(index):
fname = Path(_file_).parent/'index.txt'
fname.parent.mkdir(parents=True, exist_ok=True)
json.dump(index, open(fname, 'w', encoding='utf8'), indent=2)
def _reset_index():
if (Path(_file_).parent/'index.txt').exists():
os.remove(Path(_file_).parent/'index.txt')
#hide
ind,ind_bak = Path(_file_).parent/'index.txt',Path(_file_).parent/'index.bak'
if ind.exists(): shutil.move(ind, ind_bak)
_test_eq(_get_index(), {})
_save_index({'foo':'bar'})
_test_eq(_get_index(), {'foo':'bar'})
if ind_bak.exists(): shutil.move(ind_bak, ind)
#export
def _notebook2script(fname, silent=False, to_pkl=False):
"Finds cells starting with `#export` and puts them into a new module"
if os.environ.get('IN_TEST',0): return # don't export if running tests
fname = Path(fname)
nb = read_nb(fname)
default = find_default_export(nb['cells'])
if default is not None:
default = os.path.sep.join(default.split('.'))
if not to_pkl: _create_mod_file(Path.cwd()/'local'/f'{default}.py', fname)
index = _get_index()
exports = [is_export(c, default) for c in nb['cells']]
cells = [(i,c,e) for i,(c,e) in enumerate(zip(nb['cells'],exports)) if e is not None]
for i,c,e in cells:
fname_out = Path.cwd()/'local'/f'{e}.py'
orig = ('#C' if e==default else f'#Comes from {fname.name}, c') + 'ell\n'
code = '\n\n' + orig + '\n'.join(_deal_import(c['source'].split('\n')[1:], fname_out))
# remove trailing spaces
names = export_names(code)
extra,code = extra_add(code)
if not to_pkl: _add2add(fname_out, [f"'{f}'" for f in names if '.' not in f and len(f) > 0] + extra)
index.update({f: fname.name for f in names})
code = re.sub(r' +$', '', code, flags=re.MULTILINE)
if code != '\n\n' + orig[:-1]:
if to_pkl: _update_pkl(fname_out, (i, fname, code))
else:
with open(fname_out, 'a', encoding='utf8') as f: f.write(code)
_save_index(index)
if not silent: print(f"Converted {fname}.")
#export
def _get_sorted_files(all_fs: Union[bool,str], up_to=None):
"Return the list of files corresponding to `g` in the current dir."
if (all_fs==True): ret = glob.glob('*.ipynb') # Checks both that is bool type and that is True
else: ret = glob.glob(all_fs) if isinstance(g,str) else []
if len(ret)==0: print('WARNING: No files found')
ret = [f for f in ret if not f.startswith('_')]
if up_to is not None: ret = [f for f in ret if str(f)<=str(up_to)]
return sorted(ret)
_notebook2script('03a_layers.ipynb')
Converted 03a_layers.ipynb.
#export
def notebook2script(fname=None, all_fs=None, up_to=None, silent=False, to_pkl=False):
"Convert `fname` or all the notebook satisfying `all_fs`."
# initial checks
if os.environ.get('IN_TEST',0): return # don't export if running tests
assert fname or all_fs
if all_fs: _reset_index()
if (all_fs is None) and (up_to is not None): all_fs=True # Enable allFiles if upTo is present
fnames = _get_sorted_files(all_fs, up_to=up_to) if all_fs else [fname]
[_notebook2script(f, silent=silent, to_pkl=to_pkl) for f in fnames]
Finds cells starting with #export
and puts them into the appropriate module.
fname
: the filename of one notebook to convertall_fs
: True
if you want to convert all notebook files in the folder or a glob expressionup_to
: converts all notebooks respecting the previous arg up to a certain numberExamples of use in console:
notebook2script # Parse all files
notebook2script --fname 00_export.ipynb # Parse 00_export.ipynb
notebook2script --all_fs=nb* # Parse all files starting with nb*
notebook2script --up_to=10 # Parse all files with (name<='10')
notebook2script --all_fs=*_*.ipynb --up_to=10 # Parse all files with an '_' and (name<='10')
We need to get the name of the object we are looking for, and then we'll try to find it in our index file.
#export
def _get_property_name(p):
"Get the name of property `p`"
if hasattr(p, 'fget'):
return p.fget.func.__qualname__ if hasattr(p.fget, 'func') else p.fget.__qualname__
else: return next(iter(re.findall(r'\'(.*)\'', str(p)))).split('.')[-1]
def get_name(obj):
"Get the name of `obj`"
if hasattr(obj, '__name__'): return obj.__name__
elif getattr(obj, '_name', False): return obj._name
elif hasattr(obj,'__origin__'): return str(obj.__origin__).split('.')[-1] #for types
elif type(obj)==property: return _get_property_name(obj)
else: return str(obj).split('.')[-1]
# export
def qual_name(obj):
"Get the qualified name of `obj`"
if hasattr(obj,'__qualname__'): return obj.__qualname__
if inspect.ismethod(obj): return f"{get_name(obj.__self__)}.{get_name(fn)}"
return get_name(obj)
_test_eq(get_name(in_ipython), 'in_ipython')
_test_eq(get_name(DocsTestClass.test), 'test')
# assert get_name(Union[Tensor, float]) == 'Union'
For properties defined using property
or our own add_props
helper, we approximate the name by looking at their getter functions, since we don't seem to have access to the property name itself. If everything fails (a getter cannot be found), we return the name of the object that contains the property. This suffices for source_nb
to work.
#hide
class PropertyClass:
p_lambda = property(lambda x: x)
def some_getter(self): return 7
p_getter = property(some_getter)
_test_eq(get_name(PropertyClass.p_lambda), 'PropertyClass.<lambda>')
_test_eq(get_name(PropertyClass.p_getter), 'PropertyClass.some_getter')
_test_eq(get_name(PropertyClass), 'PropertyClass')
# export
def source_nb(func, is_name=None, return_all=False):
"Return the name of the notebook where `func` was defined"
is_name = is_name or isinstance(func, str)
index = _get_index()
name = func if is_name else qual_name(func)
while len(name) > 0:
if name in index: return (name,index[name]) if return_all else index[name]
name = '.'.join(name.split('.')[:-1])
_test_eq(qual_name(DocsTestClass), 'DocsTestClass')
_test_eq(qual_name(DocsTestClass.test), 'DocsTestClass.test')
# export
_re_default_nb = re.compile(r'File to edit: dev/(\S+)\s+')
_re_cell = re.compile(r'^#Cell|^#Comes from\s+(\S+), cell')
You can either pass an object or its name (by default is_name
will look if func
is a string or not, but you can override if there is some inconsistent behavior).
If passed a method of a class, the function will return the notebook in which the largest part of the function was defined in case there is a monkey-matching that defines class.method
in a different notebook than class
. If return_all=True
, the function will return a tuple with the name by which the function was found and the notebook.
from local.core.transform import Transform
from local.test import test_fail
_test_eq(source_nb(test_fail), '00_test.ipynb')
_test_eq(source_nb(Transform), '01c_transform.ipynb')
_test_eq(source_nb(Transform.decode), '01c_transform.ipynb')
#opt_call is in the core module but defined in 02
# from local.core import opt_call
# _test_eq(source_nb(opt_call), '02_data_pipeline.ipynb' # TODO: find something else)
assert source_nb(int) is None
#Added through a monkey-patch
_test_eq(source_nb('Path.ls'), '01a_utils.ipynb')
#Test with name TODO:Investigate
#_test_eq(source_nb('DocsTestClass'), '90_notebook_core.ipynb')
#_test_eq(source_nb('DocsTestClass.test'), '90_notebook_core.ipynb')
#Test return_all
#assert source_nb(DocsTestClass, return_all=True) == ('DocsTestClass','90_notebook_core.ipynb')
#assert source_nb(DocsTestClass.test, return_all=True) == ('DocsTestClass','90_notebook_core.ipynb')
#hide
# Commented out to avoid circ ref - uncomment to test manually
# from local.data.core import *
# _test_eq(source_nb(DataBunch.train_dl), '05_data_core.ipynb')
If someone decides to change a module instead of the notebooks, the following functions help update the notebooks accordingly.
# export
def _split(code):
lines = code.split('\n')
default_nb = _re_default_nb.search(lines[0])
if not default_nb: set_trace()
default_nb = default_nb.groups()[0]
s,res = 1,[]
while _re_cell.search(lines[s]) is None: s += 1
e = s+1
while e < len(lines):
while e < len(lines) and _re_cell.search(lines[e]) is None: e += 1
grps = _re_cell.search(lines[s]).groups()
nb = grps[0] or default_nb
content = lines[s+1:e]
while len(content) > 1 and content[-1] == '': content = content[:-1]
res.append((nb, '\n'.join(content)))
s,e = e,e+1
return res
with open(Path.cwd()/'local'/'core'/'foundation.py') as f: code = f.read()
#export
def _relimport2name(name, mod_name):
if mod_name.endswith('.py'): mod_name = mod_name[:-3]
mods = mod_name.split(os.path.sep)
mods = mods[mods.index('local'):]
if name=='.':
print("###",'.'.join(mods[:-1]))
return '.'.join(mods[:-1])
i = 0
while name[i] == '.': i += 1
return '.'.join(mods[:-i] + [name[i:]])
# export
#Catches any from .bla import something and catches local.bla in group 1, the imported thing(s) in group 2.
_re_loc_import = re.compile(r'(^\s*)from (\.\S*) import (.*)$')
assert _relimport2name('.core', 'local/data.py') == 'local.core'
assert _relimport2name('.core', 'home/sgugger/fastai_dev/dev/local/data.py') == 'local.core'
assert _relimport2name('..core', 'local/vision/data.py') == 'local.core'
assert _relimport2name('.transform', 'local/vision/data.py') == 'local.vision.transform'
assert _relimport2name('..notebook.core', 'local/data/external.py') == 'local.notebook.core'
#export
def _deal_loc_import(code, fname):
lines = []
def _replace(m):
sp,mod,obj = m.groups()
return f"{sp}from {_relimport2name(mod, fname)} import {obj}"
for line in code.split('\n'):
line = re.sub('__'+'file__', '_'+'file_', line) #Need to break _file_ or that line will be treated
lines.append(_re_loc_import.sub(_replace,line))
return '\n'.join(lines)
# export
def _update_pkl(fname, cell):
dic = pickle.load(open((Path.cwd()/'lib.pkl'), 'rb')) if (Path.cwd()/'lib.pkl').exists() else collections.defaultdict(list)
dic[fname].append(cell)
pickle.dump(dic, open((Path.cwd()/'lib.pkl'), 'wb'))
#hide
code = "from .core import *\nnothing to see\n from .vision import bla1, bla2"
assert _deal_loc_import(code, 'local/data.py') == "from local.core import *\nnothing to see\n from local.vision import bla1, bla2"
#export
def _script2notebook(fname, dic, silent=False):
"Put the content of `fname` back in the notebooks it came from."
if os.environ.get('IN_TEST',0): return # don't export if running tests
if not silent: print(f"Converting {fname}.")
fname = Path(fname)
with open(fname, encoding='utf8') as f: code = f.read()
splits = _split(code)
assert len(splits)==len(dic[fname]), f"Exported file from notebooks should have {len(dic[fname])} cells but has {len(splits)}."
assert np.all([c1[0]==c2[1]] for c1,c2 in zip(splits, dic[fname]))
splits = [(c2[0],c1[0],c1[1]) for c1,c2 in zip(splits, dic[fname])]
nb_fnames = {s[1] for s in splits}
for nb_fname in nb_fnames:
nb = read_nb(nb_fname)
for i,f,c in splits:
c = _deal_loc_import(c, str(fname))
if f == nb_fname:
l = nb['cells'][i]['source'].split('\n')[0]
nb['cells'][i]['source'] = l + '\n' + c
NotebookNotary().sign(nb)
nbformat.write(nb, nb_fname, version=4)
if (Path.cwd()/'lib.pkl').exists(): os.remove(Path.cwd()/'lib.pkl')
notebook2script(all_fs=True, silent=True, to_pkl=True)
dic = pickle.load(open(Path.cwd()/'lib.pkl', 'rb'))
_script2notebook(Path().cwd()/'local/tabular/core.py', dic)
Converting /root/workspace/fastai_dev_fork/dev/local/tabular/core.py.
#export
_manual_mods = 'version.py __init__.py imports.py torch_imports.py patch_tables.py all.py torch_basics.py fp16_utils.py test_utils.py basics.py launch.py'.split()
#export
def script2notebook(folder='local', silent=False):
if (Path.cwd()/'lib.pkl').exists(): os.remove(Path.cwd()/'lib.pkl')
notebook2script(all_fs=True, silent=True, to_pkl=True)
dic = pickle.load(open(Path.cwd()/'lib.pkl', 'rb'))
os.remove(Path.cwd()/'lib.pkl')
if os.environ.get('IN_TEST',0): return # don't export if running tests
for f in (Path.cwd()/folder).glob('**/*.py'):
if f.name not in _manual_mods: _script2notebook(f, dic, silent=silent)
#script2notebook()
#export
import subprocess
#export
def _print_diff(code1, code2, fname):
diff = difflib.ndiff(code1, code2)
sys.stdout.writelines(diff)
#for l in difflib.context_diff(code1, code2): print(l)
#_print_diff_py(code1, code2, fname) if fname.endswith('.py') else _print_diff_txt(code1, code2, fname)
#export
def diff_nb_script(lib_folder='local'):
"Print the diff between the notebooks and the library in `lib_folder`"
tmp_path1,tmp_path2 = Path.cwd()/'tmp_lib',Path.cwd()/'tmp_lib1'
shutil.copytree(Path.cwd()/lib_folder, tmp_path1)
try:
notebook2script(all_fs=True, silent=True)
shutil.copytree(Path.cwd()/lib_folder, tmp_path2)
shutil.rmtree(Path.cwd()/lib_folder)
shutil.copytree(tmp_path1, Path.cwd()/lib_folder)
res = subprocess.run(['diff', '-ru', 'tmp_lib1', lib_folder], stdout=subprocess.PIPE)
print(res.stdout.decode('utf-8'))
finally:
shutil.rmtree(tmp_path1)
shutil.rmtree(tmp_path2)
diff_nb_script()
#hide
notebook2script(all_fs=True)