o
    Df                     @  sb  d Z ddlmZ ddlZddlZddlmZ ddlmZm	Z	 ddl
mZmZ ddlmZ z]ddlZdd	lmZ dd
lmZ ejjejjejjejjejjejjejjejjejjf	Zejjejj ejj!ejjejjejjejj"ejj#ejjejj$ejj%ejj&ejjejj'ej(fZ)W n e*y   dZd ZZ)Y nw dZ+dZ,G dd dej-Z-G dd dej.Z.dS )a  Zookeeper transport module for kombu.

Zookeeper based transport. This transport uses the built-in kazoo Zookeeper
based queue implementation.

**References**

- https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connects to a zookeeper node as:

.. code-block::

    zookeeper://SERVER:PORT/VHOST

The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.


Transport Options
=================

    )annotationsN)Empty)bytes_to_strensure_bytes)dumpsloads   )virtual)KazooClient)Queue i  z!Mahendra M <mahendra.m@gmail.com>c                      s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zedd Z  ZS )ChannelzZookeeper Channel.Nc                   s4   t  j|fi | | jjj}d|d| _d S )Nz/{}/)super__init__
connectionclientvirtual_hostformatstrip_vhost)selfr   kwargsvhost	__class__r   R/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/zookeeper.pyr   i   s   
zChannel.__init__c                 C  s   t j| j|S N)ospathjoinr   )r   
queue_namer   r   r   	_get_pathn   s   zChannel._get_pathc                 C  s>   | j |d }|d u rt| j| |}|| j |< t| |S r   )_queuesgetr   r   r"   len)r   r!   queuer   r   r   
_get_queueq   s   
zChannel._get_queuec                 K  s&   |  |jtt|| j|dddS )NT)reverse)priority)r'   putr   r   _get_message_priority)r   r&   messager   r   r   r   _put}   s   

zChannel._putc                 C  s,   |  |}| }|d u rt tt|S r   )r'   r$   r   r   r   )r   r&   msgr   r   r   _get   s
   
zChannel._getc                 C  s0   d}|  |}	 | }|d u r	 |S |d7 }q)Nr   Tr   )r'   r$   )r   r&   countr.   r   r   r   _purge   s   
zChannel._purgec                 O  s.   |  |r| | | j| | d S d S r   )
_has_queuer1   r   deleter"   )r   r&   argsr   r   r   r   _delete   s   

zChannel._deletec                 C  s   |  |}t|S r   )r'   r%   r   r&   r   r   r   _size   s   
zChannel._sizec                 K  s   |  |s| |}d S d S r   )r2   r'   )r   r&   r   r   r   r   
_new_queue   s   
zChannel._new_queuec                 C  s   | j | |d uS r   )r   existsr"   r6   r   r   r   r2      s   zChannel._has_queuec              	   C  s   | j j}g }|jrO|jD ]B}|dr|tdd  }|sqz|dd\}}|t|f}W n tyH   ||jkrB||j	p?t
f}n|t
f}Y nw || q|j|j	pUt
f}||vra|d| ddd |D }t|}|  |S )Nzzookeeper://:r   r   ,c                 S  s   g | ]\}}| d | qS )r:   r   ).0hpr   r   r   
<listcomp>   s    z!Channel._open.<locals>.<listcomp>)r   r   alt
startswithr%   splitint
ValueErrorhostnameportDEFAULT_PORTappendinsertr    r
   start)r   conninfohosts	host_porthostrF   conn_strconnr   r   r   _open   s2   


zChannel._openc                 C  s   | j d u r
|  | _ | j S r   )_clientrQ   r   r   r   r   r      s   

zChannel.client)__name__
__module____qualname____doc__rR   r#   r   r"   r'   r-   r/   r1   r5   r7   r8   r2   rQ   propertyr   __classcell__r   r   r   r   r   c   s"    	r   c                      sT   e Zd ZdZeZdZeZej	j
e Z
ej	je ZdZdZ fddZdd Z  ZS )		TransportzZookeeper Transport.r   	zookeeperkazooc                   s&   t d u rtdt j|i | d S )Nz"The kazoo library is not installed)r\   ImportErrorr   r   )r   r4   r   r   r   r   r      s   zTransport.__init__c                 C  s   t jS r   )r\   __version__rS   r   r   r   driver_version   s   zTransport.driver_version)rT   rU   rV   rW   r   polling_intervalrG   default_portr	   rZ   connection_errorsKZ_CONNECTION_ERRORSchannel_errorsKZ_CHANNEL_ERRORSdriver_typedriver_namer   r_   rY   r   r   r   r   rZ      s    

rZ   )/rW   
__future__r   r   socketr&   r   kombu.utils.encodingr   r   kombu.utils.jsonr   r    r	   r\   kazoo.clientr
   kazoo.recipe.queuer   
exceptionsSystemErrorExceptionConnectionLossExceptionMarshallingErrorExceptionUnimplementedExceptionOperationTimeoutExceptionNoAuthExceptionInvalidACLExceptionAuthFailedExceptionSessionExpiredExceptionrc   RuntimeInconsistencyExceptionDataInconsistencyExceptionBadArgumentsExceptionApiErrorExceptionNoNodeExceptionNodeExistsException NoChildrenForEphemeralsExceptionNotEmptyExceptionInvalidCallbackExceptionerrorre   r]   rG   
__author__r   rZ   r   r   r   r   <module>   s\   )f