o
    Df3                     @   s  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$m%Z% ddl&m'Z' ddl(m)Z) dZ*e#e+Z,ej-dZ.e/edZ0g dZ1dZ2dZ3dd Z4ej5fddZ6G dd de)Z7dddefd d!Z8e
d"kree8d#d$ed%Z9nee8d#dd&Z9e.see8d"d$ed%Z:nd'd(  Z9Z:d)d* Z;e.see8d+e;ed,Z<nd-d. Z<d/d0 Z=d=d2d3Z>d>d5d6Z?	7	8d?d9d:Z@d=d;d<ZAdS )@zWorker command-line program.

This module is the 'program-version' of :mod:`celery.worker`.

It does everything necessary to run that module
as an actual application, like installing signal handlers,
platform tweaks, and so on.
    N)datetime)partial)REMAP_SIGTERM)current_process)safe_str)VERSION_BANNER	platformssignals)trace)	AppLoader)
EX_FAILUREEX_OKcheck_privileges)staticterm)cry)qualname)
get_loggerin_sighandlerset_in_sighandler)	pluralize)WorkController)Workerjavapypy_version_info) ----------------- ***** ------- ******* ----- *** --- * ---- ** ----------r   r   r   r   r   r   r   z{hostname} v{version}

{platform} {timestamp}

[config]
.> app:         {app}
.> transport:   {conninfo}
.> results:     {results}
.> concurrency: {concurrency}
.> task events: {events}

[queues]
{queues}
z
[tasks]
{tasks}
c                  C   s    ddl m}  tdd |  D S )Nr   	enumeratec                 s   s     | ]}|j d sdV  qdS )zDummy-   N)name
startswith).0t r'   K/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/apps/worker.py	<genexpr>L   s    
z&active_thread_count.<locals>.<genexpr>)	threadingr!   sumr    r'   r'   r(   active_thread_countJ   s   r,   c                 C   s   t d|  |dd d S )N
Tfileflush)print)msgfr'   r'   r(   safe_sayP   s   r4   c                       s   e Zd ZdZd#ddZ		d$ fdd	Zdd	 Z fd
dZdd Zdd Z	d%ddZ
dd Zd&ddZdd Zd'ddZdd Zdd  Zd!d" Z  ZS )(r   zWorker as a program.Fc                 K   sB   || _ t| j| j tjj| j| | jj|d t	| jjj
 d S )N)senderinstanceconfoptions)quietr
   setup_worker_optimizationsapphostnamer	   celeryd_initsendr7   r   accept_content)selfr9   kwargsr'   r'   r(   on_before_initW   s   zWorker.on_before_initNc                    sr   | j d|| _| j d|| _t jdi | || _|| _tj	
 | _| j jj| j|d ur2| n|d| _d S )Nworker_redirect_stdoutsworker_redirect_stdouts_level)enabledr'   )r;   eitherredirect_stdoutsredirect_stdouts_levelsupersetup_defaultspurgeno_colorsysstdoutisatty_isattylogcoloredlogfile)r@   rK   rL   rG   rH   rA   	__class__r'   r(   on_after_initc   s   zWorker.on_after_initc                 C   s   |   | _t| j| j d S N)setup_logging_custom_loggingr
   r:   r;   r<   r@   r'   r'   r(   on_init_blueprints   s   
zWorker.on_init_blueprintc                    s   | j }t   tjj| j| |jd | jr| 	  | j
s!|   | d | |  | js8| jr8|j| j d}|j}t|trH| dv}|rV|j rXtd d S d S d S )N)r5   r6   r7   z-active-T)zdjango.conf:settingszPlease run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.)r;   rI   on_startr	   celeryd_after_setupr>   r<   r7   rK   purge_messagesr9   emit_bannerset_process_statusinstall_platform_tweaksrY   rG   rQ   rH   _config_source
isinstancestrlowermaybe_warn_deprecated_settingsloggerwarning)r@   r;   warn_deprecatedconfig_sourcerT   r'   r(   r\   y   s0   





zWorker.on_startc                 C   sl   t  }|rtt t  ttdt| j	
d| j| dt| j	|  p)dgtjdd d S )N z 
)artlinesTr.   )r   supports_imagesr1   imgcatr   logor   joinrd   rR   cyanstartup_inforeset
extra_inforM   
__stdout__)r@   	use_imager'   r'   r(   r_      s   
zWorker.emit_bannerc                 C   s$   t jj|d tdt| j d S )N)r5   z	%s ready.)r	   worker_readyr>   rg   infor   r<   )r@   consumerr'   r'   r(   on_consumer_ready   s   zWorker.on_consumer_readyc                 C   s8   |d u r| j d ur| j  }| jjj| j| jd|| jdS )NF)rG   colorizer<   )rL   r;   rQ   setuploglevelrS   r<   )r@   r{   r'   r'   r(   rX      s   zWorker.setup_loggingc                 C   st   | j  +}| j jj|d}|r(td| dt|d ddd W d    d S W d    d S 1 s3w   Y  d S )N)
connectionzpurge: Erased  messagez from the queue.
T)r0   )r;   connection_for_writecontrolrK   r1   r   )r@   r~   countr'   r'   r(   r^      s   ""zWorker.purge_messagesTr-   celery.c                    s"   |  fddt| jjD S )Nc                 3   s.    | ]} s| sn|rd | V  qdS )z  . N)r$   )r%   taskinclude_builtinsint_r'   r(   r)      s    
z"Worker.tasklist.<locals>.<genexpr>)rp   sortedr;   tasks)r@   r   sepr   r'   r   r(   tasklist   s   
zWorker.tasklistc                 C   sB   | j d u rd S | j tjkr| j tjk}| j|d}tj|dS d S )N)r   )r   )r}   loggingINFODEBUGr   EXTRA_INFO_FMTformat)r@   r   r   r'   r'   r(   rt      s   
zWorker.extra_infoc                 C   s  | j }t| j}d|jpdt|}t|jts1t	|j}|
dr)|dd  }|d| d7 }| jrB| j\}}d| d| d	}| j}t|tsM|j}|d|d
d  d7 }d}	| jsad}	tj|t| jt jddt| j   | j j |tt |	|jjjdddd
 }
|rt|
D ]$\}}zd t!| |
| g|
|< W q t"y   d|
|  |
|< Y qw d |
d S )Nz{}:{:#x}__main__zcelery.loaders    ()z{min=z, max=}.ONz/OFF (enable -E to monitor tasks in this worker)r   )microsecondF)indentindent_first)
r;   r<   	timestampversionconninforesultsconcurrencyplatformeventsqueuesr   z                r-   )#r;   rd   r   r   mainidrc   loaderr   r   r$   	autoscalepool_cls
__module__splittask_eventsBANNERr   r<   r   nowreplacer   r~   as_uribackend	_platformr   amqpr   
splitlinesr!   rp   ARTLINES
IndexError)r@   rl   r;   r   apprr   maxminpoolr   banneri_r'   r'   r(   rr      sP   






zWorker.startup_infoc                 C   sX   | j jr|   | js| j jrt| nt| t| t| t| t	  t
  dS )z1Install platform specific tweaks and workarounds.N)r;   IS_macOS macOS_proxy_detection_workaroundrP   !install_HUP_not_supported_handlerinstall_worker_restart_handlerinstall_worker_term_handler install_worker_term_hard_handlerinstall_worker_int_handlerinstall_cry_handlerinstall_rdb_handler)r@   workerr'   r'   r(   ra      s   

zWorker.install_platform_tweaksc                 C   s   t jdd dS )z6See https://github.com/celery/celery/issues#issue/161.celery_dummy_proxyset_by_celerydN)osenviron
setdefaultrZ   r'   r'   r(   r     s   z'Worker.macOS_proxy_detection_workaroundc                 C   s&   t jd| dt tj d| jdS )Ncelerydr   r   )rx   r<   )r   set_mp_process_titlestrargvrM   argvr<   )r@   rx   r'   r'   r(   r`     s
   zWorker.set_process_status)F)FNNNrW   )Tr-   r   )T)__name__r   __qualname____doc__rB   rV   r[   r\   r_   rz   rX   r^   r   rt   rr   ra   r   r`   __classcell__r'   r'   rT   r(   r   T   s$    
(


*r   TERMWarmc                    s2    fdd}t d |_|tj< d S )Nc                     s   t  ; ddlm} t jdkr+ r  td dtj tj	j
jd t|ddd	  W d    d S 1 sAw   Y  d S )
Nr   stateMainProcesszworker: z shutdown (MainProcess))r5   sighowexitcodeshould_stopshould_terminate)r   Cold)r   celery.workerr   r   _namer4   rM   ru   r	   worker_shutting_downr>   r<   setattr)argsr   callbackr   r   r   r   r'   r(   _handle_request  s$   "z*_shutdown_handler.<locals>._handle_requestworker_)rd   r   r   r	   )r   r   r   r   r   r   r'   r   r(   _shutdown_handler  s   r   SIGQUITSIGTERMr   )r   r   r   )r   r   c                  O      d S rW   r'   )akwr'   r'   r(   <lambda><  s    r   c                 C   s   t dtj t| dd d S )Nz>worker: Hitting Ctrl+C again will terminate all running tasks!SIGINTr   )r4   rM   ru   r   )r   r'   r'   r(   	on_SIGINT?  s   r   r   )r   r   r   c                  O   r   rW   r'   )r   rA   r'   r'   r(   r   K  s   r   c                   C   s2   t tjtjtjg ttjtjgtj	  d S rW   )
r   close_open_fdsrM   	__stdin__ru   
__stderr__r   execv
executabler   r'   r'   r'   r(   _reload_current_workerO  s   r   SIGHUPc                 C   s   dd }|t j|< d S )Nc                  W   sL   t d tddtj dtj ddl}|t ddl	m
} t|_dS )z5Signal handler restarting the current python program.TzRestarting celery worker (r   r   r   Nr   )r   r4   rp   rM   r   ru   atexitregisterr   r   r   r   r   )r   r   r   r'   r'   r(   restart_worker_sig_handlerX  s   

zBinstall_worker_restart_handler.<locals>.restart_worker_sig_handlerr   r	   )r   r   r   r'   r'   r(   r   V  s   	r   SIGUSR1c                 C   s   t rd S dd }|tj| < d S )Nc                  W   s6   t   tt  W d   dS 1 sw   Y  dS )z=Signal handler logging the stack-trace of all active threads.N)r   r4   r   )r   r'   r'   r(   cry_handleri  s   "z(install_cry_handler.<locals>.cry_handler)is_pypyr   r	   )r   r   r'   r'   r(   r   d  s   r   CELERY_RDBSIGSIGUSR2c                 C   s&   dd }t j| r|tj|< d S d S )Nc                  W   sX   t   ddlm}m} | r| d n| j}|| W d   dS 1 s%w   Y  dS )z=Signal handler setting a rdb breakpoint at the current frame.r   )_frame	set_tracer"   N)r   celery.contrib.rdbr  r  f_back)r   r  r  framer'   r'   r(   rdb_handlers  s
   
"z(install_rdb_handler.<locals>.rdb_handler)r   r   getr   r	   )envvarr   r  r'   r'   r(   r   p  s   r   c                    s    fdd}|t j < d S )Nc                    s<   t   tdj d W d    d S 1 sw   Y  d S )NzH{sig} not supported: Restarting with {sig} is unstable on this platform!r   )r   r4   r   )signumr  r   r'   r(   warn_on_HUP_handler  s
   
"z>install_HUP_not_supported_handler.<locals>.warn_on_HUP_handlerr   )r   r   r  r'   r   r(   r     s   r   )r   )r   )r  r  )Br   r   r   r   r   rM   r   	functoolsr   billiard.commonr   billiard.processr   kombu.utils.encodingr   celeryr   r   r	   
celery.appr
   celery.loaders.appr   celery.platformsr   r   r   celery.utilsr   r   celery.utils.debugr   celery.utils.importsr   celery.utils.logr   r   r   celery.utils.textr   r   r   __all__r   rg   r$   	is_jythonhasattrr   r   r   r   r,   r   r4   r   r   r   r   r   r   r   r   r   r   r   r'   r'   r'   r(   <module>   s~    
 F



