o
    Df"                     @  s  d Z ddlmZ ddlZddlmZ ddlmZmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ d
dlmZ zddlmZ W n eyM   dZY nw z
ddlmZmZ W n eye   dZdZY nw dd ejD ZG dd dejZG dd dejZdS )a  Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following formats:

.. code-block::

    azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

Note that if the access key for the storage account contains a forward slash
(``/``), it will have to be regenerated before it can be used in the connection
URL.

.. code-block::

    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

If you wish to use an `Azure Managed Identity` you may use the
``DefaultAzureCredential`` format of the connection string which will use
``DefaultAzureCredential`` class in the azure-identity package. You may want to
read the `azure-identity documentation` for more information on how the
``DefaultAzureCredential`` works.

.. _azure-identity documentation:
https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python
.. _Azure Managed Identity:
https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview

Transport Options
=================

* ``queue_name_prefix``
    )annotationsN)Empty)AnyOptional)ResourceExistsError)safe_str)dumpsloads)cached_property   )virtual)QueueServiceClient)DefaultAzureCredentialManagedIdentityCredentialc                 C  s   i | ]}t |d qS )-   )ord).0c r   [/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/azurestoragequeues.py
<dictcomp>Q   s    
r   c                      s   e Zd ZU dZdZded< dZded< i Zded	< d
Zded< e	 Z
ded<  fddZ fddZefd+ddZdd Zdd Zdd Zd,ddZdd Zd d! Zed-d#d$Zed%d& Zed'd( Zed+d)d*Z  ZS ).ChannelzAzure Storage Queues channel.zkombu%(vhost)sstrdomain_formatNzOptional[QueueServiceClient]_queue_servicezdict[Any, Any]_queue_name_cacheTboolno_ackzset[Any]_noack_queuesc                   sZ   t d u rtdt j|i | t| jj\| _| _	| j
 D ]	}|| j|d < q!d S )NzGAzure Storage Queues transport requires the azure-storage-queue libraryname)r   ImportErrorsuper__init__	Transport	parse_uriconninfohostname_credential_urlqueue_servicelist_queuesr   )selfargskwargsqueue	__class__r   r   r"   _   s   zChannel.__init__c                   s,   |r| j | t j||g|R i |S N)r   addr!   basic_consume)r+   r.   r   r,   r-   r/   r   r   r3   m   s   zChannel.basic_consumereturnc                 C  s   t t||S )z=Format AMQP queue name into a valid Azure Storage Queue name.)r   r   	translate)r+   r   tabler   r   r   entity_namet   s   zChannel.entity_namec                 C  s   |  | j| }z| jj| j| d}W |S  ty?   z| j|}W n ty4   | jj|d}Y nw |	 | j|< Y |S w )zEnsure a queue exists.)r.   )
r7   queue_name_prefixr   get_queue_clientr   KeyErrorr)   create_queuer   get_queue_propertiesr+   r.   qr   r   r   _ensure_queuex   s   
zChannel._ensure_queuec                 O  s(   |  |}| j|d | j| dS )zDelete queue by name.N)r7   r   popr)   delete_queue)r+   r.   r,   r-   
queue_namer   r   r   _delete   s   
zChannel._deletec                 K  s    |  |}t|}|| dS )zPut message onto queue.N)r?   r   send_message)r+   r.   messager-   r>   encoded_messager   r   r   _put   s   
zChannel._putc                 C  sT   |  |}|jd|d}zt|}W n
 ty   t w t|j}|j|d |S )z/Try to retrieve a single message off ``queue``.r   )messages_per_pagetimeout)rE   )r?   receive_messagesnextStopIterationr   r	   contentdelete_message)r+   r.   rI   r>   messagesrE   rM   r   r   r   _get   s   

zChannel._getc                 C  s   |  |}| jS )z)Return the number of messages in a queue.)r?   r<   approximate_message_countr=   r   r   r   _size   s   

zChannel._sizec                 C  s"   |  |}| |j}|  |S )z'Delete all current messages in a queue.)r?   rR   rB   clear_messages)r+   r.   r>   nr   r   r   _purge   s   
zChannel._purger   c                 C  s"   | j d u rt| j| jd| _ | j S )N)account_url
credential)r   r   r(   r'   r+   r   r   r   r)      s
   
zChannel.queue_servicec                 C  s   | j jS r1   )
connectionclientrX   r   r   r   r%      s   zChannel.conninfoc                 C  s
   | j jjS r1   )rY   rZ   transport_optionsrX   r   r   r   r[      s   
zChannel.transport_optionsc                 C  s   | j ddS )Nr8    )r[   getrX   r   r   r   r8      s   zChannel.queue_name_prefix)r4   r   r1   )r4   r   )__name__
__module____qualname____doc__r   __annotations__r   r   r   setr   r"   r3   CHARS_REPLACE_TABLEr7   r?   rC   rG   rP   rR   rU   propertyr)   r%   r[   r
   r8   __classcell__r   r   r/   r   r   V   s0   
 


r   c                   @  sZ   e Zd ZU dZeZdZded< dZded< dZd	ed
< e	dddZ
e	ddddZdS )r#   zAzure Storage Queues transport.r   intpolling_intervalNzOptional[int]default_portTr   can_parse_urlurir   r4   tuple[str | dict, str]c                 C  s   zP|  dd} | dd\}}d | kr#td u rtdt }n!d | kr7td u r3tdt }nd	|v rDd
|vrDd	|d}t||gsLJ W ||fS  ty[   tdw )Nzazurestoragequeues://r\   @r   r   z`Azure Storage Queues transport with a DefaultAzureCredential requires the azure-identity libraryr   zcAzure Storage Queues transport with a ManagedIdentityCredential requires the azure-identity librarydevstoreaccount1z.core.windows.net)account_nameaccount_keyzNeed a URI like azurestoragequeues://{SAS or access key}@{URL}, azurestoragequeues://DefaultAzureCredential@{URL}, , or azurestoragequeues://ManagedIdentityCredential@{URL})	replacersplitlowerr   r    r   all	Exception
ValueError)rk   rW   urlr   r   r   r$      s,   
zTransport.parse_uriF**include_passwordmaskc                 C  s&   |  |\}}d|r||S ||S )Nzazurestoragequeues://{}@{})r$   format)clsrk   ry   rz   rW   rw   r   r   r   as_uri   s   zTransport.as_uri)rk   r   r4   rl   )Frx   )rk   r   ry   r   rz   r   r4   r   )r^   r_   r`   ra   r   rh   rb   ri   rj   staticmethodr$   classmethodr}   r   r   r   r   r#      s   
 0r#   )ra   
__future__r   stringr.   r   typingr   r   azure.core.exceptionsr   kombu.utils.encodingr   kombu.utils.jsonr   r	   kombu.utils.objectsr
   r\   r   azure.storage.queuer   r    azure.identityr   r   punctuationrd   r   r#   r   r   r   r   <module>   s4    4p