o
    DfI                     @   s  d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ dZddhZdZdZG dd dejZG dd dejZG dd dejZG dd dejZ G dd dejZ!G dd dejZ"dS )zWorker-level Bootsteps.    N)Hub)get_event_loopset_event_loop)	DummyLockLaxBoundedSemaphore)Timer)	bootsteps)_set_task_join_will_block)ImproperlyConfigured)
IS_WINDOWS)worker_logger)r   r   PoolBeatStateDBConsumereventletgeventzO-B option doesn't work with eventlet/gevent pools: use standalone beat instead.z
The worker_pool setting shouldn't be used to select the eventlet/gevent
pools, instead you *must use the -P* argument so that patches are applied
as early as possible.
c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	r   zTimer bootstep.c                 C   sF   |j rtdd|_d S |js|jj|_| j|j|j| j| j	d|_d S )Ng      $@)max_interval)r   on_erroron_tick)
use_eventloop_Timertimer	timer_clspool_clsr   instantiatetimer_precisionon_timer_erroron_timer_tickselfw r"   Q/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/components.pycreate#   s   
zTimer.createc                 C   s   t jd|dd d S )NzTimer error: %rT)exc_info)loggererror)r    excr"   r"   r#   r   1   s   zTimer.on_timer_errorc                 C   s   t d| d S )Nz Timer wake-up! Next ETA %s secs.)r&   debug)r    delayr"   r"   r#   r   4      zTimer.on_timer_tickN)__name__
__module____qualname____doc__r$   r   r   r"   r"   r"   r#   r       s
    r   c                       sV   e Zd ZdZefZ fddZdd Zdd Zdd	 Z	d
d Z
dd Zdd Z  ZS )r   zWorker starts the event loop.c                    s   d |_ t j|fi | d S N)hubsuper__init__r    r!   kwargs	__class__r"   r#   r3   =   s   zHub.__init__c                 C   s   |j S r0   )r   r   r"   r"   r#   
include_ifA   s   zHub.include_ifc                 C   sF   t  |_|jd u rt|jdd }t|r|nt|j|_| | | S )Nrequires_hub)r   r1   getattr	_conninfor   _Hubr   _patch_thread_primitives)r    r!   required_hubr"   r"   r#   r$   D   s   

z
Hub.createc                 C   s   d S r0   r"   r   r"   r"   r#   startM   s   z	Hub.startc                 C      |j   d S r0   r1   closer   r"   r"   r#   stopP      zHub.stopc                 C   r@   r0   rA   r   r"   r"   r#   	terminateS   rD   zHub.terminatec                 C   s<   t  |jj_zddlm} W n
 ty   Y d S w t |_d S )Nr   )pool)r   appclockmutexbilliardrF   ImportErrorLock)r    r!   rF   r"   r"   r#   r=   V   s   
zHub._patch_thread_primitives)r,   r-   r.   r/   r   requiresr3   r8   r$   r?   rC   rE   r=   __classcell__r"   r"   r6   r#   r   8   s    	r   c                       sP   e Zd ZdZefZd fdd	Zdd Zdd Zd	d
 Z	dd Z
dd Z  ZS )r   a
  Bootstep managing the worker pool.

    Describes how to initialize the worker pool, and starts and stops
    the pool during worker start-up/shutdown.

    Adds attributes:

        * autoscale
        * pool
        * max_concurrency
        * min_concurrency
    Nc                    s   d |_ d |_|j|_|j| _t|tr'|d\}}}t||r$t|p%dg}||_	|j	r4|j	\|_|_t
 j|fi | d S )N,r   )rF   max_concurrencyconcurrencymin_concurrencyoptimization
isinstancestr	partitionint	autoscaler2   r3   )r    r!   rX   r5   max_c_min_cr6   r"   r#   r3   r   s   
zPool.__init__c                 C      |j r
|j   d S d S r0   )rF   rB   r   r"   r"   r#   rB         z
Pool.closec                 C   r\   r0   )rF   rE   r   r"   r"   r#   rE      r]   zPool.terminatec                 C   s   d }d }|j jjtv rttt |j pt	}|j
}|j|_|s?t| }|_|jj|_|jj|_d}|jr?|jjr?|j|_|j}| j|j|j
|j |jf|j|j|j|j|joY||j|||d|| j|j d }|_ t!|j" |S )Nd   T)initargsmaxtasksperchildmax_memory_per_childtimeoutsoft_timeoutputlockslost_worker_timeoutthreadsmax_restartsallow_restartforking_enable	semaphoresched_strategyrG   )#rG   confworker_poolGREEN_POOLSwarningswarnUserWarningW_POOL_SETTINGr   r   rR   _process_taskprocess_taskr   rj   acquire_quick_acquirerelease_quick_releasepool_putlocksr   uses_semaphore_process_task_sempool_restartsr   hostnamemax_tasks_per_childra   
time_limitsoft_time_limitworker_lost_waitrS   rF   r	   task_join_will_block)r    r!   rj   rg   threadedprocsrh   rF   r"   r"   r#   r$      sD   



zPool.createc                 C   s   d|j r	|j jiS diS )NrF   zN/A)rF   infor   r"   r"   r#   r      s   z	Pool.infoc                 C   s   |j | d S r0   )rF   register_with_event_loop)r    r!   r1   r"   r"   r#   r      r+   zPool.register_with_event_loopr0   )r,   r-   r.   r/   r   rM   r3   rB   rE   r$   r   r   rN   r"   r"   r6   r#   r   b   s    $r   c                       s2   e Zd ZdZd ZdZd fdd	Zdd Z  ZS )	r   zWStep used to embed a beat process.

    Enabled when the ``beat`` argument is set.
    TFc                    s.   | | _ |_d |_t j|fd|i| d S )Nbeat)enabledr   r2   r3   )r    r!   r   r5   r6   r"   r#   r3      s   zBeat.__init__c                 C   s@   ddl m} |jjdrtt||j|j|j	d }|_
|S )Nr   )EmbeddedService)r   r   )schedule_filenamescheduler_cls)celery.beatr   r   r-   endswithr
   ERR_B_GREENrG   r   	schedulerr   )r    r!   r   br"   r"   r#   r$      s   zBeat.create)F)	r,   r-   r.   r/   labelconditionalr3   r$   rN   r"   r"   r6   r#   r      s    r   c                       s(   e Zd ZdZ fddZdd Z  ZS )r   z:Bootstep that sets up between-restart state database file.c                    s&   |j | _d |_t j|fi | d S r0   )statedbr   _persistencer2   r3   r4   r6   r"   r#   r3      s   zStateDB.__init__c                 C   s,   |j |j |j|jj|_t|jj d S r0   )	state
Persistentr   rG   rH   r   atexitregistersaver   r"   r"   r#   r$      s   zStateDB.create)r,   r-   r.   r/   r3   r$   rN   r"   r"   r6   r#   r      s    r   c                   @   s   e Zd ZdZdZdd ZdS )r   z)Bootstep starting the Consumer blueprint.Tc                 C   sn   |j rt|j d|j }n|j|j }| j|j|j|j|j|j	||j
|j|j||j|j|j|jd }|_|S )N   )r}   task_eventsinit_callbackinitial_prefetch_countrF   r   rG   
controllerr1   worker_optionsdisable_rate_limitsprefetch_multiplier)rP   maxr   rQ   r   consumer_clsrt   r}   r   ready_callbackrF   r   rG   r1   optionsr   consumer)r    r!   prefetch_countcr"   r"   r#   r$      s&   zConsumer.createN)r,   r-   r.   r/   lastr$   r"   r"   r"   r#   r      s    r   )#r/   r   ro   kombu.asynchronousr   r<   r   r   kombu.asynchronous.semaphorer   r   kombu.asynchronous.timerr   r   celeryr   celery._stater	   celery.exceptionsr
   celery.platformsr   celery.utils.logr   r&   __all__rn   r   rr   StepStartStopStepr   r   r   r   r"   r"   r"   r#   <module>   s,    *P