o
    Df_                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	m
Z
 ddlmZ ddlmZ ddlmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$m%Z% ddl&m'Z' ddl(m)Z)m*Z* ddl+m,Z,m-Z- ddl.m/Z/m0Z0 dZ1eddZ2e,e3Z4e4j5e4j6e4j7e4j8f\Z5Z6Z7Z8dZ9G dd de:Z;G dd dZ<eG dd dZ=dd Z>d d! Z?G d"d# d#Z@G d$d% d%e@ZAG d&d' d'ZBG d(d) d)eZCze  W n eDy   dZEY n	w G d*d+ d+eZEd.d,d-ZFdS )/zThe periodic task scheduler.    N)timegm)
namedtuple)total_ordering)EventThread)ensure_multiprocessing)reset_signals)Process)maybe_evaluatereprcall)cached_property   )__version__	platformssignals)reraise)crontabmaybe_schedule)is_numeric_value)load_extension_class_namessymbol_by_name)
get_loggeriter_open_logger_fds)humanize_secondsmaybe_make_aware)SchedulingErrorScheduleEntry	SchedulerPersistentSchedulerServiceEmbeddedServiceevent_t)timepriorityentryi,  c                   @   s   e Zd ZdZdS )r   z*An error occurred while scheduling a task.N)__name__
__module____qualname____doc__ r)   r)   D/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/beat.pyr   ,   s    r   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	BeatLazyFuncao  A lazy function declared in 'beat_schedule' and called before sending to worker.

    Example:

        beat_schedule = {
            'test-every-5-minutes': {
                'task': 'test',
                'schedule': 300,
                'kwargs': {
                    "current": BeatCallBack(datetime.datetime.now)
                }
            }
        }

    c                 O   s   || _ ||d| _d S )N)argskwargs_func_func_params)selffuncr,   r-   r)   r)   r*   __init__A   s   zBeatLazyFunc.__init__c                 C      |   S N)delayr1   r)   r)   r*   __call__H      zBeatLazyFunc.__call__c                 C   s   | j | jd i | jd S )Nr,   r-   r.   r7   r)   r)   r*   r6   K   s   zBeatLazyFunc.delayN)r%   r&   r'   r(   r3   r8   r6   r)   r)   r)   r*   r+   0   s
    r+   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
			dddZdd	 ZeZdd
dZe ZZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )r   a  An entry in the scheduler.

    Arguments:
        name (str): see :attr:`name`.
        schedule (~celery.schedules.schedule): see :attr:`schedule`.
        args (Tuple): see :attr:`args`.
        kwargs (Dict): see :attr:`kwargs`.
        options (Dict): see :attr:`options`.
        last_run_at (~datetime.datetime): see :attr:`last_run_at`.
        total_run_count (int): see :attr:`total_run_count`.
        relative (bool): Is the time relative to when the server starts?
    Nr   r)   Fc                 C   sb   |
| _ || _|| _|| _|r|ni | _|r|ni | _t||	| j d| _|p(|  | _	|p-d| _
d S )N)appr   )r:   nametaskr,   r-   optionsr   scheduledefault_nowlast_run_attotal_run_count)r1   r;   r<   r@   rA   r>   r,   r-   r=   relativer:   r)   r)   r*   r3   s   s   zScheduleEntry.__init__c                 C   s   | j r| j  S | j S r5   )r>   nowr:   r7   r)   r)   r*   r?      s   zScheduleEntry.default_nowc                 C   s(   | j di t| |p|  | jd dS )z8Return new instance, with date and count fields updated.r   )r@   rA   Nr)   )	__class__dictr?   rA   )r1   r@   r)   r)   r*   _next_instance   s
   


zScheduleEntry._next_instancec              	   C   s*   | j | j| j| j| j| j| j| j| jffS r5   )	rD   r;   r<   r@   rA   r>   r,   r-   r=   r7   r)   r)   r*   
__reduce__   s   zScheduleEntry.__reduce__c                 C   s&   | j |j|j|j|j|jd dS )zUpdate values from another entry.

        Will only update "editable" fields:
            ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        )r<   r>   r,   r-   r=   N)__dict__updater<   r>   r,   r-   r=   r1   otherr)   r)   r*   rI      s
   zScheduleEntry.updatec                 C   s   | j | jS )z.See :meth:`~celery.schedules.schedule.is_due`.)r>   is_duer@   r7   r)   r)   r*   rL      s   zScheduleEntry.is_duec                 C   s   t t|  S r5   )itervarsitemsr7   r)   r)   r*   __iter__   s   zScheduleEntry.__iter__c                 C   s,   dj | t| j| jp
d| jpi t| jdS )Nz%<{name}: {0.name} {call} {0.schedule}r)   )callr;   )formatr   r<   r,   r-   typer%   r7   r)   r)   r*   __repr__   s
   zScheduleEntry.__repr__c                 C   s   t |trt| t|k S tS r5   )
isinstancer   idNotImplementedrJ   r)   r)   r*   __lt__   s   
zScheduleEntry.__lt__c                 C   s(   dD ]}t | |t ||kr dS qdS )N)r<   r,   r-   r=   r>   FT)getattr)r1   rK   attrr)   r)   r*   editable_fields_equal   s
   z#ScheduleEntry.editable_fields_equalc                 C   s
   |  |S )zTest schedule entries equality.

        Will only compare "editable" fields:
        ``task``, ``schedule``, ``args``, ``kwargs``, ``options``.
        )r[   rJ   r)   r)   r*   __eq__   s   
zScheduleEntry.__eq__)
NNNNNr)   NNFNr5   )r%   r&   r'   r(   r;   r>   r,   r-   r=   r@   rA   r3   r?   _default_nowrF   __next__nextrG   rI   rL   rP   rT   rX   r[   r\   r)   r)   r)   r*   r   O   s2    

r   c                 C   s   | sg S dd | D S )Nc                 S   s    g | ]}t |tr| n|qS r)   rU   r+   ).0vr)   r)   r*   
<listcomp>   s    z(_evaluate_entry_args.<locals>.<listcomp>r)   )
entry_argsr)   r)   r*   _evaluate_entry_args   s
   re   c                 C   s   | si S dd |   D S )Nc                 S   s&   i | ]\}}|t |tr| n|qS r)   r`   )ra   krb   r)   r)   r*   
<dictcomp>   s    z*_evaluate_entry_kwargs.<locals>.<dictcomp>)rO   )entry_kwargsr)   r)   r*   _evaluate_entry_kwargs   s
   ri   c                   @   sD  e Zd ZdZeZdZeZdZ	dZ
dZdZeZ		d>ddZdd	 Zd?d
dZd@ddZdd ZefddZeejfddZeeejejfddZdd Zdd Zdd ZdAddZ d d! Z!d"d# Z"d$d% Z#d&d' Z$d(d) Z%d*d+ Z&d,d- Z'd.d/ Z(d0d1 Z)d2d3 Z*d4d5 Z+d6d7 Z,e-e+e,Ze.d8d9 Z/e.d:d; Z0e-d<d= Z1dS )Br   a  Scheduler for periodic tasks.

    The :program:`celery beat` program may instantiate this class
    multiple times for introspection purposes, but then with the
    ``lazy`` argument set.  It's important for subclasses to
    be idempotent when this argument is set.

    Arguments:
        schedule (~celery.schedules.schedule): see :attr:`schedule`.
        max_interval (int): see :attr:`max_interval`.
        lazy (bool): Don't set up the schedule.
    N   r   Fc                 K   st   || _ t|d u r
i n|| _|p|jjp| j| _|p|jj| _d | _d | _	|d u r-|jj
n|| _|s8|   d S d S r5   )r:   r
   dataconfbeat_max_loop_intervalmax_intervalamqpProducer_heapold_schedulersbeat_sync_everysync_every_taskssetup_schedule)r1   r:   r>   rn   rp   lazyrt   r-   r)   r)   r*   r3      s    zScheduler.__init__c                 C   sJ   i }| j jjr| j jjsd|vrdtdddddid|d< | | d S )Nzcelery.backend_cleanup04*expiresi  )r<   r>   r=   )r:   rl   result_expiresbackendsupports_autoexpirer   update_from_dict)r1   rk   entriesr)   r)   r*   install_default_entries
  s   


z!Scheduler.install_default_entriesc              
   C   s   t d|j|j z
| j||dd}W n ty/ } ztd|t dd W Y d }~d S d }~ww |rAt|drAt	d|j|j
 d S t	d	|j d S )
Nz#Scheduler: Sending due task %s (%s)F)produceradvancezMessage Error: %s
%sTexc_inforV   z%s sent. id->%sz%s sent.)infor;   r<   apply_async	Exceptionerror	tracebackformat_stackhasattrdebugrV   )r1   r$   r   resultexcr)   r)   r*   apply_entry  s   
zScheduler.apply_entry{Gzc                 C   s   |r
|dkr
|| S |S )Nr   r)   )r1   ndriftr)   r)   r*   adjust"  s   zScheduler.adjustc                 C   s   |  S r5   )rL   )r1   r$   r)   r)   r*   rL   '  r9   zScheduler.is_duec                 C   s4   | j }t| }|| |jd  ||pd S )z9Return a utc timestamp, make sure heapq in correct order.g    .Ar   )r   r   r?   utctimetuplemicrosecond)r1   r$   next_time_to_runmktimer   as_nowr)   r)   r*   _when*  s   

zScheduler._whenc                 C   s\   d}g | _ | j D ]}| \}}| j || ||rdn|p!d|| q
|| j  dS )z:Populate the heap with the data contained in the schedule.   r   N)rq   r>   valuesrL   appendr   )r1   r!   heapifyr#   r$   rL   next_call_delayr)   r)   r*   populate_heap4  s   
zScheduler.populate_heapc                 C   s   | j }| j}| jdu s| | j| jst| j| _|   | j}|s%|S |d }|d }	| |	\}
}|
rh||}||u r\| 	|	}| j
|	| jd |||| |||d | dS ||| ||d |S ||}|t|ru||S ||S )zRun a tick - one iteration of the scheduler.

        Executes one due task per call.

        Returns:
            float: preferred delay in seconds for next call.
        Nr      )r   r   )r   rn   rq   schedules_equalrr   r>   copyr   rL   reserver   r   r   r   )r1   r!   minheappopheappushr   rn   Heventr$   rL   r   verify
next_entryadjusted_next_time_to_runr)   r)   r*   tickD  s<   	


zScheduler.tickc                 C   s   ||  u rd u rdS  |d u s|d u rdS t | t | kr$dS | D ]\}}||}|s6 dS ||kr= dS q(dS )NTF)setkeysrO   get)r1   old_schedulesnew_schedulesr;   	old_entry	new_entryr)   r)   r*   r   l  s   
zScheduler.schedules_equalc                 C   s.   | j  pt | j  | jkp| jo| j| jkS r5   )
_last_syncr"   	monotonic
sync_everyrt   _tasks_since_syncr7   r)   r)   r*   should_sync{  s   
zScheduler.should_syncc                 C   s   t | }| j|j< |S r5   )r_   r>   r;   )r1   r$   r   r)   r)   r*   r     s   zScheduler.reserveTc           	   
   K   sN  |r|  |n|}| jj|j}zzLt|j}t|j}|r>|j	||fd|i|j
W W |  jd7  _|  r=|   S S | j|j||fd|i|j
W W |  jd7  _|  r^|   S S  ty } ztttdj||dt d  W Y d }~nd }~ww W |  jd7  _|  r|   d S d S |  jd7  _|  r|   w w )Nr   r   z-Couldn't apply scheduled task {0.name}: {exc})r   r   )r   r:   tasksr   r<   re   r,   ri   r-   r   r=   r   r   _do_sync	send_taskr   r   r   rR   sysr   )	r1   r$   r   r   r-   r<   rd   rh   r   r)   r)   r*   r     sV   





zScheduler.apply_asyncc                 O   s   | j j|i |S r5   )r:   r   r1   r,   r-   r)   r)   r*   r        zScheduler.send_taskc                 C   s    |  | j | | jjj d S r5   )r   rk   merge_inplacer:   rl   beat_scheduler7   r)   r)   r*   ru     s   zScheduler.setup_schedulec                 C   s:   zt d |   W t | _d| _d S t | _d| _w )Nzbeat: Synchronizing schedule...r   )r   syncr"   r   r   r   r7   r)   r)   r*   r     s   



zScheduler._do_syncc                 C   s   d S r5   r)   r7   r)   r)   r*   r     s   zScheduler.syncc                 C   s   |    d S r5   )r   r7   r)   r)   r*   close  s   zScheduler.closec                 K   s&   | j dd| ji|}|| j|j< |S )Nr:   r)   )Entryr:   r>   r;   )r1   r-   r$   r)   r)   r*   add  s   zScheduler.addc                 C   s4   t || jr| j|_|S | jdi t||| jdS N)r;   r:   r)   )rU   r   r:   rE   )r1   r;   r$   r)   r)   r*   _maybe_entry  s   zScheduler._maybe_entryc                    s"    j  fdd| D  d S )Nc                    s   i | ]\}}|  ||qS r)   )r   )ra   r;   r$   r7   r)   r*   rg     s    z.Scheduler.update_from_dict.<locals>.<dictcomp>)r>   rI   rO   )r1   dict_r)   r7   r*   r~     s   zScheduler.update_from_dictc              	   C   s   | j }t|t|}}||A D ]}||d  q|D ]#}| jdi t|| || jd}||r:|| | q|||< qd S r   )r>   r   popr   rE   r:   r   rI   )r1   br>   ABkeyr$   r)   r)   r*   r     s    

zScheduler.merge_inplacec                 C   s   dd }| j || jjjS )Nc                 S   s   t d| | d S )Nz9beat: Connection error: %s. Trying again in %s seconds...)r   )r   intervalr)   r)   r*   _error_handler  s   z3Scheduler._ensure_connected.<locals>._error_handler)
connectionensure_connectionr:   rl   broker_connection_max_retries)r1   r   r)   r)   r*   _ensure_connected  s   
zScheduler._ensure_connectedc                 C   s   | j S r5   rk   r7   r)   r)   r*   get_schedule  s   zScheduler.get_schedulec                 C   s
   || _ d S r5   r   r1   r>   r)   r)   r*   set_schedule     
zScheduler.set_schedulec                 C   s
   | j  S r5   )r:   connection_for_writer7   r)   r)   r*   r     s   
zScheduler.connectionc                 C   s   | j |  ddS )NF)auto_declare)rp   r   r7   r)   r)   r*   r     s   zScheduler.producerc                 C   s   dS )N r)   r7   r)   r)   r*   r     s   zScheduler.info)NNNFNr5   )r   )NT)2r%   r&   r'   r(   r   r   r>   DEFAULT_MAX_INTERVALrn   r   rt   r   r   loggerr3   r   r   r   rL   r   r   r!   heapqr   r   r   r   r   r   r   r   r   r   r   ru   r   r   r   r   r   r~   r   r   r   r   propertyr   r   r   r   r)   r)   r)   r*   r      sZ    




(



r   c                       s   e Zd ZdZeZdZdZ fddZdd Z	dd	 Z
d
d Zdd Zdd Zdd Zdd ZeeeZdd Zdd Zedd Z  ZS )r   z+Scheduler backed by :mod:`shelve` database.)r   z.dbz.datz.bakz.dirNc                    s"   | d| _t j|i | d S )Nschedule_filename)r   r   superr3   r   rD   r)   r*   r3     s   zPersistentScheduler.__init__c              	   C   sL   | j D ] }ttj t| j|  W d    n1 sw   Y  qd S r5   )known_suffixesr   ignore_errnoerrnoENOENTosremover   )r1   suffixr)   r)   r*   
_remove_db  s   
zPersistentScheduler._remove_dbc                 C   s   | j j| jddS )NT)	writeback)persistenceopenr   r7   r)   r)   r*   _open_schedule  r   z"PersistentScheduler._open_schedulec                 C   s"   t d| j|dd |   |  S )Nz'Removing corrupted schedule file %r: %rTr   )r   r   r   r   )r1   r   r)   r)   r*    _destroy_open_corrupted_schedule  s
   z4PersistentScheduler._destroy_open_corrupted_schedulec              
   C   sF  z|   | _| j  W n ty$ } z| || _W Y d }~nd }~ww |   | jjj}| j	d}|d urG||krGt
d|| | j  | jjj}| j	d}|d urn||krnddd}t
d|| ||  | j  | jdi }| | jjj | | j | jt||d	 |   td
ddd | D   d S )Ntzz%Reset: Timezone changed from %r to %rutc_enabledenableddisabled)TFz Reset: UTC changed from %s to %sr   )r   r   r   zCurrent schedule:

c                 s   s    | ]}t |V  qd S r5   )repr)ra   r$   r)   r)   r*   	<genexpr>4  s    
z5PersistentScheduler.setup_schedule.<locals>.<genexpr>)r   _storer   r   r   _create_scheduler:   rl   timezoner   warningclear
enable_utc
setdefaultr   r   r   r>   rI   r   r   r   joinr   )r1   r   r   	stored_tzutc
stored_utcchoicesr   r)   r)   r*   ru     sB   





z"PersistentScheduler.setup_schedulec                 C   s   dD ]n}z| j d  W n2 tttfy=   zi | j d< W n tttfy8 } z| || _ W Y d }~Y qd }~ww Y  d S w d| j vrOtd | j    d S d| j vr`td | j    d S d| j vrntd | j    d S d S )	N)r   r   r   r   z+DB Reset: Account for new __version__ fieldr   z"DB Reset: Account for new tz fieldr   z+DB Reset: Account for new utc_enabled field)r   KeyErrorUnicodeDecodeError	TypeErrorr   r   r   )r1   _r   r)   r)   r*   r   7  s6   





z$PersistentScheduler._create_schedulec                 C   s
   | j d S Nr   r   r7   r)   r)   r*   r   N  r   z PersistentScheduler.get_schedulec                 C   s   || j d< d S r  r  r   r)   r)   r*   r   Q  s   z PersistentScheduler.set_schedulec                 C   s   | j d ur| j   d S d S r5   )r   r   r7   r)   r)   r*   r   U  s   
zPersistentScheduler.syncc                 C   s   |    | j  d S r5   )r   r   r   r7   r)   r)   r*   r   Y  s   zPersistentScheduler.closec                 C   s   d| j  S )Nz    . db -> )r   r7   r)   r)   r*   r   ]  s   zPersistentScheduler.info)r%   r&   r'   r(   shelver   r   r   r3   r   r   r   ru   r   r   r   r   r>   r   r   r   __classcell__r)   r)   r   r*   r     s$    &
r   c                   @   s`   e Zd ZdZeZ		dddZdd Zddd	Zd
d Z	dddZ
		dddZedd ZdS )r   zCelery periodic task service.Nc                 C   sB   || _ |p|jj| _|p| j| _|p|jj| _t | _t | _	d S r5   )
r:   rl   rm   rn   scheduler_clsbeat_schedule_filenamer   r   _is_shutdown_is_stopped)r1   r:   rn   r   r
  r)   r)   r*   r3   g  s   
zService.__init__c                 C   s   | j | j| j| j| jffS r5   )rD   rn   r   r
  r:   r7   r)   r)   r*   rG   s  s   zService.__reduce__Fc              	   C   s   t d tdt| jj tjj| d |r"tjj| d t	
d zNz/| j sQ| j }|rL|dkrLtdt|dd t| | j rL| j  | j r)W n ttfyb   | j  Y nw W |   d S W |   d S |   w )	Nzbeat: Starting...z#beat: Ticking with max interval->%s)senderzcelery beatg        zbeat: Waking up %s.zin )prefix)r   r   r   	schedulerrn   r   	beat_initsendbeat_embedded_initr   set_process_titler  is_setr   r"   sleepr   r   KeyboardInterrupt
SystemExitr   r   )r1   embedded_processr   r)   r)   r*   startw  s6   








zService.startc                 C      | j   | j  d S r5   )r  r   r  r   r7   r)   r)   r*   r        
zService.syncc                 C   s*   t d | j  |o| j  d S  d S )Nzbeat: Shutting down...)r   r  r   r  wait)r1   r  r)   r)   r*   stop  s   
zService.stopcelery.beat_schedulersc                 C   s0   | j }tt|}t| j|d| j|| j|dS )N)aliases)r:   r   rn   rv   )r   rE   r   r   r
  r:   rn   )r1   rv   extension_namespacefilenamer   r)   r)   r*   get_scheduler  s   zService.get_schedulerc                 C   r4   r5   )r#  r7   r)   r)   r*   r    s   zService.scheduler)NNN)F)Fr  )r%   r&   r'   r(   r   r
  r3   rG   r  r   r  r#  r   r  r)   r)   r)   r*   r   b  s    



r   c                       s0   e Zd ZdZ fddZdd Zdd Z  ZS )	_Threadedz(Embedded task scheduler using threading.c                    s2   t    || _t|fi || _d| _d| _d S )NTBeat)r   r3   r:   r   servicedaemonr;   r1   r:   r-   r   r)   r*   r3     s
   

z_Threaded.__init__c                 C   r  r5   )r:   set_currentr&  r  r7   r)   r)   r*   run  r  z_Threaded.runc                 C   s   | j jdd d S )NT)r  )r&  r  r7   r)   r)   r*   r    r   z_Threaded.stop)r%   r&   r'   r(   r3   r*  r  r	  r)   r)   r   r*   r$    s
    r$  c                       s,   e Zd Z fddZdd Zdd Z  ZS )_Processc                    s,   t    || _t|fi || _d| _d S )Nr%  )r   r3   r:   r   r&  r;   r(  r   r)   r*   r3     s   

z_Process.__init__c                 C   sP   t dd ttjtjtjgtt   | j	
  | j	  | jjdd d S )NF)fullT)r  )r   r   close_open_fdsr   	__stdin__
__stdout__
__stderr__listr   r:   set_defaultr)  r&  r  r7   r)   r)   r*   r*    s   


z_Process.runc                 C   s   | j   |   d S r5   )r&  r  	terminater7   r)   r)   r*   r    s   
z_Process.stop)r%   r&   r'   r3   r*  r  r	  r)   r)   r   r*   r+    s    	r+  c                 K   s<   | dds
tdu rt| fddi|S t| fd|i|S )zReturn embedded clock service.

    Arguments:
        thread (bool): Run threaded instead of as a separate process.
            Uses :mod:`multiprocessing` by default, if available.
    threadFNrn   r   )r   r+  r$  )r:   rn   r-   r)   r)   r*   r      s   r    r5   )Gr(   r   r   r   r   r  r   r"   r   calendarr   collectionsr   	functoolsr   	threadingr   r   billiardr   billiard.commonr   billiard.contextr	   kombu.utils.functionalr
   r   kombu.utils.objectsr   r   r   r   r   
exceptionsr   	schedulesr   r   utils.functionalr   utils.importsr   r   	utils.logr   r   
utils.timer   r   __all__r!   r%   r   r   r   r   r   r   r   r   r+   r   re   ri   r   r   r   r$  NotImplementedErrorr+  r    r)   r)   r)   r*   <module>   sf    
w		   kF
