B
    dj!                 @   s   d dl mZmZmZmZ dgZyd dlmZ W n ek
rF   dZY nX G dd de	Z
G dd de	Zd	d
 ZG dd de	ZdS )    )divisionprint_functionabsolute_importunicode_literalsMPIPool)MPINc               @   s   e Zd Zdd ZdS )_close_pool_messagec             C   s   dS )Nz<Close pool message> )selfr	   r	   [/work/yifan.wang/ringdown/master-ringdown-env/lib/python3.7/site-packages/emcee/mpi_pool.py__repr__   s    z_close_pool_message.__repr__N)__name__
__module____qualname__r   r	   r	   r	   r   r      s   r   c               @   s   e Zd Zdd ZdS )_function_wrapperc             C   s
   || _ d S )N)function)r
   r   r	   r	   r   __init__   s    z_function_wrapper.__init__N)r   r   r   r   r	   r	   r	   r   r      s   r   c             C   s   t dd S )Nz=Pool was sent tasks before being told what function to apply.)RuntimeError)taskr	   r	   r   _error_function   s    r   c               @   sR   e Zd ZdZdddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd ZdS )r   av  
    A pool that distributes tasks over a set of MPI processes. MPI is an
    API for distributed memory parallelism.  This pool will let you run
    emcee without shared memory, letting you use much larger machines
    with emcee.

    The pool only support the :func:`map` method at the moment because
    this is the only functionality that emcee needs. That being said,
    this pool is fairly general and it could be used for other purposes.

    Contributed by `Joe Zuntz <https://github.com/joezuntz>`_.

    :param comm: (optional)
        The ``mpi4py`` communicator.

    :param debug: (optional)
        If ``True``, print out a lot of status updates at each step.

    :param loadbalance: (optional)
        if ``True`` and ntask > Ncpus, tries to loadbalance by sending
        out one task to each cpu first and then sending out the rest
        as the cpus get done.
    NFc             C   sh   t d krtd|d krt jn|| _| j | _| j d | _|| _t	| _
|| _| jdkrdtdd S )NzPlease install mpi4py   r   z]Tried to create an MPI pool, but there was only one MPI process available. Need at least two.)r   ImportErrorZ
COMM_WORLDcommZGet_rankrankZGet_sizesizedebugr   r   loadbalance
ValueError)r
   r   r   r   r	   r	   r   r   7   s    
zMPIPool.__init__c             C   s
   | j dkS )z5
        Is the current process the master?

        r   )r   )r
   r	   r	   r   	is_masterF   s    zMPIPool.is_masterc             C   s   |   rtdt }x| jr0td| j | jj	dtj
|d}| jr`td| j||j t|tr| jrtd| j P t|tr|j| _| jrtd| j| j q| |}| jrtd| j||j | jj|d|jd	 qW d
S )zK
        If this isn't the master process, wait for instructions.

        zMaster node told to await jobs.zWorker {0} waiting for task.r   )sourcetagstatusz%Worker {0} got task {1} with tag {2}.zWorker {0} told to quit.z+Worker {0} replaced its task function: {1}.z+Worker {0} sending answer {1} with tag {2}.)destr    N)r   r   r   Statusr   printformatr   r   recvANY_TAGr    
isinstancer   r   r   isend)r
   r!   r   resultr	   r	   r   waitM   s2    


zMPIPool.waitc             C   sv  t |}|  s|   dS || jk	r| jr:td| || _t|}g }x0t| j	D ]"}| j
j||d d}|| qXW tj| | jr|| j	kr`g }xVt|D ]J\}}|| j	 d }	| jrtd||	| | j
j||	|d}|| qW tj| g }
xRt|D ]F}|| j	 d }	| jr<td|	| | j
j|	|d}|
| qW |
S xTt|d	| j	 D ]>\}}|d }	| jrtd||	| | j
j||	|d qtW | j	}dg| }
xt|D ]}t }| j
jtjtj|d
}|j}	|j}||
|< | jrtd|	| ||k r|| }|}| jrNtd||	| | j
j||	|d |d7 }qW |
S dS )a  
        Like the built-in :func:`map` function, apply a function to all
        of the values in a list and return the list of results.

        :param function:
            The function to apply to the list.

        :param tasks:
            The list of elements.

        Nz(Master replacing pool function with {0}.r   )r"   z)Sent task {0} to worker {1} with tag {2}.)r"   r    z*Master waiting for worker {0} with tag {1})r   r    r   )r   r    r!   z,Master received from worker {0} with tag {1})lenr   r+   r   r   r$   r%   r   ranger   r   r)   appendr   RequestZwaitallr   	enumerater&   r#   Z
ANY_SOURCEr'   r   r    )r
   r   tasksZntaskFrequestsirr   Zworkerresultsr*   Zntasks_dispatchedZitaskr!   r	   r	   r   map{   sz    





zMPIPool.mapc             O   s   | j j||S )zJ
        Equivalent to mpi4py :func:`bcast` collective operation.
        )r   bcast)r
   argskwargsr	   r	   r   r8      s    zMPIPool.bcastc             C   s6   |   r2x(t| jD ]}| jjt |d d qW dS )z
        Just send a message off to all the pool members which contains
        the special :class:`_close_pool_message` sentinel.

        r   )r"   N)r   r-   r   r   r)   r   )r
   r4   r	   r	   r   close   s    zMPIPool.closec             C   s   | S )Nr	   )r
   r	   r	   r   	__enter__   s    zMPIPool.__enter__c             G   s   |    d S )N)r;   )r
   r9   r	   r	   r   __exit__   s    zMPIPool.__exit__)NFF)r   r   r   __doc__r   r   r+   r7   r8   r;   r<   r=   r	   r	   r	   r   r      s   
.i
)
__future__r   r   r   r   __all__Zmpi4pyr   r   objectr   r   r   r   r	   r	   r	   r   <module>   s   
