o
    DfE(                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ dZi Zdd ZedG dd dZG dd deZedG dd deZedG dd deZG dd dZG dd dZdS )z$Async I/O backend support utilities.    N)deque)Empty)sleep)WeakKeyDictionary)detect_environment)states)TimeoutError)THREAD_TIMEOUT_MAX)AsyncBackendMixinBaseResultConsumerDrainerregister_drainerc                    s    fdd}|S )z5Decorator used to register a new result drainer type.c                    s   | t  < | S N)drainers)clsname U/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/backends/asynchronous.py_inner   s   z register_drainer.<locals>._innerr   )r   r   r   r   r   r      s   r   defaultc                   @   s<   e Zd ZdZdd Zdd Zdd Zdd
dZdddZdS )r   zResult draining service.c                 C   s
   || _ d S r   )result_consumer)selfr   r   r   r   __init__$   s   
zDrainer.__init__c                 C      d S r   r   r   r   r   r   start'      zDrainer.startc                 C   r   r   r   r   r   r   r   stop*   r   zDrainer.stopN   c                 c   sv    |p| j j}t }	 |rt | |krt z| j|||dV  W n
 tjy/   Y nw |r5|  |jr:d S qNr   timeout)r   drain_eventstime	monotonicsocketr"   wait_forready)r   pr"   intervalon_intervalwait
time_startr   r   r   drain_events_until-   s    zDrainer.drain_events_untilc                 C   s   ||d d S Nr!   r   r   r)   r,   r"   r   r   r   r'   >      zDrainer.wait_for)Nr   NNr   )	__name__
__module____qualname____doc__r   r   r   r.   r'   r   r   r   r   r       s    
r   c                       sZ   e Zd ZdZdZdZdd Zdd Z fddZdd	 Z	d
d Z
dd ZdddZ  ZS )greenletDrainerNc                 C      dS )z,create new self._drain_complete_event objectNr   r   r   r   r   _create_drain_complete_eventG      z,greenletDrainer._create_drain_complete_eventc                 C   r7   )z5raise self._drain_complete_event for wakeup .wait_forNr   r   r   r   r   _send_drain_complete_eventK   r9   z*greenletDrainer._send_drain_complete_eventc                    s<   t  j|i | t | _t | _t | _|   d S r   )superr   	threadingEvent_started_stopped	_shutdownr8   )r   argskwargs	__class__r   r   r   O   s
   


zgreenletDrainer.__init__c                 C   sd   | j   | j s+z| jjdd |   |   W n
 tj	y%   Y nw | j r
| j
  d S r    )r>   setr?   is_setr   r#   r:   r8   r&   r"   r@   r   r   r   r   runV   s   


zgreenletDrainer.runc                 C   s*   | j  s| | j| _| j   d S d S r   )r>   rF   spawnrG   _gr,   r   r   r   r   r   a   s   
zgreenletDrainer.startc                 C   s"   | j   |   | jt d S r   )r?   rE   r:   r@   r,   r	   r   r   r   r   r   f   s   
zgreenletDrainer.stopc                 C   s$   |    |js| jj|d d S d S r/   )r   r(   _drain_complete_eventr,   r0   r   r   r   r'   k   s   zgreenletDrainer.wait_forr   )r2   r3   r4   rH   rI   rJ   r8   r:   r   rG   r   r   r'   __classcell__r   r   rC   r   r6   B   s    r6   eventletc                   @   $   e Zd Zdd Zdd Zdd ZdS )eventletDrainerc                 C   s$   ddl m}m} ||}|d |S )Nr   )r   rH   )rL   r   rH   )r   funcr   rH   gr   r   r   rH   t   s   zeventletDrainer.spawnc                 C      ddl m} | | _d S Nr   )r=   )eventlet.eventr=   rJ   r   r=   r   r   r   r8   z      z,eventletDrainer._create_drain_complete_eventc                 C   s   | j   d S r   )rJ   sendr   r   r   r   r:   ~   r1   z*eventletDrainer._send_drain_complete_eventNr2   r3   r4   rH   r8   r:   r   r   r   r   rN   q       rN   geventc                   @   rM   )geventDrainerc                 C   s    dd l }||}|d |S )Nr   )rY   rH   r   )r   rO   rY   rP   r   r   r   rH      s   

zgeventDrainer.spawnc                 C   rQ   rR   )gevent.eventr=   rJ   rT   r   r   r   r8      rU   z*geventDrainer._create_drain_complete_eventc                 C   s   | j   |   d S r   )rJ   rE   r8   r   r   r   r   r:      s   
z(geventDrainer._send_drain_complete_eventNrW   r   r   r   r   rZ      rX   rZ   c                   @   s   e Zd ZdZdd ZdddZddd	Zd
d ZdddZdddZ	dd Z
dd Zdd Z	d ddZ	d!ddZedd ZdS )"r
   z.Mixin for backends that enables the async API.c                 C   s   || j j|< d S r   )r   buckets)r   resultbucketr   r   r   _collect_into   s   zAsyncBackendMixin._collect_intoTc                 k   s    |    |j}|st t }|D ]}t|ds|| q|jr(|| q| || q| j|fd|i|D ]}|rX|	 }t|dsO|j
|jfV  n|j
|jfV  |s>q:|rj|	 }|j
|jfV  |s[d S d S )N_cacheno_ack)_ensure_not_eagerresultsStopIterationr   hasattrappendr`   r_   _wait_for_pendingpopleftidchildren)r   r]   ra   rB   rc   r^   node_r   r   r   iter_native   s0   

zAsyncBackendMixin.iter_nativeFc                 C   sH   |r| j j  z| | W |S  ty#   | j|j||d Y |S w )N)weak)r   drainerr   _maybe_resolve_from_bufferr   _add_pending_resultri   )r   r]   rn   start_drainerr   r   r   add_pending_result   s   z$AsyncBackendMixin.add_pending_resultc                 C   s   | | j|j d S r   )_maybe_set_cache_pending_messagestakeri   r   r]   r   r   r   rp      s   z,AsyncBackendMixin._maybe_resolve_from_bufferc                 C   sD   | j \}}||vr|j|vr ||r|n||< | j| d S d S d S r   )_pending_resultsri   r   consume_from)r   task_idr]   rn   concreteweak_r   r   r   rq      s
   
z%AsyncBackendMixin._add_pending_resultc                    s     j j   fdd|D S )Nc                    s   g | ]
} j |d dqS )F)rn   rr   )rs   ).0r]   r   rn   r   r   
<listcomp>   s    z9AsyncBackendMixin.add_pending_results.<locals>.<listcomp>)r   ro   r   )r   rc   rn   r   r~   r   add_pending_results   s   z%AsyncBackendMixin.add_pending_resultsc                 C   s   |  |j | | |S r   )_remove_pending_resultri   on_result_fulfilledrw   r   r   r   remove_pending_result   s   
z'AsyncBackendMixin.remove_pending_resultc                 C   s   | j D ]}||d  qd S r   )rx   popr   rz   mappingr   r   r   r      s   
z(AsyncBackendMixin._remove_pending_resultc                 C   s   | j |j d S r   )r   
cancel_forri   rw   r   r   r   r         z%AsyncBackendMixin.on_result_fulfilledNc                 K   s.   |    | j|fi |D ]}q|j||dS )N)callback	propagate)rb   rg   maybe_throw)r   r]   r   r   rB   rl   r   r   r   wait_for_pending   s   z"AsyncBackendMixin.wait_for_pendingc                 K   s   | j j|f|||d|S )N)r"   r+   
on_message)r   rg   )r   r]   r"   r+   r   rB   r   r   r   rg      s   z#AsyncBackendMixin._wait_for_pendingc                 C   r7   NTr   r   r   r   r   is_async   r9   zAsyncBackendMixin.is_async)T)FT)Fr   NNN)r2   r3   r4   r5   r_   rm   rs   rp   rq   r   r   r   r   r   rg   propertyr   r   r   r   r   r
      s"    

	



	r
   c                   @   s   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	dd Z
dd Zd ddZ	d!ddZdddZdd Zdd Zdd ZdS )"r   z2Manager responsible for consuming result messages.c                 C   s@   || _ || _|| _|| _|| _d | _t | _tt	  | | _
d S r   )backendappacceptrx   ru   r   r   r\   r   r   ro   )r   r   r   r   pending_resultspending_messagesr   r   r   r      s   zBaseResultConsumer.__init__c                 K      t  r   NotImplementedError)r   initial_task_idrB   r   r   r   r         zBaseResultConsumer.startc                 C   r   r   r   r   r   r   r   r      r   zBaseResultConsumer.stopNc                 C   r   r   r   )r   r"   r   r   r   r#     r   zBaseResultConsumer.drain_eventsc                 C   r   r   r   r   rz   r   r   r   ry     r   zBaseResultConsumer.consume_fromc                 C   r   r   r   r   r   r   r   r   	  r   zBaseResultConsumer.cancel_forc                 C   s$   | j   t | _ d | _|   d S r   )r\   clearr   r   on_after_forkr   r   r   r   _after_fork  s   
zBaseResultConsumer._after_forkc                 C   r   r   r   r   r   r   r   r     r   z BaseResultConsumer.on_after_forkc                 C   s   | j j|||dS )Nr"   r+   )ro   r.   )r   r)   r"   r+   r   r   r   r.     s   z%BaseResultConsumer.drain_events_untilc                 k   s    | j |fd|i| | j|}| _z(z| j|j||dD ]	}d V  td qW n tjy5   tdw W || _d S || _w )Nr"   r   r   zThe operation timed out.)on_wait_for_pendingr   r.   on_readyr   r&   r"   r   )r   r]   r"   r+   r   rB   	prev_on_mrl   r   r   r   rg     s    

z$BaseResultConsumer._wait_for_pendingc                 K   r   r   r   )r   r]   r"   rB   r   r   r   r   )  r   z&BaseResultConsumer.on_wait_for_pendingc                 C   s   |  |j| d S r   )on_state_changepayload)r   messager   r   r   on_out_of_band_result,  r   z(BaseResultConsumer.on_out_of_band_resultc              	   C   s4   | j D ]}z|| W   S  ty   Y qw t|r   )rx   KeyErrorr   r   r   r   _get_pending_result/  s   
z&BaseResultConsumer._get_pending_resultc                 C   s   | j r|  | |d tjv rI|d }z| |}W n ty*   | j|| Y nw || | j}z|	|}W n	 tyC   Y nw |
| td d S )Nstatusrz   r   )r   r   READY_STATESr   r   ru   putrt   r\   r   rf   r   )r   metar   rz   r]   r\   r^   r   r   r   r   7  s$   


z"BaseResultConsumer.on_state_changer   )NNr   )r2   r3   r4   r5   r   r   r   r#   ry   r   r   r   r.   rg   r   r   r   r   r   r   r   r   r      s"    



r   )r5   r&   r<   r$   collectionsr   queuer   r   weakrefr   kombu.utils.compatr   celeryr   celery.exceptionsr   celery.utils.threadsr	   __all__r   r   r   r6   rN   rZ   r
   r   r   r   r   r   <module>   s0    !/[