B
    dV                 @   s   d Z ddlZddlZddlmZmZ ddlZddlZddlZddl	Z	dd Z
dd Zdadadd	 Zd
d ZG dd dejjZdd ZG dd deZdddZdddZdS )z. Tools for creating pools of worker processes
    N)TimeoutError	cpu_countc           
   C   sB   y"ddl m}  | j}| }|dkS  tttfk
r<   dS X dS )zM Check if this is the main control process and may handle one time tasks
    r   )MPITN)mpi4pyr   
COMM_WORLDGet_rankImportError
ValueErrorRuntimeError)r   commrank r   W/work/yifan.wang/ringdown/master-ringdown-env/lib/python3.7/site-packages/pycbc/pool.pyis_main_process   s    r   c             G   s$   t  t jt j | d k	r | | S d S )N)signalSIGINTSIG_IGN)initargsr   r   r   _noint   s    r   c          	   C   sD   | \}}}t  t jd7  _W dQ R X xtj|kr*||S q*W dS )z7 Wrapper to ensure that all processes execute together    N)_process_lock_numdonevalue)valuesZnumrequiredfcnr   r   r   r   _lockstep_fcn   s    

r   c             C   s   |    |   d S )N)	terminatejoin)pr   r   r   _shutdown_pool+   s    r    c                   sD   e Zd ZdZd fdd	Zdd Zdd	 Zd
d ZdddZ  Z	S )BroadcastPoolz2 Multiprocessing pool with a broadcast method
    Nr   c                sH   t  at ddatt|}tt	| j
|||f| tt|  d S )Nir   )multiprocessingLockr   Valuer   	functoolspartialr   superr!   __init__atexitregisterr    )self	processesZinitializerZinitargskwdsZnoint)	__class__r   r   r)   2   s
    zBroadcastPool.__init__c             C   s
   t | jS )N)len_pool)r,   r   r   r   __len__;   s    zBroadcastPool.__len__c             C   s*   |  tt| ||fgt|  }dt_|S )z Do a function call on every worker.

        Parameters
        ----------
        fcn: funtion
            Function to call.
        args: tuple
            The arguments for Pool.map
        r   )mapr   r0   r   r   )r,   r   r   resultsr   r   r   	broadcast>   s    
 zBroadcastPool.broadcastc                s&    t fdd|D }dt_|S )z Do a function call on every worker with different arguments

        Parameters
        ----------
        fcn: funtion
            Function to call.
        args: tuple
            The arguments for Pool.map
        c                s   g | ]}t  |fqS r   )r0   ).0arg)r   r,   r   r   
<listcomp>W   s    z(BroadcastPool.allmap.<locals>.<listcomp>r   )r3   r   r   r   )r,   r   r   r4   r   )r   r,   r   allmapL   s    
zBroadcastPool.allmapc             C   s^   |  |||}xJy
|dS  tk
r.   Y q tk
rT   |   |   tY qX qW dS )a=   Catch keyboard interuppts to allow the pool to exit cleanly.

        Parameters
        ----------
        func: function
            Function to call
        items: list of tuples
            Arguments to pass
        chunksize: int, Optional
            Number of calls for each process to handle at once
        i  N)Z	map_asyncgetr   KeyboardInterruptr   r   )r,   funcitems	chunksizer4   r   r   r   r3   [   s    
zBroadcastPool.map)NNr   )N)
__name__
__module____qualname____doc__r)   r2   r5   r9   r3   __classcell__r   r   )r/   r   r!   /   s   	r!   c             C   s   |  ||g| j  d S )N)r3   size)r,   fr   r   r   r   _dummy_broadcastr   s    rF   c               @   s   e Zd Zdd Zdd ZdS )
SinglePoolc             C   s   |  ||gS )N)r3   )r,   r   r   r   r   r   r5   v   s    zSinglePool.broadcastc                s    fdd|D S )Nc                s   g | ]} |qS r   r   )r6   a)rE   r   r   r8   z   s    z"SinglePool.map.<locals>.<listcomp>r   )r,   rE   r=   r   )rE   r   r3   y   s    zSinglePool.mapN)r?   r@   rA   r5   r3   r   r   r   r   rG   u   s   rG   FTc          
   C   s   d}yDddl m} |j}| }| }|dkrFd}|rFtd|| W n6 tk
r~ } z| rnt| t	dW dd}~X Y nX |sd }}|||fS )	zD Get whether MPI is enabled and if so the current size and rank
    Fr   )r   r   Tz)Running under mpi with size: %s, rank: %sz.Failed to load mpi, ensure mpi4py is installedN)
r   r   r   ZGet_sizer   logginginfor   printr	   )require_mpiloguse_mpir   r   rD   r   er   r   r   rN   |   s$    rN   c             C   s   t |d\}}}|ryXddl}|j||d d}tt||_t|j	 | rXt
d |rj|sjt
d W q tk
r   tdY qX n&| dkrt }n| d	krt } t| }| |_|r||_|S )
z Get processing pool
    )rL   r   Nr   )mpir-   zYNOTE: that for MPI process size determined by MPI launch size, not the processes argumentz5NOTE: using MPI as this process was launchedunder MPIz;Failed to start up an MPI pool, install mpi4py / schwimmbad)rN   
schwimmbadchoose_pooltypes
MethodTyperF   r5   r*   r+   closerI   rJ   r   r	   rG   r   r!   rD   )r-   rP   Zdo_mpirD   r   rR   poolr   r   r   rS      s.    
rS   )FT)F)rB   Zmultiprocessing.poolr#   r&   r   r   rT   r   r*   rI   r   r   r   r   r   r    rW   ZPoolr!   rF   objectrG   rN   rS   r   r   r   r   <module>   s$   C
