o
    DfZ                     @   s
  d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z$ dZ%dZ&dZ'eddZ(dddZ)G dd de*Z+G dd dZ,dS )z/Sending/Receiving Messages (Kombu integration).    N)
namedtuple)Mapping)	timedelta)WeakValueDictionary)
ConnectionConsumerExchangeProducerQueuepools)	Broadcast)
maybe_list)cached_property)signals)anon_nodename)saferepr)indent)maybe_make_aware   )routes)AMQPQueuestask_messagei   zS
.> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) key={0.routing_key}
r   headers
propertiesbody
sent_eventutf-8c                    s    fdd|   D S )Nc                    s*   i | ]\}}t |tr| n||qS  )
isinstancebytesdecode).0kvencodingr   H/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/app/amqp.py
<dictcomp>%   s    zutf8dict.<locals>.<dictcomp>)items)dr'   r   r&   r(   utf8dict$   s   
r,   c                       s   e Zd ZdZdZ			d! fdd	Z fddZ fdd	Zd
d Zdd Z	dd Z
dd Zdd Zd"ddZdd Zdd Zdd Zdd Zedd  Z  ZS )#r   u  Queue name⇒ declaration mapping.

    Arguments:
        queues (Iterable): Initial list/tuple or dict of queues.
        create_missing (bool): By default any unknown queues will be
            added automatically, but if this flag is disabled the occurrence
            of unknown queues in `wanted` will raise :exc:`KeyError`.
        max_priority (int): Default x-max-priority for queues with none set.
    NTc           	         s   t    t | _|| _|| _|| _|d u rtn|| _|| _	|d ur.t
|ts.dd |D }|p1i }| D ]\}}t
|trD| |n| j|fi | q6d S )Nc                 S   s   i | ]}|j |qS r   )name)r#   qr   r   r(   r)   C   s    z#Queues.__init__.<locals>.<dictcomp>)super__init__r   aliasesdefault_exchangedefault_routing_keycreate_missingr   autoexchangemax_priorityr    r   r*   r
   add
add_compat)	selfqueuesr2   r4   r5   r6   r3   r-   r.   	__class__r   r(   r0   8   s   
$zQueues.__init__c                    s,   z| j | W S  ty   t | Y S w N)r1   KeyErrorr/   __getitem__r9   r-   r;   r   r(   r?   H   s
   zQueues.__getitem__c                    s<   | j r
|js
| j |_t || |jr|| j|j< d S d S r=   )r2   exchanger/   __setitem__aliasr1   )r9   r-   queuer;   r   r(   rB   N   s   zQueues.__setitem__c                 C   s   | j r| | |S t|r=   )r4   r7   new_missingr>   r@   r   r   r(   __missing__U   s   zQueues.__missing__c                 K   s&   t |ts| j|fi |S | |S )a  Add new queue.

        The first argument can either be a :class:`kombu.Queue` instance,
        or the name of a queue.  If the former the rest of the keyword
        arguments are ignored, and options are simply taken from the queue
        instance.

        Arguments:
            queue (kombu.Queue, str): Queue to add.
            exchange (kombu.Exchange, str):
                if queue is str, specifies exchange name.
            routing_key (str): if queue is str, specifies binding key.
            exchange_type (str): if queue is str, specifies type of exchange.
            **options (Any): Additional declaration options used when
                queue is a str.
        )r    r
   r8   _add)r9   rD   kwargsr   r   r(   r7   Z   s   

z
Queues.addc                 K   s>   | d|d |d d u r||d< | tj|fi |S )Nrouting_keybinding_key)
setdefaultgetrG   r
   	from_dict)r9   r-   optionsr   r   r(   r8   o   s   zQueues.add_compatc                 C   s`   |j d u s|j jdkr| j|_ |js| j|_| jd ur)|jd u r#i |_| |j || |j< |S )N )rA   r-   r2   rI   r3   r6   queue_arguments_set_max_priority)r9   rD   r   r   r(   rG   v   s   


zQueues._addc                 C   s*   d|vr| j d ur|d| j iS d S d S )Nzx-max-priority)r6   update)r9   argsr   r   r(   rQ      s   zQueues._set_max_priorityr   c                 C   s\   | j }|sdS dd t| D }|rtd||S |d d td|dd | S )z/Format routing table into string for log dumps.rO   c                 S   s   g | ]\}}t  |qS r   )QUEUE_FORMATstripformat)r#   _r.   r   r   r(   
<listcomp>   s    z!Queues.format.<locals>.<listcomp>
r   r   N)consume_fromsortedr*   
textindentjoin)r9   r   indent_firstactiveinfor   r   r(   rV      s   
$zQueues.formatc                 K   s,   | j |fi |}| jdur|| j|j< |S )zAdd new task queue that'll be consumed from.

        The queue will be active even when a subset has been selected
        using the :option:`celery worker -Q` option.
        N)r7   _consume_fromr-   )r9   rD   rH   r.   r   r   r(   
select_add   s   
zQueues.select_addc                    s$   |r fddt |D  _dS dS )zSelect a subset of currently defined queues to consume from.

        Arguments:
            include (Sequence[str], str): Names of queues to consume from.
        c                       i | ]}| | qS r   r   )r#   r-   r9   r   r(   r)      s    
z!Queues.select.<locals>.<dictcomp>N)r   ra   )r9   includer   rd   r(   select   s
   
zQueues.selectc                    sN    r#t   | jdu r|  fdd| D S  D ]}| j|d qdS dS )zDeselect queues so that they won't be consumed from.

        Arguments:
            exclude (Sequence[str], str): Names of queues to avoid
                consuming from.
        Nc                 3   s    | ]	}| vr|V  qd S r=   r   )r#   r$   excluder   r(   	<genexpr>   s    z"Queues.deselect.<locals>.<genexpr>)r   ra   rf   pop)r9   rh   rD   r   rg   r(   deselect   s   
zQueues.deselectc                 C   s   t || ||S r=   )r
   r5   r@   r   r   r(   rE      s   zQueues.new_missingc                 C   s   | j d ur| j S | S r=   )ra   rd   r   r   r(   rZ      s   
zQueues.consume_from)NNTNNN)r   T)__name__
__module____qualname____doc__ra   r0   r?   rB   rF   r7   r8   rG   rQ   rV   rb   rf   rk   rE   propertyrZ   __classcell__r   r   r;   r(   r   )   s*    
r   c                   @   sN  e Zd ZdZeZeZeZeZeZ	dZ
dZdZdZdZdd Zedd Zedd	 Z		d0d
dZd1ddZdd Zd1ddZ									d2ddZ							d3ddZdd Zdd Zedd Zedd Zejd d Zed!d" Zed#d$ Zejd%d$ Zed&d' Z e Z!ed(d) Z"ed*d+ Z#ed,d- Z$d.d/ Z%dS )4r   zApp AMQP API: app.amqp.Ni   c                 C   s*   || _ | j| jd| _| j j| j d S )N)r      )app
as_task_v1
as_task_v2task_protocols_confbind_to_handle_conf_update)r9   rs   r   r   r(   r0      s
   zAMQP.__init__c                 C      | j | jjj S r=   )rv   rs   conftask_protocolrd   r   r   r(   create_task_message      zAMQP.create_task_messagec                 C      |   S r=   )_create_task_senderrd   r   r   r(   send_task_message      zAMQP.send_task_messagec                 C   sp   | j j}|j}|d u r|j}|d u r|j}|s$|jr$t|j| j|df}|d u r+| jn|}| 	|| j||||S )N)rA   rI   )
rs   r{   task_default_routing_keytask_create_missing_queuestask_queue_max_prioritytask_default_queuer
   r2   r5   
queues_cls)r9   r:   r4   r5   r6   r{   r3   r   r   r(   r      s$   
zAMQP.Queuesc                 C   s&   t j| j|p| j| jd|| jdS )zReturn the current task router.r   )rs   )_routesRouterr   r:   rs   either)r9   r:   r4   r   r   r(   r     s   zAMQP.Routerc                 C   s   t | jjj| _d S r=   )r   preparers   r{   task_routes_rtablerd   r   r   r(   flush_routes  s   zAMQP.flush_routesc                 K   s:   |d u r	| j jj}| j|f||pt| jj d|S )N)acceptr:   )rs   r{   accept_contentr   listr:   rZ   values)r9   channelr:   r   kwr   r   r(   TaskConsumer  s   
zAMQP.TaskConsumerr   Fc           !         s  |pd}|pi }t |ttfstdt |tstd|r<| |d |p*| j }|p0| jj}t	|t
|d |d}t |	tjr`| |	d |pN| j }|pT| jj}t	|t
|	d |d}	t |tsk|oj| }t |	tsv|	ou|	 }	|d u rt|| j}|d u rt|| j}|s|} fdd	|pg D }i d
dd|d|d|d|d|	d|d|d|
d||gd|d|d|d|d|pt d|d|||d} t| ||pdd||||||df|r|||||||
||	d	d S d d S )!Nr   !task args must be a list or tuple(task keyword arguments must be a mapping	countdownseconds)tzexpiresc                    rc   r   r   )r#   headerrN   r   r(   r)   D  s    z#AMQP.as_task_v2.<locals>.<dictcomp>langpytaskidshadowetagroupgroup_indexretries	timelimitroot_id	parent_idargsrepr
kwargsreproriginignore_resultreplaced_task_nesting)stamped_headersstampsrO   correlation_idreply_to)	callbackserrbackschainchord)	uuidr   r   r-   rS   rH   r   r   r   r   )r    r   tuple	TypeErrorr   _verify_secondsrs   nowtimezoner   r   numbersRealstr	isoformatr   argsrepr_maxsizekwargsrepr_maxsizer   r   )!r9   task_idr-   rS   rH   r   r   group_idr   r   r   r   r   r   r   
time_limitsoft_time_limitcreate_sent_eventr   r   r   r   r   r   r   r   r   r   r   r   rN   r   r   r   r   r(   ru     s   



	

zAMQP.as_task_v2c                 K   s  |pd}|pi }| j }t|ttfstdt|tstd|r5| |d |p-| j }|t	|d }t|	t
jrO| |	d |pG| j }|t	|	d }	|oT| }|	oZ|	 }	ti ||padd|||||||
||	|||||f||d	|r||t|t||
||	d
dS d dS )Nr   r   r   r   r   r   rO   r   )r   r   rS   rH   r   r   r   r   r   utcr   r   r   tasksetr   )r   r-   rS   rH   r   r   r   r   )r   r    r   r   r   r   r   rs   r   r   r   r   r   r   r   )r9   r   r-   rS   rH   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   compat_kwargsr   r   r   r(   rt   v  sf   
zAMQP.as_task_v1c                 C   s   |t k rt| d||S )Nz is out of range: )INT_MIN
ValueError)r9   swhatr   r   r(   r     s   zAMQP._verify_secondsc                    s   | j jj| j jj| j jj| j| jtjj	tjj
tjj	tjj
 tjj	tjj
| j| j| j jj	| j jj
| j jj	 	 	 	 	 	 d 	
fdd	}|S )Nc                    sl  |d u rn|}|\}}}}|r| | |r| | |}|d u r(|d u r(}|d ur<t|tr9|| }}n|j}|
d u rTz|jj}
W n	 tyO   Y nw |
pS}
|d u rjz|jj}W n tyi   d}Y nw |rn|sx|dkrxd|}}n|d u r|jjp}|p|jp	}|d u r|rt|t	s|g}|d u rn|}|rt
fi |n}r||||||||d | j|f|||	p
|pň|||
||d	|} rۈ|||||d rt|tr||d ||d |d |d	 |d
 d n||d ||d |d |d	 |d d |r4|p}|}t|tr!|j}| |||d |jd|| ||d |S )NdirectrO   )senderr   rA   rI   declarer   r   retry_policy)	rA   rI   
serializercompressionretryr   delivery_moder   r   )r   r   r   rA   rI   r   r   r   r   r   )r   r   r   rS   rH   r   r   rS   rH   r   )rD   rA   rI   z	task-sent)r   r   )rR   r    r   r-   rA   r   AttributeErrortyperI   r   dictpublishr   r   )producerr-   messagerA   rI   rD   event_dispatcherr   r   r   r   r   r   r   exchange_typerH   headers2r   r   r   qname_rpretevdexnameafter_receiversbefore_receiversdefault_compressordefault_delivery_modedefault_evdr2   default_policydefault_queuedefault_retrydefault_rkeydefault_serializerr:   send_after_publishsend_before_publishsend_task_sentsent_receiversr   r(   r     s   


	



z3AMQP._create_task_sender.<locals>.send_task_message)NNNNNNNNNNNN)rs   r{   task_publish_retrytask_publish_retry_policytask_default_delivery_moder   r:   r   before_task_publishsend	receiversafter_task_publish	task_sent_event_dispatcherr2   r   task_serializertask_compression)r9   r   r   r   r(   r     s0   





,bzAMQP._create_task_senderc                 C   rz   r=   )r:   rs   r{   r   rd   r   r   r(   r   0  r~   zAMQP.default_queuec                 C   s   |  | jjjS )u"   Queue name⇒ declaration mapping.)r   rs   r{   task_queuesrd   r   r   r(   r:   4  s   zAMQP.queuesc                 C   s
   |  |S r=   )r   )r9   r:   r   r   r(   r:   9     
c                 C   s   | j d u r	|   | j S r=   )r   r   rd   r   r   r(   r   =  s   
zAMQP.routesc                 C   r   r=   )r   rd   r   r   r(   routerC  r   zAMQP.routerc                 C   s   |S r=   r   )r9   valuer   r   r(   r  G  s   c                 C   s0   | j d u rtj| j  | _ | jjj| j _| j S r=   )_producer_poolr   	producersrs   connection_for_writepoollimitrd   r   r   r(   producer_poolK  s   
zAMQP.producer_poolc                 C   s   t | jjj| jjjS r=   )r   rs   r{   task_default_exchangetask_default_exchange_typerd   r   r   r(   r2   T  s   
zAMQP.default_exchangec                 C   s
   | j jjS r=   )rs   r{   
enable_utcrd   r   r   r(   r   Y  r  zAMQP.utcc                 C   s   | j jjddS )NF)enabled)rs   events
Dispatcherrd   r   r   r(   r   ]  s   zAMQP._event_dispatcherc                 O   s&   d|v sd|v r|    |  | _d S )Nr   )r   r   r  )r9   rS   rH   r   r   r(   ry   c  s   
zAMQP._handle_conf_update)NNN)NN)NNNNNNNr   NNNNNNFNNNNNNNFNNNr   )NNNNNNNr   NNNNNNFNNNNN)&rl   rm   rn   ro   r   r   r	   BrokerConnectionr   r   r   r  r5   r   r   r0   r   r}   r   r   r   r   ru   rt   r   r   r   r:   setterrp   r   r  r
  publisher_poolr2   r   r   ry   r   r   r   r(   r      s    




	
^
<y









r   )r   )-ro   r   collectionsr   collections.abcr   datetimer   weakrefr   kombur   r   r   r	   r
   r   kombu.commonr   kombu.utils.functionalr   kombu.utils.objectsr   celeryr   celery.utils.nodenamesr   celery.utils.safereprr   celery.utils.textr   r\   celery.utils.timer   rO   r   r   __all__r   rT   r   r,   r   r   r   r   r   r   r(   <module>   s4     
 