o
    Df-/                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ dd	lmZ dd
lmZmZ dZdZG dd deZdd ZG dd deZG dd dejeZdS )zqThe ``RPC`` result backend for AMQP brokers.

RPC-style result backend, using reply-to and one queue per client.
    N)maybe_declare)register_after_fork)cached_property)states)current_tasktask_join_will_block   )base)AsyncBackendMixinBaseResultConsumer)BacklogLimitExceeded
RPCBackendz
The "rpc" result backend does not support chords!

Note that a group chained with a task is also upgraded to be a chord,
as this pattern requires synchronization.

Result backends that supports chords: Redis, Database, Memcached, and more.
c                   @   s   e Zd ZdZdS )r   z'Too much state history to fast-forward.N)__name__
__module____qualname____doc__ r   r   L/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/backends/rpc.pyr      s    r   c                 C   s   |    d S N)_after_fork)backendr   r   r   _on_after_fork_cleanup_backend"      r   c                       s^   e Zd ZejZdZdZ fddZdddZdddZ	d	d
 Z
dd Zdd Zdd Z  ZS )ResultConsumerNc                    s    t  j|i | | jj| _d S r   )super__init__r   _create_bindingselfargskwargs	__class__r   r   r   ,   s   zResultConsumer.__init__Tc                 K   sF   | j  | _| |}| j| jj|g| jg|| jd| _| j	  d S )N)	callbacksno_ackaccept)
app
connection_connectionr   Consumerdefault_channelon_state_changer%   	_consumerconsume)r   initial_task_idr$   r    initial_queuer   r   r   start0   s   

zResultConsumer.startc                 C   s*   | j r
| j j|dS |rt| d S d S )N)timeout)r(   drain_eventstimesleep)r   r1   r   r   r   r2   9   s
   zResultConsumer.drain_eventsc                 C   s(   z| j   W | j  d S | j  w r   )r,   cancelr(   closer   r   r   r   stop?   s   zResultConsumer.stopc                 C   s(   d | _ | jd ur| j  d | _d S d S r   )r,   r(   collectr7   r   r   r   on_after_forkE   s
   


zResultConsumer.on_after_forkc                 C   sH   | j d u r
| |S | |}| j |s"| j | | j   d S d S r   )r,   r0   r   consuming_from	add_queuer-   )r   task_idqueuer   r   r   consume_fromK   s   


zResultConsumer.consume_fromc                 C   s"   | j r| j | |j d S d S r   )r,   cancel_by_queuer   namer   r=   r   r   r   
cancel_forS   s   zResultConsumer.cancel_forTr   )r   r   r   kombur)   r(   r,   r   r0   r2   r8   r:   r?   rC   __classcell__r   r   r!   r   r   &   s    

	r   c                       sb  e Zd ZdZejZejZeZeZdZ	dZ
dZdddddZG dd	 d	ejZG d
d dejZ		dE fdd	Zdd ZdFddZdd Zdd Zdd Zdd Zdd Zdd  ZdGd!d"Z	dHd#d$Zd%d& Zd'd( ZdId*d+ZeZd,d- Z	dJd.d/Zd0d1 Z d2d3 Z!d4d5 Z"d6d7 Z#d8d9 Z$dGd:d;Z%d<d= Z&dK fd?d@	Z'e(dAdB Z)e*dCdD Z+  Z,S )Lr   z&Base class for the RPC result backend.FT   r   r   )max_retriesinterval_startinterval_stepinterval_maxc                   @      e Zd ZdZdZdS )zRPCBackend.Consumerz4Consumer that requires manual declaration of queues.FN)r   r   r   r   auto_declarer   r   r   r   r)   m       r)   c                   @   rL   )zRPCBackend.Queuez$Queue that never caches declaration.FN)r   r   r   r   can_cache_declarationr   r   r   r   Queuer   rN   rP   Nc           
         s   t  j|fi | | jj}	|| _i | _| || _| jrdnd| _|p&|	j	}|p+|	j
}| ||| j| _|p9|	j| _|| _| | | j| j| j| j| _td urWt| t d S d S )N   r   )r   r   r&   confr(   _out_of_bandprepare_persistent
persistentdelivery_moderesult_exchangeresult_exchange_type_create_exchangeexchangeresult_serializer
serializerauto_deleter   r%   _pending_results_pending_messagesresult_consumerr   r   )
r   r&   r'   rZ   exchange_typerU   r\   r]   r    rR   r!   r   r   r   w   s(   


zRPCBackend.__init__c                 C   s   | j   | j  d S r   )r^   clearr`   r   r7   r   r   r   r      s   
zRPCBackend._after_forkdirectrQ   c                 C   s
   |  d S r   )Exchange)r   rA   typerV   r   r   r   rY      s   
zRPCBackend._create_exchangec                 C   s   | j S )z$Create new binding for task with id.)bindingrB   r   r   r   r      s   zRPCBackend._create_bindingc                 C   s   t t r   )NotImplementedErrorE_NO_CHORD_SUPPORTstripr7   r   r   r   ensure_chords_allowed   r   z RPCBackend.ensure_chords_allowedc                 C   s"   t  st| |jdd d S d S )NT)retry)r   r   rf   channel)r   producerr=   r   r   r   on_task_call   s   zRPCBackend.on_task_callc                 C   s<   z|pt j}W n ty   td|w |j|jp|fS )zGet the destination for result by task id.

        Returns:
            Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
        z%RPC backend missing task request for )r   requestAttributeErrorRuntimeErrorreply_tocorrelation_id)r   r=   ro   r   r   r   destination_for   s   zRPCBackend.destination_forc                 C      d S r   r   rB   r   r   r   on_reply_declare   s   zRPCBackend.on_reply_declarec                 C   ru   r   r   )r   resultr   r   r   on_result_fulfilled   s   zRPCBackend.on_result_fulfilledc                 C   s   dS )Nzrpc://r   )r   include_passwordr   r   r   as_uri      zRPCBackend.as_uric           
      K   s   |  ||\}}|sdS | jjjjdd%}	|	j| |||||| j||| jd| j	| 
|| jd	 W d   |S 1 s=w   Y  |S )z!Send task return value and state.NTblock)rZ   routing_keyrs   r\   rk   retry_policydeclarerV   )rt   r&   amqpproducer_poolacquirepublish
_to_resultrZ   r\   r   rv   rV   )
r   r=   rw   state	tracebackro   r    r~   rs   rm   r   r   r   store_result   s$   
zRPCBackend.store_resultc                 C   s   |||  |||| |dS )N)r=   statusrw   r   children)encode_resultcurrent_task_children)r   r=   r   rw   r   ro   r   r   r   r      s   
zRPCBackend._to_resultc                 C   s    | j r	| j | || j|< d S r   )r`   on_out_of_band_resultrS   )r   r=   messager   r   r   r      s   z RPCBackend.on_out_of_band_result  c           
      C   s   | j |d }|r| ||S i }d }| || j|D ]}| |}|||}||< |r4|  d }q||d }| D ]
\}}	| 	||	 q?|rV|
  | ||S z| j| W S  tyk   tjd d Y S w )N)r   rw   )rS   pop_set_cache_by_message_slurp_from_queuer%   _get_message_task_idgetackitemsr   requeue_cacheKeyErrorr   PENDING)
r   r=   backlog_limitbufferedlatest_by_idprevacctidlatestmsgr   r   r   get_task_meta   s.   
zRPCBackend.get_task_metac                 C   s   |  |j }| j|< |S r   )meta_from_decodedpayloadr   )r   r=   r   r   r   r   r   r   	  s   z RPCBackend._set_cache_by_messagec           	      c   s    | j jjdd0\}}| ||}|  t|D ]}|j||d}|s( n	|V  q| |W d    d S 1 s<w   Y  d S )NTr|   )r%   r$   )r&   poolacquire_channelr   r   ranger   r   )	r   r=   r%   limitr$   _rl   rf   r   r   r   r   r     s   
"zRPCBackend._slurp_from_queuec              	   C   s.   z|j d W S  ttfy   |jd  Y S w )Nrs   r=   )
propertiesrp   r   r   )r   r   r   r   r   r     s
   zRPCBackend._get_message_task_idc                 C   ru   r   r   )r   rl   r   r   r   revive%  r{   zRPCBackend.revivec                 C      t d)Nz4reload_task_result is not supported by this backend.rg   rB   r   r   r   reload_task_result(     zRPCBackend.reload_task_resultc                 C   r   )z<Reload group result, even if it has been previously fetched.z5reload_group_result is not supported by this backend.r   rB   r   r   r   reload_group_result,  s   zRPCBackend.reload_group_resultc                 C   r   )Nz,save_group is not supported by this backend.r   )r   group_idrw   r   r   r   
save_group1  r   zRPCBackend.save_groupc                 C   r   )Nz/restore_group is not supported by this backend.r   )r   r   cacher   r   r   restore_group5  r   zRPCBackend.restore_groupc                 C   r   )Nz.delete_group is not supported by this backend.r   )r   r   r   r   r   delete_group9  r   zRPCBackend.delete_groupr   c                    s@   |si n|}t  |t|| j| jj| jj| j| j| j	| j
dS )N)r'   rZ   ra   rU   r\   r]   expires)r   
__reduce__dictr(   rZ   rA   re   rU   r\   r]   r   r   r!   r   r   r   =  s   
zRPCBackend.__reduce__c                 C   s   | j | j| j| jdd| jdS )NFT)durabler]   r   )rP   oidrZ   r   r7   r   r   r   rf   J  s   zRPCBackend.bindingc                 C   s   | j jS r   )r&   
thread_oidr7   r   r   r   r   S  s   zRPCBackend.oid)NNNNNT)rc   rQ   rD   )NN)r   )r   F)r   N)-r   r   r   r   rE   rd   Producerr   r   rU   supports_autoexpiresupports_native_joinr   r)   rP   r   r   rY   r   rj   rn   rt   rv   rx   rz   r   r   r   r   pollr   r   r   r   r   r   r   r   r   r   propertyrf   r   r   rF   r   r   r!   r   r   X   sb    


	
	
	

r   )r   r3   rE   kombu.commonr   kombu.utils.compatr   kombu.utils.objectsr   celeryr   celery._stater   r    r	   asynchronousr
   r   __all__rh   	Exceptionr   r   r   Backendr   r   r   r   r   <module>   s     
2