o
    Df8                     @   s2  d Z ddlZddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddl	mZ ddl	mZ dd	lmZmZ dd
lmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& zddl'Z'W n e(y   dZ'Y nw dZ)dZ*dZ+dZ,G dd dZ-dS )a  WorkController can be used to instantiate in-process workers.

The command-line interface for the worker is in :mod:`celery.bin.worker`,
while the worker program is in :mod:`celery.apps.worker`.

The worker program is responsible for adding signal handlers,
setting up logging, etc.  This is a bare-bones worker without
global side-effects (i.e., except for the global state stored in
:mod:`celery.worker.state`).

The worker consists of several components, all managed by bootsteps
(mod:`celery.bootsteps`).
    N)datetimetimezone)	cpu_count)detect_environment)	bootsteps)concurrency)signals)RUN	TERMINATE)ImproperlyConfiguredTaskRevokedErrorWorkerTerminate)
EX_FAILUREcreate_pidlock)reload_from_cwd)mlevel)worker_logger)default_nodenameworker_direct)str_to_list)default_socket_timeout   state)WorkControllerg      @z
Trying to select queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.

If you want to automatically declare unknown queues you can
enable the `task_create_missing_queues` setting.
ze
Trying to deselect queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.
c                   @   s  e Zd ZdZdZdZdZdZdZdZ	G dd de
jZdHddZ		dIddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd ZdJddZdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdKd,d-ZdLd.d/Z dMd1d2Z!dNd3d4Z"dJd5d6Z#dKd7d8Z$d9d: Z%d;d< Z&d=d> Z'd?d@ Z(dAdB Z)e*dCdD Z+																					dOdFdGZ,dS )Pr   zUnmanaged worker instance.Nc                   @   s   e Zd ZdZdZh dZdS )zWorkController.BlueprintzWorker bootstep blueprint.Worker>   celery.worker.components:Hubcelery.worker.components:Beatcelery.worker.components:Poolcelery.worker.components:Timer celery.worker.components:StateDB!celery.worker.components:Consumer'celery.worker.autoscale:WorkerComponentN)__name__
__module____qualname____doc__namedefault_steps r)   r)   M/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/worker.py	BlueprintK   s    r+   c                 K   s   |p| j | _ t|| _ttj| _| j j	  | j
di | | jdi | | jdi | | jdi | jdi | d S )Nr)   )appr   hostnamer   nowr   utcstartup_timeloaderinit_workeron_before_initsetup_defaultson_after_initsetup_instanceprepare_args)selfr,   r-   kwargsr)   r)   r*   __init__Y   s   
 zWorkController.__init__c                 K   s   || _ | || | t| | js&zt | _W n ty%   d| _Y nw t| j| _|p0| j	| _
| j | _|d u r@|  n|| _|| _tjj| d t| j| _g | _|   | j| jjd | j| j| jd| _| jj| fi | d S )N   senderworker)stepson_starton_close
on_stopped)pidfilesetup_queuessetup_includesr   r   r   NotImplementedErrorr   loglevelon_consumer_readyready_callbackr,   connection_for_read	_conninfoshould_use_eventloopuse_eventloopoptionsr   worker_initsend_concurrencyget_implementationpool_clsr?   on_init_blueprintr+   r@   rA   rB   	blueprintapply)r8   queuesrI   rC   includerM   exclude_queuesr9   r)   r)   r*   r6   d   s6   

zWorkController.setup_instancec                 C      d S Nr)   r8   r)   r)   r*   rT         z WorkController.on_init_blueprintc                 K   rZ   r[   r)   r8   r9   r)   r)   r*   r3      r]   zWorkController.on_before_initc                 K   rZ   r[   r)   r^   r)   r)   r*   r5      r]   zWorkController.on_after_initc                 C   s   | j rt| j | _d S d S r[   )rC   r   pidlockr\   r)   r)   r*   r@      s   zWorkController.on_startc                 C   rZ   r[   r)   )r8   consumerr)   r)   r*   rH      r]   z WorkController.on_consumer_readyc                 C   s   | j j  d S r[   )r,   r1   shutdown_workerr\   r)   r)   r*   rA      s   zWorkController.on_closec                 C   s,   | j   | j  | jr| j  d S d S r[   )timerstopr`   shutdownr_   releaser\   r)   r)   r*   rB      s
   

zWorkController.on_stoppedc              
   C   s   t |}t |}z
| jjj| W n ty( } z
tt 	||d }~ww z
| jjj
| W n tyI } z
tt 	||d }~ww | jjjr\| jjjt| j d S d S r[   )r   r,   amqprW   selectKeyErrorr   SELECT_UNKNOWN_QUEUEstripformatdeselectDESELECT_UNKNOWN_QUEUEconfr   
select_addr-   )r8   rX   excludeexcr)   r)   r*   rD      s*   
zWorkController.setup_queuesc                    sf   t  jjj}|r|t |7 } fdd|D  | _dd  jj D }t t||B  jj_d S )Nc                    s   g | ]	} j j|qS r)   )r,   r1   import_task_module.0mr\   r)   r*   
<listcomp>   s    z1WorkController.setup_includes.<locals>.<listcomp>c                 S   s   h | ]}|j jqS r)   )	__class__r$   )rt   taskr)   r)   r*   	<setcomp>   s    z0WorkController.setup_includes.<locals>.<setcomp>)tupler,   rn   rX   tasksvaluesset)r8   includesprevtask_modulesr)   r\   r*   rE      s   
zWorkController.setup_includesc                 K   s   |S r[   r)   r^   r)   r)   r*   r7      r]   zWorkController.prepare_argsc                 C   s   t jj| d d S )Nr<   )r   worker_shutdownrP   r\   r)   r)   r*   _send_worker_shutdown   s   z$WorkController._send_worker_shutdownc              
   C   s   z	| j |  W d S  ty   |   Y d S  ty7 } ztjd|dd | jtd W Y d }~d S d }~w t	yP } z| j|j
d W Y d }~d S d }~w ty_   | jtd Y d S w )NzUnrecoverable error: %rT)exc_info)exitcode)rU   startr   	terminate	Exceptionloggercriticalrc   r   
SystemExitcodeKeyboardInterrupt)r8   rq   r)   r)   r*   r      s   zWorkController.startc                 C   s   | j j| d|fdd d S )Nregister_with_event_loopzhub.register)argsdescription)rU   send_all)r8   hubr)   r)   r*   r      s   
z'WorkController.register_with_event_loopc                 C   s   |  | j|S r[   )_quick_acquire_process_taskr8   reqr)   r)   r*   _process_task_sem   s   z WorkController._process_task_semc                 C   sJ   z	| | j W dS  ty$   z|   W Y dS  ty#   Y Y dS w w )z2Process task by sending it to the pool of workers.N)execute_using_poolpoolr   _quick_releaseAttributeErrorr   r)   r)   r*   r      s   zWorkController._process_taskc                 C   s&   z| j   W d S  ty   Y d S w r[   )r`   closer   r\   r)   r)   r*   signal_consumer_close   s
   z$WorkController.signal_consumer_closec                 C   s    t  dko| jjjjo| jj S )Ndefault)r   rK   	transport
implementsasynchronousr,   
IS_WINDOWSr\   r)   r)   r*   rL      s
   

z#WorkController.should_use_eventloopFc                 C   sF   |dur|| _ | jjtkr|   |r| jjr| jdd |   dS )z'Graceful shutdown of the worker server.NTwarm)	r   rU   r   r	   r   r   signal_safe	_shutdownr   )r8   in_sighandlerr   r)   r)   r*   rc      s   zWorkController.stopc                 C   s8   | j jtkr|   |r| jjr| jdd dS dS dS )z.Not so graceful shutdown of the worker server.Fr   N)rU   r   r
   r   r   r   r   )r8   r   r)   r)   r*   r      s   zWorkController.terminateTc                 C   sX   | j d ur*tt | j j| | d | j   W d    d S 1 s#w   Y  d S d S )N)r   )rU   r   SHUTDOWN_SOCKET_TIMEOUTrc   join)r8   r   r)   r)   r*   r     s   

"zWorkController._shutdownc                 C   sT   t | j|||d | jr| j  | j  z| j  W d S  ty)   Y d S w )N)force_reloadreloader)list_reload_modulesr`   update_strategiesreset_rate_limitsr   restartrF   )r8   modulesreloadr   r)   r)   r*   r     s   

zWorkController.reloadc                    s4    fddt |d u rjjjD S |pdD S )Nc                 3   s"    | ]}j |fi  V  qd S r[   )_maybe_reload_modulers   r9   r8   r)   r*   	<genexpr>  s
    
z1WorkController._reload_modules.<locals>.<genexpr>r)   )r}   r,   r1   r   )r8   r   r9   r)   r   r*   r     s   
zWorkController._reload_modulesc                 C   sH   |t jvrtd| | jj|S |r"td| tt j| |S d S )Nzimporting module %szreloading module %s)sysr   r   debugr,   r1   import_from_cwdr   )r8   moduler   r   r)   r)   r*   r     s   
z#WorkController._maybe_reload_modulec                 C   s8   t tj| j }| jjt t	| j
jt| dS )N)totalpidclockuptime)r   r.   r   r/   r0   r   total_countosgetpidstrr,   r   roundtotal_seconds)r8   r   r)   r)   r*   info'  s   

zWorkController.infoc                 C   s   t d u rtdt t j}i d|jd|jd|jd|jd|jd|j	d|j
d	|jd
|jd|jd|jd|jd|jd|jd|jd|jS )Nz%rusage not supported by this platformutimestimemaxrssixrssidrssisrssminfltmajfltnswapinblockoublockmsgsndmsgrcvnsignalsnvcswnivcsw)resourcerF   	getrusageRUSAGE_SELFru_utimeru_stime	ru_maxrssru_ixrssru_idrssru_isrss	ru_minflt	ru_majfltru_nswap
ru_inblock
ru_oublock	ru_msgsnd	ru_msgrcvru_nsignalsru_nvcsw	ru_nivcsw)r8   sr)   r)   r*   rusage.  sH   	
zWorkController.rusagec                 C   s`   |   }|| j |  || jj | j z	|  |d< W |S  ty/   d|d< Y |S w )Nr   zN/A)r   updaterU   r`   r   rF   )r8   r   r)   r)   r*   statsE  s   
zWorkController.statsc                 C   s"   dj | | jr| j dS ddS )z``repr(worker)``.z#<Worker: {self.hostname} ({state})>INIT)r8   r   )rk   rU   human_stater\   r)   r)   r*   __repr__O  s   zWorkController.__repr__c                 C   s   | j S )z#``str(worker) == worker.hostname``.)r-   r\   r)   r)   r*   __str__V  s   zWorkController.__str__c                 C   s   t S r[   r   r\   r)   r)   r*   r   Z  s   zWorkController.stateWARNc                 K   s  | j j}|| _|| _|d|| _|d|| _|d||| _|d|| _|d|| _|d|| _	|p2|| _
|d|	| _|d|
| _|d	|| _|d
||| _|d|| _|d||| _|d||| _|d||| _|d|| _|d|| _t|d|| _|d|| _|d|| _d S )Nworker_concurrencyworker_send_task_eventsworker_poolworker_consumerworker_timerworker_timer_precisionworker_autoscalerworker_pool_putlocksworker_pool_restartsworker_state_dbbeat_schedule_filenamebeat_schedulertask_time_limittask_soft_time_limitworker_max_tasks_per_childworker_max_memory_per_childworker_prefetch_multiplierworker_disable_rate_limitsworker_lost_wait)r,   eitherrG   logfiler   task_eventsrS   consumer_cls	timer_clstimer_precisionoptimizationautoscaler_clspool_putlockspool_restartsstatedbschedule_filename	scheduler
time_limitsoft_time_limitmax_tasks_per_childmax_memory_per_childintprefetch_multiplierdisable_rate_limitsr  )r8   r   rG   r  r  r   r  r  r  r
  r  r  r	  Or  r  r  r  rS   state_dbr   r   scheduler_clsr  r  r  r  r  r  _kwr  r)   r)   r*   r4   ^  sN   
zWorkController.setup_defaults)NN)NNNNNNr[   )FN)F)T)NFN)Nr   NNNNNNNNNNNNNNNNNNNNNNNNNN)-r#   r$   r%   r&   r,   r_   rU   r   	semaphorer   r   r+   r:   r6   rT   r3   r5   r@   rH   rA   rB   rD   rE   r7   r   r   r   r   r   r   rL   rc   r   r   r   r   r   r   r   r   r   r   propertyr   r4   r)   r)   r)   r*   r   >   s    

(










r   ).r&   r   r   r   r   billiardr   kombu.utils.compatr   celeryr   r   rQ   r   celery.bootstepsr	   r
   celery.exceptionsr   r   r   celery.platformsr   r   celery.utils.importsr   celery.utils.logr   r   r   celery.utils.nodenamesr   r   celery.utils.textr   celery.utils.threadsr    r   r   ImportError__all__r   ri   rm   r   r)   r)   r)   r*   <module>   s:    