o
    Df                  	   @  s  d Z ddlmZ ddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZ zddlZdd
lmZ ddlmZ W n eyM   d Z ZZY nw dZdZdZe
eZG dd dejZG dd dejZedureddd  ejejddG dd dZ edkre!d e" AZ#e!d$ej%j&ej%j' e( Z)e!d$ej%j& e#*e Z+e)*de+ W d   n1 sw   Y  e#,  W d   dS 1 sw   Y  dS dS )a  Pyro transport module for kombu.

Pyro transport, and Kombu Broker daemon.

Requires the :mod:`Pyro4` library to be installed.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================

To use the Pyro transport with Kombu, use an url of the form:

.. code-block::

    pyro://localhost/kombu.broker

The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``

Transport Options
=================
    )annotationsN)EmptyQueue)reraise)
get_logger)cached_property   )virtual)NamingError)SerializerBasei#  z5Unable to locate pyro nameserver on host {0.hostname}zKUnable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}c                      s~   e Zd ZdZ fddZdd Zdd Zdd	 ZdddZdd Z	dd Z
dd Zdd Zdd Zdd Zedd Z  ZS )ChannelzPyro Channel.c                   s"   t    | jr| j  d S d S N)supercloseshared_queues_pyroReleaseself	__class__ M/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/pyro.pyr   C   s   
zChannel.closec                 C  s
   | j  S r   )r   get_queue_namesr   r   r   r   queuesH      
zChannel.queuesc                 K  s    ||   vr| j| d S d S r   r   r   	new_queuer   queuekwargsr   r   r   
_new_queueK   s   zChannel._new_queuec                 K     | j |S r   )r   	has_queuer   r   r   r   
_has_queueO      zChannel._has_queueNc                 C  s   |  |}| j|S r   )
_queue_forr   get)r   r   timeoutr   r   r   _getR   s   
zChannel._getc                 C  s   ||   vr| j| |S r   r   r   r   r   r   r   r%   V   s   zChannel._queue_forc                 K  s   |  |}| j|| d S r   )r%   r   put)r   r   messager   r   r   r   _put[   s   
zChannel._putc                 C  r!   r   )r   sizer)   r   r   r   _size_   r$   zChannel._sizec                 O  s   | j | d S r   )r   delete)r   r   argsr   r   r   r   _deleteb   s   zChannel._deletec                 C  r!   r   )r   purger)   r   r   r   _purgee   r$   zChannel._purgec                 C  s   d S r   r   r)   r   r   r   after_reply_message_receivedh   s   z$Channel.after_reply_message_receivedc                 C  s   | j jS r   )
connectionr   r   r   r   r   r   k      zChannel.shared_queuesr   )__name__
__module____qualname____doc__r   r   r    r#   r(   r%   r,   r.   r1   r3   r4   r   r   __classcell__r   r   r   r   r   @   s    
r   c                      sT   e Zd ZdZeZe ZeZ	d Z
Z fddZdd Zdd Zed	d
 Z  ZS )	TransportzPyro Transport.pyroc                   s    t  j|fi | | j| _d S r   )r   __init__global_statestate)r   clientr   r   r   r   r>   }   s   zTransport.__init__c              	   C  s   t d | j}ztj|j| jd}W n ty+   tttt	
|t d  Y nw z||j}t|W S  tyQ   tttt
|t d  Y d S w )Nz0trying Pyro nameserver to find the broker daemon)hostport   )loggerdebugrA   r=   locateNShostnamedefault_portr
   r   E_NAMESERVERformatsysexc_infolookupvirtual_hostProxyE_LOOKUP)r   conninfo
nameserverurir   r   r   _open   s&   




zTransport._openc                 C  s   t jS r   )r=   __version__r   r   r   r   driver_version   s   zTransport.driver_versionc                 C  s   |   S r   )rU   r   r   r   r   r      r6   zTransport.shared_queues)r7   r8   r9   r:   r   r	   BrokerStater?   DEFAULT_PORTrI   driver_typedriver_namer>   rU   rW   r   r   r;   r   r   r   r   r<   p   s    r<   zqueue.Emptyc                 C  s   t  S r   )r   )clsdatar   r   r   <lambda>   s    r^   single)instance_modec                   @  sX   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdS )KombuBrokerzmKombu Broker used by the Pyro transport.

        You have to run this as a separate (Pyro) service.
        c                 C  s
   i | _ d S r   r   r   r   r   r   r>      r   zKombuBroker.__init__c                 C  s
   t | jS r   )listr   r   r   r   r   r      r   zKombuBroker.get_queue_namesc                 C  s   || j v rd S t | j |< d S r   )r   r   r)   r   r   r   r      s   
zKombuBroker.new_queuec                 C  s
   || j v S r   rb   r)   r   r   r   r"      r   zKombuBroker.has_queuec                 C  s   | j | jddS )NF)block)r   r&   r)   r   r   r   r&      s   zKombuBroker.getc                 C  s   | j | | d S r   )r   r*   )r   r   r+   r   r   r   r*      s   zKombuBroker.putc                 C  s   | j |  S r   )r   qsizer)   r   r   r   r-      s   zKombuBroker.sizec                 C  s   | j |= d S r   rb   r)   r   r   r   r/      r$   zKombuBroker.deletec                 C  s0   	 z| j | jdd W n
 ty   Y d S w q)NTF)blocking)r   r&   r   r)   r   r   r   r2      s   zKombuBroker.purgeN)r7   r8   r9   r:   r>   r   r   r"   r&   r*   r-   r/   r2   r   r   r   r   ra      s    ra   __main__z,Launching Broker for Kombu's Pyro transport.z'(Expecting a Pyro name server at {}:{})zAYou can connect with Kombu using the url 'pyro://{}/kombu.broker'zkombu.broker)-r:   
__future__r   rL   r   r   r   kombu.exceptionsr   	kombu.logr   kombu.utils.objectsr    r	   Pyro4r=   Pyro4.errorsr
   
Pyro4.utilr   ImportErrorrY   rJ   rQ   r7   rE   r   r<   register_dict_to_classexposebehaviorra   printDaemondaemonrK   configNS_HOSTNS_PORTrG   nsregisterrT   requestLoopr   r   r   r   <module>   sX    "0*
*




"