o
    Df#                     @   s   d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZmZ d
ZG dd dZdS )zEvent dispatcher sends events.    N)defaultdictdeque)Producer)app_or_default)anon_nodename)	utcoffset   )Eventget_exchange
group_from)EventDispatcherc                   @   s   e Zd ZdZdhZdZdZdZ				d"ddZd	d
 Z	dd Z
dd Zdd ZdefddZddefddZdeddefddZd#ddZdd Zdd Zdd Zd d! ZeeeZdS )$r   a0  Dispatches event messages.

    Arguments:
        connection (kombu.Connection): Connection to the broker.

        hostname (str): Hostname to identify ourselves as,
            by default uses the hostname returned by
            :func:`~celery.utils.anon_nodename`.

        groups (Sequence[str]): List of groups to send events for.
            :meth:`send` will ignore send requests to groups not in this list.
            If this is :const:`None`, all events will be sent.
            Example groups include ``"task"`` and ``"worker"``.

        enabled (bool): Set to :const:`False` to not actually publish any
            events, making :meth:`send` a no-op.

        channel (kombu.Channel): Can be used instead of `connection` to specify
            an exact channel to use when sending events.

        buffer_while_offline (bool): If enabled events will be buffered
            while the connection is down. :meth:`flush` must be called
            as soon as the connection is re-established.

    Note:
        You need to :meth:`close` this after use.
    sqlNTr      c                 C   s0  t |p| j| _|| _|| _|pt | _|| _|
pt | _|| _	|| _
tt| _t | _d | _t | _|p:| jjj| _t | _t | _t|pHg | _tj tj g| _| jj| _|	| _ |se|re|jj!| _|| _"| jpo| j# }t$|| jjj%d| _&|j'j(| j)v rd| _"| j"r| *  d| ji| _+t,- | _.d S )N)nameFhostname)/r   app
connectionchannelr   r   buffer_while_offline	frozensetbuffer_groupbuffer_limiton_send_bufferedr   list_group_buffer	threadingLockmutexproducerr   _outbound_bufferconfevent_serializer
serializerset
on_enabledon_disabledgroupstimetimezonealtzonetzoffsetclockdelivery_modeclientenabledconnection_for_writer
   event_exchangeexchange	transportdriver_typeDISABLED_TRANSPORTSenableheadersosgetpidpid)selfr   r   r.   r   r   r   r"   r&   r,   r   r   r   conninfo r<   Q/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/events/dispatcher.py__init__:   s@   



zEventDispatcher.__init__c                 C   s   | S Nr<   r:   r<   r<   r=   	__enter__^   s   zEventDispatcher.__enter__c                 G   s   |    d S r?   )close)r:   exc_infor<   r<   r=   __exit__a   s   zEventDispatcher.__exit__c                 C   s:   t | jp| j| j| jdd| _d| _| jD ]}|  qd S )NF)r1   r"   auto_declareT)r   r   r   r1   r"   r   r.   r$   r:   callbackr<   r<   r=   r5   d   s   
zEventDispatcher.enablec                 C   s.   | j rd| _ |   | jD ]}|  qd S d S )NF)r.   rB   r%   rF   r<   r<   r=   disablem   s   
zEventDispatcher.disableFc           	      K   s|   |rdn| j  }||f| jt | j|d|}| j | j||fd|ddi|W  d   S 1 s7w   Y  dS )au  Publish event using custom :class:`~kombu.Producer`.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
                fields: Dictionary of event fields, must be json serializable.
            producer (kombu.Producer): Producer instance to use:
                only the ``publish`` method will be called.
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event.
                Defaults to :func:`Event`.
            utcoffset (Callable): Function returning the current
                utc offset in hours.
        Nr   r   r9   r+   routing_key-.)r+   forwardr   r   r9   r   _publishreplace)	r:   typefieldsr   blindr	   kwargsr+   eventr<   r<   r=   publisht   s   
$zEventDispatcher.publishc           	      C   st   | j }z|j|||j|||g| j| j| jd	 W d S  ty9 } z| js% | j	|||f W Y d }~d S d }~ww )N)rJ   r1   retryretry_policydeclarer"   r6   r,   )
r1   rU   r   r"   r6   r,   	Exceptionr   r   append)	r:   rT   r   rJ   rV   rW   r   r1   excr<   r<   r=   rN      s&    zEventDispatcher._publishc              	   K   s   | j r\| jt|}}	|r|	|vrdS |	| jv rO| j }
||f| j| | j|
d|}| j|	 }|	| t
|| jkrD|   dS | jrM|   dS dS | j||| j||||dS dS )a  Send event.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event,
                defaults to :func:`Event`.
            utcoffset (Callable): unction returning the current utc offset
                in hours.
            **fields (Any): Event fields -- must be json serializable.
        NrI   )rR   r	   rV   rW   )r.   r&   r   r   r+   rM   r   r9   r   rZ   lenr   flushr   rU   r   )r:   rP   rR   r   rV   rW   r	   rQ   r&   groupr+   rT   bufr<   r<   r=   send   s0   




zEventDispatcher.sendc           	      C   s   |r8t | j}z*| j |D ]\}}}| || j| qW d   n1 s&w   Y  W | j  n| j  w |rj| j# | j D ]\}}| || jd|  g |dd< qCW d   dS 1 scw   Y  dS dS )zFlush the outbound buffer.Nz%s.multi)r   r   r   rN   r   clearr   items)	r:   errorsr&   r_   rT   rJ   _r^   eventsr<   r<   r=   r]      s$   
"zEventDispatcher.flushc                 C   s   | j |j  dS )z-Copy the outbound buffer of another instance.N)r   extend)r:   otherr<   r<   r=   extend_buffer   s   zEventDispatcher.extend_bufferc                 C   s*   | j  o| j   d| _dS  d| _dS )zClose the event dispatcher.N)r   lockedreleaser   r@   r<   r<   r=   rB      s   

zEventDispatcher.closec                 C   s   | j S r?   r   r@   r<   r<   r=   _get_publisher   s   zEventDispatcher._get_publisherc                 C   s
   || _ d S r?   rk   )r:   r   r<   r<   r=   _set_publisher   s   
zEventDispatcher._set_publisher)NNTNTNNNr   Nr   N)TT)__name__
__module____qualname____doc__r4   r   r$   r%   r>   rA   rD   r5   rH   r	   rU   r   rN   r`   r]   rh   rB   rl   rm   property	publisherr<   r<   r<   r=   r      s:    
$	



%r   )rq   r7   r   r'   collectionsr   r   kombur   
celery.appr   celery.utils.nodenamesr   celery.utils.timer   rT   r	   r
   r   __all__r   r<   r<   r<   r=   <module>   s    