Advertisement
gatoatigrado3

add-closes-fds-debug-codes

Dec 2nd, 2013
173
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 7.88 KB | None | 0 0
  1. commit f2703fb4c1a46151d1e24fb5368fb0b2d8ff5533
  2. Author: Nicholas Tung <ntung@yelp.com>
  3. Date:   Mon Dec 2 15:31:55 2013 -0800
  4.  
  5.     Make code for closing queues on worker threads symmetrical; make the closes FDs test more informative when errors are encountered
  6.  
  7. diff --git a/tests/closes_fds_test.py b/tests/closes_fds_test.py
  8. index ee5624f..5ec134a 100644
  9. --- a/tests/closes_fds_test.py
  10. +++ b/tests/closes_fds_test.py
  11. @@ -5,11 +5,14 @@ Provides an interface for defining worker processes.
  12.  from __future__ import absolute_import
  13.  from __future__ import print_function
  14.  
  15. +import mock
  16.  import os
  17.  import stat
  18. +import testify as T
  19.  import vimap.pool
  20. +import vimap.queue_manager
  21.  import vimap.worker_process
  22. -import testify as T
  23. +from collections import namedtuple
  24.  
  25.  
  26.  # decrypt POSIX stuff
  27. @@ -23,30 +26,39 @@ readable_mode_strings = {
  28.      'socket': stat.S_ISSOCK}
  29.  
  30.  
  31. +FDInfo = namedtuple("FDInfo", ["modes", "symlink"])
  32. +
  33. +
  34.  def fd_type_if_open(fd_number):
  35.      """For a given open file descriptor, return a list of human-readable
  36.      strings describing the file type.
  37.      """
  38.      fd_stat = os.fstat(fd_number)
  39. -    return [
  40. -        k for k, v in readable_mode_strings.items()
  41. -        if v(fd_stat.st_mode)]
  42. +    return FDInfo(
  43. +        modes=[k for k, v in readable_mode_strings.items() if v(fd_stat.st_mode)],
  44. +        symlink=os.readlink("/proc/{0}/fd/{1}".format(os.getpid(), fd_number)))
  45.  
  46.  
  47. -def get_open_fds():
  48. +def get_open_fds(retries=3):
  49.      """
  50.      Returns a map,
  51.  
  52. -        fd (int) --> modes (list of human-readable strings)
  53. +        fd (int) --> FDInfo
  54.      """
  55.      unix_fd_dir = "/proc/{0}/fd".format(os.getpid())
  56.      fds = [(int(i), os.path.join(unix_fd_dir, i)) for i in os.listdir(unix_fd_dir)]
  57. -    # NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
  58. -    # re-check whether the FD still exists (via os.path.exists)
  59. -    fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
  60. -    return dict(filter(
  61. -        lambda (k, v): v is not None,
  62. -        ((i, fd_type_if_open(i)) for i in fds)))
  63. +
  64. +    try:
  65. +        # NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
  66. +        # re-check whether the FD still exists (via os.path.exists)
  67. +        fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
  68. +        return dict(filter(
  69. +            lambda (k, v): v is not None,
  70. +            ((i, fd_type_if_open(i)) for i in fds)))
  71. +    except OSError:
  72. +        if retries == 0:
  73. +            raise
  74. +        return get_open_fds(retries - 1)
  75.  
  76.  
  77.  def difference_open_fds(before, after):
  78. @@ -58,7 +70,8 @@ def difference_open_fds(before, after):
  79.      # "a - b" for dicts -- remove anything in 'a' that has a key in b
  80.      dict_diff = lambda a, b: dict((k, a[k]) for k in (frozenset(a) - frozenset(b)))
  81.      for k in (frozenset(after) & frozenset(before)):
  82. -        assert before[k] == after[k], "Changing FD types aren't supported!"
  83. +        if before[k] != after[k]:
  84. +            print("WARNING: FD {0} changed from {1} to {2}".format(k, before[k], after[k]))
  85.      return {
  86.          'closed': dict_diff(before, after),
  87.          'opened': dict_diff(after, before)}
  88. @@ -88,7 +101,31 @@ def basic_worker(xs):
  89.          yield x + 1
  90.  
  91.  
  92. +def repeat(times):
  93. +    """Repeats a test to help catch flakiness."""
  94. +    def fcn_helper(fcn):
  95. +        return lambda *args, **kwargs: [fcn(*args, **kwargs) for _ in xrange(times)]
  96. +    return fcn_helper
  97. +
  98. +
  99.  class TestBasicMapDoesntLeaveAroundFDs(T.TestCase):
  100. +    @T.setup_teardown
  101. +    def instrument_queue_initiation(self):
  102. +        old_init = vimap.queue_manager.VimapQueueManager.__init__
  103. +        def instrumented_init(*args, **kwargs):
  104. +            self.before_queue_manager_init = get_open_fds()
  105. +            old_init(*args, **kwargs)
  106. +            self.after_queue_manager_init = get_open_fds()
  107. +            self.queue_fds = difference_open_fds(
  108. +                self.before_queue_manager_init,
  109. +                self.after_queue_manager_init)['opened']
  110. +        with mock.patch.object(
  111. +                vimap.queue_manager.VimapQueueManager,
  112. +                '__init__',
  113. +                instrumented_init):
  114. +            yield
  115. +
  116. +    @repeat(30)
  117.      def test_all_fds_cleaned_up(self):
  118.          initial_open_fds = get_open_fds()
  119.          pool = vimap.pool.fork_identical(basic_worker, num_workers=1)
  120. @@ -101,12 +138,16 @@ class TestBasicMapDoesntLeaveAroundFDs(T.TestCase):
  121.          # T.assert_equal(after_fork['closed'], [])
  122.          T.assert_gte(len(after_fork['opened']), 2)  # should have at least 3 open fds
  123.          # All opened files should be FIFOs
  124. -        T.assert_equal(all(typ == ['fifo'] for typ in after_fork['opened'].values()), True)
  125. +        T.assert_equal(all(info.modes == ['fifo'] for info in after_fork['opened'].values()), True)
  126.  
  127.          after_cleanup = difference_open_fds(after_fork_open_fds, after_finish_open_fds)
  128.          T.assert_gte(len(after_cleanup['closed']), 2)
  129.  
  130.          left_around = difference_open_fds(initial_open_fds, after_finish_open_fds)
  131. +        if len(left_around['opened']) != 0:
  132. +            queue_fds_left_around = dict(
  133. +                item for item in self.queue_fds.items() if item[0] in left_around['opened'])
  134. +            print("Queue FDs left around: {0}".format(queue_fds_left_around))
  135.          T.assert_equal(len(left_around['opened']), 0)
  136.  
  137.  
  138. diff --git a/vimap/real_worker_routine.py b/vimap/real_worker_routine.py
  139. index ad58c2a..5e50f48 100644
  140. --- a/vimap/real_worker_routine.py
  141. +++ b/vimap/real_worker_routine.py
  142. @@ -54,33 +54,37 @@ class WorkerRoutine(object):
  143.                      file=sys.stderr)
  144.                  raise
  145.  
  146. -    def explicitly_close_queues(self):
  147. -        '''Explicitly join queues, so that we'll get "stuck" in something that's
  148. -        more easily debugged than multiprocessing.
  149. -
  150. -        NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
  151. -        but this seems to leave us in a bad state in practice (reproducible
  152. -        via existing tests).
  153. -        '''
  154. -        self.input_queue.close()
  155. -        self.output_queue.close()
  156. +    def safe_close_queue(self, name, queue):
  157. +        self.debug("Closing queue {0}", name)
  158. +        queue.close()
  159.          try:
  160. -            self.debug("Joining input queue")
  161. -            self.input_queue.join_thread()
  162. -            self.debug("...done")
  163. +            self.debug("Joining thread for queue {0}", name)
  164.  
  165.              try:
  166.                  self.debug(
  167. -                    "Joining output queue (size {size}, full: {full})",
  168. -                    size=self.output_queue.qsize(),
  169. -                    full=self.output_queue.full())
  170. +                    "Joining queue {name} (size {size}, full: {full})",
  171. +                    name=name,
  172. +                    size=queue.qsize(),
  173. +                    full=queue.full())
  174.              except NotImplementedError:
  175.                  pass  # Mac OS X doesn't implement qsize()
  176. -            self.output_queue.join_thread()
  177. -            self.debug("...done")
  178. +            queue.join_thread()
  179.          # threads might have already been closed
  180.          except AssertionError as e:
  181. -            self.debug("Couldn't join threads; error {0}", e)
  182. +            self.debug("Couldn't join queue {0}; error {1}", name, e)
  183. +        else:
  184. +            self.debug("Done closing {0}, no exceptions.", name)
  185. +
  186. +    def explicitly_close_queues(self):
  187. +        '''Explicitly join queues, so that we'll get "stuck" in something that's
  188. +        more easily debugged than multiprocessing.
  189. +
  190. +        NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
  191. +        but this seems to leave us in a bad state in practice (reproducible
  192. +        via existing tests).
  193. +        '''
  194. +        self.safe_close_queue('input', self.input_queue)
  195. +        self.safe_close_queue('output', self.output_queue)
  196.  
  197.      def run(self, input_queue, output_queue):
  198.          '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement