o
    Df                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ d
dlmZ dZdZeddZG dd de	ZdS )zEvent receiver implementation.    N)
itemgetter)Queue)maybe_channel)ConsumerMixin)uuid)app_or_default)adjust_timestamp   )get_exchange)EventReceiver	utcoffset	timestampc                   @   s   e Zd ZdZdZ			dddZdd Zdd	 Z	
dddZdddZ	dddZ
dddZd
ejeeefddZeefddZedd ZdS )r   a?  Capture events.

    Arguments:
        connection (kombu.Connection): Connection to the broker.
        handlers (Mapping[Callable]): Event handlers.
            This is  a map of event type names and their handlers.
            The special handler `"*"` captures all events that don't have a
            handler.
    N#c
           
   	   C   s   t |p| j| _t|| _|d u ri n|| _|| _|pt | _|p%| jjj	| _
t| jp/| j | jjjd| _|d u r@| jjj}|	d u rI| jjj}	td| j
| jg| j| jdd||	d| _| jj| _| jj| _| jj| _|d u rx| jjjdh}|| _d S )N)name.TF)exchangerouting_keyauto_deletedurablemessage_ttlexpiresjson)r   appr   channelhandlersr   r   node_idconfevent_queue_prefixqueue_prefixr
   
connectionconnection_for_writeevent_exchanger   event_queue_ttlevent_queue_expiresr   joinqueueclockadjustadjust_clockforwardforward_clockevent_serializeraccept)
selfr   r   r   r   r   r   r-   	queue_ttlqueue_expires r1   O/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/events/receiver.py__init__#   s8   






zEventReceiver.__init__c                 C   s.   | j |p| j d}|o|| dS  dS )z3Process event by dispatching to configured handler.*N)r   get)r.   typeeventhandlerr1   r1   r2   processB   s   zEventReceiver.processc                 C   s   || j g| jgd| jdgS )NT)queues	callbacksno_ackr-   )r&   _receiver-   )r.   Consumerr   r1   r1   r2   get_consumersG   s   zEventReceiver.get_consumersTc                 K   s   |r
| j |d d S d S )N)r   )wakeup_workers)r.   r    r   	consumerswakeupkwargsr1   r1   r2   on_consume_readyL   s   zEventReceiver.on_consume_readyc                 C   s   | j |||dS )NlimittimeoutrB   consume)r.   rF   rG   rB   r1   r1   r2   itercaptureQ   s   zEventReceiver.itercapturec                 C   s   | j |||dD ]}qdS )zOpen up a consumer capturing events.

        This has to run in the main process, and it will never stop
        unless :attr:`EventDispatcher.should_stop` is set to True, or
        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
        rE   NrH   )r.   rF   rG   rB   _r1   r1   r2   captureT   s   zEventReceiver.capturec                 C   s   | j jjd| j|d d S )N	heartbeat)r    r   )r   control	broadcastr    )r.   r   r1   r1   r2   r@   ^   s   

zEventReceiver.wakeup_workersc                 C   s   |d }|dkr| j jpd|  }|d< | | nz|d }	W n ty/   |  |d< Y nw | |	 |rPz||\}
}W n	 tyH   Y nw |||
|d< | |d< ||fS )Nr6   z	task-sentr	   r'   r   local_received)r'   valuer)   KeyErrorr+   )r.   bodylocalizenowtzfieldsr   CLIENT_CLOCK_SKEWr6   _cr'   offsetr   r1   r1   r2   event_from_messagec   s&   

z EventReceiver.event_from_messagec                    sD   |||r| j | j  fdd|D  d S | j | |  d S )Nc                    s   g | ]} | qS r1   r1   ).0r7   from_messager9   r1   r2   
<listcomp>   s    z*EventReceiver._receive.<locals>.<listcomp>)r9   rZ   )r.   rS   messagelist
isinstancer1   r\   r2   r=   ~   s   
zEventReceiver._receivec                 C   s   | j r| j jjS d S N)r   r    client)r.   r1   r1   r2   r       s   zEventReceiver.connection)Nr   NNNNNN)T)NNTrb   )__name__
__module____qualname____doc__r   r3   r9   r?   rD   rJ   rL   r@   time	_TZGETTERr   rW   rZ   r`   ra   r=   propertyr    r1   r1   r1   r2   r      s,    







r   )rg   rh   operatorr   kombur   kombu.connectionr   kombu.mixinsr   celeryr   
celery.appr   celery.utils.timer   r7   r
   __all__rW   ri   r   r1   r1   r1   r2   <module>   s    
