o
    Df9                     @  s"  d Z ddlmZ ddlZddlZddlmZmZ ddlm	Z	 ddl
m
Z
 ddlmZ ddlmZ d	d
lmZmZmZmZ d	dlmZ d	dlmZmZ d	dlmZ d	dlmZ d	dlmZ d	dlmZm Z  d	dl!m"Z" d	dl#m$Z$ dZ%dZ&dZ'ee(Z)e)j*e)j+Z*Z+G dd dZ,G dd dZ-dS )zGeneric process mailbox.    )annotationsN)defaultdictdeque)contextmanager)copy)count)time   )ConsumerExchangeProducerQueue)LamportClock)maybe_declareoid_from)InconsistencyError)
get_logger)match)maybe_evaluatereprcall)cached_property)uuid
   zA node named {node.hostname} is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!
)NodeMailboxc                   @  s   e Zd ZdZdZdZdZdZdZ		dddZ	dddZ
dd	 Zd
d ZdddZ		dddZdddZdd Zdd ZdddZeZdd ZdS )r   zMailbox node.Nc                 C  s:   || _ || _|| _|| _| jjj| _|d u ri }|| _d S N)channelmailboxhostnamestateclockadjustadjust_clockhandlers)selfr   r   r   r#   r    r%   E/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/pidbox.py__init__8   s   
zNode.__init__Tc                   sP    j  j} fdd}||_t|p j|gf||d u r! j jn|d|S )Nc                   s   |rt tj d d S d S )N)node)warningswarnW_PIDBOX_IN_USEformat)namemessages	consumersr$   r%   r&   verify_exclusiveF   s   z'Node.Consumer.<locals>.verify_exclusive)no_ackaccept)r   	get_queuer   on_declaredr
   r   r3   )r$   r   r2   r3   optionsqueuer1   r%   r0   r&   r
   C   s   zNode.Consumerc                 C  s   || j |j< |S r   )r#   __name__)r$   funr%   r%   r&   handlerQ   s   zNode.handlerc                 C  s   t d|dd d S )NzCannot decode message: %rr	   exc_info)error)r$   messageexcr%   r%   r&   on_decode_errorU   s   zNode.on_decode_errorc                 C  s&   | j ||p| jg| jd}|  |S )N)r   	callbacksr@   )r
   handle_messager@   consume)r$   r   callbackconsumerr%   r%   r&   listenX   s   
zNode.listenc           	   
   K  s   |pi }t dt|d|d|| |r| jp| j}z|||}W n& ty'     tyE } ztd|dd dt|i}W Y d }~nd }~ww |rX| j| j	|i|d |d	 |d
 |S )Nz1pidbox received method %s [reply_to:%s ticket:%s]r%   )kwargszpidbox command error: %rr	   r;   r=   exchangerouting_key)rH   rI   ticket)
debugr   handle_callhandle_cast
SystemExit	Exceptionr=   reprreplyr   )	r$   method	argumentsreply_torJ   rG   handlerQ   r?   r%   r%   r&   dispatch_   s*   zNode.dispatchc                 C  s$   |si n|}| j | | jfi |S r   )r#   r   r$   rR   rS   r%   r%   r&   rU   t   s   zNode.handlec                 C     |  ||S r   rU   rW   r%   r%   r&   rL   x      zNode.handle_callc                 C  rX   r   rY   rW   r%   r%   r&   rM   {   rZ   zNode.handle_castc                 C  s   | d}| d}| d}|r| |j dpd | j}d}|r*||v r)d}n|r7|r7t|||r6d}nd}|rC| jdi |S d S )	Ndestinationpatternmatcherr    r   FTr%   )getr"   headersr   r   rV   )r$   bodyr>   r[   r\   r]   r   run_dispatchr%   r%   r&   rB   ~   s&   


zNode.handle_messagec                 K  s"   | j j||||| j| j jd d S )N)r   
serializer)r   _publish_replyr   rb   )r$   datarH   rI   rJ   rG   r%   r%   r&   rQ      s   
z
Node.replyNNNN)NTNNN)NNNr   )r8   
__module____qualname____doc__r   r   r#   r   r   r'   r
   r:   r@   rF   rV   rU   rL   rM   rB   dispatch_from_messagerQ   r%   r%   r%   r&   r   &   s.    





r   c                   @  s  e Zd ZdZeZdZdZdZdZ	dZ
dZdZdgZdZ				d0dd	Zd
d Zd1ddZ		d1ddZd2ddZd2ddZ		d3ddZdd Zedd Zdd Zed4ddZ	d4dd Z			d5d!d"Z				d6d$d%Z		d3d&d'Zd(d) Zd*d+ Z e!d,d- Z"ed.d/ Z#dS )7r   zProcess Mailbox.z	%s.pidboxzreply.%s.pidboxNdirectjson      $@c                 C  s   || _ || _|| _|d u rt n|| _| | j | j| _| | j | _t	t
| _|d u r/| jn|| _|d u r9| jn|| _|| _|	| _|
| _|| _|| _d S r   )	namespace
connectiontyper   r    _get_exchangerH   _get_reply_exchangereply_exchanger   r   	unclaimedr3   rb   	queue_ttlqueue_expiresreply_queue_ttlreply_queue_expires_producer_pool)r$   rn   rp   ro   r    r3   rb   producer_poolru   rv   rw   rx   r%   r%   r&   r'      s   

zMailbox.__init__c                 C  s   t | }||_|S r   )r   ro   )r$   ro   boundr%   r%   r&   __call__   s   zMailbox.__call__c                 C  s    |pt  }| j||||| dS )N)r   )socketgethostnamenode_cls)r$   r   r   r   r#   r%   r%   r&   r      s   zMailbox.Nodec              	   C  s$   |si n|}| j |||d|||dS )NT)rQ   timeoutrD   r   
_broadcast)r$   r[   commandrG   r   rD   r   r%   r%   r&   call      
zMailbox.callc                 C  s   |si n|}| j |||ddS NF)rQ   r   )r$   r[   r   rG   r%   r%   r&   cast   s   zMailbox.castc                 C  s   |si n|}| j ||ddS r   r   )r$   r   rG   r%   r%   r&   abcast   s   zMailbox.abcastr	   c              	   C  s$   |si n|}| j ||d||||dS )NT)rQ   r   limitrD   r   r   )r$   r   rG   r   r   rD   r   r%   r%   r&   
multi_call   r   zMailbox.multi_callc              	   C  s0   | j }t| d| jj | j|dd| j| jdS )N.FT)rH   rI   durableauto_deleteexpiresmessage_ttl)oidr   rs   r-   rx   rw   )r$   r   r%   r%   r&   get_reply_queue   s   zMailbox.get_reply_queuec                 C  s   |   S r   )r   r0   r%   r%   r&   reply_queue      zMailbox.reply_queuec                 C  s(   t | d| j d| jdd| j| jdS )Nr   z.pidboxFT)rH   r   r   r   r   )r   rn   rH   rv   ru   )r$   r   r%   r%   r&   r4      s   zMailbox.get_queuec                 c  s^    |r|V  d S | j r&| j  }|V  W d    d S 1 sw   Y  d S t|ddV  d S )NF)auto_declare)rz   acquirer   )r$   producerr   r%   r%   r&   producer_or_acquire  s   
"zMailbox.producer_or_acquirec           	   	   K  s   |p| j j}t|dddd}| ||3}z|j|f|||g|| j ddd| W n	 ty6   Y n	w W d    d S W d    d S 1 sJw   Y  d S )Nrk   	transientF)exchange_typedelivery_moder   )rJ   r    T)rH   rI   declarer_   retry)ro   default_channelr   r   publishr    forwardr   )	r$   rQ   rH   rI   rJ   r   r   optschanr%   r%   r&   rc     s2   

"zMailbox._publish_replyc              	   C  s   ||||	|
d}|p| j j}| j}|r't| | |j|| jj| jdd |p+| j	}| 
||#}|j||j|g| j |rEt | ndd|dd W d    d S 1 sXw   Y  d S )N)rR   rS   r[   r\   r]   )rH   rI   )rJ   rT   r   )r    r   T)rH   r   r_   rb   r   )ro   r   rH   r   r   updaters   r-   r   rb   r   r   r    r   r   )r$   rp   rS   r[   reply_ticketr   r   rb   r   r\   r]   r>   r   rH   r%   r%   r&   _publish   s2   

"zMailbox._publishFc                 C  s   |d urt |ttfstdt||
d ur2t |
ts2|d ur2t |ts2tdt|
t||p5i }|r;t p<d }|pB| jj	}|d u rQ|rQ|rOt
|pPd }|	pU| j}	| j|||||||	|
|d	 |rp| j|||||dS d S )Nz'destination must be a list/tuple not {}z.pattern and matcher must be strings not {}, {})r[   r   r   r   rb   r\   r]   )r   r   rD   r   )
isinstancelisttuple
ValueErrorr,   rp   strr   ro   r   lenrb   r   _collect)r$   r   rS   r[   rQ   r   r   rD   r   rb   r\   r]   r   r   r%   r%   r&   r   8  sF   

zMailbox._broadcastc              
     s  |d u r| j }|p| jj}| j}t||g|dd}	g | j| jj zW S  t	y1   Y nw  fdd}
|	
|
 zD|	1 |rKt|pMt D ]}z	| jj|d W qN tjyd   Y  nw W  d    W ||j S 1 sxw   Y  W ||j d S ||j w )NT)r3   r2   c                   sp   |j j} |dp
d |d}|rt |krd S |d}|kr/r(|  |  d S | |  d S )Nr    r   r   rJ   )r_   r^   r   append)r`   r>   headerr   this_idr"   rD   	responsesrJ   rt   r%   r&   
on_messagep  s   
z$Mailbox._collect.<locals>.on_message)r   )r3   ro   r   r   r
   rt   r    r!   popKeyErrorregister_callbackranger   drain_eventsr}   r   after_reply_message_receivedr-   )r$   rJ   r   r   rD   r   r3   r   r7   rE   r   ir%   r   r&   r   _  s8   
zMailbox._collectc                 C  s   t | j| |dddS )NFr   rp   r   r   )r   exchange_fmt)r$   rn   rp   r%   r%   r&   rq     
   
zMailbox._get_exchangec                 C  s   t | j| ddddS )Nrk   Fr   r   )r   reply_exchange_fmt)r$   rn   r%   r%   r&   rr     r   zMailbox._get_reply_exchangec                 C  s   t | S r   )r   r0   r%   r%   r&   r     r   zMailbox.oidc                 C  s
   t | jS r   )r   ry   r0   r%   r%   r&   rz     s   
zMailbox.producer_pool)
rk   NNNNNNNNrm   re   r   )Nr	   NNNrf   )NNNNNNNN)
NNFr	   NNNNNN)$r8   rg   rh   ri   r   r   r   r   rn   ro   rp   rH   rs   r3   rb   r'   r|   r   r   r   r   r   r   r   r4   r   r   rc   r   r   r   rq   rr   propertyr   rz   r%   r%   r%   r&   r      sj    











(
,
r   ).ri   
__future__r   r}   r)   collectionsr   r   
contextlibr   r   	itertoolsr   r    r
   r   r   r   clocksr   commonr   r   
exceptionsr   logr   r]   r   utils.functionalr   r   utils.objectsr   
utils.uuidr   REPLY_QUEUE_EXPIRESr+   __all__r8   loggerrK   r=   r   r   r%   r%   r%   r&   <module>   s2    r