o
    DfΫ                     @   s  d Z ddlZddlZddlZddlmZ ddlmZmZmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZmZ ddlmZ ddlZddlmZmZmZmZ ddl m!Z! ddl"m#Z# ddlm$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+ ddl,m-Z-m.Z.m/Z/m0Z0m1Z1 ddl2m3Z3 ddl4m5Z5m6Z6 ddl7m8Z8 ddl9m:Z:m;Z;m<Z<m=Z=m>Z> ddl?m@Z@ dZAeBdhZCe8eDZEdZFeddZGdZHdZIdd ZJG dd  d eKZLd!d" ZMG d#d$ d$ZNG d%d& d&ZOG d'd( d(eNeOZPePZQG d)d* d*eNZRG d+d, d,eReOZSG d-d. d.ePZTdS )/zResult backend base classes.

- :class:`BaseBackend` defines the interface.

- :class:`KeyValueStoreBackend` is a common base class
    using K/V semantics like _get and _put.
    N)
namedtuple)datetime	timedeltatimezone)partial)WeakValueDictionary)ExceptionInfo)dumpsloadsprepare_accept_content)registry)bytes_to_strensure_bytes)maybe_sanitize_url)current_appgroupmaybe_signaturestates)get_current_task)Context)BackendGetMetaErrorBackendStoreError
ChordErrorImproperlyConfiguredNotRegisteredSecurityErrorTaskRevokedErrorTimeoutError)GroupResult
ResultBase	ResultSetallow_join_resultresult_from_tuple)	BufferMap)LRUCachearity_greater)
get_logger)create_exception_clsensure_serializableget_pickleable_exceptionget_pickled_exceptionraise_with_context) get_exponential_backoff_interval)BaseBackendKeyValueStoreBackendDisabledBackendpicklei    pending_results_t)concreteweakzU
No result backend is configured.
Please see the documentation for more information.
z
Starting chords requires a result backend to be configured.

Note that a group chained with a task is also upgraded to be a chord,
as this pattern requires synchronization.

Result backends that supports chords: Redis, Database, Memcached, and more.
c                 C   s   | |dt  i|S )zReturn an unpickled backend.app)r   _get_current_object)clsargskwargs r9   M/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/backends/base.pyunpickle_backend?   s   r;   c                   @   s    e Zd Zdd Ze Z ZZdS )	_nulldictc                 O      d S Nr9   )selfakwr9   r9   r:   ignoreE      z_nulldict.ignoreN)__name__
__module____qualname__rB   __setitem__update
setdefaultr9   r9   r9   r:   r<   D   s    r<   c                 C   s   | d u rdS | j S NF)ignore_resultrequestr9   r9   r:   _is_request_ignore_resultK   s   rN   c                   @   s4  e Zd ZejZejZejZeZdZdZ	dZ
dZdddddZ		dpdd	Zdqd
dZdd ZddejfddZddddejfddZdd ZdddejfddZdddejfddZdrddZdrddZdrddZdd  Zd!d" Zd#d$ Zd%d& Zd'd( Z d)d* Z!d+d, Z"drd-d.Z#drd/d0Z$d1d2 Z%d3d4 Z&		dsd5d6Z'd7d8 Z(	dtd9d:Z)d;d< Z*d=d> Z+d?d@ Z,e,Z-dAdB Z.dCdD Z/dEdF Z0dGdH Z1dIdJ Z2dudKdLZ3dMdN Z4dOdP Z5dudQdRZ6dudSdTZ7dUdV Z8dWdX Z9dYdZ Z:d[d\ Z;d]d^ Z<d_d` Z=dadb Z>dcdd Z?dvdedfZ@dgdh ZAdidj ZBdrdkdlZCdwdndoZDdS )xBackendNFT   r      )max_retriesinterval_startinterval_stepinterval_maxc                 K   s  || _ | j j}	|p|	j| _tj| j \| _| _| _|p|	j	}
|
dkr%t
 nt|
d| _| ||| _|d u r9|	jn|| _| jd u rD|	jn| j| _t| j| _|	dd| _|	dd| _|	dd| _|	d	td
| _|	dd| _ti t | _tt| _|| _ d S )N)limitresult_backend_always_retryF+result_backend_max_sleep_between_retries_msi'  ,result_backend_base_sleep_between_retries_ms
   result_backend_max_retriesinfresult_backend_thread_safe)!r4   confresult_serializer
serializerserializer_registry	_encoderscontent_typecontent_encodingencoderresult_cache_maxr<   r$   _cacheprepare_expiresexpiresresult_accept_contentacceptaccept_contentr   getalways_retrymax_sleep_between_retries_msbase_sleep_between_retries_msfloatrR   thread_safer1   r   _pending_resultsr#   MESSAGE_BUFFER_MAX_pending_messagesurl)r?   r4   ra   max_cached_resultsrl   rj   expires_typerw   r8   r_   cmaxr9   r9   r:   __init__o   s*   



zBackend.__init__c                 C   s2   |r| j S t| j p
d}|dr|dd S |S )z=Return the backend as an URI, sanitizing the password or not. z:///NrV   )rw   r   endswith)r?   include_passwordrw   r9   r9   r:   as_uri   s   zBackend.as_uric                 K   s   |  ||tjS )zMark a task as started.)store_resultr   STARTEDr?   task_idmetar9   r9   r:   mark_as_started      zBackend.mark_as_startedc                 C   sB   |rt |s| j||||d |r|jr| ||| dS dS dS )z#Mark task as successfully executed.rL   N)rN   r   chordon_chord_part_return)r?   r   resultrM   r   stater9   r9   r:   mark_as_done   s
   
zBackend.mark_as_donec              	   C   s  |r| j |||||d |r||jr| ||| zt|j}W n ttfy-   t }Y nw |D ]=}	t|	}
|
	|
j
 |
j
d|
_|
j
d|
_|ra|tjv ra|
jdura| j |
j||||
d d|
j
v rm| |
|| q0|r~|jr| ||| dS dS dS dS )z#Mark task as executed with failure.	tracebackrM   r   group_idNr   )r   r   r   iterchainAttributeError	TypeErrortupler   rH   optionsrn   idr   r   PROPAGATE_STATESr   errbacks_call_task_errbacks)r?   r   excr   rM   r   call_errbacksr   
chain_data
chain_elemchain_elem_ctxr9   r9   r:   mark_as_failure   s@   





-zBackend.mark_as_failurec           	   	   C   s   g }|j D ]?}| j|}|js| j|_z"t|jdr0t|jjts0t	|jjdr0|||| n|
| W q tyD   |
| Y qw |rx|j}|jpN|}t|| jd}| jjjsb|jddrm|j|f||d d S |j|f||d d S d S )N
__header__rQ   r4   is_eagerF)	parent_idroot_id)r   r4   	signature_apphasattrtype
isinstancer   r   r%   appendr   r   r   r   r_   task_always_eagerdelivery_inforn   applyapply_async)	r?   rM   r   r   old_signatureerrbackr   r   gr9   r9   r:   r      s<   






zBackend._call_task_errbacksr|   c                 C   sD   t |}|r| j|||d |d |r|jr | ||| d S d S d S )Nr   )r   r   r   r   )r?   r   reasonrM   r   r   r   r9   r9   r:   mark_as_revoked  s   

zBackend.mark_as_revokedc                 C   s   | j |||||dS )zfMark task as being retries.

        Note:
            Stores the current exception (if any).
        r   )r   )r?   r   r   r   rM   r   r   r9   r9   r:   mark_as_retry  s   
zBackend.mark_as_retryc              
   C   s   | j }z	|j|j j}W n ty   | }Y nw t|jd|jdg t d|}z	| 	||d  W n t
yO } z|j|j|dW  Y d }~S d }~ww |j|j|dS )Nr   
link_error)r   r   r   )r   )r4   _taskstaskbackendKeyErrorr   r   rn   dictr   	Exceptionfail_from_current_stackr   )r?   callbackr   r4   r   fake_requesteb_excr9   r9   r:   chord_error_from_stack  s(   
zBackend.chord_error_from_stackc                 C   s   t  \}}}z;|d u r|n|}t|||f}| |||j |W |d urAz|j  |jj W n	 ty9   Y nw |j	}|d us%~S |d urcz|j  |jj W n	 ty[   Y nw |j	}|d usG~w r>   )
sysexc_infor   r   r   tb_frameclearf_localsRuntimeErrortb_next)r?   r   r   type_real_exctbexception_infor9   r9   r:   r   2  s4   
	
	zBackend.fail_from_current_stackc                 C   sL   |du r| j n|}|tv rt|S t|}t|d|jt|j| j|j	dS )z$Prepare exception for serialization.NrF   )exc_typeexc_message
exc_module)
ra   EXCEPTION_ABLE_CODECSr)   r   getattrrD   r(   r7   encoderE   )r?   r   ra   exctyper9   r9   r:   prepare_exceptionE  s   zBackend.prepare_exceptionc           
   
   C   s  |sdS t |tr| jtv rt|}|S t |ts4zt|}W n ty3 } ztd| |d}~ww |d}z|d }W n tyP } zt	d|d}~ww |du r[t
|t}n'ztj| }|dD ]}t||}qfW n ttfy   t
|tjj}Y nw |dd}t |trt|ts|du r|n| d| }td	| d
| zt |ttfr|| }W |S ||}W |S  ty }	 zt| d| d}W Y d}	~	|S d}	~	ww )z1Convert serialized exception to Python exception.NzbIf the stored exception isn't an instance of BaseException, it must be a dictionary.
Instead got: r   r   z5Exception information must include the exception type.r   r|   z!Expected an exception class, got z with payload ())r   BaseExceptionra   r   r*   r   r   rn   r   
ValueErrorr'   rD   r   modulessplitr   r   celery
exceptionsr   
issubclassr   r   listr   )
r?   r   er   r   r6   nameexc_msgfake_exc_typeerrr9   r9   r:   exception_to_pythonO  sr   






zBackend.exception_to_pythonc                 C   s    | j dkrt|tr| S |S )zPrepare value for storage.r0   )ra   r   r   as_tupler?   r   r9   r9   r:   prepare_value  s   zBackend.prepare_valuec                 C   s   |  |\}}}|S r>   )_encode)r?   data_payloadr9   r9   r:   r     s   zBackend.encodec                 C   s   t || jdS )N)ra   )r	   ra   )r?   r   r9   r9   r:   r        zBackend._encodec                 C   s$   |d | j v r| |d |d< |S )Nstatusr   )EXCEPTION_STATESr   )r?   r   r9   r9   r:   meta_from_decoded  s   zBackend.meta_from_decodedc                 C   s   |  | |S r>   )r   decoder?   r   r9   r9   r:   decode_result  s   zBackend.decode_resultc                 C   s.   |d u r|S |pt |}t|| j| j| jdS )N)rd   re   rl   )strr
   rd   re   rl   r   r9   r9   r:   r     s   zBackend.decodec                 C   s<   |d u r	| j jj}t|tr| }|d ur|r||S |S r>   )r4   r_   result_expiresr   r   total_seconds)r?   valuer   r9   r9   r:   ri     s   

zBackend.prepare_expiresc                 C   s(   |d ur|S | j jj}|d u r| jS |S r>   )r4   r_   result_persistent
persistent)r?   enabledr   r9   r9   r:   prepare_persistent  s   
zBackend.prepare_persistentc                 C   s(   || j v rt|tr| |S | |S r>   )r   r   r   r   r   )r?   r   r   r9   r9   r:   encode_result  s   

zBackend.encode_resultc                 C   s
   || j v S r>   )rh   r?   r   r9   r9   r:   	is_cached  s   
zBackend.is_cachedc                 C   s@  || j v rttj}|r| }nd }|||| ||d}|r,t|dd r,|j|d< |r9t|dd r9|j	|d< | j
jddr|rt|dd t|dd t|d	d t|d
d t|dd t|drj|jrj|jdnd d}	t|dd r|j|	d< |	|j |rdd	h}
|
D ]}|	| }| |}t||	|< q||	 |S )N)r   r   r   children	date_doner   r   r   extendedr   r   r7   r8   hostnameretriesr   routing_key)r   r7   r8   workerr  queuestampsstamped_headers)READY_STATESr   nowr   utc	isoformatcurrent_task_childrenr   r   r   r4   r_   find_value_for_keyr   r   rn   r  rH   r  r   r   )r?   r   r   r   rM   format_dater   r   r   request_metaencode_needed_fieldsfieldr   encoded_valuer9   r9   r:   _get_result_meta  sP   











zBackend._get_result_metac                 C   s   t | d S r>   )timesleep)r?   amountr9   r9   r:   _sleep  r   zBackend._sleepc           
   
   K   s   |  ||}d}	 z| j||||fd|i| |W S  tyY } z3| jrN| |rN|| jk rD|d7 }t| j|| jdd }	| 	|	 nt
td||d n W Y d}~nd}~ww q	)	zUpdate task state and result.

        if always_retry_backend_operation is activated, in the event of a recoverable exception,
        then retry operation with an exponential backoff until a limit has been reached.
        r   TrM   rQ     z%failed to store result on the backend)r   r   N)r   _store_resultr   ro   exception_safe_to_retryrR   r,   rq   rp   r  r+   r   )
r?   r   r   r   r   rM   r8   r  r   sleep_amountr9   r9   r:   r     s8   
zBackend.store_resultc                 C   s   | j |d  | | d S r>   )rh   pop_forgetr   r9   r9   r:   forget#  s   zBackend.forgetc                 C      t d)Nz"backend does not implement forget.NotImplementedErrorr   r9   r9   r:   r  '     zBackend._forgetc                 C   s   |  |d S )zGet the state of a task.r   )get_task_metar   r9   r9   r:   	get_state*  s   zBackend.get_statec                 C      |  |dS )z$Get the traceback for a failed task.r   r"  rn   r   r9   r9   r:   get_traceback0  r   zBackend.get_tracebackc                 C   r$  )zGet the result of a task.r   r%  r   r9   r9   r:   
get_result4  r   zBackend.get_resultc                 C   s&   z|  |d W S  ty   Y dS w )z(Get the list of subtasks sent by a task.r   N)r"  r   r   r9   r9   r:   get_children8  s
   zBackend.get_childrenc                 C   s,   | j jjr| j jjstdt d S d S d S )NzResults are not stored in backend and should not be retrieved when task_always_eager is enabled, unless task_store_eager_result is enabled.)r4   r_   r   task_store_eager_resultwarningswarnRuntimeWarningr?   r9   r9   r:   _ensure_not_eager?  s   zBackend._ensure_not_eagerc                 C      dS )a  Check if an exception is safe to retry.

        Backends have to overload this method with correct predicates dealing with their exceptions.

        By default no exception is safe to retry, it's up to backend implementation
        to define which exceptions are safe.
        Fr9   )r?   r   r9   r9   r:   r  G  s   zBackend.exception_safe_to_retryc              
   C   s   |    |rz| j| W S  ty   Y nw d}	 z| |}W n? ty^ } z2| jrS| |rS|| jk rJ|d7 }t| j	|| j
dd }| | n
ttd|d n W Y d}~nd}~ww q|ro|dtjkro|| j|< |S )	zGet task meta from backend.

        if always_retry_backend_operation is activated, in the event of a recoverable exception,
        then retry operation with an exponential backoff until a limit has been reached.
        r   TrQ   r  zfailed to get meta)r   Nr   )r.  rh   r   _get_task_meta_forr   ro   r  rR   r,   rq   rp   r  r+   r   rn   r   SUCCESS)r?   r   cacher  r   r   r  r9   r9   r:   r"  Q  sB   



zBackend.get_task_metac                 C      | j |dd| j|< dS )z;Reload task result, even if it has been previously fetched.Fr2  N)r"  rh   r   r9   r9   r:   reload_task_resultx     zBackend.reload_task_resultc                 C   r3  )z<Reload group result, even if it has been previously fetched.Fr4  N)get_group_metarh   r?   r   r9   r9   r:   reload_group_result|  r6  zBackend.reload_group_resultc                 C   sP   |    |rz| j| W S  ty   Y nw | |}|r&|d ur&|| j|< |S r>   )r.  rh   r   _restore_groupr?   r   r2  r   r9   r9   r:   r7    s   

zBackend.get_group_metac                 C   s   | j ||d}|r|d S dS )zGet the result for a group.r4  r   N)r7  r;  r9   r9   r:   restore_group  s   zBackend.restore_groupc                 C      |  ||S )z&Store the result of an executed group.)_save_groupr?   r   r   r9   r9   r:   
save_group  s   zBackend.save_groupc                 C   s   | j |d  | |S r>   )rh   r  _delete_groupr8  r9   r9   r:   delete_group  s   
zBackend.delete_groupc                 C   r/  )zBackend cleanup.Nr9   r-  r9   r9   r:   cleanup      zBackend.cleanupc                 C   r/  )z:Cleanup actions to do at the end of a task worker process.Nr9   r-  r9   r9   r:   process_cleanup  rD  zBackend.process_cleanupc                 C   s   i S r>   r9   )r?   producerr   r9   r9   r:   on_task_call  rC   zBackend.on_task_callc                 C   r  )Nz%Backend does not support add_to_chordr  )r?   chord_idr   r9   r9   r:   add_to_chord  r!  zBackend.add_to_chordc                 K   r=   r>   r9   )r?   rM   r   r   r8   r9   r9   r:   r     rC   zBackend.on_chord_part_returnc                 C   r=   r>   r9   )r?   r   
chord_sizer9   r9   r:   set_chord_size  rC   zBackend.set_chord_sizec                 K   s   dd |D |d< zt |dd }W n ty   d }Y nw |jdt |dd }|d u r9| jjj||jd j}|jdt |dd}| jj	d j
|j|f||||d	 d S )
Nc                 S      g | ]}|  qS r9   r   .0rr9   r9   r:   
<listcomp>      z1Backend.fallback_chord_unlock.<locals>.<listcomp>r   r   r  priorityr   zcelery.chord_unlock)	countdownr  rS  )r   r   r   rn   r4   amqprouterrouter   tasksr   r   )r?   header_resultbodyrT  r8   	body_typer  rS  r9   r9   r:   fallback_chord_unlock  s    

zBackend.fallback_chord_unlockc                 C   r=   r>   r9   r-  r9   r9   r:   ensure_chords_allowed  rC   zBackend.ensure_chords_allowedc                 K   s,   |    | jj| }| j||fi | d S r>   )r]  r4   r   r\  r?   header_result_argsrZ  r8   rY  r9   r9   r:   apply_chord  s   zBackend.apply_chordc                 C   s0   |pt t dd }|rdd t |dg D S d S )NrM   c                 S   rL  r9   rM  rN  r9   r9   r:   rQ    rR  z1Backend.current_task_children.<locals>.<listcomp>r   )r   r   )r?   rM   r9   r9   r:   r    s   zBackend.current_task_childrenr9   c                 C   s   |si n|}t | j||ffS r>   )r;   	__class__r?   r7   r8   r9   r9   r:   
__reduce__  s   zBackend.__reduce__)NNNNNNFr>   )TFNN)T)rQ   )r9   N)ErD   rE   rF   r   r  UNREADY_STATESr   r   subpolling_intervalsupports_native_joinsupports_autoexpirer   retry_policyr{   r   r   r1  r   FAILUREr   r   REVOKEDr   RETRYr   r   r   r   r   r   r   r   r   r   r   ri   r   r   r   r  r  r   r  r  r#  
get_statusr&  r'  r(  r.  r  r"  r5  r9  r7  r<  r@  rB  rC  rE  rG  rI  r   rK  r\  r]  r`  r  rc  r9   r9   r9   r:   rO   Q   s    

	
	
8.
	





G
	
	
3
"

'



rO   c                   @   sT   e Zd Z		dddZ			dddZ	ddd	ZdddZdd Zedd Z	dS )SyncBackendMixinN      ?Tc           	      c   sn    |    |j}|sd S t }|D ]}t|tr |j|jfV  q||j q| j||||||dE d H  d S )N)timeoutintervalno_ack
on_messageon_interval)r.  resultssetr   r    r   addget_many)	r?   r   rq  rr  rs  rt  ru  rv  task_idsr9   r9   r:   iter_native  s   
zSyncBackendMixin.iter_nativec	           
      C   sN   |    |d urtd| j|j||||d}	|	r%||	 |j||dS d S )Nz,Backend does not support on_message callback)rq  rr  ru  rs  )	propagater   )r.  r   wait_forr   _maybe_set_cachemaybe_throw)
r?   r   rq  rr  rs  rt  ru  r   r|  r   r9   r9   r:   wait_for_pending  s   
z!SyncBackendMixin.wait_for_pendingc                 C   s\   |    d}	 | |}|d tjv r|S |r|  t| ||7 }|r-||kr-tdq)aL  Wait for task and return its result.

        If the task raises an exception, this exception
        will be re-raised by :func:`wait_for`.

        Raises:
            celery.exceptions.TimeoutError:
                If `timeout` is not :const:`None`, and the operation
                takes longer than `timeout` seconds.
        g        rQ   r   zThe operation timed out.)r.  r"  r   r  r  r  r   )r?   r   rq  rr  rs  ru  time_elapsedr   r9   r9   r:   r}    s   

zSyncBackendMixin.wait_forFc                 C      |S r>   r9   )r?   r   r3   r9   r9   r:   add_pending_result  rC   z#SyncBackendMixin.add_pending_resultc                 C   r  r>   r9   r   r9   r9   r:   remove_pending_result  rC   z&SyncBackendMixin.remove_pending_resultc                 C   r/  rJ   r9   r-  r9   r9   r:   is_async  s   zSyncBackendMixin.is_async)Nrp  TNN)Nrp  TNNNT)Nrp  TNrd  )
rD   rE   rF   r{  r  r}  r  r  propertyr  r9   r9   r9   r:   ro    s    



ro  c                   @      e Zd ZdZdS )r-   z"Base (synchronous) result backend.NrD   rE   rF   __doc__r9   r9   r9   r:   r-   $      r-   c                       s"  e Zd ZeZdZdZdZdZ fddZ	dd Z
d	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd=ddZd=ddZd=ddZd=d d!Zd"d# Zejfd$d%Zejfd&d'Zd(d)d*d(d(d(ejfd+d,Zd-d. Z	(d>d/d0Zd1d2 Zd3d4 Z d5d6 Z!d7d8 Z"d9d: Z#d;d< Z$  Z%S )?BaseKeyValueStoreBackendzcelery-task-meta-zcelery-taskset-meta-zchord-unlock-Fc                    sN   t | jdr| jj| _t j|i | |   |   | jr%| j| _	d S d S )N__func__)
r   key_tr  superr{   _add_global_keyprefix_encode_prefixesimplements_incr_apply_chord_incrr`  rb  ra  r9   r:   r{   2  s   
z!BaseKeyValueStoreBackend.__init__c                 C   sZ   | j jdi dd}|r+| d| j | _| d| j | _| d| j | _dS dS )a/  
        This method prepends the global keyprefix to the existing keyprefixes.

        This method checks if a global keyprefix is configured in `result_backend_transport_options` using the
        `global_keyprefix` key. If so, then it is prepended to the task, group and chord key prefixes.
         result_backend_transport_optionsglobal_keyprefixNr   )r4   r_   rn   task_keyprefixgroup_keyprefixchord_keyprefix)r?   r  r9   r9   r:   r  ;  s   z.BaseKeyValueStoreBackend._add_global_keyprefixc                 C   s.   |  | j| _|  | j| _|  | j| _d S r>   )r  r  r  r  r-  r9   r9   r:   r  H  s   z)BaseKeyValueStoreBackend._encode_prefixesc                 C   r  )NzMust implement the get method.r  r?   keyr9   r9   r:   rn   M  r!  zBaseKeyValueStoreBackend.getc                 C   r  )NzDoes not support get_manyr  )r?   keysr9   r9   r:   mgetP  r!  zBaseKeyValueStoreBackend.mgetc                 C   r=  r>   )rw  )r?   r  r   r   r9   r9   r:   _set_with_stateS     z(BaseKeyValueStoreBackend._set_with_statec                 C   r  )NzMust implement the set method.r  r?   r  r   r9   r9   r:   rw  V  r!  zBaseKeyValueStoreBackend.setc                 C   r  )Nz Must implement the delete methodr  r  r9   r9   r:   deleteY  r!  zBaseKeyValueStoreBackend.deletec                 C   r  )NzDoes not implement incrr  r  r9   r9   r:   incr\  r!  zBaseKeyValueStoreBackend.incrc                 C   r=   r>   r9   r  r9   r9   r:   expire_  rC   zBaseKeyValueStoreBackend.expirer|   c                 C   $   |s
t d| d| | j||S )z#Get the cache key for a task by id.ztask_id must not be empty. Got 	 instead.)r   _get_key_forr  )r?   r   r  r9   r9   r:   get_key_for_taskb     z)BaseKeyValueStoreBackend.get_key_for_taskc                 C   r  )z$Get the cache key for a group by id. group_id must not be empty. Got r  )r   r  r  r?   r   r  r9   r9   r:   get_key_for_grouph  r  z*BaseKeyValueStoreBackend.get_key_for_groupc                 C   r  )z?Get the cache key for the chord waiting on group with given id.r  r  )r   r  r  r  r9   r9   r:   get_key_for_chordn  r  z*BaseKeyValueStoreBackend.get_key_for_chordc                 C   s"   | j }|d|||||gS )Nr|   )r  join)r?   prefixr   r  r  r9   r9   r:   r  t  s   z%BaseKeyValueStoreBackend._get_key_forc                 C   sF   |  |}| j| jfD ]}||rt|t|d   S qt|S )zTake bytes: emit string.N)r  r  r  
startswithr   len)r?   r  r  r9   r9   r:   _strip_prefix{  s   

z&BaseKeyValueStoreBackend._strip_prefixc                 c   s<    |D ]\}}|d ur|  |}|d |v r||fV  qd S )Nr   )r   )r?   valuesr  kr   r9   r9   r:   _filter_ready  s   

z&BaseKeyValueStoreBackend._filter_readyc                    sF   t |drfdd| |D S  fddt||D S )Nitemsc                    s   i | ]
\}}  ||qS r9   )r  )rO  r  vr-  r9   r:   
<dictcomp>  s    
z=BaseKeyValueStoreBackend._mget_to_results.<locals>.<dictcomp>c                    s   i | ]\}}t  | |qS r9   r   )rO  ir  )r  r9   r:   r    s    )r   r  r  	enumerate)r?   r  r  r  r9   )r  r?   r:   _mget_to_results  s   


z)BaseKeyValueStoreBackend._mget_to_resultsNrp  Tc	              	   #   sd   |d u rdn|}t |tr|nt|}	t }
 j}|	D ]$}z|| }W n	 ty-   Y qw |d |v r@t||fV  |
| q|	|
 d}|	rt|	}  	 fdd|D ||}|
| |	dd |D  | D ]\}}|d ur~|| t||fV  qr|r|| |krtd| d	|r|  t| |d
7 }|r||krd S |	sJd S d S )Nrp  r   r   c                    s   g | ]}  |qS r9   )r  )rO  r  r-  r9   r:   rQ    s    z5BaseKeyValueStoreBackend.get_many.<locals>.<listcomp>c                 S   s   h | ]}t |qS r9   r  )rO  r  r9   r9   r:   	<setcomp>  rR  z4BaseKeyValueStoreBackend.get_many.<locals>.<setcomp>zOperation timed out (r   rQ   )r   rw  rh   r   r   rx  difference_updater   r  r  rH   r  r   r  r  )r?   rz  rq  rr  rs  rt  ru  max_iterationsr  ids
cached_idsr2  r   cached
iterationsr  rP  r  r   r9   r-  r:   ry    sN   



z!BaseKeyValueStoreBackend.get_manyc                 C      |  | | d S r>   )r  r  r   r9   r9   r:   r       z BaseKeyValueStoreBackend._forgetc           
   
   K   s   | j ||||d}t||d< | |}|d tjkr|S z| | || || W |S  tyC }	 z
tt	|	||d|	d }	~	ww )N)r   r   r   rM   r   r   )r   r   )
r  r   r0  r   r1  r  r  r   r   r   )
r?   r   r   r   r   rM   r8   r   current_metaexr9   r9   r:   r    s   
z&BaseKeyValueStoreBackend._store_resultc                 C   s(   |  | || d| itj |S )Nr   )r  r  r   r   r   r1  r?  r9   r9   r:   r>    s   z$BaseKeyValueStoreBackend._save_groupc                 C   r  r>   )r  r  r8  r9   r9   r:   rA    r  z&BaseKeyValueStoreBackend._delete_groupc                 C   s*   |  | |}|stjddS | |S )$Get task meta-data for a task by id.N)r   r   )rn   r  r   PENDINGr   r   r9   r9   r:   r0    s   
z+BaseKeyValueStoreBackend._get_task_meta_forc                 C   s>   |  | |}|r| |}|d }t|| j|d< |S dS )r  r   N)rn   r  r   r"   r4   )r?   r   r   r   r9   r9   r:   r:    s   
z'BaseKeyValueStoreBackend._restore_groupc                 K   s$   |    | jj| }|j| d d S )Nr   )r]  r4   r   saver^  r9   r9   r:   r    s   z*BaseKeyValueStoreBackend._apply_chord_incrc                 K   s  | j sd S | j}|j}|sd S | |}z	tj|| d}W n+ tyH }	 zt|j|d}
t	
d||	 | |
td|	W  Y d }	~	S d }	~	ww |d u r~zt| ty} }	 z t|j|d}
t	
d||	 | |
td| dW  Y d }	~	S d }	~	ww | |}|jd}|d u rt|}||krt	d	| d S ||krjt|j|d}
|jr|jn|j}zzt  ||jjd
d}W d    n1 sw   Y  W n> ty }	 z1zt| }d||	}W n ty   t|	}Y nw t	
d|| | |
t| W Y d }	~	n2d }	~	ww z|
| W n1 ty; }	 zt	
d||	 | |
td|	 W Y d }	~	nd }	~	ww W |  | | d S W |  | | d S W |  | | d S |  | | w | || j  d S )Nr  r   zChord %r raised: %rzCannot restore group: zChord callback %r raised: %rzGroupResult z no longer existsrJ  z/Chord counter incremented too many times for %rT)rq  r|  zDependency {0.id} raised {1!r}zCallback error: )!r  r4   r   r  r   restorer   r   r   logger	exceptionr   r   r   r  rn   r  warningrh  join_nativer  r!   r_   result_chord_join_timeoutnext_failed_join_reportformatStopIterationreprdelayr  r  rj   )r?   rM   r   r   r8   r4   gidr  depsr   r   valsizejretculpritr   r9   r9   r:   r     s   


z-BaseKeyValueStoreBackend.on_chord_part_return)r|   re  )&rD   rE   rF   r   r  r  r  r  r  r{   r  r  rn   r  r  rw  r  r  r  r  r  r  r  r  r   r  r  r  ry  r  r  r>  rA  r0  r:  r  r   __classcell__r9   r9   r  r:   r  +  sH    	




&
r  c                   @   r  )r.   z/Result backend base class for key/value stores.Nr  r9   r9   r9   r:   r.   @  r  r.   c                   @   sP   e Zd ZdZi Zdd Zdd Zdd Zdd	 Ze Z	 Z
 ZZe Z ZZd
S )r/   zDummy result backend.c                 O   r=   r>   r9   rb  r9   r9   r:   r   I  rC   zDisabledBackend.store_resultc                 C      t t r>   )r   E_CHORD_NO_BACKENDstripr-  r9   r9   r:   r]  L  r  z%DisabledBackend.ensure_chords_allowedc                 O   r  r>   )r   E_NO_BACKENDr  rb  r9   r9   r:   _is_disabledO  r  zDisabledBackend._is_disabledc                 O   r/  )Nzdisabled://r9   rb  r9   r9   r:   r   R  rC   zDisabledBackend.as_uriN)rD   rE   rF   r  rh   r   r]  r  r   r#  rn  r'  r&  get_task_meta_forr}  ry  r9   r9   r9   r:   r/   D  s    r/   )Ur  r   r  r*  collectionsr   r   r   r   	functoolsr   weakrefr   billiard.einfor   kombu.serializationr	   r
   r   r   rb   kombu.utils.encodingr   r   kombu.utils.urlr   celery.exceptionsr   r   r   r   r   celery._stater   celery.app.taskr   r   r   r   r   r   r   r   r   celery.resultr   r   r    r!   r"   celery.utils.collectionsr#   celery.utils.functionalr$   r%   celery.utils.logr&   celery.utils.serializationr'   r(   r)   r*   r+   celery.utils.timer,   __all__	frozensetr   rD   r  ru   r1   r  r  r;   r   r<   rN   rO   ro  r-   BaseDictBackendr  r.   r/   r9   r9   r9   r:   <module>   s`    (


     
N  