python-futures/0003-Backport-fixes-to-as_completed-and-map-iterators-bpo.patch
zhang-liang-pengkun a26854a04f Backport fixes to as_completed and map iterators (bpo-27144) (#66)
Signed-off-by: zhang-liang-pengkun <zhangliangpengkun@xfusion.com>
2023-12-21 15:12:40 +08:00

122 lines
4.0 KiB
Diff

From 28fa404639a702c32a18a1a411409f1aab8ab464 Mon Sep 17 00:00:00 2001
From: Lisandro Dalcin <dalcinl@gmail.com>
Date: Wed, 29 Nov 2017 22:40:47 +0300
Subject: [PATCH] Backport fixes to as_completed and map iterators (bpo-27144)
(#66)
Python issues:
+ python/cpython#1560
+ python/cpython#3270
+ python/cpython#3830
---
concurrent/futures/_base.py | 53 ++++++++++++++++++++++++++++++-------
1 file changed, 43 insertions(+), 10 deletions(-)
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py
index 401e488..510ffa5 100644
--- a/concurrent/futures/_base.py
+++ b/concurrent/futures/_base.py
@@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):
return waiter
+
+def _yield_finished_futures(fs, waiter, ref_collect):
+ """
+ Iterate on the list *fs*, yielding finished futures one by one in
+ reverse order.
+ Before yielding a future, *waiter* is removed from its waiters
+ and the future is removed from each set in the collection of sets
+ *ref_collect*.
+
+ The aim of this function is to avoid keeping stale references after
+ the future is yielded and before the iterator resumes.
+ """
+ while fs:
+ f = fs[-1]
+ for futures_set in ref_collect:
+ futures_set.remove(f)
+ with f._condition:
+ f._waiters.remove(waiter)
+ del f
+ # Careful not to keep a reference to the popped value
+ yield fs.pop()
+
+
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@@ -194,16 +217,19 @@ def as_completed(fs, timeout=None):
end_time = timeout + time.time()
fs = set(fs)
+ total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+ finished = list(finished)
try:
- for future in finished:
- yield future
+ for f in _yield_finished_futures(finished, waiter,
+ ref_collect=(fs,)):
+ f = [f]
+ yield f.pop()
while pending:
if timeout is None:
@@ -213,7 +239,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
- len(pending), len(fs)))
+ len(pending), total_futures))
waiter.event.wait(wait_timeout)
@@ -222,11 +248,15 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()
- for future in finished:
- yield future
- pending.remove(future)
+ # reverse to keep finishing order
+ finished.reverse()
+ for f in _yield_finished_futures(finished, waiter,
+ ref_collect=(fs, pending)):
+ f = [f]
+ yield f.pop()
finally:
+ # Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
@@ -603,11 +633,14 @@ class Executor(object):
# before the first iterator value is required.
def result_iterator():
try:
- for future in fs:
+ # reverse to keep finishing order
+ fs.reverse()
+ while fs:
+ # Careful not to keep a reference to the popped future
if timeout is None:
- yield future.result()
+ yield fs.pop().result()
else:
- yield future.result(end_time - time.time())
+ yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
--
2.39.0.windows.2