Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import ast
- import greenlet
- import logging
- import neovim
- import os
- import re
- from .base import Base
- from functools import partial
- from jupyter_client import KernelManager
- from jupyter_client.consoleapp import JupyterConsoleApp
- from jupyter_client.threaded import ThreadedKernelClient
- from jupyter_core.application import JupyterApp
- imports = re.compile(
- r'^\s*(from\s+\.*\w+(\.\w+)*\s+import\s+(\w+,\s+)*|import\s+)')
- split_pattern = re.compile('[^= \r\n*()@-]')
- keyword = re.compile('[A-Za-z0-9_]')
- request = '''
- try:
- _completions = completion_metadata(get_ipython())
- except Exception:
- pass
- '''
- logger = logging.getLogger(__name__)
- error, debug, info, warn = (
- logger.error, logger.debug, logger.info, logger.warn,)
- if 'NVIM_IPY_DEBUG_FILE' in os.environ:
- logfile = os.environ['NVIM_IPY_DEBUG_FILE'].strip()
- logger.addHandler(logging.FileHandler(logfile, 'w'))
- logger.level = logging.DEBUG
- class RedirectingKernelManager(KernelManager):
- def _launch_kernel(self, cmd, **b):
- # stdout is used to communicate with nvim, redirect it somewhere else
- self._null = open("/dev/null", "wb", 0)
- b['stdout'] = self._null.fileno()
- b['stderr'] = self._null.fileno()
- return super(RedirectingKernelManager, self)._launch_kernel(cmd, **b)
- class JupyterVimApp(JupyterApp, JupyterConsoleApp):
- # don't use blocking client; we override call_handlers below
- kernel_client_class = ThreadedKernelClient
- kernel_manager_class = RedirectingKernelManager
- aliases = JupyterConsoleApp.aliases # this the way?
- flags = JupyterConsoleApp.flags
- def init_kernel_client(self):
- # TODO: cleanup this (by subclassing kernel_clint or something)
- if self.kernel_manager is not None:
- self.kernel_client = self.kernel_manager.client()
- else:
- self.kernel_client = self.kernel_client_class(
- session=self.session,
- ip=self.ip,
- transport=self.transport,
- shell_port=self.shell_port,
- iopub_port=self.iopub_port,
- hb_port=self.hb_port,
- connection_file=self.connection_file,
- parent=self,
- )
- self.kernel_client.shell_channel.call_handlers = self.target.on_shell_msg
- self.kernel_client.hb_channel.call_handlers = self.target.on_hb_msg
- self.kernel_client.start_channels()
- def initialize(self, target, argv):
- self.target = target
- super(JupyterVimApp, self).initialize(argv)
- JupyterConsoleApp.initialize(self, argv)
- class Async(object):
- """Wrapper that defers all method calls on a plugin object to the event
- loop, given that the object has vim attribute"""
- def __init__(self, wraps):
- self.wraps = wraps
- def __getattr__(self, name):
- return partial(self.wraps.vim.async_call, getattr(self.wraps, name))
- @neovim.plugin
- @neovim.encoding(True)
- class IPythonPlugin(object):
- def __init__(self, vim):
- self.vim = vim
- self.has_connection = False
- self.pending_shell_msgs = {}
- def connect(self, argv):
- self.ip_app = JupyterVimApp.instance()
- self.ip_app.initialize(Async(self), argv)
- self.ip_app.start()
- self.kc = self.ip_app.kernel_client
- self.km = self.ip_app.kernel_manager
- self.has_connection = True
- def handle(self, msg_id, handler):
- self.pending_shell_msgs[msg_id] = handler
- def waitfor(self, msg_id, retval=None):
- # FIXME: add some kind of timeout
- gr = greenlet.getcurrent()
- self.handle(msg_id, gr)
- return gr.parent.switch(retval)
- def ignore(self, msg_id):
- self.handle(msg_id, None)
- @neovim.function("IPyConnect", sync=True)
- def ipy_connect(self, args):
- # 'connect' waits for kernelinfo, and so must be async
- Async(self).connect(args)
- def on_shell_msg(self, m):
- self.last_msg = m
- debug('shell %s: %r', m['msg_type'], m['content'])
- msg_id = m['parent_header']['msg_id']
- try:
- handler = self.pending_shell_msgs.pop(msg_id)
- except KeyError:
- debug('unexpected shell msg: %r', m)
- return
- if isinstance(handler, greenlet.greenlet):
- handler.parent = greenlet.getcurrent()
- handler.switch(m)
- elif handler is not None:
- handler(m)
- # this gets called when heartbeat is lost
- def on_hb_msg(self, time_since):
- raise Exception()
- class Source(Base):
- def __init__(self, vim):
- super().__init__(vim)
- self.__plug = IPythonPlugin(vim)
- self.__plug.ipy_connect(['--existing'])
- self.name = 'ipython'
- self.mark = '[IPy]'
- self.filetypes = ['python']
- self.is_volatile = True
- self.input_pattern = '.*'
- self.rank = 2000
- def get_complete_position(self, context):
- line = self.vim.current.line
- # return immediately for imports
- if imports.match(context['input']):
- start = len(context['input'])
- while start and re.match('[._A-Za-z0-9]', line[start - 1]):
- start -= 1
- return start
- # locate the start of the word
- col = self.vim.funcs.col('.')
- start = col - 1
- if start == 0 or (len(line) == start and
- not split_pattern.match(line[start - 2]) and
- not (start >= 2 and keyword.match(line[start - 3]))
- and line[start - 3:start - 1] != '].'):
- start = -1
- return start
- line = self.vim.funcs.getline('.')
- start = self.vim.funcs.strchars(line[:col]) - 1
- bracket_level = 0
- while start > 0 and (
- split_pattern.match(line[start - 1])
- or (line[start - 1] == '.'
- and start >= 2 and keyword.match(line[start - 2])
- or (line[start - 1] == '-'
- and start >= 2 and line[start - 2] == '[')
- or ''.join(line[start - 2:start]) == '].')):
- if line[start - 1] == '[':
- if (start == 1 or not re.match(
- '[A-Za-z0-9_\]]', line[start-2])) or bracket_level > 0:
- break
- bracket_level += 1
- elif line[start-1] == ']':
- bracket_level -= 1
- start -= 1
- return start
- def gather_candidates(self, context):
- plug = self.__plug
- plug.waitfor(plug.kc.complete(context['input'], len(context['input'])))
- reply = plug.waitfor(plug.kc.execute(
- request, silent=True,
- user_expressions={'_completions': '_completions'}))
- metadata = reply['content']['user_expressions']['_completions']
- return ast.literal_eval(metadata['data']['text/plain'])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement