# -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2020-04-30 13:40:16 # @Last Modified by: Theo Lemaire # @Last Modified time: 2020-04-30 13:53:09 import contextlib import io import logging import sys import nbformat from IPython.core.formatters import format_display_data from IPython.terminal.interactiveshell import InteractiveShell ''' Wrapper module around nbconvert allowing to run a notebook and redictect print/logging statements to the command line. Courtesy of Matthew Wardrop: https://gist.github.com/matthewwardrop/fe2148923048baabe14edacb2eda0b74 ''' class TeeOutput: def __init__(self, *orig_files): self.captured = io.StringIO() self.orig_files = orig_files def __getattr__(self, attr): return getattr(self.captured, attr) def write(self, data): self.captured.write(data) for f in self.orig_files: f.write(data) def get_output(self): self.captured.seek(0) return self.captured.read() @contextlib.contextmanager def redirect_logging(fh): old_fh = {} for handler in logging.getLogger().handlers: if isinstance(handler, logging.StreamHandler): old_fh[id(handler)] = handler.stream handler.stream = fh yield for handler in logging.getLogger().handlers: if id(handler) in old_fh: handler.stream = old_fh[id(handler)] class NotebookRunner: def __init__(self, namespace=None): self.shell = InteractiveShell(user_ns=namespace) @property def user_ns(self): return self.shell.user_ns def run(self, nb, as_version=None, output=None, stop_on_error=True): if isinstance(nb, nbformat.NotebookNode): nb = nb.copy() elif isinstance(nb, str): nb = nbformat.read(nb, as_version=as_version) else: raise ValueError(f"Unknown notebook reference: `{nb}`") # Clean notebook for cell in nb.cells: cell.execution_count = None cell.outputs = [] # Run all notebook cells for cell in nb.cells: if not self._run_cell(cell) and stop_on_error: break # Output the notebook if request if output is not None: nbformat.write(nb, output) return nb def _run_cell(self, cell): if cell.cell_type != 'code': return cell cell.outputs = [] # Actually run the cell code stdout = TeeOutput(sys.stdout) stderr = TeeOutput(sys.stderr) with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr), redirect_logging(stderr): result = self.shell.run_cell(cell.source, store_history=True) # Record the execution count on the cell cell.execution_count = result.execution_count # Include stdout and stderr streams for stream, captured in { 'stdout': self._strip_stdout(cell, stdout.get_output()), 'stderr': stderr.get_output() }.items(): if stream == 'stdout': captured = self._strip_stdout(cell, captured) if captured: cell.outputs.append(nbformat.v4.new_output('stream', name=stream, text=captured)) # Include execution results if result.result is not None: cell.outputs.append(nbformat.v4.new_output( 'execute_result', execution_count=result.execution_count, data=format_display_data(result.result)[0] )) elif result.error_in_exec: cell.outputs.append(nbformat.v4.new_output( 'error', ename=result.error_in_exec.__class__.__name__, evalue=result.error_in_exec.args[0], traceback=self._render_traceback( result.error_in_exec.__class__.__name__, result.error_in_exec.args[0], sys.last_traceback ) )) return result.error_in_exec is None def _strip_stdout(self, cell, stdout): if stdout is None: return idx = max( stdout.find(f'Out[{cell.execution_count}]: '), stdout.find("---------------------------------------------------------------------------") ) if idx > 0: stdout = stdout[:idx] return stdout def _render_traceback(self, etype, value, tb): """ This method is lifted from `InteractiveShell.showtraceback`, extracting only the functionality needed by this runner. """ try: stb = value._render_traceback_() except Exception: stb = self.shell.InteractiveTB.structured_traceback(etype, value, tb, tb_offset=None) return stb def runNotebook(fname, save=False): ''' Open and run a notebook, and save "in-place" if specified. ''' # Load notebook file without conversion with open(fname) as f: nb = nbformat.read(f, nbformat.NO_CONVERT) # Execute notebook NotebookRunner().run(nb) # Save notebook in-place if specified if save: with open(fname, 'w', encoding='utf-8') as f: nbformat.write(nb, f)