o
    Dfg                     @   s  d Z ddlZddlmZ ddlmZ ddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+ zddl,Z-ddl.m/Z/ W n e0y   dZ-dZ/Y nw zddl1Z-W n	 e0y   Y nw dZ2dZ3dZ4dZ5dZ6dZ7dZ8dZ9dZ:e$e;Z<G dd  d e)Z=G d!d" d"e+e(Z>e?e-d#drG d$d% d%e-j@jAe-jBZCG d&d' d'e>ZDdS )(zRedis result store backend.    N)contextmanager)partial)	CERT_NONECERT_OPTIONALCERT_REQUIRED)unquote)retry_over_time)cached_property)
_parse_urlmaybe_sanitize_url)states)task_join_will_block)maybe_signature)BackendStoreError
ChordErrorImproperlyConfigured)GroupResultallow_join_result)_regen
dictfilter)
get_logger)humanize_seconds   )AsyncBackendMixinBaseResultConsumer)BaseKeyValueStoreBackend)get_redis_error_classes)RedisBackendSentinelBackendzW
You need to install the redis library in order to use the Redis result store backend.
zp
You need to install the redis library with support of sentinel in order to use the Redis result store backend.
z
Setting ssl_cert_reqs=CERT_OPTIONAL when connecting to redis means that celery might not validate the identity of the redis broker when connecting. This leaves you vulnerable to man in the middle attacks.
z
Setting ssl_cert_reqs=CERT_NONE when connecting to redis means that celery will not validate the identity of the redis broker when connecting. This leaves you vulnerable to man in the middle attacks.
z
SSL connection parameters have been provided but the specified URL scheme is redis://. A Redis SSL connection URL should use the scheme rediss://.
zv
A rediss:// URL must have parameter ssl_cert_reqs and this must be set to CERT_REQUIRED, CERT_OPTIONAL, or CERT_NONE
z+Connection to Redis lost: Retry (%s/%s) %s.z
Retry limit exceeded while trying to reconnect to the Celery redis result store backend. The Celery application must be restarted.
c                       s   e Zd ZdZ fddZ fddZdd Zedd	 Zd
d Z	 fddZ
dd Zdd Zdd ZdddZdd Zdd Zdd Z  ZS )ResultConsumerNc                    sF   t  j|i | | jj| _| jj| _| jj| _| jj	| _
t | _d S N)super__init__backendget_key_for_task_get_key_for_taskdecode_result_decode_resultensure_ensureconnection_errors_connection_errorssetsubscribed_toselfargskwargs	__class__ N/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/backends/redis.pyr"   S   s   



zResultConsumer.__init__c              
      sh   z| j jj  | jd ur| j  W n ty, } ztt	| W Y d }~nd }~ww t
   d S r    )r#   clientconnection_poolreset_pubsubcloseKeyErrorloggerwarningstrr!   on_after_fork)r/   er2   r4   r5   r?   [   s   

zResultConsumer.on_after_forkc                 C   s   d | _ | jjj  | jr*| jj| j}dd |D }|D ]}| | |d  q| jjj	dd| _ | jr?| j j
| j  d S | j jd| j j| j _| j j| j j d S )Nc                 S   s   g | ]}|r|qS r4   r4   ).0metar4   r4   r5   
<listcomp>k   s    z4ResultConsumer._reconnect_pubsub.<locals>.<listcomp>Tignore_subscribe_messagespubsub)r9   r#   r6   r7   r8   r-   mgeton_state_changer'   rF   	subscribeget_connection
shard_hint
connectionregister_connect_callback
on_connect)r/   metasrB   r4   r4   r5   _reconnect_pubsubd   s    z ResultConsumer._reconnect_pubsubc                 c   sT    zd V  W d S  | j y)   z| | jd W Y d S  | j y(   tt  w w Nr4   )r+   r)   rP   r<   criticalE_RETRY_LIMIT_EXCEEDEDr/   r4   r4   r5   reconnect_on_error|   s   
z!ResultConsumer.reconnect_on_errorc                 C   s$   |d t jv r| |d  d S d S )Nstatustask_id)r   READY_STATES
cancel_for)r/   rB   r4   r4   r5   _maybe_cancel_ready_task   s   z'ResultConsumer._maybe_cancel_ready_taskc                    s   t  || | | d S r    )r!   rH   rZ   )r/   rB   messager2   r4   r5   rH      s   zResultConsumer.on_state_changec                 K   s    | j jjdd| _| | d S )NTrD   )r#   r6   rF   r9   _consume_from)r/   initial_task_idr1   r4   r4   r5   start   s   zResultConsumer.startc                 K   s.   |j di |D ]}|d ur| |d  qd S rQ   )
_iter_metarH   )r/   resultr1   rB   r4   r4   r5   on_wait_for_pending   s
   z"ResultConsumer.on_wait_for_pendingc                 C   s   | j d ur| j   d S d S r    )r9   r:   rT   r4   r4   r5   stop   s   
zResultConsumer.stopc                 C   s   | j rD|  3 | j j|d}|r*|d dkr2| | |d | W d    d S W d    d S W d    d S 1 s=w   Y  d S |rMt| d S d S )N)timeouttyper[   data)r9   rU   get_messagerH   r'   timesleep)r/   rc   r[   r4   r4   r5   drain_events   s   
"zResultConsumer.drain_eventsc                 C   s"   | j d u r
| |S | | d S r    )r9   r^   r\   r/   rW   r4   r4   r5   consume_from   s   

zResultConsumer.consume_fromc                 C   s^   |  |}|| jvr-| j| |   | j| W d    d S 1 s&w   Y  d S d S r    )r%   r-   addrU   r9   rI   r/   rW   keyr4   r4   r5   r\      s   


"zResultConsumer._consume_fromc                 C   sZ   |  |}| j| | jr+|   | j| W d    d S 1 s$w   Y  d S d S r    )r%   r-   discardr9   rU   unsubscriberm   r4   r4   r5   rY      s   

"zResultConsumer.cancel_forr    )__name__
__module____qualname__r9   r"   r?   rP   r   rU   rZ   rH   r^   ra   rb   ri   rk   r\   rY   __classcell__r4   r4   r2   r5   r   P   s     	


	r   c                       sN  e Zd ZdZeZeZerejndZdZdZ	dZ
dZ			d< fdd	Zdd Ze fd	d
Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z fddZdd Zdd Zdd  Zd!d" Zejejfd#d$Zd%d& Z d'd( Z!ed)d* Z"ed+d, Z#	d=d-d.Z$d/d0 Z%d1d2 Z&d3d4 Z'e(d5d6 Z)ed7d8 Z*d> fd:d;	Z+  Z,S )?r   zyRedis task result store.

    It makes use of the following commands:
    GET, MGET, DEL, INCRBY, EXPIRE, SET, SETEX
    NTi    c              	      s  t  jddti| | jjj}	| jd u rtt	 |r&d|v r&|d }}|p.|	dp.| j
| _
|| _|	d}
|	d}|	d}|	d}|	d}|	d	pLd
|	dpQd|	dpVd|	d| j
|
o`t|
|pcd|oht|d| _|	d}|rw|| jd< |r~|| jd< |r|| jd< |	d}|r| j| | j| jd< |r| || j| _d| jv rt| jd tjrd}ttttttd}| jd|}|||}|| vrtt|tkrtt n	|tkrtt || jd< || _trt nd\| _| _ | !| | j| j"| j#| j$| _%d S )Nexpires_typez://redis_max_connectionsredis_socket_timeoutredis_socket_connect_timeoutredis_retry_on_timeoutredis_socket_keepalive#redis_backend_health_check_interval
redis_host	localhost
redis_porti  redis_dbr   redis_passwordF)hostportdbpasswordmax_connectionssocket_timeoutretry_on_timeoutsocket_connect_timeoutredis_usernameusernamehealth_check_intervalsocket_keepaliveredis_backend_use_sslconnection_classMISSING)r   r   r   requiredoptionalnonessl_cert_reqs)r4   r4   r4   )&r!   r"   intappconfgetredisr   E_REDIS_MISSINGstripr   _ConnectionPoolfloat
connparamsupdateconnection_class_ssl_params_from_url
issubclassSSLConnectionr   r   r   values
ValueError%E_REDIS_SSL_CERT_REQS_MISSING_INVALIDr<   r=   W_REDIS_SSL_CERT_OPTIONALW_REDIS_SSL_CERT_NONEurlr   r*   channel_errorsr   accept_pending_results_pending_messagesresult_consumer)r/   r   r   r   r   r   r   r7   r1   _getr   r   r   r   r   r   sslssl_cert_reqs_missingssl_string_to_constantr   r2   r4   r5   r"      s   

















zRedisBackend.__init__c                    sx  t |\}}}}}}t|fi t||||dd d |dkrA | jjd| d  dd   dd   d n| d	< g d
}	|dkrgt fdd|	D sctfdd|	D rgtt	|dkrtj
 d< |	D ]}
|
d }|rt| |
< qr d	pd}t|tr|dn|}t| d	<  D ]\}}|tjjv rtjj| ||< q   S )Nvirtual_host)r   r   r   r   r   socket/)r   pathr   r   r   r   )ssl_ca_certsssl_certfilessl_keyfiler   r   c                 3       | ]}| v V  qd S r    r4   rA   rn   )r   r4   r5   	<genexpr>N      z0RedisBackend._params_from_url.<locals>.<genexpr>c                 3   r   r    r4   r   )queryr4   r5   r   O  r   redissr   r   )r
   dictr   popr   r   UnixDomainSocketConnectionanyr   &E_REDIS_SSL_PARAMS_AND_SCHEME_MISMATCHr   r   r   
isinstancer>   r   r   itemsrL   URL_QUERY_ARGUMENT_PARSERS)r/   r   defaultsschemer   r   r   r   r   ssl_param_keysssl_settingssl_valr   rn   valuer4   )r   r   r5   r   3  sT   


zRedisBackend._params_from_urlc                    s.   t  j}d| jv r| }|| jd  |S )Nretry_policy)r!   r   _transport_optionscopyr   )r/   r   r2   r4   r5   r   j  s
   
zRedisBackend.retry_policyc                 C   s   t  s| j| d S d S r    )r   r   rk   )r/   producerrW   r4   r4   r5   on_task_calls  s   zRedisBackend.on_task_callc                 C      | j |S r    )r6   r   r/   rn   r4   r4   r5   r   w     zRedisBackend.getc                 C   r   r    )r6   rG   )r/   keysr4   r4   r5   rG   z  r   zRedisBackend.mgetc                 K   s>   t | jfi |}|d}t|| j|i t| j|fi |S )Nmax_retries)r   r   r   r   r*   r   on_connection_error)r/   funr0   policyr   r   r4   r4   r5   r(   }  s   


zRedisBackend.ensurec                 C   s*   t |}tt ||pdt|d |S )NInfzin )nextr<   errorE_LOSTr   r   )r/   r   exc	intervalsretriesttsr4   r4   r5   r     s   z RedisBackend.on_connection_errorc                 K   s:   t |trt|| jkrtd| j| j||ffi |S )Nz!value too large for Redis backend)r   r>   len_MAX_STR_VALUE_SIZEr   r(   _set)r/   rn   r   r   r4   r4   r5   r,     s   zRedisBackend.setc                 C   sh   | j  %}| jr||| j| n||| ||| |  W d    d S 1 s-w   Y  d S r    )r6   pipelineexpiressetexr,   publishexecute)r/   rn   r   piper4   r4   r5   r     s   
"zRedisBackend._setc                    s   t  | | j| d S r    )r!   forgetr   rY   rj   r2   r4   r5   r     s   zRedisBackend.forgetc                 C   s   | j | d S r    )r6   deleter   r4   r4   r5   r        zRedisBackend.deletec                 C   r   r    )r6   incrr   r4   r4   r5   r     r   zRedisBackend.incrc                 C   s   | j ||S r    )r6   expire)r/   rn   r   r4   r4   r5   r     s   zRedisBackend.expirec                 C   s   | j | |dd d S )N.tr   )r6   r   get_key_for_group)r/   group_idr`   r4   r4   r5   add_to_chord  s   zRedisBackend.add_to_chordc           	      C   sB   ||\}}}}||v r|  |}||v rtd| d||S )NzDependency z raised )exception_to_pythonr   )	r/   tupdecodeEXCEPTION_STATESPROPAGATE_STATES_tidstateretvalr4   r4   r5   _unpack_chord_result  s   
z!RedisBackend._unpack_chord_resultc                 C   s   |  | |d| d S )N.s)r,   r   )r/   r   
chord_sizer4   r4   r5   set_chord_size  s   zRedisBackend.set_chord_sizec                 K   sF   t |d ts| jj| }tdd |jD r!|j| d d S d S d S )Nr   c                 s   s    | ]}t |tV  qd S r    )r   r   )rA   nrr4   r4   r5   r     s    z+RedisBackend.apply_chord.<locals>.<genexpr>)r#   )r   r   r   r   r   resultssave)r/   header_result_argsbodyr1   header_resultr4   r4   r5   apply_chord  s   zRedisBackend.apply_chordc                 C   s   | j ddS )Nresult_chord_orderedT)r   r   rT   r4   r4   r5   _chord_zset  s   zRedisBackend._chord_zsetc                 C   s   | j jdi S )N result_backend_transport_options)r   r   r   rT   r4   r4   r5   r     s   zRedisBackend._transport_optionsc                    s  | j }|j|j|j}}}	|r|sd S |	d u rd}	| j}
| |d}| |d}| |d}| ||}| d|||g}|
 G}| j	rS|
|||	i|ddn||||||}| jrv||| j|| j|| j}| d d \}}}}W d    n1 sw   Y  t|pd}|rz#t|j|d	}t|| }||krt|}|d ur|  |jr|jn|j}t  ||jjd
d}W d    n1 sw   Y  n=| j| j |
 }| j	r| |dd}n|!|d|}| \}W d    n	1 sw   Y   fdd|D }z{z|"| W nK t#yp } z>t$%d|j| | &|t'd|W  Y d }~W |
 }|(|(|(|  W d    W S 1 sew   Y  W S d }~ww W |
 }|(|(|(|  W d    W d S 1 sw   Y  W d S |
 }|(|(|(|  W d    w 1 sw   Y  w W d S  t'y } zt$%d|j| | &||W  Y d }~S d }~w t#y } zt$%d|j| | &|t'd|W  Y d }~S d }~ww d S )Nz+infz.jr   r   r   z-inf   r   )r   T)rc   	propagatec                    s   g | ]}| qS r4   r4   )rA   r   r   unpackr4   r5   rC     s    z5RedisBackend.on_chord_part_return.<locals>.<listcomp>z Chord callback for %r raised: %rzCallback error: zChord %r raised: %rzJoin error: ))r   idgroupgroup_indexr6   r   encode_resultencoder   r	  zaddzcountrpushllenr   r   r   r   r   r   chordr   restoreon_readysupports_native_joinjoin_nativejoinr   r   result_chord_join_timeoutr   r   zrangelrangedelay	Exceptionr<   	exceptionchord_error_from_stackr   r   )r/   requestr   r`   r  r1   r   r   gidr  r6   jkeytkeyskeyencodedr   r   r   
readycount	totaldiffchord_size_bytescallbacktotalr  	join_funcreslr   r4   r  r5   on_chord_part_return  s   





	

$
	2 2z!RedisBackend.on_chord_part_returnc                 K   s   |   | jdi |dS )N)r7   r4   )_get_client	_get_poolr/   paramsr4   r4   r5   _create_client+  s   zRedisBackend._create_clientc                 C   s   | j jS r    )r   StrictRedisrT   r4   r4   r5   r4  0  s   zRedisBackend._get_clientc                 K   s   | j di |S rQ   )ConnectionPoolr6  r4   r4   r5   r5  3  r   zRedisBackend._get_poolc                 C   s   | j d u r
| jj| _ | j S r    )r   r   r:  rT   r4   r4   r5   r:  6  s   

zRedisBackend.ConnectionPoolc                 C   s   | j di | jS rQ   )r8  r   rT   r4   r4   r5   r6   <  s   zRedisBackend.clientr4   c                    s(   |si n|}t  |t|| j| jdS )N)r   r   )r!   
__reduce__r   r   r   r.   r2   r4   r5   r;  @  s   zRedisBackend.__reduce__)NNNNNNNr    )r4   N)-rq   rr   rs   __doc__r   r   r   r   r   supports_autoexpirer  r   r"   r   r	   r   r   r   rG   r(   r   r,   r   r   r   r   r   r   r   r   r   r   r   r  r	  r   r3  r8  r4  r5  propertyr:  r6   r;  rt   r4   r4   r2   r5   r      s^    a7	




_

r   sentinelc                   @   s   e Zd ZdZdS )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        N)rq   rr   rs   r<  r4   r4   r4   r5   r@  G  s    r@  c                       sf   e Zd ZdZdZeeddZerendZ	 fddZ
d fdd		Z fd
dZdd Zdd Z  ZS )r   z!Redis sentinel task result store.;r?  Nc                    s,   | j d u rtt t j|i | d S r    )r?  r   E_REDIS_SENTINEL_MISSINGr   r!   r"   r.   r2   r4   r5   r"   [  s   
zSentinelBackend.__init__Fc                    sD   |r	t  j|dS dd | jpd| jD }| jdd |D S )zDReturn the server addresses as URIs, sanitizing the password or not.)include_passwordc                 s   s    | ]}t |V  qd S r    )r   )rA   chunkr4   r4   r5   r   j  s
    
z)SentinelBackend.as_uri.<locals>.<genexpr> c                 s   s*    | ]}| d r|dd n|V  qdS )z:///Nr  )endswith)rA   urir4   r4   r5   r   p  s
    
)r!   as_urir   split_SERVER_URI_SEPARATORr  )r/   rC  
uri_chunksr2   r4   r5   rH  a  s   zSentinelBackend.as_uric                    s   | | j}t|g d}|D ]}t j||d}|d | qdD ]}|| q"dD ]}|d rE||d d v rE|d d |||< q,|S )N)hosts)r   r   rL  )r   r   r   r   )r   r   r   )rI  rJ  r   r!   r   appendr   r   )r/   r   r   chunksr   rD  re   paramr2   r4   r5   r   u  s   z SentinelBackend._params_from_urlc                 K   sV   |  }|d}| jdd}| jdi }| jjdd |D f||d|}|S )NrL  min_other_sentinelsr   sentinel_kwargsc                 S   s   g | ]
}|d  |d fqS )r   r   r4   )rA   cpr4   r4   r5   rC     s    z:SentinelBackend._get_sentinel_instance.<locals>.<listcomp>)rP  rQ  )r   r   r   r   r?  Sentinel)r/   r7  r   rL  rP  rQ  sentinel_instancer4   r4   r5   _get_sentinel_instance  s   
z&SentinelBackend._get_sentinel_instancec                 K   s2   | j di |}| jdd }|j||  djS )Nmaster_name)service_nameredis_classr4   )rU  r   r   
master_forr4  r7   )r/   r7  rT  rV  r4   r4   r5   r5    s   zSentinelBackend._get_pool)F)rq   rr   rs   r<  rJ  getattrr   r?  r@  r   r"   rH  r   rU  r5  rt   r4   r4   r2   r5   r   R  s    r   )Er<  rg   
contextlibr   	functoolsr   r   r   r   r   urllib.parser   kombu.utils.functionalr   kombu.utils.objectsr	   kombu.utils.urlr
   r   celeryr   celery._stater   celery.canvasr   celery.exceptionsr   r   r   celery.resultr   r   celery.utils.functionalr   r   celery.utils.logr   celery.utils.timer   asynchronousr   r   baser   redis.connectionr   kombu.transport.redisr   ImportErrorredis.sentinel__all__r   rB  r   r   r   r   r   rS   rq   r<   r   r   rZ  r?  SentinelManagedConnectionr   r@  r   r4   r4   r4   r5   <module>   sh    k   
