From e8543e620d87f41f78751b4285a3f1af06023a62 Mon Sep 17 00:00:00 2001 From: Steven Seguin Date: Tue, 8 Aug 2017 11:34:27 -0400 Subject: [PATCH] Backport `thread_name_prefix` from upstream (#64) Changes from https://github.com/python/cpython/pull/2315 Fixes #63. --- CHANGES | 7 +++++++ concurrent/futures/thread.py | 17 ++++++++++++++--- test_futures.py | 29 ++++++++++++++++++++++++++--- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/CHANGES b/CHANGES index 4ce2585..f7d411c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +Unreleased +========== + +- The ThreadPoolExecutor class constructor now accepts an optional ``thread_name_prefix`` + argument to make it possible to customize the names of the threads created by the pool. + Upstream contribution by Gregory P. Smith in https://bugs.python.org/issue27664. + 3.1.1 ===== diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py index efae619..bb0ce9d 100644 --- a/concurrent/futures/thread.py +++ b/concurrent/futures/thread.py @@ -5,6 +5,7 @@ import atexit from concurrent.futures import _base +import itertools import Queue as queue import threading import weakref @@ -90,12 +91,17 @@ def _worker(executor_reference, work_queue): class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().next + + def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often @@ -109,6 +115,8 @@ class ThreadPoolExecutor(_base.Executor): self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) def submit(self, fn, *args, **kwargs): with self._shutdown_lock: @@ -130,8 +138,11 @@ class ThreadPoolExecutor(_base.Executor): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True diff --git a/test_futures.py b/test_futures.py index e7cd8cf..95a3ca2 100644 --- a/test_futures.py +++ b/test_futures.py @@ -29,7 +29,7 @@ def reap_threads(func): If threading is unavailable this function does nothing. """ @functools.wraps(func) - def decorator(*args): + def decorator(*args): key = test_support.threading_setup() try: return func(*args) @@ -50,7 +50,7 @@ def _assert_python(expected_success, *args, **env_vars): # caller is responsible to pass the full environment. if env_vars.pop('__cleanenv', None): env = {} - env.update(env_vars) + env.update(env_vars) cmd_line.extend(args) p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -78,7 +78,7 @@ def assert_python_ok(*args, **env_vars): return _assert_python(True, *args, **env_vars) -def strip_python_stderr(stderr): +def strip_python_stderr(stderr): """Strip the stderr of a Python process from potential debug output emitted by the interpreter. @@ -230,6 +230,29 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): for t in threads: t.join() + def test_thread_names_assigned(self): + executor = futures.ThreadPoolExecutor( + max_workers=5, thread_name_prefix='SpecialPool') + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$') + t.join() + + def test_thread_names_default(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + # Ensure that our default name is reasonably sane and unique when + # no thread_name_prefix was supplied. + self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') + t.join() + class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): def _prime_executor(self): -- 2.39.0.windows.2