o
    Df                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ d
ZdZdZdZG dd dejZG dd dejejZG dd dejZG dd dejZdS )zT`librabbitmq`_ transport.

.. _`librabbitmq`: https://pypi.org/project/librabbitmq/
    )annotationsN)ChannelErrorConnectionError)get_manager)version_string_as_tuple   )base)to_rabbitmq_queue_argumentsz
    librabbitmq version too old to detect RabbitMQ version information
    so make sure you are using librabbitmq 1.5 when using rabbitmq > 3.3
i(  i'  zAssl not supported by librabbitmq, please use pyamqp:// or stunnelc                      s    e Zd ZdZ fddZ  ZS )MessagezAMQP Message (librabbitmq).c                   s8   t  j|||||d|d|d|dd d S )Ndelivery_tagcontent_typecontent_encodingheaders)channelbodydelivery_info
propertiesr   r   r   r   )super__init__get)selfr   propsinfor   	__class__ T/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/librabbitmq.pyr   $   s   
zMessage.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r
   !   s    r
   c                   @  s,   e Zd ZdZeZ			dddZdd ZdS )ChannelzAMQP Channel (librabbitmq).Nc                 C  s:   |dur|ni }| |||d |dur||d< ||fS )z%Encapsulate data into a AMQP message.N)r   r   r   priority)update)r   r   r#   r   r   r   r   r   r   r   prepare_message5   s   zChannel.prepare_messagec                 K  s"   t |fi |}dd | D S )Nc                 S  s   i | ]
\}}| d |qS )utf8)encode).0kvr   r   r   
<dictcomp>F   s    z3Channel.prepare_queue_arguments.<locals>.<dictcomp>)r	   items)r   	argumentskwargsr   r   r   prepare_queue_argumentsD   s   zChannel.prepare_queue_arguments)NNNNN)r   r   r   r    r
   r%   r/   r   r   r   r   r"   0   s    
r"   c                   @  s   e Zd ZdZeZeZdS )
ConnectionzAMQP Connection (librabbitmq).N)r   r   r   r    r"   r
   r   r   r   r   r0   I   s    r0   c                   @  s   e Zd ZdZeZeZeZe	j
jeejeef Ze	j
jef ZdZdZe	j
jjdddZdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z dd Z!e"dd Z#dS ) 	TransportzAMQP Transport (librabbitmq).amqplibrabbitmqTF)asynchronous
heartbeatsc                 K  s4   || _ |dp
| j| _|dp| j| _d | _d S )Ndefault_portdefault_ssl_port)clientr   r6   r7   _Transport__reader)r   r8   r.   r   r   r   r   g   s   

zTransport.__init__c                 C  s   t jS N)r2   __version__r   r   r   r   driver_versionn      zTransport.driver_versionc                 C  s   |  S r:   )r   r   
connectionr   r   r   create_channelq   s   zTransport.create_channelc                 K  s   |j di |S )Nr   )drain_events)r   r@   r.   r   r   r   rB   t   s   zTransport.drain_eventsc              
   C  s   | j }| j D ]\}}t||dst||| q|jr ttt|j	|j
|j|j|j|j|j|jdfi |jp9i }| jdi |}| j |_ |j| j _|S )z(Establish connection to the AMQP broker.N)hostuseridpasswordvirtual_hostlogin_methodinsistsslconnect_timeoutr   )r8   default_connection_paramsr,   getattrsetattrrI   NotImplementedErrorNO_SSL_ERRORdictrC   rD   rE   rF   rG   rH   rJ   transport_optionsr0   rB   )r   conninfonamedefault_valueoptsconnr   r   r   establish_connectionw   s.   	

zTransport.establish_connectionc                 C  s   d| j _|  dS )z!Close the AMQP broker connection.N)r8   rB   closer?   r   r   r   close_connection   s   zTransport.close_connectionc              	   C  sn   |d ur.|j  D ]}d |_q	z	t|  W n ttfy#   Y nw |j   |j	  d | j
_d | _
d S r:   )channelsvaluesr@   osrX   filenoOSError
ValueErrorclear	callbacksr8   rB   )r   r@   r   r   r   r   _collect   s   


zTransport._collectc                 C  s   |j S r:   )	connectedr?   r   r   r   verify_connection   r>   zTransport.verify_connectionc                 C  s   | | | j|| d S r:   )
add_readerr]   on_readable)r   r@   loopr   r   r   register_with_event_loop   s   z"Transport.register_with_event_loopc                 O  s   t | jg|R i |S r:   )r   r8   )r   argsr.   r   r   r   r      s   zTransport.get_managerc                 C  sP   z|j }W n ty   ttt Y dS w |ddkr&t|d dk S dS )NproductRabbitMQversion)   rm   T)server_propertiesAttributeErrorwarningswarnUserWarning	W_VERSIONr   r   )r   r@   r   r   r   r   qos_semantics_matches_spec   s   
z$Transport.qos_semantics_matches_specc                 C  s    dd| j jr	| jn| jdddS )Nguest	localhostPLAIN)rD   rE   porthostnamerG   )r8   rI   r7   r6   r<   r   r   r   rK      s   z#Transport.default_connection_paramsN)$r   r   r   r    r0   DEFAULT_PORTr6   DEFAULT_SSL_PORTr7   r   r1   connection_errorsr   socketerrorIOErrorr^   channel_errorsr   driver_typedriver_name
implementsextendr   r=   rA   rB   rW   rY   rb   rd   rh   r   rt   propertyrK   r   r   r   r   r1   P   s<    

r1   )r    
__future__r   r\   r}   rp   r3   r2   r   r   kombu.utils.amq_managerr   kombu.utils.textr    r   r	   rs   rz   r{   rO   r
   r"   
StdChannelr0   r1   r   r   r   r   <module>   s&    