o
    Df                     @  s  d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ erDddlmZ dZdefdefdefdefdefdZdd Zdd Zdd ZG dd dZG dd dZG dd deZede g d dd!Z!G d"d# d#Z"dS )$zBase transport interface.    )annotationsN)TYPE_CHECKING)RecoverableConnectionError)ChannelErrorConnectionError)Message)
dictfilter)cached_property)maybe_s_to_ms)TracebackType)r   
StdChannel
Management	Transportz	x-expireszx-message-ttlzx-max-lengthzx-max-length-byteszx-max-priority)expiresmessage_ttl
max_lengthmax_length_bytesmax_priorityc                 K  s2   t tdd | D }|rt| fi |S | S )a!  Convert queue arguments to RabbitMQ queue arguments.

    This is the implementation for Channel.prepare_queue_arguments
    for AMQP-based transports.  It's used by both the pyamqp and librabbitmq
    transports.

    Arguments:
        arguments (Mapping):
            User-supplied arguments (``Queue.queue_arguments``).

    Keyword Arguments:
        expires (float): Queue expiry time in seconds.
            This will be converted to ``x-expires`` in int milliseconds.
        message_ttl (float): Message TTL in seconds.
            This will be converted to ``x-message-ttl`` in int milliseconds.
        max_length (int): Max queue length (in number of messages).
            This will be converted to ``x-max-length`` int.
        max_length_bytes (int): Max queue size in bytes.
            This will be converted to ``x-max-length-bytes`` int.
        max_priority (int): Max priority steps for queue.
            This will be converted to ``x-max-priority`` int.

    Returns
    -------
        Dict: RabbitMQ compatible queue arguments.
    c                 s  s    | ]
\}}t ||V  qd S N)_to_rabbitmq_queue_argument).0keyvalue r   M/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/base.py	<genexpr>=   s
    
z.to_rabbitmq_queue_arguments.<locals>.<genexpr>)r   dictitems)	argumentsoptionspreparedr   r   r   to_rabbitmq_queue_arguments!   s   

r!   c                 C  s&   t |  \}}||d ur||fS |fS r   )RABBITMQ_QUEUE_ARGUMENTS)r   r   opttypr   r   r   r   D   s   r   c                 C  s   t d| j|S )Nz<Transport {0.__module__}.{0.__name__} does not implement {1})NotImplementedErrorformat	__class__)objmethodr   r   r   
_LeftBlankJ   s
   r*   c                   @  sN   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dddZdS )r   zStandard channel base class.Nc                 O  "   ddl m} || g|R i |S )Nr   )Consumer)kombu.messagingr,   )selfargskwargsr,   r   r   r   r,   U      zStdChannel.Consumerc                 O  r+   )Nr   )Producer)r-   r2   )r.   r/   r0   r2   r   r   r   r2   Y   r1   zStdChannel.Producerc                 C  
   t | dNget_bindingsr*   r.   r   r   r   r5   ]      
zStdChannel.get_bindingsc                 C     dS )zCallback called after RPC reply received.

        Notes
        -----
           Reply queue semantics: can be used to delete the queue
           after transient reply message received.
        Nr   )r.   queuer   r   r   after_reply_message_received`   s    z'StdChannel.after_reply_message_receivedc                 K  s   |S r   r   )r.   r   r0   r   r   r   prepare_queue_argumentsi      z"StdChannel.prepare_queue_argumentsc                 C  s   | S r   r   r7   r   r   r   	__enter__l   r=   zStdChannel.__enter__exc_typetype[BaseException] | Noneexc_valBaseException | Noneexc_tbTracebackType | NonereturnNonec                 C  s   |    d S r   )close)r.   r?   rA   rC   r   r   r   __exit__o   s   zStdChannel.__exit__)r?   r@   rA   rB   rC   rD   rE   rF   )__name__
__module____qualname____doc__no_ack_consumersr,   r2   r5   r;   r<   r>   rH   r   r   r   r   r   P   s    	r   c                   @  s    e Zd ZdZdd Zdd ZdS )r   z!AMQP Management API (incomplete).c                 C  
   || _ d S r   )	transport)r.   rO   r   r   r   __init__{   r8   zManagement.__init__c                 C  r3   r4   r6   r7   r   r   r   r5   ~   r8   zManagement.get_bindingsN)rI   rJ   rK   rL   rP   r5   r   r   r   r   r   x   s    r   c                   @  s(   e Zd ZdZdd Zdd Zdd ZdS )	
Implementsz/Helper class used to define transport features.c                 C  s"   z| | W S  t y   t|w r   )KeyErrorAttributeError)r.   r   r   r   r   __getattr__   s
   
zImplements.__getattr__c                 C  s   || |< d S r   r   )r.   r   r   r   r   r   __setattr__   s   zImplements.__setattr__c                 K  s   | j | fi |S r   )r'   )r.   r0   r   r   r   extend   s   zImplements.extendN)rI   rJ   rK   rL   rT   rU   rV   r   r   r   r   rQ      s
    rQ   F)directtopicfanoutheaders)asynchronousexchange_type
heartbeatsc                   @  s  e Zd ZdZeZdZdZdZefZ	e
fZdZdZdZe Zdd Zdd Zd	d
 Zdd Zdd Zdd Zd4ddZdd Zdd Zdd Zdd Zdd Zejej e!j"e!j#ffddZ$d d! Z%d"d# Z&d5d6d(d)Z'e(d*d+ Z)d,d- Z*e+d.d/ Z,e(d0d1 Z-e(d2d3 Z.dS )7r   zBase class for transports.NFN/Ac                 K  rN   r   )client)r.   r_   r0   r   r   r   rP      r8   zTransport.__init__c                 C  r3   )Nestablish_connectionr6   r7   r   r   r   r`      r8   zTransport.establish_connectionc                 C  r3   )Nclose_connectionr6   r.   
connectionr   r   r   ra      r8   zTransport.close_connectionc                 C  r3   )Ncreate_channelr6   rb   r   r   r   rd      r8   zTransport.create_channelc                 C  r3   )Nclose_channelr6   rb   r   r   r   re      r8   zTransport.close_channelc                 K  r3   )Ndrain_eventsr6   )r.   rc   r0   r   r   r   rf      r8   zTransport.drain_events   c                 C     d S r   r   )r.   rc   rater   r   r   heartbeat_check   r=   zTransport.heartbeat_checkc                 C  r9   )Nr^   r   r7   r   r   r   driver_version   r=   zTransport.driver_versionc                 C  r9   )Nr   r   rb   r   r   r   get_heartbeat_interval   r=   z Transport.get_heartbeat_intervalc                 C  rh   r   r   r.   rc   loopr   r   r   register_with_event_loop   r=   z"Transport.register_with_event_loopc                 C  rh   r   r   rm   r   r   r   unregister_from_event_loop   r=   z$Transport.unregister_from_event_loopc                 C  r9   NTr   rb   r   r   r   verify_connection   r=   zTransport.verify_connectionc                   s    j  fdd  S )Nc              
     sr   j stdzdd W n" y   Y d S  y0 } z|jv r+W Y d }~d S  d }~ww |  |  d S )NzSocket was disconnectedr   )timeout)	connectedr   errno	call_soon)rn   exc_read_unavailrc   rf   errorrs   r   r   ry      s   
z%Transport._make_reader.<locals>._read)rf   )r.   rc   rs   r{   rz   r   rx   r   _make_reader   s   zTransport._make_readerc                 C  r9   rq   r   rb   r   r   r   qos_semantics_matches_spec   r=   z$Transport.qos_semantics_matches_specc                 C  s*   | j }|d u r| | }| _ || d S r   )_Transport__readerr|   )r.   rc   rn   readerr   r   r   on_readable   s   zTransport.on_readable**uristrrE   c                 C  s   t  )z(Customise the display format of the URI.)r%   )r.   r   include_passwordmaskr   r   r   as_uri   s   zTransport.as_uric                 C  s   i S r   r   r7   r   r   r   default_connection_params   s   z#Transport.default_connection_paramsc                 O  s
   |  | S r   )r   )r.   r/   r0   r   r   r   get_manager  r8   zTransport.get_managerc                 C  s   |   S r   )r   r7   r   r   r   manager     zTransport.managerc                 C     | j jS r   )
implementsr]   r7   r   r   r   supports_heartbeats	  r   zTransport.supports_heartbeatsc                 C  r   r   )r   r[   r7   r   r   r   supports_ev  r   zTransport.supports_ev)rg   )Fr   )r   r   rE   r   )/rI   rJ   rK   rL   r   r_   can_parse_urldefault_portr   connection_errorsr   channel_errorsdriver_typedriver_namer~   default_transport_capabilitiesrV   r   rP   r`   ra   rd   re   rf   rj   rk   rl   ro   rp   rr   socketrs   r{   ru   EAGAINEINTRr|   r}   r   r   propertyr   r   r	   r   r   r   r   r   r   r   r      sN    




r   )#rL   
__future__r   ru   r   typingr   amqp.exceptionsr   kombu.exceptionsr   r   kombu.messager   kombu.utils.functionalr   kombu.utils.objectsr	   kombu.utils.timer
   typesr   __all__intr"   r!   r   r*   r   r   r   rQ   	frozensetr   r   r   r   r   r   <module>   s@    	#(

