kwcoco.util.util_futures module

Deprecated and functionality moved to ubelt

class kwcoco.util.util_futures.Executor(mode='thread', max_workers=0)[source]

Bases: object

Wrapper around a specific executor.

Abstracts Serial, Thread, and Process Executor via arguments.

Parameters
  • mode (str, default=’thread’) – either thread, serial, or process

  • max_workers (int, default=0) – number of workers. If 0, serial is forced.

Example

>>> import platform
>>> import sys
>>> # The process backend breaks pyp3 when using coverage
>>> if 'pypy' in platform.python_implementation().lower():
...     import pytest
...     pytest.skip('not testing process on pypy')
>>> if sys.platform.startswith('win32'):
...     import pytest
...     pytest.skip('not running this test on win32 for now')
>>> import ubelt as ub
>>> # Fork before threading!
>>> # https://pybay.com/site_media/slides/raymond2017-keynote/combo.html
>>> self1 = ub.Executor(mode='serial', max_workers=0)
>>> self1.__enter__()
>>> self2 = ub.Executor(mode='process', max_workers=2)
>>> self2.__enter__()
>>> self3 = ub.Executor(mode='thread', max_workers=2)
>>> self3.__enter__()
>>> jobs = []
>>> jobs.append(self1.submit(sum, [1, 2, 3]))
>>> jobs.append(self1.submit(sum, [1, 2, 3]))
>>> jobs.append(self2.submit(sum, [10, 20, 30]))
>>> jobs.append(self2.submit(sum, [10, 20, 30]))
>>> jobs.append(self3.submit(sum, [4, 5, 5]))
>>> jobs.append(self3.submit(sum, [4, 5, 5]))
>>> for job in jobs:
>>>     result = job.result()
>>>     print('result = {!r}'.format(result))
>>> self1.__exit__(None, None, None)
>>> self2.__exit__(None, None, None)
>>> self3.__exit__(None, None, None)

Example

>>> import ubelt as ub
>>> self1 = ub.Executor(mode='serial', max_workers=0)
>>> with self1:
>>>     jobs = []
>>>     for i in range(10):
>>>         jobs.append(self1.submit(sum, [i + 1, i]))
>>>     for job in jobs:
>>>         job.add_done_callback(lambda x: print('done callback got x = {}'.format(x)))
>>>         result = job.result()
>>>         print('result = {!r}'.format(result))
submit(func, *args, **kw)[source]

Calls the submit function of the underlying backend.

Returns

a future representing the job

Return type

concurrent.futures.Future

shutdown()[source]

Calls the shutdown function of the underlying backend.

map(fn, *iterables, **kwargs)[source]

Calls the map function of the underlying backend.

CommandLine

xdoctest -m ubelt.util_futures Executor.map

Example

>>> import ubelt as ub
>>> import concurrent.futures
>>> import string
>>> with ub.Executor(mode='serial') as executor:
...     result_iter = executor.map(int, string.digits)
...     results = list(result_iter)
>>> print('results = {!r}'.format(results))
results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> with ub.Executor(mode='thread', max_workers=2) as executor:
...     result_iter = executor.map(int, string.digits)
...     results = list(result_iter)
>>> # xdoctest: +IGNORE_WANT
>>> print('results = {!r}'.format(results))
results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
class kwcoco.util.util_futures.JobPool(mode='thread', max_workers=0)[source]

Bases: object

Abstracts away boilerplate of submitting and collecting jobs

This is a basic wrapper around ubelt.util_futures.Executor that simplifies the most basic case.

Example

>>> import ubelt as ub
>>> def worker(data):
>>>     return data + 1
>>> pool = ub.JobPool('thread', max_workers=16)
>>> for data in ub.ProgIter(range(10), desc='submit jobs'):
>>>     pool.submit(worker, data)
>>> final = []
>>> for job in pool.as_completed(desc='collect jobs'):
>>>     info = job.result()
>>>     final.append(info)
>>> print('final = {!r}'.format(final))
submit(func, *args, **kwargs)[source]

Submit a job managed by the pool

Parameters
  • func (Callable[…, Any]) – A callable that will take as many arguments as there are passed iterables.

  • *args – positional arguments to pass to the function

  • *kwargs – keyword arguments to pass to the function

Returns

a future representing the job

Return type

concurrent.futures.Future

shutdown()[source]
as_completed(timeout=None, desc=None, progkw=None)[source]

Generates completed jobs in an arbitrary order

Parameters
  • timeout (float | None) – Specify the the maximum number of seconds to wait for a job.

  • desc (str | None) – if specified, reports progress with a ubelt.progiter.ProgIter object.

  • progkw (dict | None) – extra keyword arguments to ubelt.progiter.ProgIter.

Yields

concurrent.futures.Future – The completed future object containing the results of a job.

CommandLine

xdoctest -m ubelt.util_futures JobPool.as_completed

Example

>>> import ubelt as ub
>>> pool = ub.JobPool('thread', max_workers=8)
>>> text = ub.paragraph(
...     '''
...     UDP is a cool protocol, check out the wiki:
...
...     UDP-based Data Transfer Protocol (UDT), is a high-performance
...     data transfer protocol designed for transferring large
...     volumetric datasets over high-speed wide area networks. Such
...     settings are typically disadvantageous for the more common TCP
...     protocol.
...     ''')
>>> for word in text.split(' '):
...     pool.submit(print, word)
>>> for _ in pool.as_completed():
...     pass
>>> pool.shutdown()
join(**kwargs)[source]

Like JobPool.as_completed(), but executes the result method of each future and returns only after all processes are complete. This allows for lower-boilerplate prototyping.

Parameters

**kwargs – passed to JobPool.as_completed()

Returns

list of results

Return type

List[Any]

Example

>>> import ubelt as ub
>>> # We just want to try replacing our simple iterative algorithm
>>> # with the embarassingly parallel version
>>> arglist = list(zip(range(1000), range(1000)))
>>> func = ub.identity
>>> #
>>> # Original version
>>> for args in arglist:
>>>     func(*args)
>>> #
>>> # Potentially parallel version
>>> jobs = ub.JobPool(max_workers=0)
>>> for args in arglist:
>>>     jobs.submit(func, *args)
>>> _ = jobs.join(desc='running')