o
    Df
                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZmZ ddlm Z m!Z!m"Z"m#Z#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*mZ+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z: z
ddl;m<Z= dZ>W n e?y   ej<fddZ=dZ>efddZY nw dZ@e7eAZBeBjCeBjDZCZDeEejFejGhZHdZId ZJd!ZKd"ZLeLeLeKeKeLd#ZMd$d% eMN D ZOe
d&d'ZPd(d) ZQd*d+ ZReSed,rddddejTejUejVejWfd-d.ZXnd:d/d.ZXddddeXfd0d1ZYd2d3 ZZG d4d5 d5ej[Z[G d6d7 d7ej\Z\G d8d9 d9ej]Z^dS );a  Version of multiprocessing.Pool using Async I/O.

.. note::

    This module will be moved soon, so don't use it directly.

This is a non-blocking version of :class:`multiprocessing.Pool`.

This code deals with three major challenges:

#. Starting up child processes and keeping them running.
#. Sending jobs to the processes and receiving results back.
#. Safely shutting down this system.
    N)Counterdeque
namedtuple)BytesIO)Integral)HIGHEST_PROTOCOL)packunpackunpack_from)sleep)WeakValueDictionaryref)pool)
isblockingsetblocking)ACKNACKRUN	TERMINATEWorkersJoined)_SimpleQueue)ERRWRITE)pickle)SELECT_BAD_FD)fxrange)promise)worker_before_create_process)noop)
get_logger)state)readTc                 C   s(   || |}t |}|dkr|| |S Nr   )lenwrite)fdbufsizer!   chunkn r*   T/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/concurrency/asynpool.py__read__5   s
   

r,   Fc                 C   s   || |  S N)getvalue)fmtiobufr	   r*   r*   r+   r
   =      r
   )AsynPool   g      @      )Ndefaultfastfcfsfairc                 C   s   i | ]\}}||qS r*   r*   ).0kvr*   r*   r+   
<dictcomp>W       r=   Ack)idr%   payloadc                 C   s   t | dkS )z(Return true if generator is not started.GEN_CREATED)inspectgetgeneratorstate)genr*   r*   r+   gen_not_started\   s   rF   c                 C   s$   z| j }W | S  ty   Y d S w r-   )_writerAttributeError)jobwriterr*   r*   r+   _get_job_writera   s   rK   pollc                    s   | }|j | rfdd| D  |rfdd|D  |r* fdd|D  t t }	}
|r9|dk r9dnt|d }||}|D ](\}}t|tsS| }|@ r\|	| |@ re|
| | @ rn|	| qF|	|
dfS )Nc                       g | ]}| qS r*   r*   r:   r%   )POLLINregisterr*   r+   
<listcomp>r   r>   z_select_imp.<locals>.<listcomp>c                    rM   r*   r*   rN   )POLLOUTrP   r*   r+   rQ   t   r>   c                    rM   r*   r*   rN   )POLLERRrP   r*   r+   rQ   v   r>   r   g     @@)rP   setroundrL   
isinstancer   filenoadd)readerswriterserrtimeoutrL   rO   rR   rS   pollerRWeventsr%   eventr*   )rS   rO   rR   rP   r+   _select_impk   s,   





rb   c                 C   s8   t  | |||\}}}|rtt|t|B }||dfS r"   )selectlistrT   )rY   rZ   r[   r\   rwer*   r*   r+   rb      s   
c                 C   s(  | du rt  n| } |du rt  n|}|du rt  n|}z|| |||W S  ty } zd|j}|tjkr@t  t  dfW  Y d}~S |tv r| |B |B D ]6}zt|gg g d W qJ ty } z|j}|tvrg | | || || W Y d}~qJd}~ww t  t  dfW  Y d}~S  d}~ww )a<  Simple wrapper to :class:`~select.select`, using :`~select.poll`.

    Arguments:
        readers (Set[Fd]): Set of reader fds to test if readable.
        writers (Set[Fd]): Set of writer fds to test if writable.
        err (Set[Fd]): Set of fds to test for error condition.

    All fd sets passed must be mutable as this function
    will remove non-working fds from them, this also means
    the caller must make sure there are still fds in the sets
    before calling us again.

    Returns:
        Tuple[Set, Set, Set]: of ``(readable, writable, again)``, where
        ``readable`` is a set of fds that have data available for read,
        ``writable`` is a set of fds that's ready to be written to
        and ``again`` is a flag that if set means the caller must
        throw away the result and call us again.
    Nr4   r   )rT   OSErrorerrnoEINTRr   rc   discard)rY   rZ   r[   r\   rL   exc_errnor%   r*   r*   r+   _select   s4   


rn   c           	   
      s    fdd}g }| D ]-| |}}z|g|R i | W q t tfy8   tjddd | Y qw |rc|D ]'zt|drK| n|d W q= tyb   td| Y q=w dS dS )	a  Apply hub method to fds in iter, remove from list if failure.

    Some file descriptors may become stale through OS reasons
    or possibly other reasons, so safely manage our lists of FDs.
    :param fds_iter: the file descriptors to iterate and apply hub_method
    :param source_data: data source to remove FD if it renders OSError
    :param hub_method: the method to call with each fd and kwargs
    :*args to pass through to the hub_method;
    with a special syntax string '*fd*' represents a substitution
    for the current fd object in the iteration (for some callers).
    :**kwargs to pass through to the hub method (no substitutions needed)
    c                     s"    } d| v rfdd D } | S )N*fd*c                    s   g | ]
}|d kr
 n|qS )ro   r*   )r:   arg)r%   r*   r+   rQ      s    zTiterate_file_descriptors_safely.<locals>._meta_fd_argument_maker.<locals>.<listcomp>r*   )	call_argsargsr%   r*   r+   _meta_fd_argument_maker   s   z@iterate_file_descriptors_safely.<locals>._meta_fd_argument_makerz)Encountered OSError when accessing fd %s Texc_inforemoveNz*ValueError trying to invalidate %s from %s)	rh   FileNotFoundErrorloggerwarningappendhasattrrw   pop
ValueError)	fds_itersource_data
hub_methodrs   kwargsrt   	stale_fdshub_args
hub_kwargsr*   rr   r+   iterate_file_descriptors_safely   s6   
r   c                   @   s   e Zd ZdZdd ZdS )WorkerzPool worker process.c                 C   s   | j t|ff d S r-   )outqput	WORKER_UP)selfpidr*   r*   r+   on_loop_start   s   zWorker.on_loop_startN)__name__
__module____qualname____doc__r   r*   r*   r*   r+   r      s    r   c                       s^   e Zd ZdZ fddZeeeee	j
fddZdd Zdd	 Zd
d Zdd Zdd Z  ZS )ResultHandlerz)Handles messages from the pool processes.c                    s:   | d| _| d| _t j|i | | j| jt< d S )Nfileno_to_outqon_process_alive)r}   r   r   super__init__state_handlersr   )r   rs   r   	__class__r*   r+   r      s   zResultHandler.__init__c	              
   c   s   d }	}
|rt d}t|}n|  }}|	dk r\z|||r$||	d  n|d|	 }W n tyF } z|jtvr9 d V  W Y d }~nd }~ww |dkrT|	rQtdt |	|7 }	|	dk s|d|\}|rmt |}t|}n|  }}|
|k rz|||r||
d  n|||
 }W n ty } z|jtvr d V  W Y d }~nd }~ww |dkr|
rtdt |
|7 }
|
|k sv||| j| |r|||}n	|d ||}|r|| d S d S )Nr   r5   zEnd of file during messagez>i)	bytearray
memoryviewrh   ri   UNAVAILEOFErrorhandle_eventseek)r   
add_readerr%   callbackr,   
readcanbufr   r
   loadHrBrr&   bufvr)   rl   	body_sizemessager*   r*   r+   _recv_message   sj   






zResultHandler._recv_messagec                    s6   | j | j|j |j| j fdd}|S )z3Coroutine reading messages from the pool processes.c              	      s   z|   W n t y   |  Y S w  | }zt| W n ty*   Y d S  ttfy9   |  Y d S w  | | d S r-   )KeyErrornextStopIterationrh   r   )rW   itr   r   on_state_changerecv_messageremove_readerr*   r+   on_result_readable?  s   z>ResultHandler._make_process_result.<locals>.on_result_readable)r   r   r   r   r   )r   hubr   r*   r   r+   _make_process_result7  s   z"ResultHandler._make_process_resultc                 C   s   |  || _d S r-   )r   r   )r   r   r*   r*   r+   register_with_event_loopO  s   z&ResultHandler.register_with_event_loopc                 G   s   t d)NzNot registered with event loop)RuntimeError)r   rs   r*   r*   r+   r   R     zResultHandler.handle_eventc           	   	   C   s   | j }| j}| j}| j}| j}t|}|r^|r`| jtkrb|d ur#|  t }|D ]%}t|g| j| j	|j
|| z|dd W q( tyM   td Y  d S w || |rd|rf| jtksd S d S d S d S d S d S )NT)shutdownz&result handler: all workers terminated)cachecheck_timeoutsr   r   join_exited_workersrT   _stater   r   _flush_outqueuerX   r   debugdifference_update)	r   r   r   r   r   r   	outqueuespending_remove_fdr%   r*   r*   r+   on_stop_not_startedW  s.   
*z!ResultHandler.on_stop_not_startedc                 C   s>  z|| }W n t y   || Y S w |jj}zt|d W n ty,   || Y S w zVz|dr9| }nd }td W n& ttfyf   || Y W zt|d W S  tye   || Y   S w w |rm|| W zt|d W d S  ty   || Y S w zt|d W w  ty   || Y      Y S w )Nr4   r         ?)	r   r   _readerr   rh   rL   recvr   r   )r   r%   rw   process_indexr   procreadertaskr*   r*   r+   r   s  sL   

zResultHandler._flush_outqueue)r   r   r   r   r   r,   r   r   r
   _pickler   r   r   r   r   r   r   __classcell__r*   r*   r   r+   r      s    
9r   c                       s  e Zd ZdZeZeZdZ fddZ		dN fdd	Z fdd	Z	d
d Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZeejefddZdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ Z d0d1 Z!d2d3 Z"e#d4d5 Z$ fd6d7Z%d8d9 Z&d:d; Z'd<d= Z(d>d? Z)d@dA Z*dBdC Z+ejeefdDdEZ,e-dFdG Z.dHdI Z/e-dJdK Z0e1dLdM Z2  Z3S )Or2   zAsyncIO Pool (no threads).Fc                    s   t  |}d|_|S )NF)r   WorkerProcessdead)r   workerr   r*   r+   r     s   zAsynPool.WorkerProcessNc                    s  t || _|d u r  n|}| _ fddt|D  _i  _i  _i  _	|d u r/t
n| _t  _t  _t  _t  _t  _ jj _t  _t  _t j|g|R i |  jD ]}| j|j< | j	|j< qct jdt _ t jdt _!d S )Nc                       i | ]}   d qS r-   create_process_queuesr:   _r   r*   r+   r=         
z%AsynPool.__init__.<locals>.<dictcomp>on_soft_timeouton_hard_timeout)"SCHED_STRATEGIESgetsched_strategy	cpu_countsynackrange_queues_fileno_to_inq_fileno_to_outq_fileno_to_synqPROC_ALIVE_TIMEOUT_proc_alive_timeoutrT   _waiting_to_start_all_inqueues_active_writes_active_writers_busy_workersrk   _mark_worker_as_availabler   outbound_bufferr   write_statsr   r   _pooloutqR_fdsynqW_fdgetattr_timeout_handlerr   r   r   )r   	processesr   r   proc_alive_timeoutrs   r   r   r   r   r+   r     s@   



zAsynPool.__init__c                    s    t j| d t  t |S )N)sender)r   sendgccollectr   _create_worker_process)r   ir   r*   r+   r     s   zAsynPool._create_worker_processc                 C   s   |  || |   d S r-   )_untrack_child_processmaintain_pool)r   r   r   r*   r*   r+   _event_process_exit  s   zAsynPool._event_process_exitc                 C   sN   z|j }W n ty   t|jj }|_ Y nw t|gd|j| j|| dS )z4Helper method determines appropriate fd for process.N)	_sentinel_pollrH   osdup_popensentinelr   r   r   r   r   r   r%   r*   r*   r+   _track_child_process  s   

zAsynPool._track_child_processc                 C   s4   |j d ur|j d }|_ || t| d S d S r-   )r   rw   r   closer  r*   r*   r+   r     s
   

zAsynPool._untrack_child_processc                    s   j   j j_       fddjD  tj	j	 j
jd j D ]
\}} || q6jsP jj d_dS dS )z4Register the async pool with the current event loop.c                    s   g | ]} | qS r*   )r  r:   rf   r   r   r*   r+   rQ     s    z5AsynPool.register_with_event_loop.<locals>.<listcomp>ro   TN)_result_handlerr   r   handle_result_event_create_timelimit_handlers_create_process_handlers_create_write_handlersr   r   r   r   timersitemscall_repeatedly_registered_with_event_loopon_tickrX   on_poll_start)r   r   handlerintervalr*   r  r+   r     s    




z!AsynPool.register_with_event_loopc                    sR   j t  _fdd}|_fdd  _ fdd}|_dS )z.Create handlers used to implement time limits.c                    sF   |r |j | j||| j< d S |r! |j| j| j< d S d S r-   )_on_soft_timeout_job_on_hard_timeout)r^   softhard)
call_laterr   r   trefsr*   r+   on_timeout_set  s   
z;AsynPool._create_timelimit_handlers.<locals>.on_timeout_setc              	      s4   z  | }|  ~W d S  ttfy   Y d S w r-   )r}   cancelr   rH   )rI   tref)r  r*   r+   _discard_tref)  s   
z:AsynPool._create_timelimit_handlers.<locals>._discard_trefc                    s    | j  d S r-   )r  )r^   )r  r*   r+   on_timeout_cancel2  r1   z>AsynPool._create_timelimit_handlers.<locals>.on_timeout_cancelN)r  r   _tref_for_idr  r  r  )r   r   r  r  r*   )r  r  r   r   r  r+   r    s   	
z#AsynPool._create_timelimit_handlersc              	   C   sv   |r| || | j|| j|< z"z| j| }W n	 ty    Y nw | | W |s0| | d S d S |s:| | w w r-   )r  r  r  _cacher   r   r  )r   rI   r  r  r   resultr*   r*   r+   r  6  s    

zAsynPool._on_soft_timeoutc              	   C   sZ   z&z| j | }W n	 ty   Y nw | | W | | d S W | | d S | | w r-   )r   r   r   r  )r   rI   r!  r*   r*   r+   r  G  s   zAsynPool._on_hard_timeoutc                 C   s   |  | d S r-   )r   )r   rI   r   objinqW_fdr*   r*   r+   on_job_readyS  r1   zAsynPool.on_job_readyc                    s   	j 	j	jjjjjjjj	j

j	fdd	fdd}|_d
dd  	
fdd	}|_dS )z/Create handlers called on process up/down, etc.c                    sv   |  } | d ur5|   r7| v r9| j v sJ  | j | u sJ | jjv s'J td|  t| jd d S d S d S d S )Nz(Timed out waiting for UP message from %r	   )	_is_aliver   rY   errorr   killr   r   )r   r   waiting_to_startr*   r+   verify_process_alivee  s   
z?AsynPool._create_process_handlers.<locals>.verify_process_alivec                    s   | j } D ]}|jr|jj |kr| |_|jr!|jj |kr!| |_q| | j< |  t| jjr5J  | j| j 	|  
jt|  dS )z"Called when a process has started.N)r#  values	_write_to_scheduled_forr   r  r   r   r   rX   r  r   r   )r   infdrI   )r   r   r   r  r   r   r+  r*  r*   r+   on_process_upo  s   

z8AsynPool._create_process_handlers.<locals>.on_process_upNc                 S   sp   z|   }W n
 ty   Y d S w z|| |u r||d  W n
 ty)   Y |S w || |d ur6|| |S r-   )rW   rh   r}   r   )r"  r   index
remove_funr   r%   r*   r*   r+   _remove_from_index  s"   z=AsynPool._create_process_handlers.<locals>._remove_from_indexc                    s   t | ddrdS |   | jj|  | jr! | jj| 	  | jj| 	jd}|r4| 
|  |  
j| j	 	| jj | jj | j
r[| jj | jrm
j| j | jj dS dS )z#Called when a worker process exits.r   Nr   )r   r   r   synqrG   inqrk   r   r   r#  synqR_fdr   )r   r6  )r3  all_inqueuesbusy_workersfileno_to_inqr   fileno_to_synqr   process_flush_queuesr   remove_writerr   r*  r*   r+   on_process_down  s6   

z:AsynPool._create_process_handlers.<locals>.on_process_downr-   )r   r   r=  r   r   r   r   r   r   r  r<  r   r0  r>  )r   r   r0  r>  r*   )r3  r   r8  r9  r   r:  r   r;  r  r   r<  r   r=  r   r+  r*  r+   r	  V  s"   


"
z!AsynPool._create_process_handlersc                    s  j j	jjjjjj}jj	j
jjj|jj|jjj
jjtktjtjttdttditjffdd	}|_fdd}|_fdd}|__dfd	d
	}	|	_
fdd}
|
_ fddfdd fdd}|_!d	fdd	 dS )z6Create handlers used to write data to child processes.)r   c                    sX   | j d us
| jv r| js| d |   d  | | j  d S | vr*|  d S d S r-   )_terminatedcorrelation_id	_accepted_ack_set_terminated
appendleft)rI   _time)getpidoutboundrevoked_tasksr*   r+   	_put_back  s   

z2AsynPool._create_write_handlers.<locals>._put_backc                     sV    } rot t k }n}|r#t| d ttB dd d S t|  d S )NT)consolidate)r#   r   r   r   )inactiveadd_cond)active_writesr8  r9  diffhub_add
hub_removeis_fair_strategyrG  r*   r+   r    s   

z6AsynPool._create_write_handlers.<locals>.on_poll_startc                    sX    |  z|  |u r| d    |   |  W d S W d S  ty+   Y d S w r-   )rk   r}   r   )r%   r   )rM  r8  r9  r:  r*   r+   on_inqueue_close
  s   

z9AsynPool._create_write_handlers.<locals>.on_inqueue_closeNc           
         sb  |sdg}t | }t|D ]}| |d |  }|d  d7  < |v r$qr+|v r+q|vr4| qz }W n tyO   D ]}| qDY  d S w |jsz	|  }|_W n tyi   | Y qw  |||}t||_| 
| 	| zt| W n! t	y   Y q t
y }	 z|	jtjkr W Y d }	~	qd }	~	ww || qd S )Nr   r4   )r#   r   
IndexErrorrA  r.  r   r   rG   r   r   rh   ri   EBADF)
	ready_fdstotal_write_count	num_readyr   ready_fdrI   inqfdr   corrl   )
_write_jobrM  
add_writerr8  r9  rN  r:  rP  rQ  mark_worker_as_busymark_write_fd_as_activemark_write_gen_as_activepop_messageput_messager*   r+   schedule_writes  sZ   



z8AsynPool._create_write_handlers.<locals>.schedule_writesc                    sN    | d}t |}d|}| d d }t|t||f|_| d S )Nprotocol>Ir4   r   )r#   r   _payload)tupbodyr   headerrI   )dumpsget_jobr   rd  ra  r*   r+   send_joba  s   
z1AsynPool._create_write_handlers.<locals>.send_jobc                    s:   t d| | j| |  r|    | | d S )Nz"Process inqueue damaged: %r %r: %r)ry   	exceptionexitcoder&  	terminaterw   rI  )r   r%   rI   rl   r  r*   r+   on_not_recoveringm  s   

z:AsynPool._create_write_handlers.<locals>.on_not_recoveringc              
   3   s   |j \}}}d}z| |_| j}d }}	|dk rXz	||||7 }W n0 tyQ }
 z$t|
dd tvr2 |d7 }|dkrD| |||
 t d V  W Y d }
~
nd }
~
ww d}|dk s|	|k rz	|	|||	7 }	W n0 ty }
 z$t|
dd tvrv |d7 }|dkr| |||
 t d V  W Y d }
~
nd }
~
ww d}|	|k s\W | | j  d7  <  | |	  d S W | | j  d7  <  | |	  d S | | j  d7  <  | |	  w )Nr   r5   ri   r4   d   )
rf  r-  send_job_offset	Exceptionr   r   r   r1  rk   rG   )r   r%   rI   ri  rh  r   errorsr   HwBwrl   )rM  rP  rp  write_generator_doner   r*   r+   r[  u  sd   





z3AsynPool._create_write_handlers.<locals>._write_jobc                    sL   t |||  }t} |||d}| | |f|_|| d S )Nr4  )r?   r   rs   )responser   rI   r%   msgr   rZ  )
_write_ackr\  r^  r_  precalcrw  r*   r+   send_ack  s   z1AsynPool._create_write_handlers.<locals>.send_ackc              
   3   s2   |d \}}}zz|  }W n
 t y   t w |j}d }}	|dk rQz	||||7 }W n tyL }
 zt|
dd tvr? d V  W Y d }
~
nd }
~
ww |dk s%|	|k rz	|	|||	7 }	W n ty| }
 zt|
dd tvro d V  W Y d }
~
nd }
~
ww |	|k sUW |r|   |  d S |r|   |  w )N   r   r5   ri   )r   r   send_syn_offsetrs  r   r   rk   )r%   ackr   ri  rh  r   r   r   ru  rv  rl   )rM  r;  r*   r+   rz    sJ   		z3AsynPool._create_write_handlers.<locals>._write_ackr-   )"r   r   r   popleftr{   r   r   r   r   
differencer\  rX   rw   rk   r   __getitem__r   r   SCHED_STRATEGY_FAIRworker_staterevokedr   rF  r   _create_payloadr   timerI  r  rR  rP  consolidate_callback
_quick_putr|  )r   r   r   rj  rd  active_writersrI  r  rR  rb  rl  r|  r*   )rz  r[  rM  r\  r8  r9  rN  rj  r:  r;  rk  rF  r   rO  rP  rQ  r]  r^  r_  rp  rG  r   r`  r{  rd  ra  rH  r   rw  r   r+   r
    sP   
(G
3
zAsynPool._create_write_handlersc              	   C   s  | j tkrd S | jr| j D ]	}|js|  q| jr!| j  | 	  z| j t
krtddddd}i }| j D ]}t|}|d urH|||< q:| jsR| j  n| jrt| j}|D ]G}|jdkrt|rz|| }W n	 tyv   Y nw |  | j| q\z|| }W n	 ty   Y q\w |j}| r| || |  q\| jsU| 	  tt| W | j  | j  | j  | j  d S W | j  | j  | j  | j  d S W | j  | j  | j  | j  d S | j  | j  | j  | j  w )N{Gz?g?T)
repeatlastr[  )r   r   r   r   r,  rA  _cancelr   clearr   r   r   rK   r   rd   r   rF   r   rk   r-  r&  _flush_writerr   r   r   r   )r   rI   	intervalsowned_byrJ   rZ   rE   job_procr*   r*   r+   flush  s   









.


!




zAsynPool.flushc              
   C   s   |j jh}zP|r;| sn7t||dd\}}}|s0|s|r0zt| W n tttfy/   Y nw |sW | j	| d S W | j	| d S W | j	| d S W | j	| d S | j	| w )Nr   )rZ   r[   r\   )
r6  rG   r&  rn   r   r   rh   r   r   rk   )r   r   rJ   fdsreadablewritableagainr*   r*   r+   r  #  s,   

zAsynPool._flush_writerc                 C   s   t dd | j D S )zGet queues for a new process.

        Here we'll find an unused slot, as there should always
        be one available when we start a new process.
        c                 s   s     | ]\}}|d u r|V  qd S r-   r*   r:   qownerr*   r*   r+   	<genexpr>:     
 z.AsynPool.get_process_queues.<locals>.<genexpr>)r   r   r  r   r*   r*   r+   get_process_queues4  s   zAsynPool.get_process_queuesc                    s@   t  jt j d}|r j fddt|D  dS dS )z!Grow the pool by ``n`` processes.r   c                    r   r-   r   r   r   r*   r+   r=   A  r   z$AsynPool.on_grow.<locals>.<dictcomp>N)max
_processesr#   r   updater   )r   r)   rN  r*   r   r+   on_grow=  s   zAsynPool.on_growc                 C   s   dS )z#Shrink the pool by ``n`` processes.Nr*   )r   r)   r*   r*   r+   	on_shrinkE  s    zAsynPool.on_shrinkc                 C   s   t dd}t dd}d}t|jsJ t|jrJ t|jr!J t|js(J | jr>t dd}t|js7J t|jr>J |||fS )z5Create new in, out, etc. queues, returned as a tuple.T)	wnonblock)	rnonblockN)r   r   r   rG   r   )r   r6  r   r5  r*   r*   r+   r   H  s   



zAsynPool.create_process_queuesc                    s   zt  fdd| jD }W n ty   td  Y S w |j| jvs&J |j| jvs.J | j	| || j|j< || j
|j< | j|j dS )zsCalled when receiving the :const:`WORKER_UP` message.

        Marks the process as ready to receive work.
        c                 3   s    | ]
}|j  kr|V  qd S r-   r   r  r  r*   r+   r  `  s    z,AsynPool.on_process_alive.<locals>.<genexpr>z"process with pid=%s already exitedN)r   r   r   ry   rz   r#  r   r   r   rk   r   r   rX   )r   r   r   r*   r  r+   r   Z  s   zAsynPool.on_process_alivec                 C   sH   |j r|j  s| ||j  dS |jr |j s"| | dS dS dS )z:Called for each job when the process assigned to it exits.N)r-  r&  on_partial_readr.  rI  )r   rI   pid_goner*   r*   r+   on_job_process_downj  s
   zAsynPool.on_job_process_downc                 C   s   |  || dS )zCalled when the process executing job' exits.

        This happens when the process job'
        was assigned to exited by mysterious means (error exitcodes and
        signals).
        N)mark_as_worker_lost)r   rI   r   rn  r*   r*   r+   on_job_process_lostt  s   zAsynPool.on_job_process_lostc                    s   | j d u rdS t| j  }t|dd   r!t| j  ndd fdd|D dtt|t	| j
| j
t| jt| jdd	S )
NzN/Ac                 S   s   | r
t | | dS ddS )Nr   z.2f)float)r<   totalr*   r*   r+   per  s   z'AsynPool.human_write_stats.<locals>.perr   z, c                 3   s    | ]} |V  qd S r-   r*   )r:   r<   r  r  r*   r+   r    s    z-AsynPool.human_write_stats.<locals>.<genexpr>)r  active)r  avgallrawstrategyinqueues)r   rd   r,  sumr#   joinmapstrSCHED_STRATEGY_TO_NAMEr   r   r   r   )r   valsr*   r  r+   human_write_stats}  s    
zAsynPool.human_write_statsc              	   C   s:   |j szd| j| |< W dS  ttfy   Y dS w dS )z-Called to clean up queues after process exit.N)r   r   _find_worker_queuesr   r~   r   r   r*   r*   r+   _process_cleanup_queues  s   z AsynPool._process_cleanup_queuesc                 C   sx   | j D ]6}z	t|jjd W n	 ty   Y qw z|jd W q ty9 } z|jtjkr/ W Y d}~qd}~ww dS )z>Called at shutdown to tell processes that we're shutting down.r4   N)r   r   r6  rG   rh   r   ri   rT  )task_handlerr   rl   r*   r*   r+   _stop_task_handler  s   
zAsynPool._stop_task_handlerc                    s   t  j| j| jdS )N)r   r   )r   create_result_handlerr   r   r   r   r*   r+   r    s   zAsynPool.create_result_handlerc                 C   s8   || j v sJ t| j }|| j |< |t| j ksJ dS )z;Mark new ownership for ``queues`` to update fileno indices.N)r   r#   )r   r   queuesbr*   r*   r+   _process_register_queues  s   

z!AsynPool._process_register_queuesc                    s6   zt  fdd| j D W S  ty   t w )z"Find the queues owned by ``proc``.c                 3   s     | ]\}}| kr|V  qd S r-   r*   r  r)  r*   r+   r    r  z/AsynPool._find_worker_queues.<locals>.<genexpr>)r   r   r  r   r~   r  r*   r)  r+   r    s
   zAsynPool._find_worker_queuesc                 C   s"   d | _ d  | _ | _ | _| _d S r-   )r  _inqueue	_outqueue
_quick_get_poll_resultr   r*   r*   r+   _setup_queues  s   zAsynPool._setup_queuesc           
   
   C   s  |j j}| jj}|h}|r|js| jtkrt|d|dd\}}}|rwz| }W n> t	t
fyf } z0t|dd}	|	tjkrCW Y d}~q|	tjkrOW Y d}~dS |	tvr[td||dd W Y d}~dS d}~ww |du rrtd| dS || ndS |r|js| jtksdS dS dS dS dS dS )	a  Flush all queues.

        Including the outbound buffer, so that
        all tasks that haven't been started will be discarded.

        In Celery this is called whenever the transport connection is lost
        (consumer restart), and when a process is terminated.
        Nr  r\   ri   z got %r while flushing process %rr4   ru   z&got sentinel while flushing process %r)r   r   r  r   closedr   r   rn   r   rh   r   r   ri   rj   EAGAINr   r   )
r   r   resqr   r  r  r   r   rl   rm   r*   r*   r+   r<    s6   	



,zAsynPool.process_flush_queuesc                 C   s   |j s| | t|}|r| j| ~|jsGd|_t| j}z| |}| 	||r3d| j| 
 < W n	 ty=   Y nw t| j|ksIJ dS dS )z8Called when a job was partially written to exited child.TN)rA  rI  rK   r   rk   r   r#   r   r  destroy_queuesr   r~   )r   rI   r   rJ   beforer  r*   r*   r+   r    s(   



zAsynPool.on_partial_readc              
   C   s   |  rJ | j| d}z| j| W n ty!   d}Y nw z| |d j | W n	 t	y8   Y nw |D ]%}|r`|j
|jfD ]}|js_| | z|  W qE t	y^   Y qEw qEq;|S )zqDestroy queues that can no longer be used.

        This way they can be replaced by new usable sockets.
        r4   r   )r&  r   rk   r   r}   r   rR  rG   rW   rh   r   r  rP  r  )r   r  r   removedqueuesockr*   r*   r+   r    s4   
zAsynPool.destroy_queuesc           	      C   s,   |||f|d}t |}|d|}|||fS )Nrc  re  )r#   )	r   type_rs   rj  r   rd  rh  r'   ri  r*   r*   r+   r  #  s   

zAsynPool._create_payloadc                 C   s   d S r-   r*   )clsr  r   r*   r*   r+   _set_result_sentinel+  s   zAsynPool._set_result_sentinelc                 C   s   | j fS r-   )r   r   r*   r*   r+   _help_stuff_finish_args0  r   z AsynPool._help_stuff_finish_argsc           	   	   C   s   t d i }t }|D ]}z|jj }|| |||< W q ty'   Y qw |rOt|dd\}}}|r6q(|s:d S |D ]
}|| jj  q<t	d |s*d S d S )Nz7removing tasks from inqueue until task handler finishedr   r  r   )
r   rT   r6  r   rW   rX   rh   rn   r   r   )	r  r   fileno_to_procinqRrf   r%   r  r   r  r*   r*   r+   _help_stuff_finish5  s.   
zAsynPool._help_stuff_finishc                 C   s
   | j diS )Ng      @)r   r   r*   r*   r+   r  N  s   
zAsynPool.timers)NFNN)4r   r   r   r   r   r   r  r   r   r   r   r  r   r   r  r  r  r$  r	  r   r   rj  r   r
  r  r  r  r  r  r   r   r  r  r  r  staticmethodr  r  r  r  r  r<  r  r  r  classmethodr  r  r  propertyr  r   r*   r*   r   r+   r2     sl    >k
  H	
	

$


r2   )NNNr   )_r   ri   r   rC   r   rc   r  collectionsr   r   r   ior   numbersr   r   r   structr   r	   r
   r   weakrefr   r   billiardr   r   billiard.compatr   r   billiard.poolr   r   r   r   r   billiard.queuesr   kombu.asynchronousr   r   kombu.serializationr   kombu.utils.eventior   kombu.utils.functionalr   viner   celery.signalsr   celery.utils.functionalr   celery.utils.logr   celery.workerr    r  	_billiardr!   r,   r   ImportError__all__r   ry   r'  r   	frozensetr  rj   r   r   r   SCHED_STRATEGY_FCFSr  r   r  r  r?   rF   rK   r|   rL   rO   rR   rS   rb   rn   r   r   r   Poolr2   r*   r*   r*   r+   <module>   s    
	

0-
  