o
    Df0d                  	   @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddl m!Z! dZ"e#edZ$dZ%dZ&dZ'e!e(Z)e)j*Z+dZ,dZ-dZ.ej/ej0ej1ej2ej3ej4ej5ej6dZ7G dd deZ8e9e8 eddd dd d! Z:d"e%e
e;e<fd#d$Z=d%d& Z>d'd( Z?e?d)G d*d+ d+Z@e?d,G d-d. d.ZAG d/d0 d0ZBd1d2 ZCd3d4 ZDdS )5a  In-memory representation of cluster state.

This module implements a data-structure used to keep
track of the state of a cluster of workers and the tasks
it is working on (by consuming events).

For every event consumed the state is updated,
so the state represents the state of the cluster
at the time of the last event.

Snapshots (:mod:`celery.events.snapshot`) can be used to
take "pictures" of this state at regular intervals
to for example, store that in a database.
    N)defaultdict)Callable)datetime)Decimal)islice)
itemgetter)time)MappingOptional)WeakSetref	timetuple)cached_property)states)LRUCachememoizepass1)
get_logger)WorkerTaskStateheartbeat_expirespypy_version_info      zmSubstantial drift from %s may mean clocks are out of sync.  Current drift is %s seconds.  [orig: %s recv: %s]z4<State: events={0.event_count} tasks={0.task_count}>z9<Worker: {0.hostname} ({0.status_string} clock:{0.clock})z4<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>)sentreceivedstartedfailedretried	succeededrevokedrejectedc                       s(   e Zd ZdZ fddZdd Z  ZS )CallableDefaultdicta  :class:`~collections.defaultdict` with configurable __call__.

    We use this for backwards compatibility in State.tasks_by_type
    etc, which used to be a method but is now an index instead.

    So you can do::

        >>> add_tasks = state.tasks_by_type['proj.tasks.add']

    while still supporting the method call::

        >>> add_tasks = list(state.tasks_by_type(
        ...     'proj.tasks.add', reverse=True))
    c                    s   || _ t j|i | d S N)funsuper__init__)selfr&   argskwargs	__class__ L/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/events/state.pyr(   _   s   zCallableDefaultdict.__init__c                 O   s   | j |i |S r%   )r&   )r)   r*   r+   r.   r.   r/   __call__c      zCallableDefaultdict.__call__)__name__
__module____qualname____doc__r(   r0   __classcell__r.   r.   r,   r/   r$   O   s    r$   i  c                 C   s   | d S Nr   r.   )a_r.   r.   r/   <lambda>j   s    r:   )maxsizekeyfunc                 C   s    t t| |t|t| d S r%   )warnDRIFT_WARNINGr   fromtimestamp)hostnamedriftlocal_received	timestampr.   r.   r/   _warn_driftj   s   rD   <   c                 C   s8   |||r	||n|}|| |r|| } | ||d   S )z#Return time when heartbeat expires.g      Y@r.   )rC   freqexpire_windowr   float
isinstancer.   r.   r/   r   r   s   
r   c                 C   s   | di |S )Nr.   r.   )clsfieldsr.   r.   r/   _depickle_task~      rL   c                    s    fdd}|S )Nc                    s(    fdd}|| _  fdd}|| _| S )Nc                    s$   t || jrt|  t| kS tS r%   )rI   r-   getattrNotImplemented)thisotherattrr.   r/   __eq__   s   z8with_unique_field.<locals>._decorate_cls.<locals>.__eq__c                    s   t t|  S r%   )hashrN   )rP   rR   r.   r/   __hash__   rM   z:with_unique_field.<locals>._decorate_cls.<locals>.__hash__)rT   rV   )rJ   rT   rV   rR   r.   r/   _decorate_cls   s
   z(with_unique_field.<locals>._decorate_clsr.   )rS   rW   r.   rR   r/   with_unique_field   s   rX   r@   c                   @   s   e Zd ZdZdZeZdZesed Z				ddd	Z
d
d Zdd Zdd Zdd Zedd Zedd ZeefddZedd ZdS )r   zWorker State.   )r@   pidrF   
heartbeatsclockactive	processedloadavgsw_identsw_versw_sys)event__dict____weakref__NrE   r   c                 C   s`   || _ || _|| _|d u rg n|| _|pd| _|| _|| _|| _|	| _|
| _	|| _
|  | _d S r7   )r@   rZ   rF   r[   r\   r]   r^   r_   r`   ra   rb   _create_event_handlerrc   )r)   r@   rZ   rF   r[   r\   r]   r^   r_   r`   ra   rb   r.   r.   r/   r(      s   
zWorker.__init__c                 C   s6   | j | j| j| j| j| j| j| j| j| j	| j
| jffS r%   )r-   r@   rZ   rF   r[   r\   r]   r^   r_   r`   ra   rb   r)   r.   r.   r/   
__reduce__   s
   zWorker.__reduce__c                    sP   t j jjjjjjd d d tttt	j
tf fdd	}|S )Nc	                    s   |pi }|  D ]
\}	}
 |	|
 q| dkrg d d < d S |r#|s%d S ||||| }||kr;tj||| |r`|}|d krKd |rY|d krY| d S || d S d S )Noffline   r   )itemsrD   r@   )type_rC   rB   rK   	max_driftabsintinsortlenkvrA   hearts_set	hb_appendhb_pophbmaxr[   r)   r.   r/   rc      s(   z+Worker._create_event_handler.<locals>.event)object__setattr__heartbeat_maxr[   popappendHEARTBEAT_DRIFT_MAXro   rp   bisectrq   rr   r)   rc   r.   rv   r/   rf      s   zWorker._create_event_handlerc                 K   s:   |r
t |fi |n|}| D ]
\}}t| || qd S r%   )dictrl   setattr)r)   fkwdrs   rt   r.   r.   r/   update   s   zWorker.updatec                 C   
   t | S r%   )R_WORKERformatrg   r.   r.   r/   __repr__      
zWorker.__repr__c                 C   s   | j rdS dS )NONLINEOFFLINEaliverg   r.   r.   r/   status_string   s   zWorker.status_stringc                 C   s   t | jd | j| jS )Nrk   )r   r[   rF   rG   rg   r.   r.   r/   r      s   
zWorker.heartbeat_expiresc                 C   s   t | jo	| | jk S r%   )boolr[   r   )r)   nowfunr.   r.   r/   r      s   zWorker.alivec                 C   s
   d | S )Nz{0.hostname}.{0.pid})r   rg   r.   r.   r/   id      
z	Worker.id)NNrE   Nr   NNNNNN)r2   r3   r4   r5   r}   HEARTBEAT_EXPIRE_WINDOWrG   _fieldsPYPY	__slots__r(   rh   rf   r   r   propertyr   r   r   r   r   r.   r.   r.   r/   r      s.    
!

r   uuidc                   @   s6  e Zd ZdZd Z Z Z Z Z Z	 Z
 Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z ZZejZdZ dZ!e"sCdZ#ej$diZ%dZ&d$dd	Z'dddej(e)e*j+ej,fd
dZ-d%ddZ.dd Z/dd Z0dd Z1dd Z2dd Z3dd Z4e5dd Z6e5dd Z7e5dd Z8e9d d! Z:e9d"d# Z;dS )&r   zTask State.Nr   )r   namestater   r   r   r#   r!   r   r    r"   r*   r+   etaexpiresretriesworkerresult	exceptionrC   runtime	tracebackexchangerouting_keyr\   clientrootroot_idparent	parent_idchildren)rd   re   )r   r*   r+   r   r   r   r   r   )r*   r+   r   r   r   r   r   r   r   r   r   r   c                    sh   | _ | _ jd urt fdd|pdD  _nt  _ j j jd _|r2 j	| d S d S )Nc                 3   s*    | ]}| j jv r j j|V  qd S r%   )cluster_statetasksget).0task_idrg   r.   r/   	<genexpr>"  s    z Task.__init__.<locals>.<genexpr>r.   )r   r   r   )
r   r   r   r   _serializable_children_serializable_root_serializable_parent_serializer_handlersrd   r   )r)   r   r   r   r+   r.   rg   r/   r(     s   
zTask.__init__c	           
         s   |pi }||}	|	d ur|| || n|  }	|	|kr?| j|kr?||	|| jkr?| j|	  d ur> fdd| D }n|j|	|d | j| d S )Nc                    s   i | ]\}}| v r||qS r.   r.   )r   rs   rt   keepr.   r/   
<dictcomp>E  s    zTask.event.<locals>.<dictcomp>)r   rC   )upperr   merge_rulesr   rl   r   rd   )
r)   rm   rC   rB   rK   
precedencer   task_event_to_stateRETRYr   r.   r   r/   rc   1  s   
z
Task.eventc                    s8    sg n  du rj n fdd}t| S )z;Information about this task suitable for on-screen display.Nc                  3   s:    t t   D ]} t| d }|d ur| |fV  q	d S r%   )listrN   )keyvalueextrarK   r)   r.   r/   _keysS  s   
zTask.info.<locals>._keys)_info_fieldsr   )r)   rK   r   r   r.   r   r/   infoN  s   
z	Task.infoc                 C   r   r%   )R_TASKr   rg   r.   r.   r/   r   [  r   zTask.__repr__c                    s&   t j jj fddjD S )Nc                    s"   i | ]}||t  |qS r.   )r   )r   rs   r   handlerr)   r.   r/   r   a  s    z Task.as_dict.<locals>.<dictcomp>)r{   __getattribute__r   r   r   rg   r.   r   r/   as_dict^  s
   zTask.as_dictc                 C   s   dd | j D S )Nc                 S      g | ]}|j qS r.   r   )r   taskr.   r.   r/   
<listcomp>f      z/Task._serializable_children.<locals>.<listcomp>)r   r)   r   r.   r.   r/   r   e  r1   zTask._serializable_childrenc                 C      | j S r%   )r   r   r.   r.   r/   r   h     zTask._serializable_rootc                 C   r   r%   )r   r   r.   r.   r/   r   k  r   zTask._serializable_parentc                 C   s   t | j|  ffS r%   )rL   r-   r   rg   r.   r.   r/   rh   n     zTask.__reduce__c                 C   r   r%   )r   rg   r.   r.   r/   r   q  s   zTask.idc                 C   s   | j d u r| jS | j jS r%   )r   r   r   rg   r.   r.   r/   originu  s   zTask.originc                 C   s   | j tjv S r%   r   r   READY_STATESrg   r.   r.   r/   readyy  s   z
Task.readyc                 C   .   z| j o| jjj| j  W S  ty   Y d S w r%   )r   r   r   dataKeyErrorrg   r.   r.   r/   r   }  
   zTask.parentc                 C   r   r%   )r   r   r   r   r   rg   r.   r.   r/   r     r   z	Task.root)NNN)NN)<r2   r3   r4   r5   r   r   r   r   r!   r   r    r"   r#   r*   r+   r   r   r   r   r   r   rC   r   r   r   r   r   r   r   r   PENDINGr   r\   r   r   r   RECEIVEDr   r   r(   r   r   TASK_EVENT_TO_STATEr   r   rc   r   r   r   r   r   r   rh   r   r   r   r   r   r   r   r.   r.   r.   r/   r      s    






r   c                   @   s   e Zd ZdZeZeZdZdZdZ					d9ddZ	e
d	d
 Zdd Zd:ddZd:defddZd:ddZd:defddZdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zefd%d&Zd;d'ee fd(d)Zd<d*efd+d,ZeZd<d-d.Z d<d/d0Z!d1d2 Z"d3d4 Z#d5d6 Z$d7d8 Z%dS )=r   zRecords clusters state.r   rY   N  '  c                 C   s   || _ |d u rt|n|| _|d u rt|n|| _|d u rg n|| _|| _|| _|| _|| _t	
 | _i | _t | _i | _|   t| jt| _| jt|	| j t| jt| _| jt|
| j d S r%   )event_callbackr   workersr   	_taskheapmax_workers_in_memorymax_tasks_in_memoryon_node_joinon_node_leave	threadingLock_mutexhandlersset_seen_types_tasks_to_resolverebuild_taskheapr$   _tasks_by_typer   tasks_by_typer   !_deserialize_Task_WeakSet_Mapping_tasks_by_workertasks_by_worker)r)   callbackr   r   taskheapr   r   r   r   r   r   r.   r.   r/   r(     s>   


zState.__init__c                 C   s   |   S r%   )_create_dispatcherrg   r.   r.   r/   _event  s   zState._eventc              	   O   sd   | dd}| j z||i |W |r|   W  d    S |r'|   w w 1 s+w   Y  d S )Nclear_afterF)r~   r   _clear)r)   r&   r*   r+   r   r.   r.   r/   freeze_while  s   
zState.freeze_whileTc                 C   4   | j  | |W  d    S 1 sw   Y  d S r%   )r   _clear_tasksr)   r   r.   r.   r/   clear_tasks     $zState.clear_tasksr   c                 C   sJ   |rdd |   D }| j  | j| n| j  g | jd d < d S )Nc                 S   s"   i | ]\}}|j tjvr||qS r.   r   r   r   r   r.   r.   r/   r     s
    z&State._clear_tasks.<locals>.<dictcomp>)	itertasksr   clearr   r   )r)   r   in_progressr.   r.   r/   r     s   

zState._clear_tasksc                 C   s$   | j   | | d| _d| _d S r7   )r   r  r   event_count
task_countr  r.   r.   r/   r     s   


zState._clearc                 C   r   r%   )r   r   r  r.   r.   r/   r    r  zState.clearc                 K   sZ   z| j | }|r|| |dfW S  ty,   | j|fi | }| j |< |df Y S w )zsGet or create worker by hostname.

        Returns:
            Tuple: of ``(worker, was_created)`` pairs.
        FT)r   r   r   r   )r)   r@   r+   r   r.   r.   r/   get_or_create_worker  s   


zState.get_or_create_workerc                 C   sD   z| j | dfW S  ty!   | j|| d }| j |< |df Y S w )zGet or create task by uuid.Fr   T)r   r   r   )r)   r   r   r.   r.   r/   get_or_create_task  s   zState.get_or_create_taskc                 C   r   r%   )r   r   r   r.   r.   r/   rc     r  zState.eventc                 C       |  t|dd|gdd S )Deprecated, use :meth:`event`.-r   typer   r   r   joinr)   rm   rK   r.   r.   r/   
task_event      zState.task_eventc                 C   r  )r  r  r   r  r   r  r  r.   r.   r/   worker_event  r  zState.worker_eventc                    s   j jjtdddtdddddjjjjj 	j	j
jj
jj jjjjjjjjjjtttjdf 	
fdd	}|S )	Nr@   rC   rB   r   r\   Tc                    s<   j d7  _ r|  | d d\}}}z|}W n	 |y'   Y nw ||| |fS |dkrz	| \}	}
}W n
 |yF   Y d S w |dk}z	|	d}}W n |yo   |re|	d}}n|	 }|	< Y nw |||
||  
r|s|dkr
| r|r| |	d  ||f|fS |dkr| \}}	}
}}|d	k}z	|d}}W n |y    |d
 }|< d}Y nw |r|	|_n(z|	}W n |y   |	 }|	< Y nw ||_|d ur|r|d ||
 |r|	n|j}t}|d 	krd |||
|t|}|r%|d kr%| n|| |dkr6 j	d7  _	|||
||  |j
}|d ur[| |r[|| |	| |jr}zj|j }W n |yv   | Y nw |j| zj|}W n
 |y   Y nw |j| ||f|fS d S )Nrj   r  r  r   ri   Fonliner   r   r  Tr   rk   r   )r  	partitionrc   r~   r   r   r   rr   r   r	  r   addr   r   _add_pending_task_childr   r   r   )rc   r   r   rq   createdgroupr9   subjectr   r@   rC   rB   
is_offliner   r   r\   is_client_eventr   task_createdr   heapstimetup	task_nameparent_task	_childrenr   r   add_typer   get_handlerget_taskget_task_by_type_setget_task_by_worker_set
get_workermax_events_in_heapr   r   r)   r   r   tfields	th_appendth_popwfieldsr   r.   r/   r     s   





z(State._create_dispatcher.<locals>._event)r   __getitem__r   r   r   r   r~   r   heap_multiplierr   r  r   r   r   r   r   r   r   r   r   r   r   r   rq   )r)   r   r.   r'  r/   r     s*   4^zState._create_dispatcherc                 C   sD   z| j |j }W n ty   t  }| j |j< Y nw || d S r%   )r   r   r   r   r  )r)   r   chr.   r.   r/   r  |  s   zState._add_pending_task_childc                    s2    fdd| j  D  }| jd d < |  d S )Nc                    s$   g | ]} |j |j|jt|qS r.   )r\   rC   r   r   r   tr   r.   r/   r     s    z*State.rebuild_taskheap.<locals>.<listcomp>)r   valuesr   sort)r)   r   heapr.   r   r/   r     s   
zState.rebuild_taskheaplimitc                 c   s:    t | j D ]\}}|V  |r|d |kr d S qd S )Nrj   )	enumerater   rl   )r)   r;  indexrowr.   r.   r/   r    s   zState.itertasksreversec                 c   sd    | j }|r
t|}t }t|d|D ]}|d  }|dur/|j}||vr/||fV  || qdS )zkGenerator yielding tasks ordered by time.

        Yields:
            Tuples of ``(uuid, Task)``.
        r      N)r   reversedr   r   r   r  )r)   r;  r?  _heapseenevtupr   r   r.   r.   r/   tasks_by_time  s   


zState.tasks_by_timec                    "   t  fdd| j|dD d|S )zGet all tasks by type.

        This is slower than accessing :attr:`tasks_by_type`,
        but will be ordered by time.

        Returns:
            Generator: giving ``(uuid, Task)`` pairs.
        c                 3   s&    | ]\}}|j  kr||fV  qd S r%   r   r  rG  r.   r/   r     s   
 
z'State._tasks_by_type.<locals>.<genexpr>r?  r   r   rE  )r)   r   r;  r?  r.   rG  r/   r     s   	zState._tasks_by_typec                    rF  )znGet all tasks by worker.

        Slower than accessing :attr:`tasks_by_worker`, but ordered by time.
        c                 3   s(    | ]\}}|j j kr||fV  qd S r%   )r   r@   r  r@   r.   r/   r     s   
 z)State._tasks_by_worker.<locals>.<genexpr>rH  r   rI  )r)   r@   r;  r?  r.   rJ  r/   r     s   zState._tasks_by_workerc                 C   s
   t | jS )z%Return a list of all seen task types.)sortedr   rg   r.   r.   r/   
task_types  r   zState.task_typesc                 C   s   dd | j  D S )z+Return a list of (seemingly) alive workers.c                 s   s    | ]}|j r|V  qd S r%   r   )r   wr.   r.   r/   r     s    z&State.alive_workers.<locals>.<genexpr>)r   r8  rg   r.   r.   r/   alive_workers  s   zState.alive_workersc                 C   r   r%   )R_STATEr   rg   r.   r.   r/   r     r   zState.__repr__c                 C   s8   | j | j| j| jd | j| j| j| jt| j	t| j
f
fS r%   )r-   r   r   r   r   r   r   r   _serialize_Task_WeakSet_Mappingr   r   rg   r.   r.   r/   rh     s   zState.__reduce__)
NNNNr   r   NNNN)Tr%   )NT)&r2   r3   r4   r5   r   r   r  r	  r4  r(   r   r   r   r  r   r   r   r  r
  r  rc   r  r  r   r  r   r   r
   rp   r  rE  tasks_by_timestampr   r   rL  rN  r   rh   r.   r.   r.   r/   r     sJ    


	
{

r   c                 C   s   dd |   D S )Nc                 S   s    i | ]\}}|d d |D qS )c                 S   r   r.   r   r6  r.   r.   r/   r     r   z>_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<listcomp>r.   )r   r   r   r.   r.   r/   r     s     z3_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>rl   )mappingr.   r.   r/   rP    r   rP  c                    s   | pi }  fdd|   D S )Nc                    s(   i | ]\}}|t  fd d|D qS )c                 3   s     | ]}| v r | V  qd S r%   r.   )r   ir   r.   r/   r     s    z?_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<genexpr>)r   )r   r   idsrU  r.   r/   r     s    z5_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>rR  )rS  r   r.   rU  r/   r     s   
r   )Er5   r   sysr   collectionsr   collections.abcr   r   decimalr   	itertoolsr   operatorr   r   typingr	   r
   weakrefr   r   kombu.clocksr   kombu.utils.objectsr   celeryr   celery.utils.functionalr   r   r   celery.utils.logr   __all__hasattrr   r   r   r>   r2   loggerwarningr=   rO  r   r   r   r   STARTEDFAILUREr   SUCCESSREVOKEDREJECTEDr   r$   registerrD   rH   rI   r   rL   rX   r   r   r   rP  r   r.   r.   r.   r/   <module>   st    



]   G