Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- "cells": [
- {
- "cell_type": "code",
- "execution_count": 73,
- "metadata": {},
- "outputs": [],
- "source": [
- " import nbformat, time, jsonpatch, jsonpointer, itertools, jsondiff, json, logging\n",
- " from IPython import get_ipython\n",
- " import datetime, notebook, traitlets, pathlib, jsonpatch, pathlib, mimetypes\n",
- " import ipywidgets, nbconvert, IPython, html"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 74,
- "metadata": {},
- "outputs": [],
- "source": [
- " def strip_remove_patch_value(patches):\n",
- " [(patch['op'] == 'remove') and patch.pop('value', None) for patch in patches]"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 75,
- "metadata": {},
- "outputs": [],
- "source": [
- " cache = {}\n",
- " last_patch = []"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 76,
- "metadata": {},
- "outputs": [],
- "source": [
- " def remove_trust(nb):\n",
- " for cell in nb['cells']: cell['metadata'].pop('trusted', None)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 77,
- "metadata": {},
- "outputs": [],
- "source": [
- " def source2lines(nb):\n",
- " for cell in nb['cells']:\n",
- " key = 'text' if 'text' in cell else 'source'\n",
- " if isinstance(cell[key], str):\n",
- " cell[key] = cell[key].splitlines(True)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 78,
- "metadata": {},
- "outputs": [],
- "source": [
- " def write_resources(source, target, parent, *patches):\n",
- " t = target['metadata']['last_modified']\n",
- " for i, (src, tgt) in enumerate(\n",
- " itertools.zip_longest((source or target)['cells'], target['cells'])\n",
- " ):\n",
- " orig, new = (src or {}).get('outputs', []), (tgt or {}).get('outputs', [])\n",
- " if orig == new and not (orig is new): continue\n",
- " \n",
- " for j, (p, n) in enumerate(itertools.zip_longest(orig, new)):\n",
- " if p == n and not (p is n): continue\n",
- " if (n or {}).get('output_type', None) not in {'execute_result', 'display_data'}: \n",
- " continue\n",
- " for type, value in (n or {}).get('data', {}).items():\n",
- " if type == 'text/plain': continue\n",
- " resource_path = F\"{t}_{i}_{j}{mimetypes.guess_extension(type)}\"\n",
- " path = parent / jsonpointer.escape(type) / resource_path\n",
- " path.parent.mkdir(parents=True, exist_ok=True)\n",
- " if isinstance(value, str): path.write_text(value)\n",
- " elif isinstance(value, bytes): path.write_bytes(value)\n",
- " patches += {\n",
- " \"op\": \"replace\", \"value\": str(path), \"path\": F\"/cells/{i}/outputs/{j}/data/{jsonpointer.escape(type)}\"\n",
- " },\n",
- " \n",
- " return patches"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 79,
- "metadata": {},
- "outputs": [],
- "source": [
- " def pre_save_hook(model=None, path=None, contents_manager=None):\n",
- " \n",
- " if model[\"type\"] == \"notebook\":\n",
- " model = {**contents_manager.get(path), 'content': model['content']}\n",
- " global cache, last_patch\n",
- "\n",
- " path = pathlib.Path(path)\n",
- " nb = model['content']\n",
- " \n",
- " \n",
- " copy = nbformat.from_dict(nb)\n",
- " source2lines(copy)\n",
- " \n",
- " remove_trust(copy)\n",
- " copy['metadata'].update(last_modified=str(int(model[\"last_modified\"].timestamp())))\n",
- " cache[path] = cache.get(path, nbformat.v4.new_notebook())\n",
- " resource_path = path.parent / '.nbstream' / path.stem\n",
- " copy = nbformat.from_dict(copy)\n",
- " files_patch = write_resources(cache[path], copy, resource_path)\n",
- " \n",
- " change = jsonpatch.JsonPatch.from_diff(\n",
- " jsonpatch.apply_patch(copy, files_patch), \n",
- " jsonpatch.apply_patch(cache[path], last_patch)\n",
- " )\n",
- " \n",
- " if len(change.patch) == 1 and change.patch[0]['path'] == '/metadata/last_modified': return\n",
- "\n",
- " log_path = path.parent / '.nbstream' / path.with_suffix(\".json\").name\n",
- " if not log_path.exists():\n",
- " log_path.parent.mkdir(parents=True, exist_ok=True)\n",
- " log_path.touch()\n",
- "\n",
- " if change.patch:\n",
- " with open(log_path, \"a\") as file:\n",
- " file.write(\"\".join(json.dumps(change.patch).splitlines()))\n",
- " file.write(\"\\n\")\n",
- " cache[path], last_patch = copy, files_patch"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 80,
- "metadata": {},
- "outputs": [],
- "source": [
- " def join_source(nb):\n",
- " for cell in nb['cells']:\n",
- " if 'source' in cell:\n",
- " cell['source'] = ''.join(cell['source'])"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 85,
- "metadata": {},
- "outputs": [],
- "source": [
- " def time_masheen(name):\n",
- " changes = list(map(json.loads, pathlib.Path(F'.nbstream/{name}.json').read_text().splitlines()))\n",
- "\n",
- " NB = json.loads(pathlib.Path(F'.ipynb_checkpoints/{name}-checkpoint.ipynb').read_text())\n",
- "\n",
- " @ipywidgets.interact\n",
- " def _(i=(0, len(changes)), e=nbconvert.get_export_names()):\n",
- " nonlocal NB\n",
- " nb = nbformat.from_dict(NB)\n",
- " for change in reversed(changes[-i:]):\n",
- " nb = jsonpatch.apply_patch(nb, change)\n",
- " join_source(nb)\n",
- " try:\n",
- " %pushd ~\n",
- " resolve_output_files(nb)\n",
- " finally:\n",
- " %popd\n",
- " object = nbconvert.get_exporter(e)().from_notebook_node(nbformat.from_dict(nb))[0]\n",
- "\n",
- " if e.startswith('html'):\n",
- " object = IPython.display.HTML(\n",
- " F\"\"\"<iframe srcdoc=\"{html.escape(object)}\" width=\"100%\" height=\"600\"></iframe>\"\"\"\n",
- " )\n",
- " if e in {'python', 'script'}: object = IPython.display.Code(object)\n",
- "\n",
- " if e in {'markdown'}: object = IPython.display.Markdown(object)\n",
- " \n",
- " if e in {'notebook', 'json'}: object = IPython.display.JSON(object)\n",
- "\n",
- " IPython.display.display(object)\n",
- " return _"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 86,
- "metadata": {},
- "outputs": [],
- "source": [
- " def resolve_output_files(nb):\n",
- " for cell in nb['cells']:\n",
- " for output in cell.get('outputs', []):\n",
- " for type, value in output.get('data', {}).items():\n",
- " try:\n",
- " if isinstance(value, str) and pathlib.Path(value).exists():\n",
- " output['data'][type] = pathlib.Path(value).read_text()\n",
- " except OSError: ..."
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 87,
- "metadata": {},
- "outputs": [],
- "source": [
- "# c = cm.get('2019-04-15-Untitled.ipynb', True)\n",
- "\n",
- "# jsonpatch.apply_patch(\n",
- "# c['content'],\n",
- "# write_resources(nbformat.v4.new_notebook(), c['content'], pathlib.Path('.nbstream/xxx'))\n",
- "# )"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- " if __name__ == '__main__':\n",
- " !jupyter nbconvert --to python napp.ipynb\n",
- " !black napp.py\n",
- " !isort napp.py"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": []
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "p6",
- "language": "python",
- "name": "other-env"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.6.8"
- },
- "last_modified": "1555354753",
- "varInspector": {
- "cols": {
- "lenName": 16,
- "lenType": 16,
- "lenVar": 40
- },
- "kernels_config": {
- "python": {
- "delete_cmd_postfix": "",
- "delete_cmd_prefix": "del ",
- "library": "var_list.py",
- "varRefreshCmd": "print(var_dic_list())"
- },
- "r": {
- "delete_cmd_postfix": ") ",
- "delete_cmd_prefix": "rm(",
- "library": "var_list.r",
- "varRefreshCmd": "cat(var_dic_list()) "
- }
- },
- "types_to_exclude": [
- "module",
- "function",
- "builtin_function_or_method",
- "instance",
- "_Feature"
- ],
- "window_display": false
- }
- },
- "nbformat": 4,
- "nbformat_minor": 4
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement