o
    Df.                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZ d
ZeeZejejejZZZG dd dZG dd deZdS )zWorker Pidbox (remote control).    N)ignore_errors)safe_str)AttributeDict)pass1)
get_logger   )control)PidboxgPidboxc                   @   sT   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd Zdd ZdS )r	   zWorker mailbox.Nc              	   C   s^   || _ |j| _|jjjjt|jtjjt	|j|j||j
jrtntdd| _| j jjj| _d S )N)apphostnameconsumertset)handlersstate)cr   r   r   mailboxNoder   Paneldatar   
controlleruse_eventloopr   setnodeclockforward_forward_clockselfr    r   M/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/pidbox.py__init__   s   
	zPidbox.__init__c              
   C   s   |    z
| j|| W d S  ty& } ztd| W Y d }~d S d }~w tyC } ztd|dd |   W Y d }~d S d }~ww )NzNo such control command: %szControl command error: %rT)exc_info)r   r   handle_messageKeyErrorerror	Exceptionreset)r   bodymessageexcr   r   r    
on_message'   s   zPidbox.on_messagec                 C   s.   |j  | j_| jj| jd| _|j| j_d S N)callback)
connectionchannelr   listenr+   r   on_decode_errorr   r   r   r    start3   s   zPidbox.startc                 C   s   d S Nr   r   r   r   r    on_stop8   s   zPidbox.on_stopc                 C   s   |    | || _d S r3   )r5   _close_channelr   r   r   r   r    stop;   s   zPidbox.stopc                 C   s   |  | j | | j d S r3   )r7   r   r2   r4   r   r   r    r'   ?   s   zPidbox.resetc                 C   s*   | j r| j jrt|| j jj d S d S d S r3   )r   r/   r   closer   r   r   r    r6   C   s   zPidbox._close_channelc                 C   s4   |    | jrtd t|| jj | | j d S )NzCanceling broadcast consumer...)r5   r   debugr   cancelr7   r   r   r   r   r    shutdownG   s
   zPidbox.shutdown)__name__
__module____qualname____doc__r   r!   r+   r2   r5   r7   r'   r6   r;   r   r   r   r    r	      s    r	   c                   @   sD   e Zd ZdZdZdZdZdd Zdd Zdd	 Z	d
d Z
dd ZdS )r
   zWorker pidbox (greenlet).Nr   c                 C   s   |j | j| d S r3   )poolspawn_nloopr   r   r   r    r2   V   s   zgPidbox.startc                 C   s6   | j r| j  td | j   d  | _ | _d S d S )Nz+Waiting for broadcast thread to shutdown...)_node_stopped_node_shutdownr   r9   waitr4   r   r   r    r5   Y   s   

zgPidbox.on_stopc                 C   s   |  j d7  _ d S )Nr   )_resetsr4   r   r   r    r'   `   s   zgPidbox.resetc                 C   s6   |  | | | j_| jj| jd| _| j  d S r,   )r6   r/   r   r0   r+   r   consume)r   r   r.   r   r   r    	_do_resetc   s   
zgPidbox._do_resetc              	   C   s   | j g}t  }| _t  }| _ze| K}td|  | || |	 s[|j
r[|d | j k rA|d  d7  < | || z|jdd W n
 tjyS   Y nw |	 s[|j
s,W d    n1 sew   Y  W |  d S W |  d S |  w )Nzpidbox: Connected to %s.r   r   g      ?)timeout)rF   	threadingEventrD   rC   connection_for_readinfoas_urirH   is_setr.   drain_eventssocketrI   r   )r   r   resetsr;   stoppedr.   r   r   r    rB   i   s,   
zgPidbox.loop)r<   r=   r>   r?   rD   rC   rF   r2   r5   r'   rH   rB   r   r   r   r    r
   O   s    r
   )r?   rQ   rJ   kombu.commonr   kombu.utils.encodingr   celery.utils.collectionsr   celery.utils.functionalr   celery.utils.logr    r   __all__r<   loggerr9   r%   rM   r	   r
   r   r   r   r    <module>   s    ;