o
    DfP                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ ddlm Z m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z) zddl*Z*W n e+y   dZ*Y nw zddl*m,Z, W n e+y   dZ,Y nw edZ-e-j.e-j/Z0Z/dZ1dZ2dZ3g dZ4eddZ5dd Z6dd  Z7G d!d" d"e8Z9e
d#d$ Z:d%d& Z;G d'd( d(Z<G d)d* d*e<e*j=Z>G d+d, d,e<e*j?j@ZAG d-d. d.e*j?jBZCG d/d0 d0e)jDZDG d1d2 d2ZEG d3d4 d4e)jFZFG d5d6 d6e)jGZGe,r2G d7d8 d8e,jHe*jIZJG d9d: d:eFZKG d;d< d<eGZLdS )=a  Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
    )annotationsN)bisect)
namedtuple)contextmanager)Empty)time)promise)InconsistencyErrorVersionMismatch)
get_logger)register_after_fork)bytes_to_str)ERRREADpoll)accepts_argument)dumpsloads)cached_property)cycle_by_name)
_parse_url   )virtual)sentinelzkombu.transport.redisi     )r         	   error_classes_t)connection_errorschannel_errorsc               
   C  sb   ddl m}  t| dr| j}n| j}ttjjt	t
jtt| j| j| j| jf tjj|| j| jf S )z$Return tuple of redis error classes.r   
exceptionsInvalidData)redisr"   hasattrr#   	DataErrorr   r   	Transportr   r	   socketerrorIOErrorOSErrorConnectionErrorBusyLoadingErrorAuthenticationErrorTimeoutErrorr    InvalidResponseResponseError)r"   r&    r2   N/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/redis.pyget_redis_error_classesy   s*   
	r4   c                  C  s   ddl m}  | jS )z1Return the redis ConnectionError exception class.r   r!   )r$   r"   r,   r!   r2   r2   r3   get_redis_ConnectionError   s   r5   c                   @     e Zd ZdZdS )	MutexHeldz)Raised when another party holds the lock.N__name__
__module____qualname____doc__r2   r2   r2   r3   r7      s    r7   c                 c  s    | j ||d}d}z(|jdd}|rdV  nt W |r1z|  W dS  tjjy0   Y dS w dS |rGz|  W w  tjjyF   Y w w w )zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    timeoutF)blockingN)lockacquirer7   releaser$   r"   LockNotOwnedError)clientnameexpirer@   lock_acquiredr2   r2   r3   Mutex   s,   rH   c                 C     |    d S N)_after_forkchannelr2   r2   r3   _after_fork_cleanup_channel      rN   c                      sl   e Zd ZdZg dZddddddddddddd	Zd
d Z fddZ fddZdddZ	  Z
S )GlobalKeyPrefixMixina  Mixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    )HDELHGETHLENHSETLLENLPUSHPUBLISHRPUSHRPOPSADDSREMSETSMEMBERSZADDZREMZREVRANGEBYSCOREr   N)
args_startargs_end   r   )DELBRPOPEVALSHAWATCHc                   s   t |}|d}| jv r jt|d  |d< n<| jv rV j| d } j| d }|dkr7|d | ng }g }|d urE||d  }| fdd||| D  | }|g|S )Nr   ra   rb   c                      g | ]	} j t| qS r2   global_keyprefixstr.0argselfr2   r3   
<listcomp>       z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)listpopPREFIXED_SIMPLE_COMMANDSrk   rl   PREFIXED_COMPLEX_COMMANDS)rq   argscommandra   rb   pre_args	post_argsr2   rp   r3   _prefix_args   s"   




z!GlobalKeyPrefixMixin._prefix_argsc                   sH   t  j||fi |}|dkr"|r"|\}}|t| jd }||fS |S )zParse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rf   N)superparse_responselenrk   )rq   
connectioncommand_nameoptionsretkeyvalue	__class__r2   r3   r~      s   z#GlobalKeyPrefixMixin.parse_responsec                      t  j| |i |S rJ   r}   execute_commandr|   rq   rx   kwargsr   r2   r3   r         z$GlobalKeyPrefixMixin.execute_commandTc                 C  s   t | j| j||| jdS )Nrk   )PrefixedRedisPipelineconnection_poolresponse_callbacksrk   )rq   transaction
shard_hintr2   r2   r3   pipeline  s   zGlobalKeyPrefixMixin.pipeline)TN)r9   r:   r;   r<   rv   rw   r|   r~   r   r   __classcell__r2   r2   r   r3   rP      s    rP   c                   @  s    e Zd ZdZdd Zdd ZdS )PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.c                 O  s,   | dd| _tjj| g|R i | d S Nrk    )ru   rk   r$   Redis__init__r   r2   r2   r3   r     s   zPrefixedStrictRedis.__init__c                 K  s   t | jfd| ji|S )Nrk   )PrefixedRedisPubSubr   rk   )rq   r   r2   r2   r3   pubsub  s   zPrefixedStrictRedis.pubsubN)r9   r:   r;   r<   r   r   r2   r2   r2   r3   r     s    r   c                   @  s   e Zd ZdZdd ZdS )r   a   Custom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    c                 O  s.   | dd| _tjjj| g|R i | d S r   )ru   rk   r$   rD   Pipeliner   r   r2   r2   r3   r   "  s    zPrefixedRedisPipeline.__init__N)r9   r:   r;   r<   r   r2   r2   r2   r3   r     s    r   c                      sD   e Zd ZdZdZ fddZdd Z fddZ fd	d
Z  Z	S )r   zCRedis pubsub client that takes global_keyprefix into consideration.)	SUBSCRIBEUNSUBSCRIBE
PSUBSCRIBEPUNSUBSCRIBEc                   s$   | dd| _t j|i | d S r   )ru   rk   r}   r   r   r   r2   r3   r   1  s   zPrefixedRedisPubSub.__init__c                   s8   t |}|d}| jv r fdd|D }|g|S )Nr   c                   ri   r2   rj   rm   rp   r2   r3   rr   :  rs   z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rt   ru   PUBSUB_COMMANDS)rq   rx   ry   r2   rp   r3   r|   5  s   



z PrefixedRedisPubSub._prefix_argsc                   sF   t  j|i |}|du r|S |^}}}|g fdd|D |S )zParse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Nc                   s   g | ]}|t  jd  qS rJ   )r   rk   )rn   rM   rp   r2   r3   rr   S  s    z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)r}   r~   )rq   rx   r   r   message_typechannelsmessager   rp   r3   r~   A  s   z"PrefixedRedisPubSub.parse_responsec                   r   rJ   r   r   r   r2   r3   r   W  r   z#PrefixedRedisPubSub.execute_command)
r9   r:   r;   r<   r   r   r|   r~   r   r   r2   r2   r   r3   r   '  s    r   c                      s   e Zd ZdZdZ fddZ fddZd#dd	Z fd
dZd$ fdd	Z	e
d%ddZd#ddZd&ddZd'ddZedd Zedd Zedd Zedd  Zed!d" Z  ZS )(QoSzRedis Ack Emulation.Tc                   s   t  j|i | d| _d S )Nr   )r}   r   _vrestore_countr   r   r2   r3   r   `  s   
zQoS.__init__c              	     s   |j }|d |d }}tjd dkr|t ig}nt |g}|  (}|j| jg|R  | j|t	|j
||g  t || W d    d S 1 sNw   Y  d S )Nexchangerouting_keyr   r   )delivery_infor$   VERSIONr   pipe_or_acquirezaddunacked_index_keyhsetunacked_keyr   _rawexecuter}   append)rq   r   delivery_tagdeliveryEXRK	zadd_argspiper   r2   r3   r   d  s   

"z
QoS.appendNc                 C  sT   | j |}| jD ]	}| j||d q
W d    n1 sw   Y  | j  d S )NrD   )rM   conn_or_acquire
_deliveredrestore_by_tagclear)rq   rD   tagr2   r2   r3   restore_unackedu  s   
zQoS.restore_unackedc                   s   |  |  t | d S rJ   )_remove_from_indicesr   r}   ack)rq   r   r   r2   r3   r   {  s   zQoS.ackFc                   s2   |r
| j |dd n| |  t | d S NT)leftmost)r   r   r   r}   r   )rq   r   requeuer   r2   r3   reject  s   z
QoS.rejectc                 c  sL    |r|V  d S | j |}| V  W d    d S 1 sw   Y  d S rJ   )rM   r   r   )rq   r   rD   r2   r2   r3   r     s   
"zQoS.pipe_or_acquirec                 C  sF   |  |}|| j|| j|W  d    S 1 sw   Y  d S rJ   )r   zremr   hdelr   )rq   r   r   r2   r2   r3   r     s   
$zQoS._remove_from_indicesr   
   c           	   
   C  s   |  j d7  _ | j d | rd S | j X}t | j }z7t|| j| j% |j| j	|d|o/||dd}|p7g D ]
\}}| 
|| q8W d    n1 sMw   Y  W n	 ty\   Y n	w W d    d S W d    d S 1 spw   Y  d S )Nr   r   T)startnum
withscores)r   rM   r   r   visibility_timeoutrH   unacked_mutex_keyunacked_mutex_expirezrevrangebyscorer   r   r7   )	rq   r   r   intervalrD   ceilvisibler   scorer2   r2   r3   restore_visible  s2   
"zQoS.restore_visiblec                   sP    fdd}j |}||j W d    d S 1 s!w   Y  d S )Nc                   sT   |  j}|   |  |r(tt|\}}}j||||   d S d S rJ   )hgetr   multir   r   r   rM   _do_restore_message)r   pMr   r   r   rq   r   r2   r3   restore_transaction  s   z/QoS.restore_by_tag.<locals>.restore_transaction)rM   r   r   r   )rq   r   rD   r   r   r2   r   r3   r     s   "zQoS.restore_by_tagc                 C     | j jS rJ   )rM   r   rp   r2   r2   r3   r        zQoS.unacked_keyc                 C  r   rJ   )rM   r   rp   r2   r2   r3   r     r   zQoS.unacked_index_keyc                 C  r   rJ   )rM   r   rp   r2   r2   r3   r     r   zQoS.unacked_mutex_keyc                 C  r   rJ   )rM   r   rp   r2   r2   r3   r     r   zQoS.unacked_mutex_expirec                 C  r   rJ   )rM   r   rp   r2   r2   r3   r     r   zQoS.visibility_timeoutrJ   FNN)r   r   r   )NF)r9   r:   r;   r<   restore_at_shutdownr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r2   r2   r   r3   r   [  s.    







r   c                   @  s   e Zd ZdZeeB ZdZdZdd Z	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd(d$d%Zed&d' ZdS ))MultiChannelPollerz%Async I/O poller for Redis transport.FNc                 C  s(   t  | _i | _i | _t | _t  | _d S rJ   )set	_channels_fd_to_chan_chan_to_sockr   poller
after_readrp   r2   r2   r3   r     s
   zMultiChannelPoller.__init__c              
   C  sX   | j  D ]}z| j| W q ttfy   Y qw | j  | j  | j   d S rJ   )	r   valuesr   
unregisterKeyError
ValueErrorr   r   r   )rq   fdr2   r2   r3   close  s   

zMultiChannelPoller.closec                 C     | j | d S rJ   )r   addrq   rM   r2   r2   r3   r        zMultiChannelPoller.addc                 C  r   rJ   )r   discardr   r2   r2   r3   r     r   zMultiChannelPoller.discardc              	   C  s.   z
| j |j W d S  ttfy   Y d S w rJ   )r   r   _sockAttributeError	TypeErrorrq   r   r2   r2   r3   _on_connection_disconnect  s
   z,MultiChannelPoller._on_connection_disconnectc                 C  sr   |||f| j v r| ||| |jjd u r|j  |jj}||f| j| < || j |||f< | j|| j	 d S rJ   )
r   _unregisterr   r   connectr   filenor   register
eventflags)rq   rM   rD   typesockr2   r2   r3   	_register  s   
zMultiChannelPoller._registerc                 C  s   | j | j|||f  d S rJ   )r   r   r   )rq   rM   rD   r   r2   r2   r3   r     s   zMultiChannelPoller._unregisterc                 C  s:   t |dd d u r|jd|_|jjd uo|||f| jv S )Nr   _)getattrr   get_connectionr   r   r   )rq   rM   rD   cmdr2   r2   r3   _client_registered   s
   z%MultiChannelPoller._client_registeredc                 C  sB   ||j df}| ||j dsd|_| j|  |js|  dS dS )zEnable BRPOP mode for channel.rf   FN)rD   r  _in_pollr   _brpop_start)rq   rM   identr2   r2   r3   _register_BRPOP  s   
z"MultiChannelPoller._register_BRPOPc                 C  s<   |  ||jdsd|_| ||jd |js|  dS dS )zEnable LISTEN mode for channel.LISTENFN)r  	subclient
_in_listenr   
_subscriber   r2   r2   r3   _register_LISTEN  s   z#MultiChannelPoller._register_LISTENc                 C  s:   | j D ]}|jr|j r| | |jr| | qd S rJ   )r   active_queuesqoscan_consumer	  active_fanout_queuesr  r   r2   r2   r3   on_poll_start  s   



z MultiChannelPoller.on_poll_startc                 C  s(   || _ | jD ]}|jj|jd  S d S N)r   )r   r   r  r   unacked_restore_limit)rq   r   rM   r2   r2   r3   on_poll_init  s   

zMultiChannelPoller.on_poll_initc                 C  s*   | j D ]}|jr|jj|jd  S qd S r  )r   r  r  r   r  r   r2   r2   r3   maybe_restore_messages&  s   

z)MultiChannelPoller.maybe_restore_messagesc                 C  s<   | j D ]}|jd}|d urtt|dd r|  qd S )Nr  check_health)r   __dict__getcallabler  r  )rq   rM   rD   r2   r2   r3   maybe_check_subclient_health.  s   
z/MultiChannelPoller.maybe_check_subclient_healthc                 C  s,   | j | \}}|j r|j|   d S d S rJ   )r   r  r  handlers)rq   r   chanr   r2   r2   r3   on_readable6  s   
zMultiChannelPoller.on_readablec                 C  s>   |t @ r| || fS |t@ r| j| \}}|| d S d S rJ   )r   r  r   r   _poll_error)rq   r   eventr  r   r2   r2   r3   handle_event;  s   zMultiChannelPoller.handle_eventc           	      C  s   d| _ z]| jD ]}|jr|j r| | |jr| | q| j	|}|rZ|D ]0\}}| 
||}|rY W d| _ | jrWz| j }W n
 tyN   Y d S w |  | js=d S d S q)|   t d| _ | jr~z| j }W n	 tyw   Y w w |  | jsgw )NTF)_in_protected_readr   r  r  r  r	  r  r  r   r   r"  r   ru   r   r  r   )	rq   callbackr>   rM   eventsr   r!  r   funr2   r2   r3   r  B  sJ   



zMultiChannelPoller.getc                 C  s   | j S rJ   )r   rp   r2   r2   r3   fds`  s   zMultiChannelPoller.fdsrJ   )r9   r:   r;   r<   r   r   r   r#  r   r   r   r   r   r   r   r   r  r	  r  r  r  r  r  r  r"  r  propertyr'  r2   r2   r2   r3   r     s0    

	
r   c                      s  e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZi ZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZdZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(d Z(e)rQe)j*ndZ+e)rXe)j,ndZ- fddZ.dd Z/dd Z0dd Z1	drddZ2dr fdd	Z3dd Z4 fdd Z5d!d" Z6 fd#d$Z7d%d& Z8d'd( Z9d)d* Z:d+d, Z;d-d. Z<d/d0 Z=d1d2 Z>dsd4d5Z?d6d7 Z@d8d9 ZAd:d; ZBd<d= ZCd>d? ZDd@dA ZEdBdC ZFdDdE ZGdrdFdGZHdHdI ZIdJdK ZJdLdM ZKdNdO ZLdPdQ ZM fdRdSZNdTdU ZOdVdW ZP		dtdXdYZQdrdZd[ZRdrd\d]ZSdrd^d_ZTd`da ZUeVdudbdcZWeXddde ZYeXdfdg ZZe[dhdi Z\e[djdk Z]dldm Z^dndo Z_eXdpdq Z`  ZaS )vChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zunackedunacked_indexunacked_mutexi,  i  r   r   round_robin)sepack_emulationr   r   r   r   r   r  fanout_prefixfanout_patternsrk   socket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsqueue_order_strategymax_connectionshealth_check_intervalretry_on_timeoutpriority_stepsc                   s   t  j|i | | jstj| _t| j | _|  | _	| 
 | _t | _t | _i | _| j| jd| _| jrBt| jtrA| j| _nd| _z| j  W n tyX   |    w | jj|  | jj| _td urpt| t  d S d S )N)rf   r
  r   )!r}   r   r/  r   r   r   r6  _queue_cycle_get_clientClient_get_response_errorr1   r   r  auto_delete_queues_fanout_to_queue_brpop_read_receiver  r0  
isinstancerl   keyprefix_fanoutrD   ping	Exception_disconnect_poolsr   cycler   r   r   rN   r   r   r2   r3   r     s4   


zChannel.__init__c                 C  rI   rJ   )rG  rp   r2   r2   r3   rK     rO   zChannel._after_forkc                 C  s@   | j }| j}d  | _| _ |d ur|  |d ur|  d S d S rJ   )_pool_async_pool
disconnect)rq   pool
async_poolr2   r2   r3   rG    s   zChannel._disconnect_poolsc                 C  sH   | j |u rd | _ | j|u rd | _| jr | jjr"| jj| d S d S d S rJ   )r  r  r   rH  r   r   r2   r2   r3   r     s   

z!Channel._on_connection_disconnectc                 C  s   z3zd|d d< d|d d d< W n	 t y   Y nw | ||D ]}|r(|jn|j|t| q!W d S  tyD   td|dd Y d S w )NTheadersredelivered
propertiesr   zCould not restore message: %rexc_info)r   _lookuplpushrpushr   rF  crit)rq   payloadr   r   r   r   queuer2   r2   r3   r     s   zChannel._do_restore_messagec                   sd   j s	t |S |j fdd} }||j W d    d S 1 s+w   Y  d S )Nc                   sT   |  j}|   | j |r(tt|\}}}||||   d S d S rJ   )r   r   r   r   r   r   r   )r   Pr   r   r   r   r2   r3   r      s   z-Channel._restore.<locals>.restore_transaction)r/  r}   _restorer   r   r   r   )rq   r   r   r   rD   r   r   r3   rZ    s   
"zChannel._restorec                 C  s   | j |ddS r   )rZ  )rq   r   r2   r2   r3   _restore_at_beginning+  s   zChannel._restore_at_beginningc                   sT   || j v r| j | \}}| j| || j|< t j|g|R i |}|   |S rJ   )_fanout_queuesr  r   r@  r}   basic_consume_update_queue_cycle)rq   rX  rx   r   r   r  r   r   r2   r3   r]  .  s   

zChannel.basic_consumec                 C  s8   | j }|r|jjr|jjt| j|fS | |S d S rJ   )r   rH  r#  r   r   r   _basic_cancel)rq   consumer_tagr   r2   r2   r3   basic_cancelB  s   
zChannel.basic_cancelc                   s   z| j | }W n
 ty   Y d S w z| j| W n	 ty#   Y nw | | z| j| \}}| j| W n	 tyA   Y nw t 	|}| 
  |S rJ   )_tag_to_queuer   r  remove_unsubscribe_fromr\  r@  ru   r}   ra  r^  )rq   r`  rX  r   r  r   r   r2   r3   r_  O  s(   
zChannel._basic_cancelc                 C  s.   |r| j rd| j|d|gS d| j|gS )Nr   /)r1  joinrD  )rq   r   r   r2   r2   r3   _get_publish_topicc  s   
zChannel._get_publish_topicc                 C  s   | j | \}}| ||S rJ   )r\  rg  )rq   rX  r   r   r2   r2   r3   _get_subscribe_topich  s   zChannel._get_subscribe_topicc                   sN    fdd j D }|sd S  j}|jjd u r|j  |j _|| d S )Nc                   s   g | ]}  |qS r2   )rh  rn   rX  rp   r2   r3   rr   m  s    z&Channel._subscribe.<locals>.<listcomp>)r  r  r   r   r   r  
psubscribe)rq   keyscr2   rp   r3   r  l  s   

zChannel._subscribec                 C  s6   |  |}| j}|jr|jjr||g d S d S d S rJ   )rh  r  r   r   unsubscribe)rq   rX  topicrl  r2   r2   r3   rd  w  s
   
zChannel._unsubscribe_fromc                 C  s   t |d dkr|d dkrd|_d S t |d dkr.|d |d |d |d f\}}}}n|d d |d |d f\}}}}||||dS )	Nr   rm  rd   Fpmessager   r   )r   patternrM   data)r   
subscribed)rq   rD   rr   rp  rM   rq  r2   r2   r3   _handle_message}  s   & zChannel._handle_messagec                 C  sz   | j }g }z
|| | W n	 ty   Y nw |jd ur9|jjddr9|| | |jd ur9|jjdds%t|S )Nr   r=   )r  r   _receive_oner   r   can_readany)rq   rl  r   r2   r2   r3   rB    s   zChannel._receivec              	   C  s  d }z|  }W n | jy   d | _ w t|ttfr|| ||}t|d dr~t|d }|d r|d dkrC|	d\}}}z
t
t|d }W n ttfyg   td|t|d d	 d
d t w |dd
d }| j|| j|  dS d S d S d S )Nr   r   rM   rq  r   re  .z&Cannot process event on channel %r: %si   r   rQ  T)r~   r   r  rC  rt   tuplert  r   endswith	partitionr   r   r   warnreprr   splitr   _deliverr@  )rq   rl  responserW  rM   r  r   r   r2   r2   r3   ru    s<   
zChannel._receive_oner   c                   sr   j tj  sd S  fddjD |pdg }jj_dg|}jr0j	|}jjj
|  d S )Nc                   s"   g | ]} D ]} ||qqS r2   )
_q_for_pri)rn   prirX  queuesrq   r2   r3   rr     s
    z(Channel._brpop_start.<locals>.<listcomp>r   rf   )r;  consumer   r  r:  rD   r   r  rk   r|   send_command)rq   r>   rk  command_argsr2   r  r3   r    s   

zChannel._brpop_startc                 K  s   zJz| j j| j jdfi |}W n | jy   | j j   w |rH|\}}t|| jdd }| j	| | j
tt|| W d | _dS t d | _w )Nrf   r   r   T)rD   r~   r   r   rK  r   rsplitr.  r;  rotater  r   r  r   )rq   r   
dest__itemdestitemr2   r2   r3   rA    s(   

zChannel._brpop_readc                 K  s,   |dkr| j   d S | j| jj| d S )Nr
  )r  r~   rD   r   )rq   r   r   r2   r2   r3   r     s   zChannel._poll_errorc                 C  sd   |   $}| jD ]}|| ||}|r$tt|  W  d    S qt 1 s+w   Y  d S rJ   )r   r:  rpopr  r   r   r   )rq   rX  rD   r  r  r2   r2   r3   _get  s   

zChannel._getc              	   C  s   |   @}| +}| jD ]}|| ||}q| }tdd |D W  d    W  d    S 1 s7w   Y  W d    d S 1 sGw   Y  d S )Nc                 s  s     | ]}t |tjr|V  qd S rJ   )rC  numbersIntegral)rn   sizer2   r2   r3   	<genexpr>  s    
z Channel._size.<locals>.<genexpr>)r   r   r:  llenr  r   sum)rq   rX  rD   r   r  sizesr2   r2   r3   _size  s   


"zChannel._sizec                 C  s$   |  |}|r| | j | S |S rJ   )priorityr.  )rq   rX  r  r2   r2   r3   r    s   
zChannel._q_for_pric                 C  s   | j }|t||d  S )Nr   )r:  r   )rq   nstepsr2   r2   r3   r    s   zChannel.priorityc                 K  sT   | j |dd}|  }|| ||t| W d   dS 1 s#w   Y  dS )zDeliver message.F)reverseN)_get_message_priorityr   rT  r  r   )rq   rX  r   r   r  rD   r2   r2   r3   _put  s   
"zChannel._putc                 K  sF   |   }|| ||t| W d   dS 1 sw   Y  dS )zDeliver fanout message.N)r   publishrg  r   )rq   r   r   r   r   rD   r2   r2   r3   _put_fanout  s   

"zChannel._put_fanoutc                 K  s   |r
| j | d S d S rJ   )r?  r   )rq   rX  auto_deleter   r2   r2   r3   
_new_queue  s   zChannel._new_queuec              	   C  s   |  |jdkr||ddf| j|< |   }|| j|f | j|p%d|p(d|p+dg W d    d S 1 s:w   Y  d S )Nfanout#*r   )	typeofr   replacer\  r   saddkeyprefix_queuer.  rf  )rq   r   r   rp  rX  rD   r2   r2   r3   _queue_bind	  s   

"zChannel._queue_bindc           
   	   O  s   | j | | j|ddO}|| j|f | j|pd|p d|p#dg | }| j	D ]}	|
| ||	}q/|  W d    n1 sIw   Y  W d    d S W d    d S 1 saw   Y  d S )NrD   r   r   )r?  r   r   r  sremr  r.  rf  r   r:  deleter  r   )
rq   rX  r   r   rp  rx   r   rD   r   r  r2   r2   r3   _delete  s    


"zChannel._deletec              	   K  s   |   9}| $}| jD ]}|| ||}qt| W  d    W  d    S 1 s0w   Y  W d    d S 1 s@w   Y  d S rJ   )r   r   r:  existsr  rw  r   )rq   rX  r   rD   r   r  r2   r2   r3   
_has_queue!  s   



"zChannel._has_queuec                   sh    j | }  !}||}|sg W  d    S  fdd|D W  d    S 1 s-w   Y  d S )Nc                   s    g | ]}t t| jqS r2   )ry  r   r~  r.  )rn   valrp   r2   r3   rr   0  s     z%Channel.get_table.<locals>.<listcomp>)r  r   smembers)rq   r   r   rD   r   r2   rp   r3   	get_table(  s   


$zChannel.get_tablec              	   C  s   |   E}| 0}| jD ]}| ||}|||}q| }t|d d d W  d    W  d    S 1 s<w   Y  W d    d S 1 sLw   Y  d S )Nrd   )r   r   r:  r  r  r  r   r  )rq   rX  rD   r   r  priqr  r2   r2   r3   _purge2  s   


"zChannel._purgec                   s   d| _ | jrz|   W n	 ty   Y nw | jsD| jj|  | j	d}|d ur<| j
D ]}|| jv r;| j||d q-|   |   t   d S )NTrD   r   )_closingr  rA  r   closedr   rH  r   r  r  r\  r?  queue_deleterG  _close_clientsr}   r   )rq   rD   rX  r   r2   r3   r   ;  s$   

zChannel.closec                 C  sL   dD ]!}z| j | }|jd }|_|  W q tt| jfy#   Y qw d S )N)rD   r  )r  r   rK  r   r   r1   )rq   attrrD   r   r2   r2   r3   r  P  s   
zChannel._close_clientsc                 C  sd   t |tjs0|r|dkrt}n|dr|dd  }zt|}W |S  ty/   td|w |S )Nre  r   z/Database is int between 0 and limit - 1, not {})rC  r  r  
DEFAULT_DB
startswithintr   format)rq   vhostr2   r2   r3   _prepare_virtual_hostZ  s    

zChannel._prepare_virtual_hostc                 K  s   |S rJ   r2   )rq   r4  r5  paramsr2   r2   r3   _filter_tcp_connparamsi  s   zChannel._filter_tcp_connparamsc                   s  | j j}|jpd|jp| j j|j|j|j| j| j	| j
| j| j| j| jd}| j}t|drN|g}t|dr<|t|j7 }|D ]
}t|jdrH nq>|d |jrhz||j | j|d< W n	 tyg   Y nw |d }d|v rt|\}}	}	}
}}}|d	kr| jdi |}|jtjd
| dfi | |dd  |dd  |dd  |
|d< ||d< |dd  |dd  | |dd |d< |  |dp| j}|rG  fddd|}|}||d< |S )Nz	127.0.0.1)hostportvirtual_hostusernamepasswordr7  r2  r3  r4  r5  r8  r9  r   	__bases__r8  connection_classr  z://r(   re  )r  pathr3  r4  r5  r  r  r  r  dbc                      s   e Zd Z fddZ  ZS )z'Channel._connparams.<locals>.Connectionc                   s   t  j|  |  d S rJ   )r}   rK  r   )rq   rx   )r   rM   r2   r3   rK    s   z2Channel._connparams.<locals>.Connection.disconnect)r9   r:   r;   rK  r   r2   rL   r   r3   
Connection  s    r  r2   ) r   rD   hostnamer  default_portr  useridr  r7  r2  r3  r4  r5  r8  r9  r  r%   rt   r  r   r   ru   sslupdateconnection_class_sslr   r   r  r$   UnixDomainSocketConnectionr  r  )rq   asynchronousconninfo
connparams
conn_classclassesklassr  schemer  r  r  r  queryconnection_clsr  r2   rL   r3   _connparamsm  sz   




zChannel._connparamsc                 C  s    |r	| j | jdS | j | jdS )N)r   )r=  rM  rL  rq   r  r2   r2   r3   _create_client  s   zChannel._create_clientc                 C  s0   | j |d}| jj|d d| _tjdi |S )Nr  r  )r  r2   )r  rD  r  r$   ConnectionPool)rq   r  r  r2   r2   r3   	_get_pool  s   zChannel._get_poolc                 C  s4   t jdk rtdt | jrtjt| jdS t jS )N)r   rd   r   zSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}r   )	r$   r   r
   r  rk   	functoolspartialr   r   rp   r2   r2   r3   r<    s   
zChannel._get_clientc                 c  s    |r|V  d S |   V  d S rJ   r  rq   rD   r2   r2   r3   r     s   
zChannel.conn_or_acquirec                 C  s   | j d u r
|  | _ | j S rJ   )rI  r  rp   r2   r2   r3   rL    s   

zChannel.poolc                 C  s   | j d u r| jdd| _ | j S )NTr  )rJ  r  rp   r2   r2   r3   rM    s   
zChannel.async_poolc                 C  s   | j ddS )z+Client used to publish messages, BRPOP etc.Tr  r  rp   r2   r2   r3   rD     s   zChannel.clientc                 C  s   | j dd}| S )z1Pub/Sub connection used to consume fanout queues.Tr  )r  r   r  r2   r2   r3   r    s   zChannel.subclientc                 C  s   | j | j d S rJ   )r;  r  r  rp   r2   r2   r3   r^    s   zChannel._update_queue_cyclec                 C  s   ddl m} |jS )Nr   r!   )r$   r"   r1   )rq   r"   r2   r2   r3   r>    s   zChannel._get_response_errorc                   s    fdd j D S )z<Set of queues being consumed from (excluding fanout queues).c                   s   h | ]	}| j vr|qS r2   )r  ri  rp   r2   r3   	<setcomp>  s    
z(Channel.active_queues.<locals>.<setcomp>)_active_queuesrp   r2   rp   r3   r    s   zChannel.active_queuesr   )r   r   rJ   )br9   r:   r;   r<   r   _client
_subclientr  supports_fanoutr  rD  r.  r  r  r\  r/  r   r   r   r   r  r   PRIORITY_STEPSr:  r2  r3  r4  r5  r9  r7  DEFAULT_HEALTH_CHECK_INTERVALr8  r0  r1  rk   r6  rJ  rI  r   r)  from_transport_optionsr$   r  r  SSLConnectionr  r   rK   rG  r   r   rZ  r[  r]  ra  r_  rg  rh  r  rd  rt  rB  ru  r  rA  r   r  r  r  r  r  r  r  r  r  r  r  r  r   r  r  r  r  r  r  r<  r   r   r(  rL  rM  r   rD   r  r^  r>  r  r   r2   r2   r   r3   r)  e  s    %	

	

	



N




r)  c                      st   e Zd ZdZeZdZeZdZdZ	e
jjjdeg ddZer$e \ZZ fddZd	d
 Zdd Zdd Z  ZS )r'   zRedis Transport.Nr$   T)directrn  r  )r  exchange_typec                   s.   t d u rtdt j|i | t | _d S )Nz)Missing redis library (pip install redis))r$   ImportErrorr}   r   r   rH  r   r   r2   r3   r     s   zTransport.__init__c                 C  s   t jS rJ   )r$   __version__rp   r2   r2   r3   driver_version  s   zTransport.driver_versionc                   s   | j j jj | jfdd}|_ fddj 	dj
 |jjdt}	|j d S )Nc                   sD   | j r	| j   jr z	j W d S  ty   Y d S w d S rJ   )r   rc  r'  on_tickr   )r   )rH  loopr  r2   r3   _on_disconnect#  s   z:Transport.register_with_event_loop.<locals>._on_disconnectc                     s       fddj D  d S )Nc                   s   g | ]} ||qS r2   r2   )rn   r   )
add_readerr  r2   r3   rr   2  s    zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r'  r2   )r  rH  cycle_poll_startr  r2   r3   r  0  s   z9Transport.register_with_event_loop.<locals>.on_poll_startr   r8  )rH  r  r   r  r  r  r   r  r   call_repeatedlyr  rD   transport_optionsr  r  r  )rq   r   r  r  r8  r2   )r  rH  r  r  r  r  r3   register_with_event_loop  s$   z"Transport.register_with_event_loopc                 C  s   | j | dS )z1Handle AIO event for one of our file descriptors.N)rH  r  )rq   r   r2   r2   r3   r  >  s   zTransport.on_readable)r9   r:   r;   r<   r)  polling_intervalDEFAULT_PORTr  driver_typedriver_namer   r'   
implementsextend	frozensetr$   r4   r   r    r   r  r  r  r   r2   r2   r   r3   r'     s"    

"r'   c                   @  r6   )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr8   r2   r2   r2   r3   r  D  s    r  c                   @  sH   e Zd ZdZejd ZerejndZere	ndZ
d	ddZd	ddZdS )
SentinelChannela  Channel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:
    -------
    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )master_namemin_other_sentinelssentinel_kwargsNFc           	      C  s   |  |}| }|dd  |dd  g }| jjjD ]}t|}|jdkr6|jp-| jj	}|
|j|f q|sD|
|d |d f tj|ft| ddt| dd d|}t| dd }|d u rftd	||tjjS )
Nr  r  r   r  r   r  )r  r  r  z1'master_name' transport option must be specified.)r  copyru   r   rD   altr   r  r  r  r   r  r   Sentinelr  r   
master_forr$   r   r   )	rq   r  r  additional_params	sentinelsurlr  sentinel_instr  r2   r2   r3   _sentinel_managed_poolv  s@   



z&SentinelChannel._sentinel_managed_poolc                 C  s
   |  |S rJ   )r  r  r2   r2   r3   r    s   
zSentinelChannel._get_poolr   )r9   r:   r;   r<   r)  r  r   SentinelManagedConnectionr  r  r  r  r  r2   r2   r2   r3   r  Q  s    

%r  c                   @  s   e Zd ZdZdZeZdS )SentinelTransportzRedis Sentinel Transport.ig  N)r9   r:   r;   r<   r  r  r)  r2   r2   r2   r3   r    s    r  )Mr<   
__future__r   r  r  r(   r   collectionsr   
contextlibr   rX  r   r   viner   kombu.exceptionsr	   r
   	kombu.logr   kombu.utils.compatr   kombu.utils.encodingr   kombu.utils.eventior   r   r   kombu.utils.functionalr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.schedulingr   kombu.utils.urlr   r   r   r$   r  r   loggercriticalr|  rV  r  r  r  r  r   r4   r5   rF  r7   rH   rN   rP   r   r   rD   r   r   PubSubr   r   r   r)  r'   r  r  r  r  r  r2   r2   r2   r3   <module>   s    5

S4k       D
N