o
    Df4                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZmZmZ dd	lmZmZ d
dlmZmZ d
dlmZ d
dlmZ d
dlmZ dZdZ ee!Z"da#dd Z$dd Z%d;ddZ&G dd deZ'dd Z(d<ddZ)dd  Z*d!d" Z+d#d$ Z,d=d%d&Z-		d=d'd(Z.d>d)d*Z/	d?d+d,Z0d-d. Z1d/d0 Z2e	d1d2 Z3d@d3d4Z4d@d5d6Z5dAd7d8Z6G d9d: d:Z7dS )BzCommon Utilities.    )annotationsN)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                   C  s   t d u rt ja t S N)_node_idr	   int r   r   E/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/common.pyget_node_id"   s   r    c                 C  sL   d | ||t|}z
ttt|}W |S  ty%   ttt|}Y |S w )Nz{:x}-{:x}-{:x}-{:x})formatidstrr   r   
ValueErrorr
   )node_id
process_id	thread_idinstanceentretr   r   r   generate_oid)   s   r+   Tc                 C  s$   t t t |rt | S d| S Nr   )r+   r    osgetpid	threading	get_ident)r(   threadsr   r   r   oid_from3   s   
r2   c                      s8   e Zd ZdZejd Z						d fdd	Z  ZS )	r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
    ---------
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNNFTc              
     sb   |rd |pdt }n|pdt  }t jd|p|||||d ur$|nt|ddd| d S )Nz{}.{}bcastzbcast.fanout)type)aliasr3   nameauto_deleteexchanger   )r!   r   super__init__r   )selfr8   r3   uniquer9   r:   r7   kwargs	__class__r   r   r<   R   s   

zBroadcast.__init__)NNFTNN)__name__
__module____qualname____doc__r   attrsr<   __classcell__r   r   r@   r   r   <   s    
r   c                 C  s   | |j jjv S r   )
connectionclientdeclared_entities)entitychannelr   r   r   declaration_cachedi   s   rM   Fc                 K  s    |rt | |fi |S t| |S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rK   rL   retryretry_policyr   r   r   r   m   s   
r   c                 C  s4   | j }|s|std| d|  | |} | S dS )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)is_boundr   bind)rK   rL   rR   r   r   r   _ensure_channel_is_boundt   s   
rT   c                 C  s   | }t | | |d u r| jstd|  d| j}d  }}|jr2| jr2|jjj}t| }||v r2dS |js9t	d| j
|d |d urJ|rJ|| |d urR| j|_dS )Nzchannel is None and entity z not bound.Fchannel disconnected)rL   T)rT   rR   r   rL   rH   can_cache_declarationrI   rJ   hashr   declareaddr8   )rK   rL   origdeclaredidentr   r   r   rO      s,   



rO   c                 K  s:   t | | | jjstd| jjjj| tfi || |S )NrU   )rT   rL   rH   r   rI   ensurerO   )rK   rL   rQ   r   r   r   rN      s   

rN   c              
   #  s    t    fdd}|g|pg  | _| ' t| jjj||ddD ]}z  V  W q  ty2   Y q w W d   dS 1 s>w   Y  dS )z&Drain messages from consumer instance.c                   s     | |f d S r   )append)bodymessageaccr   r   
on_message   s   z"drain_consumer.<locals>.on_messageT)limittimeoutignore_timeoutsN)r   	callbacksr   rL   rH   rI   popleft
IndexError)consumerrd   re   rg   rc   _r   ra   r   r      s   

"r   c                 K  s$   t | jd|g|d||||dS )zIterator over messages.)queuesrL   )rd   re   rg   Nr   )r   Consumer)connrL   r3   rd   re   rg   r?   r   r   r   r      s   r   c              	   c  sN    |rt |p	t D ]}z	| j|dV  W q
 tjy$   |r"|s" Y q
w dS )a   Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples
    --------
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also
    --------
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )re   N)ranger   drain_eventssocketre   )rn   rd   re   rf   ir   r   r   r      s   r   c              	   K  sH   |j |f| ||dt|jd |jdtj|j |jdfi |S )a  Send reply for request.

    Arguments:
    ---------
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r:   rP   rQ   reply_tocorrelation_id)routing_keyrt   
serializercontent_encoding)publishdict
propertiesgetserializerstype_to_namecontent_typerw   )r:   reqmsgproducerrP   rQ   propsr   r   r   r      s   


r   c           	   	   o  s|    | dd}d}z*t| ||g|R i |D ]\}}|s!|  d}|V  qW |r2||j dS dS |r=||j w w )z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultr   ackafter_reply_message_receivedr8   )	rn   rL   r3   argsr?   r   receivedr_   r`   r   r   r   r     s&   
r   c                 C  s   t jd| |dd d S )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervalr   r   r   _ensure_errback  s   
r   c              	   c  s,    zd V  W d S  | j | j y   Y d S w r   )connection_errorschannel_errors)rn   r   r   r   _ignore_errors  s   r   c                 O  sB   |rt |  ||i |W  d   S 1 sw   Y  t | S )a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
    ----
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r   )rn   funr   r?   r   r   r   ignore_errors%  s
   
 r   c                 C  s   |r|| d S d S r   r   )rH   rL   	on_reviver   r   r   revive_connectionG  s   r   c                 K  s   |pt }| jdd4}|j|d |j}tt||d}	|j||f||	d|}
|
|i t||d\}}|W  d   S 1 sAw   Y  dS )zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r   )r   r   )rH   N)r   acquireensure_connectiondefault_channelr   r   	autoretryry   )poolr   r   r?   r   r   optsrn   rL   reviver   retvalrk   r   r   r   r   L  s   $r   c                   @  s@   e Zd ZdZdZdd ZdddZddd	Zd
d Zdd Z	dS )QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
    ---------
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
    -------
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                 C  s   || _ t | _|pd| _d S r,   )callbackr/   RLock_mutexvalue)r=   r   initial_valuer   r   r   r<     s   
zQoS.__init__r   c                 C  sZ   | j  | jr| jt|d | _W d   | jS W d   | jS 1 s%w   Y  | jS )zIncrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   maxr=   nr   r   r   increment_eventually  s   

zQoS.increment_eventuallyc                 C  sx   | j . | jr|  j|8  _| jdk r(d| _W d   | jS W d   | jS W d   | jS 1 s4w   Y  | jS )zDecrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   r   r   r   r   decrement_eventually  s   



zQoS.decrement_eventuallyc                 C  sH   || j kr"|}|tkrtdt d}td| | j|d || _ |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)prefetch_count)prevPREFETCH_COUNT_MAXr   warningdebugr   )r=   pcount	new_valuer   r   r   set  s   
zQoS.setc                 C  s6   | j  | | jW  d   S 1 sw   Y  dS )z)Update prefetch count with current value.N)r   r   r   )r=   r   r   r   update  s   
$z
QoS.update)r   )
rB   rC   rD   rE   r   r<   r   r   r   r   r   r   r   r   r   `  s    *

r   )T)NF)r   NN)NNF)NFNr   )NN)8rE   
__future__r   r-   rq   r/   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r   r	   r
   amqpr   r   rK   r   r   logr   serializationr   r|   
utils.uuid__all__r   rB   r   r   r    r+   r2   r   rM   r   rT   rO   rN   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sV    

	-




	(



"
