o
    DfY                     @   s  d Z ddlZddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZmZ dd	lmZ ejejejejhZd
Ze ZdZdZedZddddddZefddZ G dd dZ!G dd de!Z"G dd de!Z#dddZ$dS )zTransport implementation.    N)contextmanager)SSLError)packunpack   )UnexpectedFrame)KNOWN_TCP_OPTSSOL_TCP)set_cloexeci(  is   AMQP  	z\[([\.0-9a-f:]+)\](?::(\d+))?i  <   
   	   )TCP_NODELAYTCP_USER_TIMEOUTTCP_KEEPIDLETCP_KEEPINTVLTCP_KEEPCNTc                 C   sd   |}t | }|r|d} |drt|d}| |fS d| v r.| dd\} }t|}| |fS )z1Convert hostname:port string to host, port tuple.r      :)IPV6_LITERALmatchgroupintrsplit)hostdefaultportm r   G/home/ubuntu/webapp/venv/lib/python3.10/site-packages/amqp/transport.pyto_host_port(   s   


r    c                   @   s   e Zd ZdZ			d$ddZdZdd Zd	d
 Zedd Z	dd Z
dd Zdd Zdd Zd%ddZdd Zdd Zdd Zdd Zefd d!Zd"d# ZdS )&_AbstractTransporta  Common superclass for TCP and SSL transports.

    PARAMETERS:
        host: str

            Broker address in format ``HOSTNAME:PORT``.

        connect_timeout: int

            Timeout of creating new connection.

        read_timeout: int

            sets ``SO_RCVTIMEO`` parameter of socket.

        write_timeout: int

            sets ``SO_SNDTIMEO`` parameter of socket.

        socket_settings: dict

            dictionary containing `optname` and ``optval`` passed to
            ``setsockopt(2)``.

        raise_on_initial_eintr: bool

            when True, ``socket.timeout`` is raised
            when exception is received during first read. See ``_read()`` for
            details.
    NTc                 K   sD   d| _ d | _|| _t| _t|\| _| _|| _|| _	|| _
|| _d S NF)	connectedsockraise_on_initial_eintrEMPTY_BUFFER_read_bufferr    r   r   connect_timeoutread_timeoutwrite_timeoutsocket_settings)selfr   r(   r)   r*   r+   r%   kwargsr   r   r   __init__W   s   
z_AbstractTransport.__init__)
connectionr$   r%   r'   r   r   r(   r)   r*   r+   __dict____weakref__c              	   C   s   | j r:| j  d  d| j  d  }| j  d  d| j  d  }dt| j d| d| dt| dd		S dt| j d
t| dd	S )Nr   r   r   <z: z -> z at z#x>z: (disconnected) at )r$   getsocknamegetpeernametype__name__id)r,   srcdstr   r   r   __repr__t   s
   ""*z_AbstractTransport.__repr__c              	   C   sr   z | j rW d S | | j| j| j | | j| j| j d| _ W d S  t	t
fy8   | jr7| j s7| j  d | _ w )NT)r#   _connectr   r   r(   _init_socketr+   r)   r*   OSErrorr   r$   closer,   r   r   r   connect|   s   
z_AbstractTransport.connectc              
   c   s    |d u r| j V  d S | j }| }||kr|| zLz| j V  W n7 tyC } zdt|v r4t dt|v r>t  d }~w tyY } z|jtj	krTt  d }~ww W ||krf|| d S d S ||krr|| w w )N	timed outzThe operation did not complete)
r$   
gettimeout
settimeoutr   strsockettimeoutr>   errnoEWOULDBLOCK)r,   rG   r$   prevexcr   r   r   having_timeout   s8   
z!_AbstractTransport.having_timeoutc              	   C   s   t ||t jt jt}t|D ]S\}}|\}}}	}
}z*t  |||	| _zt| jd W n	 ty4   Y nw | j	| | j
| W  d S  t jyb   | jrT| j  d | _|d t|kr` Y qw d S )NTr   )rF   getaddrinfo	AF_UNSPECSOCK_STREAMr	   	enumerater$   r
   NotImplementedErrorrD   rA   errorr?   len)r,   r   r   rG   entriesiresafsocktypeproto	canonnamesar   r   r   r<      s0   
z_AbstractTransport._connectc              	   C   s   | j d  | j tjtjd | | tj|ftj|ffD ]!\}}|d ur@t	|}t	|| d }| j tj|t
d|| q|   | t d S )Nr   i@B ll)r$   rD   
setsockoptrF   
SOL_SOCKETSO_KEEPALIVE_set_socket_optionsSO_SNDTIMEOSO_RCVTIMEOr   r   _setup_transport_writeAMQP_PROTOCOL_HEADER)r,   r+   r)   r*   rG   intervalsecusecr   r   r   r=      s    

z_AbstractTransport._init_socketc              	   C   s   i }t D ]C}d }|dkr zddlm} W n ty   d}Y nw tt|r*tt|}|rG|tv r7t| ||< qtt|rG|ttt|||< q|S )Nr   r   )r      )	r   rF   r   ImportErrorhasattrgetattrDEFAULT_SOCKET_SETTINGS
getsockoptr	   )r,   r$   tcp_optsoptenumr   r   r   _get_tcp_socket_defaults   s(   



z+_AbstractTransport._get_tcp_socket_defaultsc                 C   s@   |  | j}|r|| | D ]\}}| jt|| qd S N)rr   r$   updateitemsr]   r	   )r,   r+   ro   rp   valr   r   r   r`      s   
z&_AbstractTransport._set_socket_optionsFc                 C      t d)z#Read exactly n bytes from the peer.Must be overridden in subclassrQ   )r,   ninitialr   r   r   _read      z_AbstractTransport._readc                 C      dS )z.Do any additional initialization of the class.Nr   r@   r   r   r   rc         z#_AbstractTransport._setup_transportc                 C   r~   )z8Do any preliminary work in shutting down the connection.Nr   r@   r   r   r   _shutdown_transport   r   z&_AbstractTransport._shutdown_transportc                 C   rw   )z&Completely write a string to the peer.rx   ry   )r,   sr   r   r   rd      r}   z_AbstractTransport._writec                 C   s   | j d ur<z|   W n	 ty   Y nw z	| j tj W n	 ty'   Y nw z| j   W n	 ty8   Y nw d | _ d| _d S r"   )r$   r   r>   shutdownrF   	SHUT_RDWRr?   r#   r@   r   r   r   r?     s$   

z_AbstractTransport.closec              
   C   sn  | j }t}zJ|dd}||7 }|d|\}}}|tkr@|t}z||t }	W n tjttfy7   ||7 } w d||	g}
n||}
||
7 }t|d}W nU tjy^   || j	 | _	  ttfy } z9t
|tjrtjdkr|jtjkr|| j	 | _	t t
|trdt|v r|| j	 | _	t |jtvrd| _ d	}~ww |d
kr|||
fS td|dd)a  Parse AMQP frame.

        Frame has following format::

            0      1         3         7                   size+7      size+8
            +------+---------+---------+   +-------------+   +-----------+
            | type | channel |  size   |   |   payload   |   | frame-end |
            +------+---------+---------+   +-------------+   +-----------+
             octet    short     long        'size' octets        octet

           Tz>BHI    r   ntrB   FN   zReceived frame_end z#04xz while expecting 0xce)r|   r&   SIGNED_INT_MAXrF   rG   r>   r   joinordr'   
isinstancerR   osnamerH   rI   rE   _UNAVAILr#   r   )r,   r   readread_frame_bufferframe_header
frame_typechannelsizepart1part2payload	frame_endrK   r   r   r   
read_frame  sR   




z_AbstractTransport.read_framec              
   C   sL   z|  | W d S  tjy     ty% } z	|jtvr d| _ d }~ww r"   )rd   rF   rG   r>   rH   r   r#   )r,   r   rK   r   r   r   writeY  s   
z_AbstractTransport.write)NNNNT)F)r7   
__module____qualname____doc__r.   	__slots__r;   rA   r   rL   r<   r=   rr   r`   r|   rc   r   rd   r?   r   r   r   r   r   r   r   r!   7   s,    


Br!   c                       s   e Zd ZdZd fdd	ZdZdd Zddd	Zdd
dZ					dddZ	dd Z
dejejejffddZdd Z  ZS )SSLTransporta  Transport that works over SSL.

    PARAMETERS:
        host: str

            Broker address in format ``HOSTNAME:PORT``.

        connect_timeout: int

            Timeout of creating new connection.

        ssl: bool|dict

            parameters of TLS subsystem.
                - when ``ssl`` is not dictionary, defaults of TLS are used
                - otherwise:
                    - if ``ssl`` dictionary contains ``context`` key,
                      :attr:`~SSLTransport._wrap_context` is used for wrapping
                      socket. ``context`` is a dictionary passed to
                      :attr:`~SSLTransport._wrap_context` as context parameter.
                      All others items from ``ssl`` argument are passed as
                      ``sslopts``.
                    - if ``ssl`` dictionary does not contain ``context`` key,
                      :attr:`~SSLTransport._wrap_socket_sni` is used for
                      wrapping socket. All items in ``ssl`` argument are
                      passed to :attr:`~SSLTransport._wrap_socket_sni` as
                      parameters.

        kwargs:

            additional arguments of
            :class:`~amqp.transport._AbstractTransport` class
    Nc                    s6   t |tr|ni | _t| _t j|fd|i| d S )Nr(   )r   dictssloptsr&   r'   superr.   )r,   r   r(   sslr-   	__class__r   r   r.     s   
zSSLTransport.__init__)r   c                 C   s>   | j | jfi | j| _| j| j | j  | jj| _dS )z!Wrap the socket in an SSL object.N)_wrap_socketr$   r   rD   r(   do_handshaker   _quick_recvr@   r   r   r   rc     s   
zSSLTransport._setup_transportc                 K   s*   |r| j ||fi |S | j|fi |S rs   )_wrap_context_wrap_socket_sni)r,   r$   contextr   r   r   r   r     s   zSSLTransport._wrap_socketc                 K   s(   t jdi |}||_|j|fi |S )u  Wrap socket without SNI headers.

        PARAMETERS:
            sock: socket.socket

            Socket to be wrapped.

            sslopts: dict

                Parameters of  :attr:`ssl.SSLContext.wrap_socket`.

            check_hostname

                Whether to match the peer cert’s hostname. See
                :attr:`ssl.SSLContext.check_hostname` for details.

            ctx_options

                Parameters of :attr:`ssl.create_default_context`.
        Nr   )r   create_default_contextcheck_hostnamewrap_socket)r,   r$   r   r   ctx_optionsctxr   r   r   r     s   zSSLTransport._wrap_contextFTc                 C   s   |||||	d}|du r|rt jnt j}t |}|dur#||| |dur,|| |
dur5||
 z
t jo<|	du|_W n	 t	yH   Y nw |durP||_
|du ri|j
t jkri|r`t jjnt jj}|| |jdi |}|S )u  Socket wrap with SNI headers.

        stdlib :attr:`ssl.SSLContext.wrap_socket` method augmented with support
        for setting the server_hostname field required for SNI hostname header.

        PARAMETERS:
            sock: socket.socket

                Socket to be wrapped.

            keyfile: str

                Path to the private key

            certfile: str

                Path to the certificate

            server_side: bool

                Identifies whether server-side or client-side
                behavior is desired from this socket. See
                :attr:`~ssl.SSLContext.wrap_socket` for details.

            cert_reqs: ssl.VerifyMode

                When set to other than :attr:`ssl.CERT_NONE`, peers certificate
                is checked. Possible values are :attr:`ssl.CERT_NONE`,
                :attr:`ssl.CERT_OPTIONAL` and :attr:`ssl.CERT_REQUIRED`.

            ca_certs: str

                Path to “certification authority” (CA) certificates
                used to validate other peers’ certificates when ``cert_reqs``
                is other than :attr:`ssl.CERT_NONE`.

            do_handshake_on_connect: bool

                Specifies whether to do the SSL
                handshake automatically. See
                :attr:`~ssl.SSLContext.wrap_socket` for details.

            suppress_ragged_eofs (bool):

                See :attr:`~ssl.SSLContext.wrap_socket` for details.

            server_hostname: str

                Specifies the hostname of the service which
                we are connecting to. See :attr:`~ssl.SSLContext.wrap_socket`
                for details.

            ciphers: str

                Available ciphers for sockets created with this
                context. See :attr:`ssl.SSLContext.set_ciphers`

            ssl_version:

                Protocol of the SSL Context. The value is one of
                ``ssl.PROTOCOL_*`` constants.
        )r$   server_sidedo_handshake_on_connectsuppress_ragged_eofsserver_hostnameNr   )r   PROTOCOL_TLS_SERVERPROTOCOL_TLS_CLIENT
SSLContextload_cert_chainload_verify_locationsset_ciphersHAS_SNIr   AttributeErrorverify_mode	CERT_NONEPurposeCLIENT_AUTHSERVER_AUTHload_default_certsr   )r,   r$   keyfilecertfiler   	cert_reqsca_certsr   r   r   ciphersssl_versionoptsr   purposer   r   r   r     sD   D




zSSLTransport._wrap_socket_snic                 C   s   | j dur| j  | _ dS dS )z/Unwrap a SSL socket, so we can call shutdown().N)r$   unwrapr@   r   r   r   r   /  s   
z SSLTransport._shutdown_transportc           	   
   C   s   | j }| j}zDt||k rIz
||t| }W n! ty8 } z|j|v r3|r-| jr-t W Y d }~q d }~ww |s?td||7 }t||k sW n   || _ |d | ||d  }| _|S )N%Server unexpectedly closed connectionr   r'   rS   r>   rH   r%   rF   rG   	r,   rz   r{   _errnosrecvrbufr   rK   resultr   r   r   r|   4  s0   

zSSLTransport._readc                 C   sT   | j j}|r(z||}W n ty   d}Y nw |std||d }|sdS dS )z+Write a string out to the SSL socket fully.r   zSocket closedN)r$   r   
ValueErrorr>   )r,   r   r   rz   r   r   r   rd   P  s   zSSLTransport._write)NNrs   )
NNFNNFTNNN)r7   r   r   r   r.   r   rc   r   r   r   r   rH   ENOENTEAGAINEINTRr|   rd   __classcell__r   r   r   r   r   d  s$    "


x
r   c                   @   s.   e Zd ZdZdd ZdejejffddZdS )TCPTransportz~Transport that deals directly with TCP socket.

    All parameters are :class:`~amqp.transport._AbstractTransport` class.
    c                 C   s   | j j| _t| _| j j| _d S rs   )r$   sendallrd   r&   r'   r   r   r@   r   r   r   rc   g  s   
zTCPTransport._setup_transportFc           	   
   C   s   | j }| j}zDt||k rIz
||t| }W n! ty8 } z|j|v r3|r-| jr-t W Y d}~q d}~ww |s?td||7 }t||k sW n   || _ |d| ||d }| _|S )z%Read exactly n bytes from the socket.Nr   r   r   r   r   r   r|   n  s0   

zTCPTransport._readN)	r7   r   r   r   rc   rH   r   r   r|   r   r   r   r   r   a  s    r   Fc                 K   s"   |rt nt}|| f||d|S )a  Create transport.

    Given a few parameters from the Connection constructor,
    select and create a subclass of
    :class:`~amqp.transport._AbstractTransport`.

    PARAMETERS:

        host: str

            Broker address in format ``HOSTNAME:PORT``.

        connect_timeout: int

            Timeout of creating new connection.

        ssl: bool|dict

            If set, :class:`~amqp.transport.SSLTransport` is used
            and ``ssl`` parameter is passed to it. Otherwise
            :class:`~amqp.transport.TCPTransport` is used.

        kwargs:

            additional arguments of :class:`~amqp.transport._AbstractTransport`
            class
    )r(   r   )r   r   )r   r(   r   r-   	transportr   r   r   	Transport  s   r   r"   )%r   rH   r   rerF   r   
contextlibr   r   structr   r   
exceptionsr   platformr   r	   utilsr
   r   r   r   rI   r   	AMQP_PORTbytesr&   r   re   compiler   rm   r    r!   r   r   r   r   r   r   r   <module>   s@    
	  / ~&