Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- commit f2703fb4c1a46151d1e24fb5368fb0b2d8ff5533
- Author: Nicholas Tung <ntung@yelp.com>
- Date: Mon Dec 2 15:31:55 2013 -0800
- Make code for closing queues on worker threads symmetrical; make the closes FDs test more informative when errors are encountered
- diff --git a/tests/closes_fds_test.py b/tests/closes_fds_test.py
- index ee5624f..5ec134a 100644
- --- a/tests/closes_fds_test.py
- +++ b/tests/closes_fds_test.py
- @@ -5,11 +5,14 @@ Provides an interface for defining worker processes.
- from __future__ import absolute_import
- from __future__ import print_function
- +import mock
- import os
- import stat
- +import testify as T
- import vimap.pool
- +import vimap.queue_manager
- import vimap.worker_process
- -import testify as T
- +from collections import namedtuple
- # decrypt POSIX stuff
- @@ -23,30 +26,39 @@ readable_mode_strings = {
- 'socket': stat.S_ISSOCK}
- +FDInfo = namedtuple("FDInfo", ["modes", "symlink"])
- +
- +
- def fd_type_if_open(fd_number):
- """For a given open file descriptor, return a list of human-readable
- strings describing the file type.
- """
- fd_stat = os.fstat(fd_number)
- - return [
- - k for k, v in readable_mode_strings.items()
- - if v(fd_stat.st_mode)]
- + return FDInfo(
- + modes=[k for k, v in readable_mode_strings.items() if v(fd_stat.st_mode)],
- + symlink=os.readlink("/proc/{0}/fd/{1}".format(os.getpid(), fd_number)))
- -def get_open_fds():
- +def get_open_fds(retries=3):
- """
- Returns a map,
- - fd (int) --> modes (list of human-readable strings)
- + fd (int) --> FDInfo
- """
- unix_fd_dir = "/proc/{0}/fd".format(os.getpid())
- fds = [(int(i), os.path.join(unix_fd_dir, i)) for i in os.listdir(unix_fd_dir)]
- - # NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
- - # re-check whether the FD still exists (via os.path.exists)
- - fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
- - return dict(filter(
- - lambda (k, v): v is not None,
- - ((i, fd_type_if_open(i)) for i in fds)))
- +
- + try:
- + # NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
- + # re-check whether the FD still exists (via os.path.exists)
- + fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
- + return dict(filter(
- + lambda (k, v): v is not None,
- + ((i, fd_type_if_open(i)) for i in fds)))
- + except OSError:
- + if retries == 0:
- + raise
- + return get_open_fds(retries - 1)
- def difference_open_fds(before, after):
- @@ -58,7 +70,8 @@ def difference_open_fds(before, after):
- # "a - b" for dicts -- remove anything in 'a' that has a key in b
- dict_diff = lambda a, b: dict((k, a[k]) for k in (frozenset(a) - frozenset(b)))
- for k in (frozenset(after) & frozenset(before)):
- - assert before[k] == after[k], "Changing FD types aren't supported!"
- + if before[k] != after[k]:
- + print("WARNING: FD {0} changed from {1} to {2}".format(k, before[k], after[k]))
- return {
- 'closed': dict_diff(before, after),
- 'opened': dict_diff(after, before)}
- @@ -88,7 +101,31 @@ def basic_worker(xs):
- yield x + 1
- +def repeat(times):
- + """Repeats a test to help catch flakiness."""
- + def fcn_helper(fcn):
- + return lambda *args, **kwargs: [fcn(*args, **kwargs) for _ in xrange(times)]
- + return fcn_helper
- +
- +
- class TestBasicMapDoesntLeaveAroundFDs(T.TestCase):
- + @T.setup_teardown
- + def instrument_queue_initiation(self):
- + old_init = vimap.queue_manager.VimapQueueManager.__init__
- + def instrumented_init(*args, **kwargs):
- + self.before_queue_manager_init = get_open_fds()
- + old_init(*args, **kwargs)
- + self.after_queue_manager_init = get_open_fds()
- + self.queue_fds = difference_open_fds(
- + self.before_queue_manager_init,
- + self.after_queue_manager_init)['opened']
- + with mock.patch.object(
- + vimap.queue_manager.VimapQueueManager,
- + '__init__',
- + instrumented_init):
- + yield
- +
- + @repeat(30)
- def test_all_fds_cleaned_up(self):
- initial_open_fds = get_open_fds()
- pool = vimap.pool.fork_identical(basic_worker, num_workers=1)
- @@ -101,12 +138,16 @@ class TestBasicMapDoesntLeaveAroundFDs(T.TestCase):
- # T.assert_equal(after_fork['closed'], [])
- T.assert_gte(len(after_fork['opened']), 2) # should have at least 3 open fds
- # All opened files should be FIFOs
- - T.assert_equal(all(typ == ['fifo'] for typ in after_fork['opened'].values()), True)
- + T.assert_equal(all(info.modes == ['fifo'] for info in after_fork['opened'].values()), True)
- after_cleanup = difference_open_fds(after_fork_open_fds, after_finish_open_fds)
- T.assert_gte(len(after_cleanup['closed']), 2)
- left_around = difference_open_fds(initial_open_fds, after_finish_open_fds)
- + if len(left_around['opened']) != 0:
- + queue_fds_left_around = dict(
- + item for item in self.queue_fds.items() if item[0] in left_around['opened'])
- + print("Queue FDs left around: {0}".format(queue_fds_left_around))
- T.assert_equal(len(left_around['opened']), 0)
- diff --git a/vimap/real_worker_routine.py b/vimap/real_worker_routine.py
- index ad58c2a..5e50f48 100644
- --- a/vimap/real_worker_routine.py
- +++ b/vimap/real_worker_routine.py
- @@ -54,33 +54,37 @@ class WorkerRoutine(object):
- file=sys.stderr)
- raise
- - def explicitly_close_queues(self):
- - '''Explicitly join queues, so that we'll get "stuck" in something that's
- - more easily debugged than multiprocessing.
- -
- - NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
- - but this seems to leave us in a bad state in practice (reproducible
- - via existing tests).
- - '''
- - self.input_queue.close()
- - self.output_queue.close()
- + def safe_close_queue(self, name, queue):
- + self.debug("Closing queue {0}", name)
- + queue.close()
- try:
- - self.debug("Joining input queue")
- - self.input_queue.join_thread()
- - self.debug("...done")
- + self.debug("Joining thread for queue {0}", name)
- try:
- self.debug(
- - "Joining output queue (size {size}, full: {full})",
- - size=self.output_queue.qsize(),
- - full=self.output_queue.full())
- + "Joining queue {name} (size {size}, full: {full})",
- + name=name,
- + size=queue.qsize(),
- + full=queue.full())
- except NotImplementedError:
- pass # Mac OS X doesn't implement qsize()
- - self.output_queue.join_thread()
- - self.debug("...done")
- + queue.join_thread()
- # threads might have already been closed
- except AssertionError as e:
- - self.debug("Couldn't join threads; error {0}", e)
- + self.debug("Couldn't join queue {0}; error {1}", name, e)
- + else:
- + self.debug("Done closing {0}, no exceptions.", name)
- +
- + def explicitly_close_queues(self):
- + '''Explicitly join queues, so that we'll get "stuck" in something that's
- + more easily debugged than multiprocessing.
- +
- + NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
- + but this seems to leave us in a bad state in practice (reproducible
- + via existing tests).
- + '''
- + self.safe_close_queue('input', self.input_queue)
- + self.safe_close_queue('output', self.output_queue)
- def run(self, input_queue, output_queue):
- '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement