o
    Df                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZdd Zdd ZejejeejeefddZ dS )z'Task execution strategy (optimization).    N)to_timestamp)signals)trace)InvalidTaskError)symbol_by_name)
get_logger)saferepr)timezone   )create_request_cls)task_reserved)defaultc                 C   s  z| dd| di }}|j W n ty   td ty'   tdw | d| d| d| d	| d
| d| d| d| d| d| dd| dd| d| d| dd}|| jpoi  | d| d| ddd}|||f|d| ddfS )zECreate a fresh protocol 2 message from a hybrid protocol 1/2 message.args kwargs!Message does not have args/kwargs(Task keyword arguments must be a mappinglangtaskidroot_id	parent_idgroupmethshadowetaexpiresretriesr   	timelimit)NNargsrepr
kwargsreprorigin)r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   	callbackserrbackschordNr"   r#   r$   chainTutc)getitemsKeyErrorr   AttributeErrorupdateheaders)messagebodyr   r   r-   embedr   r   O/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/strategy.pyhybrid_to_proto2   sB   


r2   c                 C   s   z| dd| di }}|j W n ty   td ty'   tdw |jt|t|| jd z|d |d< W n	 tyF   Y nw | d	| d
| ddd}|||f|d| ddfS )zConvert Task message protocol 1 arguments to protocol 2.

    Returns:
        Tuple: of ``(body, headers, already_decoded_status, utc)``
    r   r   r   r   r   )r   r    r-   tasksetr   r"   r#   r$   Nr%   Tr'   )r(   r)   r*   r   r+   r,   r   r-   )r.   r/   r   r   r0   r   r   r1   proto1_to_proto2B   s4   
r4   c	                    s   j jttjjoj}	oj|	oj	j
jjj jj	j
jjtj}
t|
jd jjjtf 	
fdd	S )zDefault task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    )appc                    sP  |d u rd| j vr| j| jd f\}}}}nd| j v r(t| | j \}}}}n	| |\}}}}| ||	||||d rZ j j j j j	d}	t
j|	d|	id  jsb jv rh  rhd S tjj d rd j j j j j j jd	d
 j	o j	  jo j d
 d }
d } j	rz jr| j	}n| j	j}W n( ttfy } zd j	| jdddd  jdd W Y d }~nd }~ww r
j}
|r|
rj  | |
dfddS |r	j  | fdd S |
r |
dS   |r" fdd|D    d S )Nr   F)on_ack	on_rejectr5   hostnameeventerr   connection_errorsr/   r-   decodedr'   )r   namer   r   r   data)extra)senderrequestztask-receivedr   r   )	uuidr<   r   r   r   r   r   r   r   z2Couldn't convert ETA %r to timestamp: %r. Task: %rT)safe)exc_info)requeuer
      )priorityc                    s   g | ]}| qS r   r   ).0callbackreqr   r1   
<listcomp>   s    z9default.<locals>.task_message_handler.<locals>.<listcomp>)payloadr/   r-   uses_utc_timezoner2   r   r<   r   r    r   
_app_traceLOG_RECEIVEDr   revokedr   task_receivedsendr   r   request_dictr(   	isoformatr'   r	   OverflowError
ValueErrorinforejectqosincrement_eventually)r.   r/   ackrX   r"   r   r-   r;   r'   contextbucketr   excReq
_does_infor5   apply_eta_taskcall_atr:   consumererrorr9   
get_buckethandler8   rW   limit_post_eta
limit_taskr4   rate_limits_enabledrevoked_tasks
send_eventr   task_message_handlerr   task_sends_eventsto_system_tzrI   r1   rm      s   




z%default.<locals>.task_message_handler)r8   r:   loggerisEnabledForloggingINFOevent_dispatcherenabledrR   send_eventstimerrc   rb   disable_rate_limitstask_buckets__getitem__on_task_request_limit_task_limit_post_etar   Requestr   pool
controllerstaterP   r   )r   r5   rd   rW   re   r   ro   bytesr4   eventsr~   r   r_   r1   r   c   s(   





<Lr   )!__doc__rr   kombu.asynchronous.timerr   celeryr   
celery.appr   rN   celery.exceptionsr   celery.utils.importsr   celery.utils.logr   celery.utils.safereprr   celery.utils.timer	   r@   r   r   r   __all____name__rp   r2   r4   rW   re   	to_systemr   r   r   r   r   r1   <module>   s(    )
"