o
    DfH                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl	m
Z
 d	Zd
ZG dd de	jZG dd deje	jZG dd dejZG dd de	jZG dd deZdS )a  pyamqp transport module for Kombu.

Pure-Python amqp transport using py-amqp library.

Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
Connection string can have the following formats:

.. code-block::

    amqp://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    [USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    amqp://

For TLS encryption use:

.. code-block::

    amqps://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]

Transport Options
=================
Transport Options are passed to constructor of underlying py-amqp
:class:`~kombu.connection.Connection` class.

Using TLS
=========
Transport over TLS can be enabled by ``ssl`` parameter of
:class:`~kombu.Connection` class. By setting ``ssl=True``, TLS transport is
used::

    conn = Connect('amqp://', ssl=True)

This is equivalent to ``amqps://`` transport URI::

    conn = Connect('amqps://')

For adding additional parameters to underlying TLS, ``ssl`` parameter should
be set with dict instead of True::

    conn = Connect('amqp://broker.example.com', ssl={
            'keyfile': '/path/to/keyfile'
            'certfile': '/path/to/certfile',
            'ca_certs': '/path/to/ca_certfile'
        }
    )

All parameters are passed to ``ssl`` parameter of
:class:`amqp.connection.Connection` class.

SSL option ``server_hostname`` can be set to ``None`` which is causing using
hostname from broker URL. This is usefull when failover is used to fill
``server_hostname`` with currently used broker::

    conn = Connect('amqp://broker1.example.com;broker2.example.com', ssl={
            'server_hostname': None
        }
    )
    )annotationsN)get_manager)version_string_as_tuple   )baseto_rabbitmq_queue_argumentsi(  i'  c                      s"   e Zd ZdZd fdd	Z  ZS )MessagezAMQP Message.Nc                   sL   |j }t jd|j||j|d|d|j|j |dpi d| d S )Ncontent_typecontent_encodingapplication_headers)bodychanneldelivery_tagr
   r   delivery_info
propertiesheaders )r   super__init__r   r   getr   )selfmsgr   kwargsprops	__class__r   O/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/pyamqp.pyr   X   s   	
zMessage.__init__N__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r	   U   s    r	   c                   @  s<   e Zd ZdZeZdddddejfddZdd Zdd ZdS )	ChannelzAMQP Channel.Nc                 C  s   ||f||||d|pi S )z<Prepare message so that it can be sent using this transport.)priorityr
   r   r   r   )r   r   r&   r
   r   r   r   _Messager   r   r   prepare_messagek   s   zChannel.prepare_messagec                 K  s   t |fi |S r   r   )r   	argumentsr   r   r   r   prepare_queue_argumentsx      zChannel.prepare_queue_argumentsc                 C  s   | j || dS )z4Convert encoded message body back to a Python value.r   )r	   )r   raw_messager   r   r   message_to_python{   s   zChannel.message_to_python)	r    r!   r"   r#   r	   amqpr(   r*   r.   r   r   r   r   r%   f   s    
r%   c                   @  s   e Zd ZdZeZdS )
ConnectionzAMQP Connection.N)r    r!   r"   r#   r%   r   r   r   r   r0      s    r0   c                   @  s   e Zd ZdZeZeZeZe	jj
Z
e	jjZe	jjZe	jjZdZdZejjjdddZ	d$ddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd%ddZdd Ze d d! Z!d"d# Z"dS )&	TransportzAMQP Transport.zpy-amqpr/   T)asynchronous
heartbeatsNc                 K  s"   || _ |p| j| _|p| j| _d S r   )clientdefault_portdefault_ssl_port)r   r4   r5   r6   r   r   r   r   r      s   zTransport.__init__c                 C  s   t jS r   )r/   __version__r   r   r   r   driver_version      zTransport.driver_versionc                 C  s   |  S r   r,   r   
connectionr   r   r   create_channel   s   zTransport.create_channelc                 K  s   |j di |S )Nr   )drain_events)r   r<   r   r   r   r   r>      r+   zTransport.drain_eventsc                 C  s   |d ur
|   d S d S r   )collectr;   r   r   r   _collect   s   zTransport._collectc                 C  s   | j }| j D ]\}}t||dst||| q|jdkr!d|_t|jtr9d|jv r9|jd du r9|j|jd< t|j	|j
|j|j|j|j|j|j|jd	fi |jpTi }| jdi |}| j |_ |  |S )z(Establish connection to the AMQP broker.N	localhostz	127.0.0.1server_hostname)	hostuseridpasswordlogin_methodvirtual_hostinsistsslconnect_timeout	heartbeatr   )r4   default_connection_paramsitemsgetattrsetattrhostname
isinstancerI   dictrC   rD   rE   rF   rG   rH   rJ   rK   transport_optionsr0   connect)r   conninfonamedefault_valueoptsconnr   r   r   establish_connection   s8   


zTransport.establish_connectionc                 C     |j S r   )	connectedr;   r   r   r   verify_connection   r:   zTransport.verify_connectionc                 C  s   d|_ |  dS )z!Close the AMQP broker connection.N)r4   closer;   r   r   r   close_connection   s   zTransport.close_connectionc                 C  r[   r   )rK   r;   r   r   r   get_heartbeat_interval   r:   z Transport.get_heartbeat_intervalc                 C  s    d|j _||j| j|| d S NT)	transportraise_on_initial_eintr
add_readersockon_readable)r   r<   loopr   r   r   register_with_event_loop   s   z"Transport.register_with_event_loop   c                 C  s   |j |dS )N)rate)heartbeat_tick)r   r<   rj   r   r   r   heartbeat_check   s   zTransport.heartbeat_checkc                 C  s(   |j }|ddkrt|d dk S dS )NproductRabbitMQversion)   rp   T)server_propertiesr   r   )r   r<   r   r   r   r   qos_semantics_matches_spec   s   z$Transport.qos_semantics_matches_specc                 C  s    dd| j jr	| jn| jdddS )NguestrA   PLAIN)rD   rE   portrP   rF   )r4   rI   r6   r5   r8   r   r   r   rL      s   z#Transport.default_connection_paramsc                 O  s   t | jg|R i |S r   )r   r4   r   argsr   r   r   r   r      s   zTransport.get_manager)NN)ri   )#r    r!   r"   r#   r0   DEFAULT_PORTr5   DEFAULT_SSL_PORTr6   r/   connection_errorschannel_errorsrecoverable_connection_errorsrecoverable_channel_errorsdriver_namedriver_typer   r1   
implementsextendr   r9   r=   r>   r@   rZ   r]   r_   r`   rh   rl   rr   propertyrL   r   r   r   r   r   r1      s@    



r1   c                      s    e Zd ZdZ fddZ  ZS )SSLTransportzAMQP SSL Transport.c                   s*   t  j|i | | jjsd| j_d S d S ra   )r   r   r4   rI   rv   r   r   r   r      s   zSSLTransport.__init__r   r   r   r   r   r      s    r   )r#   
__future__r   r/   kombu.utils.amq_managerr   kombu.utils.textr    r   r   rx   ry   r	   r%   
StdChannelr0   r1   r   r   r   r   r   <module>   s    Fo