B
    2di%                 @   sJ  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ e
eee eej  f dddZ!G dd de"Z#G dd deZ$G dd deZ%dS )zc Defines a KernelClient that provides thread-safe sockets with async callbacks on message
replies.
    N)Event)Thread)Any)	Awaitable)Dict)List)Optional)Union)IOLoop)Instance)Type)ZMQError)	zmqstream   )Session)KernelClient)	HBChannel)msgreturnc                s
   | I d H S )N )r   r   r   d/work/yifan.wang/ringdown/master-ringdown-env/lib/python3.7/site-packages/jupyter_client/threaded.pyget_msg"   s    r   c                   s  e Zd ZdZdZdZdZdZdZe	e
j e	e e	e dd fddZdZeddd	Zddd
dZddddZddddZeeef ddddZeee ef ddddZeeef ddddZddddZde ddddZ!ddddZ"  Z#S ) ThreadedZMQSocketChannelz.A ZMQ socket invoking a callback in the ioloopN)socketsessionloopr   c                sV   t    |_|_|_t   fdd}jdk	s>tj|    dS )a'  Create a channel.

        Parameters
        ----------
        socket : :class:`zmq.Socket`
            The ZMQ socket to use.
        session : :class:`session.Session`
            The session to use.
        loop
            A pyzmq ioloop to connect the socket to using a ZMQStream
        c                  s:   j d k	sttj j_jj    d S )N)	r   AssertionErrorr   Z	ZMQStreamioloopstreamZon_recv_handle_recvsetr   )evtselfr   r   setup_streamG   s    z7ThreadedZMQSocketChannel.__init__.<locals>.setup_streamN)	super__init__r   r   r   r   r   add_callbackwait)r"   r   r   r   r#   )	__class__)r!   r"   r   r%   /   s    
z!ThreadedZMQSocketChannel.__init__F)r   c             C   s   | j S )N)	_is_alive)r"   r   r   r   is_aliveS   s    z!ThreadedZMQSocketChannel.is_alivec             C   s
   d| _ d S )NT)r)   )r"   r   r   r   startV   s    zThreadedZMQSocketChannel.startc             C   s
   d| _ d S )NF)r)   )r"   r   r   r   stopY   s    zThreadedZMQSocketChannel.stopc             C   s<   | j d k	r8y| j jdd W n tk
r0   Y nX d | _ d S )Nr   )Zlinger)r   close	Exception)r"   r   r   r   r-   \   s    
zThreadedZMQSocketChannel.close)r   r   c                s,    fdd}j dk	stj | dS )zQueue a message to be sent from the IOLoop's thread.

        Parameters
        ----------
        msg : message to send

        This is threadsafe, as it uses IOLoop.add_callback to give the loop's
        thread control of the action.
        c                  s"   j d k	stj j  d S )N)r   r   sendr   r   )r   r"   r   r   thread_sendo   s    z2ThreadedZMQSocketChannel.send.<locals>.thread_sendN)r   r   r&   )r"   r   r0   r   )r   r"   r   r/   d   s    zThreadedZMQSocketChannel.sendc             C   s|   t |r0| jdk	st| jj}|t|}n|}| jdk	sBt| j|\}}| j	|}| j
rn| 
| | | dS )z[Callback for stream.on_recv.

        Unpacks message, and calls handlers with it.
        N)asyncioisfuturer   r   _asyncio_event_looprun_until_completer   r   Zfeed_identitiesZdeserialize_inspectcall_handlers)r"   r   r   Zmsg_listidentZsmsgZnew_msgr   r   r   r   v   s    

z%ThreadedZMQSocketChannel._handle_recvc             C   s   dS )ai  This method is called in the ioloop thread when a message arrives.

        Subclasses should override this method to handle incoming messages.
        It is important to remember that this method is called in the thread
        so that some logic must be done to ensure that the application level
        handlers are called in the application thread.
        Nr   )r"   r   r   r   r   r6      s    z&ThreadedZMQSocketChannel.call_handlersc             C   s   dS )zaSubclasses should override this with a method
        processing any pending GUI events.
        Nr   )r"   r   r   r   process_events   s    z'ThreadedZMQSocketChannel.process_events      ?)timeoutr   c             C   sf   t   | }| jdk	stxFtdD ]:}d| _| j| j x | js\t   |k r\t d q>W q$W dS )a  Immediately processes all pending messages on this channel.

        This is only used for the IOPub channel.

        Callers should use this method to ensure that :meth:`call_handlers`
        has been called for all messages that have been received on the
        0MQ SUB socket of this channel.

        This method is thread safe.

        Parameters
        ----------
        timeout : float, optional
            The maximum amount of time to spend flushing, in seconds. The
            default is one second.
        N   Fg{Gz?)timer   r   range_flushedr&   _flushsleep)r"   r:   	stop_time_r   r   r   flush   s    zThreadedZMQSocketChannel.flushc             C   s"   | j dk	st| j   d| _dS )z"Callback for :method:`self.flush`.NT)r   r   rC   r>   )r"   r   r   r   r?      s    
zThreadedZMQSocketChannel._flush)r9   )$__name__
__module____qualname____doc__r   r   r   r   r5   r   zmqZSocketr   r
   r%   r)   boolr*   r+   r,   r-   r   strr   r/   r	   r   bytesr   r   r6   r8   floatrC   r?   __classcell__r   r   )r(   r   r   &   s*   
r   c                   s   e Zd ZdZdZdZ fddZeej	ddddZ
ddd	d
ZddddZddddZdd ZddddZ  ZS )IOLoopThreadz;Run a pyzmq ioloop in a thread to send and receive messagesFNc                s   t    d| _d S )NT)r$   r%   daemon)r"   )r(   r   r   r%      s    
zIOLoopThread.__init__)r   c               C   s   t d k	rdt _d S )NT)rN   _exitingr   r   r   r   _notice_exit   s    zIOLoopThread._notice_exitc             C   s    t  | _t|  | j  dS )z{Start the IOLoop thread

        Don't return until self.ioloop is defined,
        which is created in the thread
        N)r   _start_eventr   r+   r'   )r"   r   r   r   r+      s    
zIOLoopThread.startc          
   C   s   t  }t | t| t | _|| j_| j	  xhy| j
  W nR tk
rz } z|jtjkrhw8n W dd}~X Y q8 tk
r   | jrP n Y q8X P q8W dS )z0Run my loop, ignoring EINTR events in the pollerN)r1   new_event_loopset_event_loopnest_asyncioapplyr
   r   r3   rR   r    r+   r   errnoZEINTRr.   rP   )r"   r   er   r   r   run   s$    


zIOLoopThread.runc             C   s4   | j dk	r| j | j j |   |   d| _ dS )zStop the channel's event loop and join its thread.

        This calls :meth:`~threading.Thread.join` and returns when the thread
        terminates. :class:`RuntimeError` will be raised if
        :meth:`~threading.Thread.start` is called again.
        N)r   r&   r,   joinr-   )r"   r   r   r   r,      s
    
zIOLoopThread.stopc             C   s   |    d S )N)r-   )r"   r   r   r   __del__   s    zIOLoopThread.__del__c             C   s6   | j d k	r2y| j jdd W n tk
r0   Y nX d S )NT)Zall_fds)r   r-   r.   )r"   r   r   r   r-      s
    
zIOLoopThread.close)rD   rE   rF   rG   rP   r   r%   staticmethodatexitregisterrQ   r+   rY   r,   r[   r-   rM   r   r   )r(   r   rN      s   
rN   c                   s   e Zd ZdZedd ZeeddZde	e	e	e	e	dd fdd	Z
eeef dd
ddZdd fddZeeZeeZeeZeeZeeZe	dddZ  ZS )ThreadedKernelClientzYA KernelClient that provides thread-safe sockets with async callbacks on message replies.c             C   s   | j jS )N)ioloop_threadr   )r"   r   r   r   r     s    zThreadedKernelClient.ioloopT)
allow_noneN)shelliopubstdinhbcontrolr   c                s8   t  | _| j  |r | j| j_t ||||| d S )N)rN   r`   r+   _check_kernel_info_replyshell_channelr5   r$   start_channels)r"   rb   rc   rd   re   rf   )r(   r   r   ri     s
    

z#ThreadedKernelClient.start_channels)r   r   c             C   s"   |d dkr|  | d| j_dS )zGThis is run in the ioloop thread when the kernel info reply is receivedZmsg_typeZkernel_info_replyN)Z_handle_kernel_info_replyrh   r5   )r"   r   r   r   r   rg   !  s    
z-ThreadedKernelClient._check_kernel_info_reply)r   c                s"   t    | j r| j  d S )N)r$   stop_channelsr`   r*   r,   )r"   )r(   r   r   rj   '  s    

z"ThreadedKernelClient.stop_channelsc             C   s   | j dk	r| j  S dS )z$Is the kernel process still running?NT)Z_hb_channelZ
is_beating)r"   r   r   r   r*   2  s    

zThreadedKernelClient.is_alive)TTTTT)rD   rE   rF   rG   propertyr   r   rN   r`   rI   ri   r   rJ   r   rg   rj   r   r   Ziopub_channel_classZshell_channel_classZstdin_channel_classr   Zhb_channel_classZcontrol_channel_classr*   rM   r   r   )r(   r   r_     s"       	r_   )&rG   r1   r]   rW   r<   	threadingr   r   typingr   r   r   r   r   r	   rU   rH   Ztornado.ioloopr
   Z	traitletsr   r   r   Zzmq.eventloopr   r   r   Zjupyter_clientr   Zjupyter_client.channelsr   rK   Messager   objectr   rN   r_   r   r   r   r   <module>   s6   " M