B
    d1                 @   s   d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 G dd	 d	eZd
d Zdd Zdd Zdd Zdd ZG dd dZG dd deZG dd deZdS )zImplements MPIPoolExecutor.    N   )Future)Executor)as_completed)_libc               @   sz   e Zd ZdZeZdddZeejZ	dd Z
edd	 ZdddZdd ZddddddZdddZdddddZdS )MPIPoolExecutorz MPI-based asynchronous executor.N c             K   sv   |dk	r(t |}|dkr td||d< |dk	rPt|s@td||d< ||d< || _d| _d| _t | _	d| _
dS )	a  Initialize a new MPIPoolExecutor instance.

        Args:
            max_workers: The maximum number of MPI processes that can be used
                to execute the given calls. If ``None`` or not given then the
                number of worker processes will be determined from the MPI
                universe size attribute if defined, otherwise a single worker
                process will be spawned.
            initializer: An callable used to initialize workers processes.
            initargs: A tuple of arguments to pass to the initializer.

        Keyword Args:
            python_exe: Path to Python executable used to spawn workers.
            python_args: Command line arguments to pass to Python executable.
            mpi_info: Dict or iterable with ``(key, value)`` pairs.
            globals: Dict or iterable with global variables to set in workers.
            main: If ``False``, do not import ``__main__`` in workers.
            path: List of paths to append to ``sys.path`` in workers.
            wdir: Path to set current working directory in workers.
            env: Environment variables to update ``os.environ`` in workers.

        Nr   z"max_workers must be greater than 0max_workerszinitializer must be a callableinitializerinitargsF)int
ValueErrorcallable	TypeError_options	_shutdown_broken	threadingLock_lock_pool)selfr	   r
   r   kwargsr   r   `/work/yifan.wang/ringdown/master-ringdown-env/lib/python3.7/site-packages/mpi4py/futures/pool.py__init__   s    
zMPIPoolExecutor.__init__c             C   s   | j d kr| | | _ d S )N)r   
_make_pool)r   r   r   r   
_bootstrapA   s    
zMPIPoolExecutor._bootstrapc          	   C   s@   | j 0 | jrd S | jrd S |   | j  | jjS Q R X d S )N)r   r   r   r   r   waitsize)r   r   r   r   _max_workersE   s    
zMPIPoolExecutor._max_workersTc          	   C   s:   | j * | jrtd|   |r,| j  | S Q R X dS )zAllocate executor resources eagerly.

        Args:
            wait: If ``True`` then bootup will not return until the
                executor resources are ready to process submissions.

        zcannot bootup after shutdownN)r   r   RuntimeErrorr   r   r   )r   r   r   r   r   bootupP   s    
zMPIPoolExecutor.bootupc          	   O   s`   | j P | jrt| j| jr(td|   |  }|||f}| j	||f |S Q R X dS )a&  Submit a callable to be executed with the given arguments.

        Schedule the callable to be executed as ``fn(*args, **kwargs)`` and
        return a `Future` instance representing the execution of the callable.

        Returns:
            A `Future` representing the given call.

        zcannot submit after shutdownN)
r   r   r   BrokenExecutorr   r    r   r   r   push)r   fnargsr   futuretaskr   r   r   submit`   s    
zMPIPoolExecutor.submitr   F)timeout	chunksize	unorderedc            G   s   |  |t| |||S )a  Return an iterator equivalent to ``map(fn, *iterables)``.

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            iterables: Iterables yielding positional arguments to be passed to
                the callable.
            timeout: The maximum number of seconds to wait. If ``None``, then
                there is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a worker process.
            unordered: If ``True``, yield results out-of-order, as completed.

        Returns:
            An iterator equivalent to built-in ``map(func, *iterables)``
            but the calls may be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If ``fn(*args)`` raises for any values.

        )starmapzip)r   r$   r)   r*   r+   	iterablesr   r   r   mapv   s    zMPIPoolExecutor.mapc             C   sB   |dk rt d|dkr*t| j||||S t| j|||||S dS )a  Return an iterator equivalent to ``itertools.starmap(...)``.

        Args:
            fn: A callable that will take positional argument from iterable.
            iterable: An iterable yielding ``args`` tuples to be used as
                positional arguments to call ``fn(*args)``.
            timeout: The maximum number of seconds to wait. If ``None``, then
                there is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a worker process.
            unordered: If ``True``, yield results out-of-order, as completed.

        Returns:
            An iterator equivalent to ``itertools.starmap(fn, iterable)``
            but the calls may be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If ``fn(*args)`` raises for any values.

        r   zchunksize must be >= 1.N)r   _starmap_helperr(   _starmap_chunks)r   r$   iterabler)   r*   r+   r   r   r   r,      s    

zMPIPoolExecutor.starmap)cancel_futuresc         	   C   sr   | j R | js(d| _| jdk	r(| j  |r@| jdk	r@| j  d}|rT| j}d| _W dQ R X |dk	rn|  dS )aD  Clean-up the resources associated with the executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Args:
            wait: If ``True`` then shutdown will not return until all running
                futures have finished executing and the resources used by the
                executor have been reclaimed.
            cancel_futures: If ``True`` then shutdown will cancel all pending
                futures. Futures that are completed or running will not be
                cancelled.

        TN)r   r   r   donecanceljoin)r   r   r3   poolr   r   r   shutdown   s    



zMPIPoolExecutor.shutdown)NNr   )T)Nr   F)T)__name__
__module____qualname____doc__r   r   staticmethodr   Z
WorkerPoolr   r   propertyr   r!   r(   r/   r,   r8   r   r   r   r   r      s    
(


!r   c                sZ   d k	r t tdtj   fdd|D r@t fdd}| S )N	monotonicc                s   g | ]} f| qS r   r   ).0r%   )functionr(   r   r   
<listcomp>   s    z#_starmap_helper.<locals>.<listcomp>c              3   s   yrVd krt } nt    } xv| D ]"}| |g}|  V  q.W nJ  d krx8r|  V  qhW n xr    V  qW W n$   xr   qW  Y nX d S )N)r   removepopresultreverser5   )iteratorr&   )end_timefuturesr)   timerr+   r   r   result_iterator   s&    


z(_starmap_helper.<locals>.result_iterator)getattrtimeset)r(   rA   r2   r)   r+   rK   r   )rH   rA   rI   r(   r)   rJ   r+   r   r0      s    
r0   c                s    fdd|D S )Nc                s   g | ]} | qS r   r   )r@   r%   )rA   r   r   rB      s    z!_apply_chunks.<locals>.<listcomp>r   )rA   chunkr   )rA   r   _apply_chunks   s    rP   c             c   s2   t |}x$tt|| }|s"d S |fV  q
W d S )N)itertuple	itertoolsislice)r*   r2   rO   r   r   r   _build_chunks   s    rU   c             c   s.   x(| D ] }|   x|r$| V  qW qW d S )N)rF   rD   )r2   itemr   r   r   _chain_from_iterable_of_lists  s    
rW   c             C   s.   t t|}t||}t| ||||}t|S )N)	functoolspartialrP   rU   r0   rW   )r(   rA   r2   r)   r+   r*   rE   r   r   r   r1   	  s
    
r1   c               @   s*   e Zd ZdZd
ddZdd Zdd	 ZdS )MPICommExecutoraW  Context manager for `MPIPoolExecutor`.

    This context manager splits a MPI (intra)communicator in two
    disjoint sets: a single master process and the remaining worker
    processes. These sets are then connected through an intercommunicator.
    The target of the ``with`` statement is assigned either an
    `MPIPoolExecutor` instance (at the master) or ``None`` (at the workers).

    Example::

        with MPICommExecutor(MPI.COMM_WORLD, root=0) as executor:
            if executor is not None: # master process
                executor.submit(...)
                executor.map(...)
    Nr   c             K   sX   |dkrt  }| r td|dk s4|| kr<td|| _|| _|| _d| _dS )a&  Initialize a new MPICommExecutor instance.

        Args:
            comm: MPI (intra)communicator.
            root: Designated master process.

        Raises:
            ValueError: If the communicator has wrong kind or
               the root value is not in the expected range.

        NzExpecting an intracommunicatorr   z"Expecting root in range(comm.size))	r   Zget_comm_worldZIs_interr   Get_size_comm_rootr   	_executor)r   commrootr   r   r   r   r   &  s    zMPICommExecutor.__init__c             C   s   | j dk	rtd| j}| j}| j}d}tjrR|dks:ttf |}t||_	nX|
 dkrvtf |}t||_	n4| |krtf |}t||||_	nt|| || _ |S )z.Return `MPIPoolExecutor` instance at the root.N	__enter__r   r   )r^   r    r\   r]   r   r   Z
SharedPoolAssertionErrorr   r   r[   
ThreadPoolZGet_rankZ	SplitPoolZserver_main_split)r   r_   r`   optionsexecutorr   r   r   ra   >  s&    



zMPICommExecutor.__enter__c             G   s,   | j }d| _ |dk	r$|jdd dS dS dS )z0Shutdown `MPIPoolExecutor` instance at the root.NT)r   F)r^   r8   )r   r%   re   r   r   r   __exit__Y  s    zMPICommExecutor.__exit__)Nr   )r9   r:   r;   r<   r   ra   rf   r   r   r   r   rZ     s   
rZ   c               @   s   e Zd ZdZeejZdS )ThreadPoolExecutorz3`MPIPoolExecutor` subclass using a pool of threads.N)r9   r:   r;   r<   r=   r   rc   r   r   r   r   r   rg   e  s   rg   c               @   s   e Zd ZdZeejZdS )ProcessPoolExecutorz5`MPIPoolExecutor` subclass using a pool of processes.N)r9   r:   r;   r<   r=   r   Z	SpawnPoolr   r   r   r   r   rh   j  s   rh   )r<   rM   rX   rS   r   Z_corer   r   r    r   r   r0   rP   rU   rW   r1   rZ   rg   rh   r   r   r   r   <module>   s$    B#	
R