kwcoco.util.util_futures module¶
Deprecated and functionality moved to ubelt
- class kwcoco.util.util_futures.Executor(mode='thread', max_workers=0)[source]¶
Bases:
object
Wrapper around a specific executor.
Abstracts Serial, Thread, and Process Executor via arguments.
- Parameters
mode (str, default=’thread’) – either thread, serial, or process
max_workers (int, default=0) – number of workers. If 0, serial is forced.
Example
>>> import platform >>> import sys >>> # The process backend breaks pyp3 when using coverage >>> if 'pypy' in platform.python_implementation().lower(): ... import pytest ... pytest.skip('not testing process on pypy') >>> if sys.platform.startswith('win32'): ... import pytest ... pytest.skip('not running this test on win32 for now') >>> import ubelt as ub >>> # Fork before threading! >>> # https://pybay.com/site_media/slides/raymond2017-keynote/combo.html >>> self1 = ub.Executor(mode='serial', max_workers=0) >>> self1.__enter__() >>> self2 = ub.Executor(mode='process', max_workers=2) >>> self2.__enter__() >>> self3 = ub.Executor(mode='thread', max_workers=2) >>> self3.__enter__() >>> jobs = [] >>> jobs.append(self1.submit(sum, [1, 2, 3])) >>> jobs.append(self1.submit(sum, [1, 2, 3])) >>> jobs.append(self2.submit(sum, [10, 20, 30])) >>> jobs.append(self2.submit(sum, [10, 20, 30])) >>> jobs.append(self3.submit(sum, [4, 5, 5])) >>> jobs.append(self3.submit(sum, [4, 5, 5])) >>> for job in jobs: >>> result = job.result() >>> print('result = {!r}'.format(result)) >>> self1.__exit__(None, None, None) >>> self2.__exit__(None, None, None) >>> self3.__exit__(None, None, None)
Example
>>> import ubelt as ub >>> self1 = ub.Executor(mode='serial', max_workers=0) >>> with self1: >>> jobs = [] >>> for i in range(10): >>> jobs.append(self1.submit(sum, [i + 1, i])) >>> for job in jobs: >>> job.add_done_callback(lambda x: print('done callback got x = {}'.format(x))) >>> result = job.result() >>> print('result = {!r}'.format(result))
- submit(func, *args, **kw)[source]¶
Calls the submit function of the underlying backend.
- Returns
a future representing the job
- Return type
- map(fn, *iterables, **kwargs)[source]¶
Calls the map function of the underlying backend.
CommandLine
xdoctest -m ubelt.util_futures Executor.map
Example
>>> import ubelt as ub >>> import concurrent.futures >>> import string >>> with ub.Executor(mode='serial') as executor: ... result_iter = executor.map(int, string.digits) ... results = list(result_iter) >>> print('results = {!r}'.format(results)) results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> with ub.Executor(mode='thread', max_workers=2) as executor: ... result_iter = executor.map(int, string.digits) ... results = list(result_iter) >>> # xdoctest: +IGNORE_WANT >>> print('results = {!r}'.format(results)) results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- class kwcoco.util.util_futures.JobPool(mode='thread', max_workers=0)[source]¶
Bases:
object
Abstracts away boilerplate of submitting and collecting jobs
This is a basic wrapper around
ubelt.util_futures.Executor
that simplifies the most basic case.Example
>>> import ubelt as ub >>> def worker(data): >>> return data + 1 >>> pool = ub.JobPool('thread', max_workers=16) >>> for data in ub.ProgIter(range(10), desc='submit jobs'): >>> pool.submit(worker, data) >>> final = [] >>> for job in pool.as_completed(desc='collect jobs'): >>> info = job.result() >>> final.append(info) >>> print('final = {!r}'.format(final))
- submit(func, *args, **kwargs)[source]¶
Submit a job managed by the pool
- Parameters
func (Callable[…, Any]) – A callable that will take as many arguments as there are passed iterables.
*args – positional arguments to pass to the function
*kwargs – keyword arguments to pass to the function
- Returns
a future representing the job
- Return type
- as_completed(timeout=None, desc=None, progkw=None)[source]¶
Generates completed jobs in an arbitrary order
- Parameters
timeout (float | None) – Specify the the maximum number of seconds to wait for a job. Note: this is ignored in serial mode.
desc (str | None) – if specified, reports progress with a
ubelt.progiter.ProgIter
object.progkw (dict | None) – extra keyword arguments to
ubelt.progiter.ProgIter
.
- Yields
concurrent.futures.Future – The completed future object containing the results of a job.
CommandLine
xdoctest -m ubelt.util_futures JobPool.as_completed
Example
>>> import ubelt as ub >>> pool = ub.JobPool('thread', max_workers=8) >>> text = ub.paragraph( ... ''' ... UDP is a cool protocol, check out the wiki: ... ... UDP-based Data Transfer Protocol (UDT), is a high-performance ... data transfer protocol designed for transferring large ... volumetric datasets over high-speed wide area networks. Such ... settings are typically disadvantageous for the more common TCP ... protocol. ... ''') >>> for word in text.split(' '): ... pool.submit(print, word) >>> for _ in pool.as_completed(): ... pass >>> pool.shutdown()
- join(**kwargs)[source]¶
Like
JobPool.as_completed()
, but executes the result method of each future and returns only after all processes are complete. This allows for lower-boilerplate prototyping.- Parameters
**kwargs – passed to
JobPool.as_completed()
- Returns
list of results
- Return type
List[Any]
Example
>>> import ubelt as ub >>> # We just want to try replacing our simple iterative algorithm >>> # with the embarassingly parallel version >>> arglist = list(zip(range(1000), range(1000))) >>> func = ub.identity >>> # >>> # Original version >>> for args in arglist: >>> func(*args) >>> # >>> # Potentially parallel version >>> jobs = ub.JobPool(max_workers=0) >>> for args in arglist: >>> jobs.submit(func, *args) >>> _ = jobs.join(desc='running')