Source code for qutip.solver.parallel

"""
This module provides functions for parallel execution of loops and function
mappings, using the builtin Python module multiprocessing or the loky parallel
execution library.
"""
__all__ = ['parallel_map', 'serial_map', 'loky_pmap', 'mpi_pmap']

import multiprocessing
import os
import sys
import time
import threading
import concurrent.futures
import warnings
from qutip.ui.progressbar import progress_bars
from qutip.settings import available_cpu_count

if sys.platform == 'darwin':
    mp_context = multiprocessing.get_context('fork')
elif sys.platform == 'linux':
    # forkserver would handle threads better, but is much slower at starting
    # the executor and spawning tasks
    mp_context = multiprocessing.get_context('fork')
else:
    mp_context = multiprocessing.get_context()


default_map_kw = {
    'timeout': threading.TIMEOUT_MAX,
    'num_cpus': available_cpu_count(),
    'fail_fast': True,
}


def _read_map_kw(options):
    options = options or {}
    map_kw = default_map_kw.copy()
    map_kw.update({k: v for k, v in options.items() if v is not None})
    return map_kw


class MapExceptions(Exception):
    def __init__(self, msg, errors, results):
        super().__init__(msg, errors, results)
        self.errors = errors
        self.results = results


[docs]def serial_map(task, values, task_args=None, task_kwargs=None, reduce_func=None, map_kw=None, progress_bar=None, progress_bar_kwargs={}): """ Serial mapping function with the same call signature as parallel_map, for easy switching between serial and parallel execution. This is functionally equivalent to:: result = [task(value, *task_args, **task_kwargs) for value in values] This function work as a drop-in replacement of :func:`parallel_map`. Parameters ---------- task : a Python function The function that is to be called for each value in ``task_vec``. values : array / list The list or array of values for which the ``task`` function is to be evaluated. task_args : list, optional The optional additional argument to the ``task`` function. task_kwargs : dictionary, optional The optional additional keyword argument to the ``task`` function. reduce_func : func, optional If provided, it will be called with the output of each tasks instead of storing a them in a list. It should return None or a number. When returning a number, it represent the estimation of the number of task left. On a return <= 0, the map will end early. progress_bar : str, optional Progress bar options's string for showing progress. progress_bar_kwargs : dict, optional Options for the progress bar. map_kw: dict, optional Dictionary containing: - timeout: float, Maximum time (sec) for the whole map. - fail_fast: bool, Raise an error at the first. Returns ------- result : list The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for each value in ``values``. If a ``reduce_func`` is provided, and empty list will be returned. """ if task_args is None: task_args = () if task_kwargs is None: task_kwargs = {} map_kw = _read_map_kw(map_kw) remaining_ntraj = None progress_bar = progress_bars[progress_bar]( len(values), **progress_bar_kwargs ) end_time = map_kw['timeout'] + time.time() results = None if reduce_func is None: results = [None] * len(values) errors = {} for n, value in enumerate(values): if time.time() > end_time: break progress_bar.update() try: result = task(value, *task_args, **task_kwargs) except Exception as err: if map_kw["fail_fast"]: raise err else: errors[n] = err else: if reduce_func is not None: remaining_ntraj = reduce_func(result) else: results[n] = result if remaining_ntraj is not None and remaining_ntraj <= 0: end_time = 0 progress_bar.finished() if errors: raise MapExceptions(f"{len(errors)} iterations failed in serial_map", errors, results) return results
def _generic_pmap(task, values, task_args, task_kwargs, reduce_func, timeout, fail_fast, num_workers, progress_bar, progress_bar_kwargs, setup_executor, extract_result, shutdown_executor): """ Common functionality for parallel_map, loky_pmap and mpi_pmap. The parameters `setup_executor`, `extract_result` and `shutdown_executor` are callback functions with the following signatures: setup_executor: () -> ProcessPoolExecutor extract_result: Future -> (Any, BaseException) If there was an exception e, returns (None, e). Otherwise returns (result, None). shutdown_executor: (executor: ProcessPoolExecutor, active_tasks: set[Future]) -> None executor: The ProcessPoolExecutor that was created in setup_executor active_tasks: A set of Futures that are currently still being executed (non-empty if: timeout, error, or reduce_func requesting exit) """ if task_args is None: task_args = () if task_kwargs is None: task_kwargs = {} end_time = timeout + time.time() progress_bar = progress_bars[progress_bar]( len(values), **progress_bar_kwargs ) errors = {} finished = [] if reduce_func is not None: results = None def result_func(_, value): return reduce_func(value) else: results = [None] * len(values) result_func = results.__setitem__ def _done_callback(future): if not future.cancelled(): result, exception = extract_result(future) if isinstance(exception, KeyboardInterrupt): # When a keyboard interrupt happens, it is raised in the main # thread and in all worker threads. At this point in the code, # the worker threads have already returned and the main thread # is only waiting for the ProcessPoolExecutor to shutdown # before exiting. We therefore return immediately. return if exception is not None: if isinstance(exception, Exception): errors[future._i] = exception else: raise exception else: remaining_ntraj = result_func(future._i, result) if remaining_ntraj is not None and remaining_ntraj <= 0: finished.append(True) progress_bar.update() os.environ['QUTIP_IN_PARALLEL'] = 'TRUE' try: with setup_executor() as executor: waiting = set() i = 0 aborted = False while i < len(values): # feed values to the executor, ensuring that there is at # most one task per worker at any moment in time so that # we can shutdown without waiting for greater than the time # taken by the longest task if len(waiting) >= num_workers: # no space left, wait for a task to complete or # the time to run out timeout = max(0, end_time - time.time()) _done, waiting = concurrent.futures.wait( waiting, timeout=timeout, return_when=concurrent.futures.FIRST_COMPLETED, ) if ( time.time() >= end_time or (errors and fail_fast) or finished ): # no time left, exit the loop aborted = True break while len(waiting) < num_workers and i < len(values): # space and time available, add tasks value = values[i] future = executor.submit( task, *((value,) + task_args), **task_kwargs, ) # small hack to avoid add_done_callback not supporting # extra arguments and closures inside loops retaining # a reference not a value: future._i = i future.add_done_callback(_done_callback) waiting.add(future) i += 1 if not aborted: # all tasks have been submitted, timeout has not been reaches # -> wait for all workers to finish before shutting down timeout = max(0, end_time - time.time()) _done, waiting = concurrent.futures.wait( waiting, timeout=timeout, return_when=concurrent.futures.ALL_COMPLETED ) shutdown_executor(executor, waiting) finally: os.environ['QUTIP_IN_PARALLEL'] = 'FALSE' progress_bar.finished() if errors and fail_fast: raise list(errors.values())[0] elif errors: raise MapExceptions( f"{len(errors)} iterations failed in parallel_map", errors, results ) return results
[docs]def parallel_map(task, values, task_args=None, task_kwargs=None, reduce_func=None, map_kw=None, progress_bar=None, progress_bar_kwargs={}): """ Parallel execution of a mapping of ``values`` to the function ``task``. This is functionally equivalent to:: result = [task(value, *task_args, **task_kwargs) for value in values] Parameters ---------- task : a Python function The function that is to be called for each value in ``task_vec``. values : array / list The list or array of values for which the ``task`` function is to be evaluated. task_args : list, optional The optional additional arguments to the ``task`` function. task_kwargs : dictionary, optional The optional additional keyword arguments to the ``task`` function. reduce_func : func, optional If provided, it will be called with the output of each task instead of storing them in a list. Note that the order in which results are passed to ``reduce_func`` is not defined. It should return None or a number. When returning a number, it represents the estimation of the number of tasks left. On a return <= 0, the map will end early. progress_bar : str, optional Progress bar options's string for showing progress. progress_bar_kwargs : dict, optional Options for the progress bar. map_kw: dict, optional Dictionary containing entry for: - timeout: float, Maximum time (sec) for the whole map. - num_cpus: int, Number of jobs to run at once. - fail_fast: bool, Abort at the first error. Returns ------- result : list The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for each value in ``values``. If a ``reduce_func`` is provided, and empty list will be returned. """ map_kw = _read_map_kw(map_kw) if sys.version_info >= (3, 7): # ProcessPoolExecutor only supports mp_context from 3.7 onwards ctx_kw = {"mp_context": mp_context} else: ctx_kw = {} def setup_executor(): return concurrent.futures.ProcessPoolExecutor( max_workers=map_kw['num_cpus'], **ctx_kw, ) def extract_result(future: concurrent.futures.Future): exception = future.exception() if exception is not None: return None, exception return future.result(), None def shutdown_executor(executor, _): # Since `ProcessPoolExecutor` leaves no other option, # we wait for all worker processes to finish their current task executor.shutdown() return _generic_pmap( task, values, task_args, task_kwargs, reduce_func, map_kw['timeout'], map_kw['fail_fast'], map_kw['num_cpus'], progress_bar, progress_bar_kwargs, setup_executor, extract_result, shutdown_executor )
[docs]def loky_pmap(task, values, task_args=None, task_kwargs=None, reduce_func=None, map_kw=None, progress_bar=None, progress_bar_kwargs={}): """ Parallel execution of a mapping of ``values`` to the function ``task``. This is functionally equivalent to:: result = [task(value, *task_args, **task_kwargs) for value in values] Use the loky module instead of multiprocessing. Parameters ---------- task : a Python function The function that is to be called for each value in ``task_vec``. values : array / list The list or array of values for which the ``task`` function is to be evaluated. task_args : list, optional The optional additional arguments to the ``task`` function. task_kwargs : dictionary, optional The optional additional keyword arguments to the ``task`` function. reduce_func : func, optional If provided, it will be called with the output of each task instead of storing them in a list. Note that the order in which results are passed to ``reduce_func`` is not defined. It should return None or a number. When returning a number, it represents the estimation of the number of tasks left. On a return <= 0, the map will end early. progress_bar : str, optional Progress bar options's string for showing progress. progress_bar_kwargs : dict, optional Options for the progress bar. map_kw: dict, optional Dictionary containing entry for: - timeout: float, Maximum time (sec) for the whole map. - num_cpus: int, Number of jobs to run at once. - fail_fast: bool, Abort at the first error. Returns ------- result : list The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for each value in ``values``. If a ``reduce_func`` is provided, and empty list will be returned. """ from loky import get_reusable_executor from loky.process_executor import ShutdownExecutorError map_kw = _read_map_kw(map_kw) def setup_executor(): return get_reusable_executor(max_workers=map_kw['num_cpus']) def extract_result(future: concurrent.futures.Future): exception = future.exception() if isinstance(exception, ShutdownExecutorError): # Task was aborted due to timeout etc return None, None if exception is not None: return None, exception return future.result(), None def shutdown_executor(executor, active_tasks): # If there are still tasks running, we kill all workers in order to # return immediately. Otherwise, `kill_workers` is set to False so # that the worker threads can be reused in subsequent loky_pmap calls. kill_workers = len(active_tasks) > 0 executor.shutdown(kill_workers=kill_workers) return _generic_pmap( task, values, task_args, task_kwargs, reduce_func, map_kw['timeout'], map_kw['fail_fast'], map_kw['num_cpus'], progress_bar, progress_bar_kwargs, setup_executor, extract_result, shutdown_executor )
[docs]def mpi_pmap(task, values, task_args=None, task_kwargs=None, reduce_func=None, map_kw=None, progress_bar=None, progress_bar_kwargs={}): """ Parallel execution of a mapping of ``values`` to the function ``task``. This is functionally equivalent to:: result = [task(value, *task_args, **task_kwargs) for value in values] Uses the mpi4py module to execute the tasks asynchronously with MPI processes. For more information, consult the documentation of mpi4py and the mpi4py.MPIPoolExecutor class. Note: in keeping consistent with the API of `parallel_map`, the parameter determining the number of requested worker processes is called `num_cpus`. The value of `map_kw['num_cpus']` is passed to the MPIPoolExecutor as its `max_workers` argument. If this parameter is not provided, the environment variable `QUTIP_NUM_PROCESSES` is used instead. If this environment variable is not set either, QuTiP will use default values that might be unsuitable for MPI applications. Parameters ---------- task : a Python function The function that is to be called for each value in ``task_vec``. values : array / list The list or array of values for which the ``task`` function is to be evaluated. task_args : list, optional The optional additional arguments to the ``task`` function. task_kwargs : dictionary, optional The optional additional keyword arguments to the ``task`` function. reduce_func : func, optional If provided, it will be called with the output of each task instead of storing them in a list. Note that the order in which results are passed to ``reduce_func`` is not defined. It should return None or a number. When returning a number, it represents the estimation of the number of tasks left. On a return <= 0, the map will end early. progress_bar : str, optional Progress bar options's string for showing progress. progress_bar_kwargs : dict, optional Options for the progress bar. map_kw: dict, optional Dictionary containing entry for: - timeout: float, Maximum time (sec) for the whole map. - num_cpus: int, Number of jobs to run at once. - fail_fast: bool, Abort at the first error. All remaining entries of map_kw will be passed to the mpi4py.MPIPoolExecutor constructor. Returns ------- result : list The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for each value in ``values``. If a ``reduce_func`` is provided, and empty list will be returned. """ from mpi4py.futures import MPIPoolExecutor # If the provided num_cpus is None, we use the default value instead. # We thus intentionally make it impossible to call # MPIPoolExecutor(max_workers=None, ...) # in which case mpi4py would determine a default value. That would be # useful, but unfortunately mpi4py provides no public API to access the # actual number of workers that is used in that case, which we would need. worker_number_provided = ( ((map_kw is not None) and ('num_cpus' in map_kw)) or 'QUTIP_NUM_PROCESSES' in os.environ) map_kw = _read_map_kw(map_kw) timeout = map_kw.pop('timeout') num_workers = map_kw.pop('num_cpus') fail_fast = map_kw.pop('fail_fast') if not worker_number_provided: warnings.warn(f'mpi_pmap was called without specifying the number of ' f'worker processes, using the default {num_workers}') def setup_executor(): return MPIPoolExecutor(max_workers=num_workers, **map_kw) def extract_result(future): exception = future.exception() if exception is not None: return None, exception return future.result(), None def shutdown_executor(executor, _): executor.shutdown() return _generic_pmap( task, values, task_args, task_kwargs, reduce_func, timeout, fail_fast, num_workers, progress_bar, progress_bar_kwargs, setup_executor, extract_result, shutdown_executor )
_maps = { "parallel_map": parallel_map, "parallel": parallel_map, "serial_map": serial_map, "serial": serial_map, "loky": loky_pmap, "mpi": mpi_pmap } def _get_map(options): map_func = _maps[options['map']] if map_func == mpi_pmap: map_kw = options['mpi_options'] else: map_kw = {} return map_func, map_kw