o
    Df(                     @  sv  d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ dZdeeeZej dkrddl!Z!ddl"Z"ddl#Z#e"j$Z%dZ&e"j'Z(e!) Z*dd Z+dd Z,nej dkrddl-Z-ddl-m%Z%m&Z& dd Z+dd Z,ne.dedg dZ/G dd dej0Z0G dd dej1Z1dS )a	  File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
* ``control_folder`` - directory where are exchange-queue table stored.
    )annotationsN)
namedtuple)Path)Empty)	monotonic)ChannelError)virtual)bytes_to_strstr_to_bytes)dumpsloads)cached_property)   r   r   .ntc                 C  s$   t |  }t ||ddt dS )Create file lock.r         N)	win32file_get_osfhandlefileno
LockFileEx__overlapped)fileflagshfile r   S/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/filesystem.pylock}   s   r   c                 C  s"   t |  }t |ddt dS )Remove file lock.r   r   N)r   r   r   UnlockFileExr   )r   r   r   r   r   unlock   s   r    posix)LOCK_EXLOCK_SHc                 C  s   t |  | dS )r   N)fcntlflockr   )r   r   r   r   r   r      s   c                 C  s   t |  t j dS )r   N)r$   r%   r   LOCK_UN)r   r   r   r   r       s   z9Filesystem plugin only defined for NT and POSIX platformsexchange_queue_t)routing_keypatternqueuec                   @  s   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd Zedd Zedd Zedd Zedd Zedd Zedd ZdS )ChannelzFilesystem Channel.Tc                 C  s   | j | d }z-|d}zt|t tt| }dd |D W t| |  W S t| |  w  t	y@   g  Y S  t
yM   td| w )N	.exchangerc                 S     g | ]}t | qS r   r'   .0qr   r   r   
<listcomp>       z%Channel.get_table.<locals>.<listcomp>zCannot open )control_folderopenr   r#   r   r	   readr    closeFileNotFoundErrorOSErrorr   )selfexchanger   f_objexchange_tabler   r   r   	get_table   s    


zChannel.get_tablec           
      C  s  | j | d }| j jdd t|pd|pd|pd}zf| rT|jddd}t|t tt|	 }dd	 |D }	||	vrS|	
d| |d |tt|	 n#|jd
dd}t|t |g}	|tt|	 W t| |  d S W t| |  d S t| |  w )Nr,   T)exist_ok zrb+r   	bufferingc                 S  r.   r   r/   r0   r   r   r   r3      r4   z'Channel._queue_bind.<locals>.<listcomp>wb)r5   mkdirr'   existsr6   r   r"   r   r	   r7   insertseekwriter
   r   r    r8   )
r;   r<   r(   r)   r*   r   	queue_valr=   r>   queuesr   r   r   _queue_bind   s6   



zChannel._queue_bindc                 K  s*   |  |D ]}| j|j|fi | qd S N)r?   _putr*   )r;   r<   payloadr(   kwargsr2   r   r   r   _put_fanout   s   zChannel._put_fanoutc                 K  s   d ttt d t |}tj| j	|}z2zt
|ddd}t|t |tt| W n ty?   td|dw W t| |  dS t| |  w )	zPut `message` onto `queue`.z{}_{}.{}.msgi  rD   r   rB   zCannot add file z to directoryN)formatintroundr   uuiduuid4ospathjoindata_folder_outr6   r   r"   rI   r
   r   r:   r   r    r8   )r;   r*   rO   rP   filenamefr   r   r   rN      s$   


zChannel._putc                 C  s   d| d }t | j}t|}t|dkrz|d}||dk r#q| jr*| j}nt	
 }ztt j| j|| W n	 tyE   Y qw t j||}zt|d}| }|  | jsct | W n tys   td|dw tt|S t )zGet next message from `queue`.r   .msgr   rbzCannot read file z from queue.)rW   listdirdata_folder_insortedlenpopfindstore_processedprocessed_foldertempfile
gettempdirshutilmoverX   rY   r:   r6   r7   r8   remover   r   r	   r   )r;   r*   
queue_findfolderr[   rf   r\   rO   r   r   r   _get   s@   



zChannel._getc                 C  s   d}d| d }t | j}t|dkrD| }z||dk r"W qt j| j|}t | |d7 }W n	 t	y=   Y nw t|dks|S )z!Remove all messages from `queue`.r   r   r]   r   )
rW   r_   r`   rb   rc   rd   rX   rY   rk   r:   r;   r*   countrl   rm   r[   r   r   r   _purge	  s    
zChannel._purgec                 C  sX   d}d| d}t | j}t|dkr*| }||dk r q|d7 }t|dks|S )z<Return the number of messages in `queue` as an :class:`int`.r   r   r]   r   )rW   r_   r`   rb   rc   rd   ro   r   r   r   _size"  s   	zChannel._sizec                 C  s
   | j jjS rM   )
connectionclienttransport_optionsr;   r   r   r   ru   3  s   
zChannel.transport_optionsc                 C     | j ddS )Nr`   data_inru   getrv   r   r   r   r`   7     zChannel.data_folder_inc                 C  rw   )NrZ   data_outry   rv   r   r   r   rZ   ;  r{   zChannel.data_folder_outc                 C  rw   )Nre   Fry   rv   r   r   r   re   ?  r{   zChannel.store_processedc                 C  rw   )Nrf   	processedry   rv   r   r   r   rf   C  r{   zChannel.processed_folderc                 C  s   t | jddS )Nr5   control)r   ru   rz   rv   r   r   r   r5   G  s   zChannel.control_folderN)__name__
__module____qualname____doc__supports_fanoutr?   rL   rQ   rN   rn   rq   rr   propertyru   r   r`   rZ   re   rf   r5   r   r   r   r   r+      s,    (




r+   c                      sZ   e Zd ZdZejjjdeg ddZe	Z	e
 ZdZdZdZ fddZd	d
 Z  ZS )	TransportzFilesystem Transport.F)directtopicfanout)asynchronousexchange_typer   
filesystemc                   s    t  j|fi | | j| _d S rM   )super__init__global_statestate)r;   rt   rP   	__class__r   r   r   [  s   zTransport.__init__c                 C  s   dS )NzN/Ar   rv   r   r   r   driver_version_  s   zTransport.driver_version)r   r   r   r   r   r   
implementsextend	frozensetr+   BrokerStater   default_portdriver_typedriver_namer   r   __classcell__r   r   r   r   r   L  s    
r   )2r   
__future__r   rW   ri   rg   rU   collectionsr   pathlibr   r*   r   timer   kombu.exceptionsr   kombu.transportr   kombu.utils.encodingr	   r
   kombu.utils.jsonr   r   kombu.utils.objectsr   VERSIONrY   mapstr__version__name
pywintypeswin32conr   LOCKFILE_EXCLUSIVE_LOCKr"   r#   LOCKFILE_FAIL_IMMEDIATELYLOCK_NB
OVERLAPPEDr   r   r    r$   RuntimeErrorr'   r+   r   r   r   r   r   <module>   sR    [



 .