From 28fa404639a702c32a18a1a411409f1aab8ab464 Mon Sep 17 00:00:00 2001 From: Lisandro Dalcin Date: Wed, 29 Nov 2017 22:40:47 +0300 Subject: [PATCH] Backport fixes to as_completed and map iterators (bpo-27144) (#66) Python issues: + python/cpython#1560 + python/cpython#3270 + python/cpython#3830 --- concurrent/futures/_base.py | 53 ++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py index 401e488..510ffa5 100644 --- a/concurrent/futures/_base.py +++ b/concurrent/futures/_base.py @@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when): return waiter + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -194,16 +217,19 @@ def as_completed(fs, timeout=None): end_time = timeout + time.time() fs = set(fs) + total_futures = len(fs) with _AcquireFutures(fs): finished = set( f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - + finished = list(finished) try: - for future in finished: - yield future + for f in _yield_finished_futures(finished, waiter, + ref_collect=(fs,)): + f = [f] + yield f.pop() while pending: if timeout is None: @@ -213,7 +239,7 @@ def as_completed(fs, timeout=None): if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) + len(pending), total_futures)) waiter.event.wait(wait_timeout) @@ -222,11 +248,15 @@ def as_completed(fs, timeout=None): waiter.finished_futures = [] waiter.event.clear() - for future in finished: - yield future - pending.remove(future) + # reverse to keep finishing order + finished.reverse() + for f in _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)): + f = [f] + yield f.pop() finally: + # Remove waiter from unfinished futures for f in fs: with f._condition: f._waiters.remove(waiter) @@ -603,11 +633,14 @@ class Executor(object): # before the first iterator value is required. def result_iterator(): try: - for future in fs: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future if timeout is None: - yield future.result() + yield fs.pop().result() else: - yield future.result(end_time - time.time()) + yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() -- 2.39.0.windows.2