Advertisement
Guest User

transport/sftp.py

a guest
Jul 27th, 2012
29
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 38.94 KB | None | 0 0
  1. # Copyright (C) 2005-2010 Canonical Ltd
  2. #
  3. # This program is free software; you can redistribute it and/or modify
  4. # it under the terms of the GNU General Public License as published by
  5. # the Free Software Foundation; either version 2 of the License, or
  6. # (at your option) any later version.
  7. #
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with this program; if not, write to the Free Software
  15. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  16.  
  17. """Implementation of Transport over SFTP, using paramiko."""
  18.  
  19. from __future__ import absolute_import
  20.  
  21. # TODO: Remove the transport-based lock_read and lock_write methods.  They'll
  22. # then raise TransportNotPossible, which will break remote access to any
  23. # formats which rely on OS-level locks.  That should be fine as those formats
  24. # are pretty old, but these combinations may have to be removed from the test
  25. # suite.  Those formats all date back to 0.7; so we should be able to remove
  26. # these methods when we officially drop support for those formats.
  27.  
  28. import bisect
  29. import errno
  30. import itertools
  31. import os
  32. import random
  33. import stat
  34. import sys
  35. import time
  36. import warnings
  37.  
  38. from bzrlib import (
  39.     config,
  40.     debug,
  41.     errors,
  42.     urlutils,
  43.     )
  44. from bzrlib.errors import (FileExists,
  45.                            NoSuchFile,
  46.                            TransportError,
  47.                            LockError,
  48.                            PathError,
  49.                            ParamikoNotPresent,
  50.                            )
  51. from bzrlib.osutils import fancy_rename
  52. from bzrlib.trace import mutter, warning
  53. from bzrlib.transport import (
  54.     FileFileStream,
  55.     _file_streams,
  56.     ssh,
  57.     ConnectedTransport,
  58.     )
  59.  
  60. # Disable one particular warning that comes from paramiko in Python2.5; if
  61. # this is emitted at the wrong time it tends to cause spurious test failures
  62. # or at least noise in the test case::
  63. #
  64. # [1770/7639 in 86s, 1 known failures, 50 skipped, 2 missing features]
  65. # test_permissions.TestSftpPermissions.test_new_files
  66. # /var/lib/python-support/python2.5/paramiko/message.py:226: DeprecationWarning: integer argument expected, got float
  67. #  self.packet.write(struct.pack('>I', n))
  68. warnings.filterwarnings('ignore',
  69.         'integer argument expected, got float',
  70.         category=DeprecationWarning,
  71.         module='paramiko.message')
  72.  
  73. try:
  74.     import paramiko
  75. except ImportError, e:
  76.     raise ParamikoNotPresent(e)
  77. else:
  78.     from paramiko.sftp import (SFTP_FLAG_WRITE, SFTP_FLAG_CREATE,
  79.                                SFTP_FLAG_EXCL, SFTP_FLAG_TRUNC,
  80.                                SFTP_OK, CMD_HANDLE, CMD_OPEN)
  81.     from paramiko.sftp_attr import SFTPAttributes
  82.     from paramiko.sftp_file import SFTPFile
  83.  
  84.  
  85. _paramiko_version = getattr(paramiko, '__version_info__', (0, 0, 0))
  86. # don't use prefetch unless paramiko version >= 1.5.5 (there were bugs earlier)
  87. _default_do_prefetch = (_paramiko_version >= (1, 5, 5))
  88.  
  89.  
  90. class SFTPLock(object):
  91.     """This fakes a lock in a remote location.
  92.  
  93.    A present lock is indicated just by the existence of a file.  This
  94.    doesn't work well on all transports and they are only used in
  95.    deprecated storage formats.
  96.    """
  97.  
  98.     __slots__ = ['path', 'lock_path', 'lock_file', 'transport']
  99.  
  100.     def __init__(self, path, transport):
  101.         self.lock_file = None
  102.         self.path = path
  103.         self.lock_path = path + '.write-lock'
  104.         self.transport = transport
  105.         try:
  106.             # RBC 20060103 FIXME should we be using private methods here ?
  107.             abspath = transport._remote_path(self.lock_path)
  108.             self.lock_file = transport._sftp_open_exclusive(abspath)
  109.         except FileExists:
  110.             raise LockError('File %r already locked' % (self.path,))
  111.  
  112.     def unlock(self):
  113.         if not self.lock_file:
  114.             return
  115.         self.lock_file.close()
  116.         self.lock_file = None
  117.         try:
  118.             self.transport.delete(self.lock_path)
  119.         except (NoSuchFile,):
  120.             # What specific errors should we catch here?
  121.             pass
  122.  
  123.  
  124. class _SFTPReadvHelper(object):
  125.     """A class to help with managing the state of a readv request."""
  126.  
  127.     # See _get_requests for an explanation.
  128.     _max_request_size = 32768
  129.  
  130.     def __init__(self, original_offsets, relpath, _report_activity):
  131.         """Create a new readv helper.
  132.  
  133.        :param original_offsets: The original requests given by the caller of
  134.            readv()
  135.        :param relpath: The name of the file (if known)
  136.        :param _report_activity: A Transport._report_activity bound method,
  137.            to be called as data arrives.
  138.        """
  139.         self.original_offsets = list(original_offsets)
  140.         self.relpath = relpath
  141.         self._report_activity = _report_activity
  142.  
  143.     def _get_requests(self):
  144.         """Break up the offsets into individual requests over sftp.
  145.  
  146.        The SFTP spec only requires implementers to support 32kB requests. We
  147.        could try something larger (openssh supports 64kB), but then we have to
  148.        handle requests that fail.
  149.        So instead, we just break up our maximum chunks into 32kB chunks, and
  150.        asyncronously requests them.
  151.        Newer versions of paramiko would do the chunking for us, but we want to
  152.        start processing results right away, so we do it ourselves.
  153.        """
  154.         # TODO: Because we issue async requests, we don't 'fudge' any extra
  155.         #       data.  I'm not 100% sure that is the best choice.
  156.  
  157.         # The first thing we do, is to collapse the individual requests as much
  158.         # as possible, so we don't issues requests <32kB
  159.         sorted_offsets = sorted(self.original_offsets)
  160.         coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
  161.                                                         limit=0, fudge_factor=0))
  162.         requests = []
  163.         for c_offset in coalesced:
  164.             start = c_offset.start
  165.             size = c_offset.length
  166.  
  167.             # Break this up into 32kB requests
  168.             while size > 0:
  169.                 next_size = min(size, self._max_request_size)
  170.                 requests.append((start, next_size))
  171.                 size -= next_size
  172.                 start += next_size
  173.         if 'sftp' in debug.debug_flags:
  174.             mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
  175.                 self.relpath, len(sorted_offsets), len(coalesced),
  176.                 len(requests))
  177.         return requests
  178.  
  179.     def request_and_yield_offsets(self, fp):
  180.         """Request the data from the remote machine, yielding the results.
  181.  
  182.        :param fp: A Paramiko SFTPFile object that supports readv.
  183.        :return: Yield the data requested by the original readv caller, one by
  184.            one.
  185.        """
  186.         requests = self._get_requests()
  187.         offset_iter = iter(self.original_offsets)
  188.         cur_offset, cur_size = offset_iter.next()
  189.         # paramiko .readv() yields strings that are in the order of the requests
  190.         # So we track the current request to know where the next data is
  191.         # being returned from.
  192.         input_start = None
  193.         last_end = None
  194.         buffered_data = []
  195.         buffered_len = 0
  196.  
  197.         # This is used to buffer chunks which we couldn't process yet
  198.         # It is (start, end, data) tuples.
  199.         data_chunks = []
  200.         # Create an 'unlimited' data stream, so we stop based on requests,
  201.         # rather than just because the data stream ended. This lets us detect
  202.         # short readv.
  203.         data_stream = itertools.chain(fp.readv(requests),
  204.                                       itertools.repeat(None))
  205.         for (start, length), data in itertools.izip(requests, data_stream):
  206.             if data is None:
  207.                 if cur_coalesced is not None:
  208.                     raise errors.ShortReadvError(self.relpath,
  209.                         start, length, len(data))
  210.             if len(data) != length:
  211.                 raise errors.ShortReadvError(self.relpath,
  212.                     start, length, len(data))
  213.             self._report_activity(length, 'read')
  214.             if last_end is None:
  215.                 # This is the first request, just buffer it
  216.                 buffered_data = [data]
  217.                 buffered_len = length
  218.                 input_start = start
  219.             elif start == last_end:
  220.                 # The data we are reading fits neatly on the previous
  221.                 # buffer, so this is all part of a larger coalesced range.
  222.                 buffered_data.append(data)
  223.                 buffered_len += length
  224.             else:
  225.                 # We have an 'interrupt' in the data stream. So we know we are
  226.                 # at a request boundary.
  227.                 if buffered_len > 0:
  228.                     # We haven't consumed the buffer so far, so put it into
  229.                     # data_chunks, and continue.
  230.                     buffered = ''.join(buffered_data)
  231.                     data_chunks.append((input_start, buffered))
  232.                 input_start = start
  233.                 buffered_data = [data]
  234.                 buffered_len = length
  235.             last_end = start + length
  236.             if input_start == cur_offset and cur_size <= buffered_len:
  237.                 # Simplify the next steps a bit by transforming buffered_data
  238.                 # into a single string. We also have the nice property that
  239.                 # when there is only one string ''.join([x]) == x, so there is
  240.                 # no data copying.
  241.                 buffered = ''.join(buffered_data)
  242.                 # Clean out buffered data so that we keep memory
  243.                 # consumption low
  244.                 del buffered_data[:]
  245.                 buffered_offset = 0
  246.                 # TODO: We *could* also consider the case where cur_offset is in
  247.                 #       in the buffered range, even though it doesn't *start*
  248.                 #       the buffered range. But for packs we pretty much always
  249.                 #       read in order, so you won't get any extra data in the
  250.                 #       middle.
  251.                 while (input_start == cur_offset
  252.                        and (buffered_offset + cur_size) <= buffered_len):
  253.                     # We've buffered enough data to process this request, spit it
  254.                     # out
  255.                     cur_data = buffered[buffered_offset:buffered_offset + cur_size]
  256.                     # move the direct pointer into our buffered data
  257.                     buffered_offset += cur_size
  258.                     # Move the start-of-buffer pointer
  259.                     input_start += cur_size
  260.                     # Yield the requested data
  261.                     yield cur_offset, cur_data
  262.                     cur_offset, cur_size = offset_iter.next()
  263.                 # at this point, we've consumed as much of buffered as we can,
  264.                 # so break off the portion that we consumed
  265.                 if buffered_offset == len(buffered_data):
  266.                     # No tail to leave behind
  267.                     buffered_data = []
  268.                     buffered_len = 0
  269.                 else:
  270.                     buffered = buffered[buffered_offset:]
  271.                     buffered_data = [buffered]
  272.                     buffered_len = len(buffered)
  273.         # now that the data stream is done, close the handle
  274.         fp.close()
  275.         if buffered_len:
  276.             buffered = ''.join(buffered_data)
  277.             del buffered_data[:]
  278.             data_chunks.append((input_start, buffered))
  279.         if data_chunks:
  280.             if 'sftp' in debug.debug_flags:
  281.                 mutter('SFTP readv left with %d out-of-order bytes',
  282.                     sum(map(lambda x: len(x[1]), data_chunks)))
  283.             # We've processed all the readv data, at this point, anything we
  284.             # couldn't process is in data_chunks. This doesn't happen often, so
  285.             # this code path isn't optimized
  286.             # We use an interesting process for data_chunks
  287.             # Specifically if we have "bisect_left([(start, len, entries)],
  288.             #                                       (qstart,)])
  289.             # If start == qstart, then we get the specific node. Otherwise we
  290.             # get the previous node
  291.             while True:
  292.                 idx = bisect.bisect_left(data_chunks, (cur_offset,))
  293.                 if idx < len(data_chunks) and data_chunks[idx][0] == cur_offset:
  294.                     # The data starts here
  295.                     data = data_chunks[idx][1][:cur_size]
  296.                 elif idx > 0:
  297.                     # The data is in a portion of a previous page
  298.                     idx -= 1
  299.                     sub_offset = cur_offset - data_chunks[idx][0]
  300.                     data = data_chunks[idx][1]
  301.                     data = data[sub_offset:sub_offset + cur_size]
  302.                 else:
  303.                     # We are missing the page where the data should be found,
  304.                     # something is wrong
  305.                     data = ''
  306.                 if len(data) != cur_size:
  307.                     raise AssertionError('We must have miscalulated.'
  308.                         ' We expected %d bytes, but only found %d'
  309.                         % (cur_size, len(data)))
  310.                 yield cur_offset, data
  311.                 cur_offset, cur_size = offset_iter.next()
  312.  
  313.  
  314. class SFTPTransport(ConnectedTransport):
  315.     """Transport implementation for SFTP access."""
  316.  
  317.     _do_prefetch = _default_do_prefetch
  318.     # TODO: jam 20060717 Conceivably these could be configurable, either
  319.     #       by auto-tuning at run-time, or by a configuration (per host??)
  320.     #       but the performance curve is pretty flat, so just going with
  321.     #       reasonable defaults.
  322.     _max_readv_combine = 200
  323.     # Having to round trip to the server means waiting for a response,
  324.     # so it is better to download extra bytes.
  325.     # 8KiB had good performance for both local and remote network operations
  326.     _bytes_to_read_before_seek = 8192
  327.  
  328.     # The sftp spec says that implementations SHOULD allow reads
  329.     # to be at least 32K. paramiko.readv() does an async request
  330.     # for the chunks. So we need to keep it within a single request
  331.     # size for paramiko <= 1.6.1. paramiko 1.6.2 will probably chop
  332.     # up the request itself, rather than us having to worry about it
  333.     _max_request_size = 32768
  334.  
  335.     def _remote_path(self, relpath):
  336.         """Return the path to be passed along the sftp protocol for relpath.
  337.  
  338.        :param relpath: is a urlencoded string.
  339.        """
  340.         remote_path = self._parsed_url.clone(relpath).path
  341.         # the initial slash should be removed from the path, and treated as a
  342.         # homedir relative path (the path begins with a double slash if it is
  343.         # absolute).  see draft-ietf-secsh-scp-sftp-ssh-uri-03.txt
  344.         # RBC 20060118 we are not using this as its too user hostile. instead
  345.         # we are following lftp and using /~/foo to mean '~/foo'
  346.         # vila--20070602 and leave absolute paths begin with a single slash.
  347.         if remote_path.startswith('/~/'):
  348.             remote_path = remote_path[3:]
  349.         elif remote_path == '/~':
  350.             remote_path = ''
  351.         return remote_path
  352.  
  353.     def _create_connection(self, credentials=None):
  354.         """Create a new connection with the provided credentials.
  355.  
  356.        :param credentials: The credentials needed to establish the connection.
  357.  
  358.        :return: The created connection and its associated credentials.
  359.  
  360.        The credentials are only the password as it may have been entered
  361.        interactively by the user and may be different from the one provided
  362.        in base url at transport creation time.
  363.        """
  364.         if credentials is None:
  365.             password = self._parsed_url.password
  366.         else:
  367.             password = credentials
  368.  
  369.         vendor = ssh._get_ssh_vendor()
  370.         user = self._parsed_url.user
  371.         if user is None:
  372.             auth = config.AuthenticationConfig()
  373.             user = auth.get_user('ssh', self._parsed_url.host,
  374.                 self._parsed_url.port)
  375.         connection = vendor.connect_sftp(self._parsed_url.user, password,
  376.             self._parsed_url.host, self._parsed_url.port)
  377.         return connection, (user, password)
  378.  
  379.     def disconnect(self):
  380.         connection = self._get_connection()
  381.         if connection is not None:
  382.             connection.close()
  383.  
  384.     def _get_sftp(self):
  385.         """Ensures that a connection is established"""
  386.         connection = self._get_connection()
  387.         if connection is None:
  388.             # First connection ever
  389.             connection, credentials = self._create_connection()
  390.             self._set_connection(connection, credentials)
  391.         return connection
  392.  
  393.     def has(self, relpath):
  394.         """
  395.        Does the target location exist?
  396.        """
  397.         try:
  398.             self._get_sftp().stat(self._remote_path(relpath))
  399.             # stat result is about 20 bytes, let's say
  400.             self._report_activity(20, 'read')
  401.             return True
  402.         except IOError:
  403.             return False
  404.  
  405.     def get(self, relpath):
  406.         """Get the file at the given relative path.
  407.  
  408.        :param relpath: The relative path to the file
  409.        """
  410.         try:
  411.             path = self._remote_path(relpath)
  412.             f = self._get_sftp().file(path, mode='rb')
  413.             if self._do_prefetch and (getattr(f, 'prefetch', None) is not None):
  414.                 f.prefetch()
  415.             return f
  416.         except (IOError, paramiko.SSHException), e:
  417.             self._translate_io_exception(e, path, ': error retrieving',
  418.                 failure_exc=errors.ReadError)
  419.  
  420.     def get_bytes(self, relpath):
  421.         # reimplement this here so that we can report how many bytes came back
  422.         f = self.get(relpath)
  423.         try:
  424.             bytes = f.read()
  425.             self._report_activity(len(bytes), 'read')
  426.             return bytes
  427.         finally:
  428.             f.close()
  429.  
  430.     def _readv(self, relpath, offsets):
  431.         """See Transport.readv()"""
  432.         # We overload the default readv() because we want to use a file
  433.         # that does not have prefetch enabled.
  434.         # Also, if we have a new paramiko, it implements an async readv()
  435.         if not offsets:
  436.             return
  437.  
  438.         try:
  439.             path = self._remote_path(relpath)
  440.             fp = self._get_sftp().file(path, mode='rb')
  441.             readv = getattr(fp, 'readv', None)
  442.             if readv:
  443.                 return self._sftp_readv(fp, offsets, relpath)
  444.             if 'sftp' in debug.debug_flags:
  445.                 mutter('seek and read %s offsets', len(offsets))
  446.             return self._seek_and_read(fp, offsets, relpath)
  447.         except (IOError, paramiko.SSHException), e:
  448.             self._translate_io_exception(e, path, ': error retrieving')
  449.  
  450.     def recommended_page_size(self):
  451.         """See Transport.recommended_page_size().
  452.  
  453.        For SFTP we suggest a large page size to reduce the overhead
  454.        introduced by latency.
  455.        """
  456.         return 64 * 1024
  457.  
  458.     def _sftp_readv(self, fp, offsets, relpath):
  459.         """Use the readv() member of fp to do async readv.
  460.  
  461.        Then read them using paramiko.readv(). paramiko.readv()
  462.        does not support ranges > 64K, so it caps the request size, and
  463.        just reads until it gets all the stuff it wants.
  464.        """
  465.         helper = _SFTPReadvHelper(offsets, relpath, self._report_activity)
  466.         return helper.request_and_yield_offsets(fp)
  467.  
  468.     def put_file(self, relpath, f, mode=None):
  469.         """
  470.        Copy the file-like object into the location.
  471.  
  472.        :param relpath: Location to put the contents, relative to base.
  473.        :param f:       File-like object.
  474.        :param mode: The final mode for the file
  475.        """
  476.         final_path = self._remote_path(relpath)
  477.         return self._put(final_path, f, mode=mode)
  478.  
  479.     def _put(self, abspath, f, mode=None):
  480.         """Helper function so both put() and copy_abspaths can reuse the code"""
  481.         tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
  482.                         os.getpid(), random.randint(0,0x7FFFFFFF))
  483.         fout = self._sftp_open_exclusive(tmp_abspath, mode=mode)
  484.         closed = False
  485.         try:
  486.             try:
  487.                 fout.set_pipelined(True)
  488.                 length = self._pump(f, fout)
  489.             except (IOError, paramiko.SSHException), e:
  490.                 self._translate_io_exception(e, tmp_abspath)
  491.             # XXX: This doesn't truly help like we would like it to.
  492.             #      The problem is that openssh strips sticky bits. So while we
  493.             #      can properly set group write permission, we lose the group
  494.             #      sticky bit. So it is probably best to stop chmodding, and
  495.             #      just tell users that they need to set the umask correctly.
  496.             #      The attr.st_mode = mode, in _sftp_open_exclusive
  497.             #      will handle when the user wants the final mode to be more
  498.             #      restrictive. And then we avoid a round trip. Unless
  499.             #      paramiko decides to expose an async chmod()
  500.  
  501.             # This is designed to chmod() right before we close.
  502.             # Because we set_pipelined() earlier, theoretically we might
  503.             # avoid the round trip for fout.close()
  504.             if mode is not None:
  505.                 self._get_sftp().chmod(tmp_abspath, mode)
  506.             fout.close()
  507.             closed = True
  508.             self._rename_and_overwrite(tmp_abspath, abspath)
  509.             return length
  510.         except Exception, e:
  511.             # If we fail, try to clean up the temporary file
  512.             # before we throw the exception
  513.             # but don't let another exception mess things up
  514.             # Write out the traceback, because otherwise
  515.             # the catch and throw destroys it
  516.             import traceback
  517.             mutter(traceback.format_exc())
  518.             try:
  519.                 if not closed:
  520.                     fout.close()
  521.                 self._get_sftp().remove(tmp_abspath)
  522.             except:
  523.                 # raise the saved except
  524.                 raise e
  525.             # raise the original with its traceback if we can.
  526.             raise
  527.  
  528.     def _put_non_atomic_helper(self, relpath, writer, mode=None,
  529.                                create_parent_dir=False,
  530.                                dir_mode=None):
  531.         abspath = self._remote_path(relpath)
  532.  
  533.         # TODO: jam 20060816 paramiko doesn't publicly expose a way to
  534.         #       set the file mode at create time. If it does, use it.
  535.         #       But for now, we just chmod later anyway.
  536.  
  537.         def _open_and_write_file():
  538.             """Try to open the target file, raise error on failure"""
  539.             fout = None
  540.             try:
  541.                 try:
  542.                     fout = self._get_sftp().file(abspath, mode='wb')
  543.                     fout.set_pipelined(True)
  544.                     writer(fout)
  545.                 except (paramiko.SSHException, IOError), e:
  546.                     self._translate_io_exception(e, abspath,
  547.                                                  ': unable to open')
  548.  
  549.                 # This is designed to chmod() right before we close.
  550.                 # Because we set_pipelined() earlier, theoretically we might
  551.                 # avoid the round trip for fout.close()
  552.                 if mode is not None:
  553.                     self._get_sftp().chmod(abspath, mode)
  554.             finally:
  555.                 if fout is not None:
  556.                     fout.close()
  557.  
  558.         if not create_parent_dir:
  559.             _open_and_write_file()
  560.             return
  561.  
  562.         # Try error handling to create the parent directory if we need to
  563.         try:
  564.             _open_and_write_file()
  565.         except NoSuchFile:
  566.             # Try to create the parent directory, and then go back to
  567.             # writing the file
  568.             parent_dir = os.path.dirname(abspath)
  569.             self._mkdir(parent_dir, dir_mode)
  570.             _open_and_write_file()
  571.  
  572.     def put_file_non_atomic(self, relpath, f, mode=None,
  573.                             create_parent_dir=False,
  574.                             dir_mode=None):
  575.         """Copy the file-like object into the target location.
  576.  
  577.        This function is not strictly safe to use. It is only meant to
  578.        be used when you already know that the target does not exist.
  579.        It is not safe, because it will open and truncate the remote
  580.        file. So there may be a time when the file has invalid contents.
  581.  
  582.        :param relpath: The remote location to put the contents.
  583.        :param f:       File-like object.
  584.        :param mode:    Possible access permissions for new file.
  585.                        None means do not set remote permissions.
  586.        :param create_parent_dir: If we cannot create the target file because
  587.                        the parent directory does not exist, go ahead and
  588.                        create it, and then try again.
  589.        """
  590.         def writer(fout):
  591.             self._pump(f, fout)
  592.         self._put_non_atomic_helper(relpath, writer, mode=mode,
  593.                                     create_parent_dir=create_parent_dir,
  594.                                     dir_mode=dir_mode)
  595.  
  596.     def put_bytes_non_atomic(self, relpath, bytes, mode=None,
  597.                              create_parent_dir=False,
  598.                              dir_mode=None):
  599.         def writer(fout):
  600.             fout.write(bytes)
  601.         self._put_non_atomic_helper(relpath, writer, mode=mode,
  602.                                     create_parent_dir=create_parent_dir,
  603.                                     dir_mode=dir_mode)
  604.  
  605.     def iter_files_recursive(self):
  606.         """Walk the relative paths of all files in this transport."""
  607.         # progress is handled by list_dir
  608.         queue = list(self.list_dir('.'))
  609.         while queue:
  610.             relpath = queue.pop(0)
  611.             st = self.stat(relpath)
  612.             if stat.S_ISDIR(st.st_mode):
  613.                 for i, basename in enumerate(self.list_dir(relpath)):
  614.                     queue.insert(i, relpath+'/'+basename)
  615.             else:
  616.                 yield relpath
  617.  
  618.     def _mkdir(self, abspath, mode=None):
  619.         if mode is None:
  620.             local_mode = 0777
  621.         else:
  622.             local_mode = mode
  623.         try:
  624.             self._report_activity(len(abspath), 'write')
  625.             self._get_sftp().mkdir(abspath, local_mode)
  626.             self._report_activity(1, 'read')
  627.             if mode is not None:
  628.                 # chmod a dir through sftp will erase any sgid bit set
  629.                 # on the server side.  So, if the bit mode are already
  630.                 # set, avoid the chmod.  If the mode is not fine but
  631.                 # the sgid bit is set, report a warning to the user
  632.                 # with the umask fix.
  633.                 stat = self._get_sftp().lstat(abspath)
  634.                 mode = mode & 0777 # can't set special bits anyway
  635.                 if mode != stat.st_mode & 0777:
  636.                     if stat.st_mode & 06000:
  637.                         warning('About to chmod %s over sftp, which will result'
  638.                                 ' in its suid or sgid bits being cleared.  If'
  639.                                 ' you want to preserve those bits, change your '
  640.                                 ' environment on the server to use umask 0%03o.'
  641.                                 % (abspath, 0777 - mode))
  642.                     self._get_sftp().chmod(abspath, mode=mode)
  643.         except (paramiko.SSHException, IOError), e:
  644.             self._translate_io_exception(e, abspath, ': unable to mkdir',
  645.                 failure_exc=FileExists)
  646.  
  647.     def mkdir(self, relpath, mode=None):
  648.         """Create a directory at the given path."""
  649.         self._mkdir(self._remote_path(relpath), mode=mode)
  650.  
  651.     def open_write_stream(self, relpath, mode=None):
  652.         """See Transport.open_write_stream."""
  653.         # initialise the file to zero-length
  654.         # this is three round trips, but we don't use this
  655.         # api more than once per write_group at the moment so
  656.         # it is a tolerable overhead. Better would be to truncate
  657.         # the file after opening. RBC 20070805
  658.         self.put_bytes_non_atomic(relpath, "", mode)
  659.         abspath = self._remote_path(relpath)
  660.         # TODO: jam 20060816 paramiko doesn't publicly expose a way to
  661.         #       set the file mode at create time. If it does, use it.
  662.         #       But for now, we just chmod later anyway.
  663.         handle = None
  664.         try:
  665.             handle = self._get_sftp().file(abspath, mode='wb')
  666.             handle.set_pipelined(True)
  667.         except (paramiko.SSHException, IOError), e:
  668.             self._translate_io_exception(e, abspath,
  669.                                          ': unable to open')
  670.         _file_streams[self.abspath(relpath)] = handle
  671.         return FileFileStream(self, relpath, handle)
  672.  
  673.     def _translate_io_exception(self, e, path, more_info='',
  674.                                 failure_exc=PathError, operation=None):
  675.         """Translate a paramiko or IOError into a friendlier exception.
  676.  
  677.        :param e: The original exception
  678.        :param path: The path in question when the error is raised
  679.        :param more_info: Extra information that can be included,
  680.                          such as what was going on
  681.        :param failure_exc: Paramiko has the super fun ability to raise completely
  682.                           opaque errors that just set "e.args = ('Failure',)" with
  683.                           no more information.
  684.                           If this parameter is set, it defines the exception
  685.                           to raise in these cases.
  686.        :param operation: Operation that failed (needs to check if it's supported)
  687.        """
  688.         # paramiko seems to generate detailless errors.
  689.         self._translate_error(e, path, raise_generic=False)
  690.         if getattr(e, 'args', None) is not None:
  691.             if (e.args == ('No such file or directory',) or
  692.                 e.args == ('No such file',)):
  693.                 raise NoSuchFile(path, str(e) + more_info)
  694.             if (e.args == ('mkdir failed',) or
  695.                 e.args[0].startswith('syserr: File exists')):
  696.                 raise FileExists(path, str(e) + more_info)
  697.             # strange but true, for the paramiko server.
  698.             if (e.args == ('Failure',)):
  699.                 raise failure_exc(path, str(e) + more_info)
  700.             # Can be something like args = ('Directory not empty:
  701.             # '/srv/bazaar.launchpad.net/blah...: '
  702.             # [Errno 39] Directory not empty',)
  703.             if (e.args[0].startswith('Directory not empty: ')
  704.                 or getattr(e, 'errno', None) == errno.ENOTEMPTY):
  705.                 raise errors.DirectoryNotEmpty(path, str(e))
  706.             if e.args == ('Operation unsupported',):
  707.                 raise errors.TransportOperationNotSupported(operation, more_info)
  708.                 # raise errors.TransportNotPossible(more_info)
  709.             mutter('Raising exception with args %s', e.args)
  710.         if getattr(e, 'errno', None) is not None:
  711.             mutter('Raising exception with errno %s', e.errno)
  712.         raise e
  713.  
  714.     def append_file(self, relpath, f, mode=None):
  715.         """
  716.        Append the text in the file-like object into the final
  717.        location.
  718.        """
  719.         try:
  720.             path = self._remote_path(relpath)
  721.             fout = self._get_sftp().file(path, 'ab')
  722.             if mode is not None:
  723.                 self._get_sftp().chmod(path, mode)
  724.             result = fout.tell()
  725.             self._pump(f, fout)
  726.             return result
  727.         except (IOError, paramiko.SSHException), e:
  728.             self._translate_io_exception(e, relpath, ': unable to append')
  729.  
  730.     def rename(self, rel_from, rel_to):
  731.         """Rename without special overwriting"""
  732.         try:
  733.             self._get_sftp().rename(self._remote_path(rel_from),
  734.                               self._remote_path(rel_to))
  735.         except (IOError, paramiko.SSHException), e:
  736.             self._translate_io_exception(e, rel_from,
  737.                     ': unable to rename to %r' % (rel_to), "rename")
  738.  
  739.     def _rename_and_overwrite(self, abs_from, abs_to):
  740.         """Do a fancy rename on the remote server.
  741.  
  742.        Using the implementation provided by osutils.
  743.        """
  744.         try:
  745.             sftp = self._get_sftp()
  746.             fancy_rename(abs_from, abs_to,
  747.                          rename_func=sftp.rename,
  748.                          unlink_func=sftp.remove)
  749.         except (IOError, paramiko.SSHException), e:
  750.             self._translate_io_exception(e, abs_from,
  751.                     ': unable to rename to %r' % (abs_to), operation="rename")
  752.  
  753.     def move(self, rel_from, rel_to):
  754.         """Move the item at rel_from to the location at rel_to"""
  755.         path_from = self._remote_path(rel_from)
  756.         path_to = self._remote_path(rel_to)
  757.         self._rename_and_overwrite(path_from, path_to)
  758.  
  759.     def delete(self, relpath):
  760.         """Delete the item at relpath"""
  761.         path = self._remote_path(relpath)
  762.         try:
  763.             self._get_sftp().remove(path)
  764.         except (IOError, paramiko.SSHException), e:
  765.             self._translate_io_exception(e, path, ': unable to delete',
  766.                          operation="delete")
  767.  
  768.     def external_url(self):
  769.         """See bzrlib.transport.Transport.external_url."""
  770.         # the external path for SFTP is the base
  771.         return self.base
  772.  
  773.     def listable(self):
  774.         """Return True if this store supports listing."""
  775.         return True
  776.  
  777.     def list_dir(self, relpath):
  778.         """
  779.        Return a list of all files at the given location.
  780.        """
  781.         # does anything actually use this?
  782.         # -- Unknown
  783.         # This is at least used by copy_tree for remote upgrades.
  784.         # -- David Allouche 2006-08-11
  785.         path = self._remote_path(relpath)
  786.         try:
  787.             entries = self._get_sftp().listdir(path)
  788.             self._report_activity(sum(map(len, entries)), 'read')
  789.         except (IOError, paramiko.SSHException), e:
  790.             self._translate_io_exception(e, path, ': failed to list_dir')
  791.         return [urlutils.escape(entry) for entry in entries]
  792.  
  793.     def rmdir(self, relpath):
  794.         """See Transport.rmdir."""
  795.         path = self._remote_path(relpath)
  796.         try:
  797.             return self._get_sftp().rmdir(path)
  798.         except (IOError, paramiko.SSHException), e:
  799.             self._translate_io_exception(e, path, ': failed to rmdir',
  800.                         operation="rmdir")
  801.  
  802.     def stat(self, relpath):
  803.         """Return the stat information for a file."""
  804.         path = self._remote_path(relpath)
  805.         try:
  806.             return self._get_sftp().lstat(path)
  807.         except (IOError, paramiko.SSHException), e:
  808.             self._translate_io_exception(e, path, ': unable to stat')
  809.  
  810.     def readlink(self, relpath):
  811.         """See Transport.readlink."""
  812.         path = self._remote_path(relpath)
  813.         try:
  814.             return self._get_sftp().readlink(path)
  815.         except (IOError, paramiko.SSHException), e:
  816.             self._translate_io_exception(e, path, ': unable to readlink',
  817.                     operation="readlink")
  818.  
  819.     def symlink(self, source, link_name):
  820.         """See Transport.symlink."""
  821.         try:
  822.             conn = self._get_sftp()
  823.             sftp_retval = conn.symlink(source, link_name)
  824.             if SFTP_OK != sftp_retval:
  825.                 raise TransportError(
  826.                     '%r: unable to create symlink to %r' % (link_name, source),
  827.                     sftp_retval
  828.                 )
  829.         except (IOError, paramiko.SSHException), e:
  830.             self._translate_io_exception(e, link_name,
  831.                     ': unable to create symlink to %r' % (source),
  832.                     operation="symlink")
  833.  
  834.     def lock_read(self, relpath):
  835.         """
  836.        Lock the given file for shared (read) access.
  837.        :return: A lock object, which has an unlock() member function
  838.        """
  839.         # FIXME: there should be something clever i can do here...
  840.         class BogusLock(object):
  841.             def __init__(self, path):
  842.                 self.path = path
  843.             def unlock(self):
  844.                 pass
  845.         return BogusLock(relpath)
  846.  
  847.     def lock_write(self, relpath):
  848.         """
  849.        Lock the given file for exclusive (write) access.
  850.        WARNING: many transports do not support this, so trying avoid using it
  851.  
  852.        :return: A lock object, which has an unlock() member function
  853.        """
  854.         # This is a little bit bogus, but basically, we create a file
  855.         # which should not already exist, and if it does, we assume
  856.         # that there is a lock, and if it doesn't, the we assume
  857.         # that we have taken the lock.
  858.         return SFTPLock(relpath, self)
  859.  
  860.     def _sftp_open_exclusive(self, abspath, mode=None):
  861.         """Open a remote path exclusively.
  862.  
  863.        SFTP supports O_EXCL (SFTP_FLAG_EXCL), which fails if
  864.        the file already exists. However it does not expose this
  865.        at the higher level of SFTPClient.open(), so we have to
  866.        sneak away with it.
  867.  
  868.        WARNING: This breaks the SFTPClient abstraction, so it
  869.        could easily break against an updated version of paramiko.
  870.  
  871.        :param abspath: The remote absolute path where the file should be opened
  872.        :param mode: The mode permissions bits for the new file
  873.        """
  874.         # TODO: jam 20060816 Paramiko >= 1.6.2 (probably earlier) supports
  875.         #       using the 'x' flag to indicate SFTP_FLAG_EXCL.
  876.         #       However, there is no way to set the permission mode at open
  877.         #       time using the sftp_client.file() functionality.
  878.         path = self._get_sftp()._adjust_cwd(abspath)
  879.         # mutter('sftp abspath %s => %s', abspath, path)
  880.         attr = SFTPAttributes()
  881.         if mode is not None:
  882.             attr.st_mode = mode
  883.         omode = (SFTP_FLAG_WRITE | SFTP_FLAG_CREATE
  884.                 | SFTP_FLAG_TRUNC | SFTP_FLAG_EXCL)
  885.         try:
  886.             t, msg = self._get_sftp()._request(CMD_OPEN, path, omode, attr)
  887.             if t != CMD_HANDLE:
  888.                 raise TransportError('Expected an SFTP handle')
  889.             handle = msg.get_string()
  890.             return SFTPFile(self._get_sftp(), handle, 'wb', -1)
  891.         except (paramiko.SSHException, IOError), e:
  892.             self._translate_io_exception(e, abspath, ': unable to open',
  893.                 failure_exc=FileExists, operation="open")
  894.  
  895.     def _can_roundtrip_unix_modebits(self):
  896.         if sys.platform == 'win32':
  897.             # anyone else?
  898.             return False
  899.         else:
  900.             return True
  901.  
  902.  
  903. def get_test_permutations():
  904.     """Return the permutations to be used in testing."""
  905.     from bzrlib.tests import stub_sftp
  906.     return [(SFTPTransport, stub_sftp.SFTPAbsoluteServer),
  907.             (SFTPTransport, stub_sftp.SFTPHomeDirServer),
  908.             (SFTPTransport, stub_sftp.SFTPSiblingAbsoluteServer),
  909.             ]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement