o
    Df                     @   s   d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ dd
lmZ dZeeZejejejZZZeejddZG dd de	jZG dd deZdS )zPool Autoscaling.

This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.

The autoscale thread is only enabled if
the :option:`celery worker --autoscale` option is used.
    N)	monotonicsleep)	DummyLock)	bootsteps)
get_logger)bgThread   )state)Pool)
AutoscalerWorkerComponentAUTOSCALE_KEEPALIVE   c                   @   s>   e Zd ZdZdZdZefZdd Zdd Z	dd	 Z
d
d ZdS )r   z?Bootstep that starts the autoscaler thread/timer in the worker.r   Tc                 K   s   |j | _d |_d S N)	autoscaleenabled
autoscaler)selfwkwargs r   P/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/autoscale.py__init__&      
zWorkerComponent.__init__c                 C   s>   | j |j|j|j|j||jrt nd d }|_|js|S d S )N)workermutex)instantiateautoscaler_clspoolmax_concurrencymin_concurrencyuse_eventloopr   r   )r   r   scalerr   r   r   create*   s   zWorkerComponent.createc                 C   s*   |j j|jj ||jj|jj d S r   )consumeron_task_messageaddr   maybe_scalecall_repeatedly	keepalive)r   r   hubr   r   r   register_with_event_loop2   s   z(WorkerComponent.register_with_event_loopc                 C   s   d|j  iS )zReturn `Autoscaler` info.r   )r   info)r   r   r   r   r   r,   8   s   zWorkerComponent.infoN)__name__
__module____qualname____doc__labelconditionalr
   requiresr   r#   r+   r,   r   r   r   r   r      s    r   c                       s   e Zd ZdZddedf fdd	Zdd Zddd	Zdd
dZdddZ	dd Z
dd Zdd Zdd Zdd Zdd Zedd Zedd Z  ZS ) r   z,Background thread to autoscale pool workers.r   Nc                    sN   t    || _|pt | _|| _|| _|| _d | _	|| _
| js%J dd S )Nzcannot scale down too fast.)superr   r   	threadingLockr   r   r    r)   _last_scale_upr   )r   r   r   r    r   r)   r   	__class__r   r   r   @   s   
zAutoscaler.__init__c                 C   s:   | j  |   W d    n1 sw   Y  td d S )Ng      ?)r   r'   r   r   r   r   r   bodyN   s   
zAutoscaler.bodyc                 C   sZ   | j }t| j| j}||kr| ||  dS t| j| j}||k r+| ||  dS d S )NT)	processesminqtyr   scale_upmaxr    
scale_down)r   reqprocscurr   r   r   _maybe_scaleS   s   zAutoscaler._maybe_scalec                 C   s   |  |r| j  d S d S r   )rE   r   maintain_pool)r   rB   r   r   r   r'   ^   s   
zAutoscaler.maybe_scalec                 C   s   | j ; |d ur|| jk r| | j|  | | || _|d ur1|| jkr.| || j  || _| j| jfW  d    S 1 sAw   Y  d S r   )r   r<   _shrink_update_consumer_prefetch_countr   _growr    )r   r@   r=   r   r   r   updateb   s   



$zAutoscaler.updatec                 C   s   t  | _| |S r   )r   r7   rI   r   nr   r   r   r?   o   r   zAutoscaler.scale_upc                 C   s*   | j rt | j  | jkr| |S d S d S r   )r7   r   r)   rG   rK   r   r   r   rA   s   s
   
zAutoscaler.scale_downc                 C   s   t d| | j| d S )NzScaling up %s processes.)r,   r   growrK   r   r   r   rI   x   s   
zAutoscaler._growc              
   C   sl   t d| z	| j| W d S  ty   td Y d S  ty5 } ztd|dd W Y d }~d S d }~ww )NzScaling down %s processes.z0Autoscaler won't scale down: all processes busy.zAutoscaler: scale_down: %rT)exc_info)r,   r   shrink
ValueErrordebug	Exceptionerror)r   rL   excr   r   r   rG   |   s   
zAutoscaler._shrinkc                 C   s$   || j  }|r| jj| d S d S r   )r   r   r$   _update_prefetch_count)r   new_maxdiffr   r   r   rH      s   
z*Autoscaler._update_consumer_prefetch_countc                 C   s   | j | j| j| jdS )N)r@   r=   currentr>   )r   r    r<   r>   r:   r   r   r   r,      s
   zAutoscaler.infoc                 C   s
   t tjS r   )lenr	   reserved_requestsr:   r   r   r   r>      s   
zAutoscaler.qtyc                 C   s   | j jS r   )r   num_processesr:   r   r   r   r<      s   zAutoscaler.processesr   )NN)r-   r.   r/   r0   r   r   r;   rE   r'   rJ   r?   rA   rI   rG   rH   r,   propertyr>   r<   __classcell__r   r   r8   r   r   =   s&    


	
r   )r0   osr5   timer   r   kombu.asynchronous.semaphorer   celeryr   celery.utils.logr   celery.utils.threadsr    r	   
componentsr
   __all__r-   loggerrQ   r,   rS   floatenvirongetr   StartStopStepr   r   r   r   r   r   <module>   s     	