o
    Df                     @  s  d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ dd
lmZ dZdgZg Ze ZejdZdd ZG dd deZG dd deZdd ZG dd deZeeedZG dd deZeeedZdd Z dd Z!d$d d!Z"d"d# Z#dS )%zPublic resource pools.    )annotationsN)chain   )Resource)Producer)EqualityDict)register_after_fork)lazy)ProducerPool	PoolGroupregister_groupconnections	producers	get_limit	set_limitreset
   KOMBU_DISABLE_LIMIT_PROTECTIONc                 C  s   |    d S N)cleargroup r   D/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/pools.py_after_fork_cleanup_group      r   c                      sd   e Zd ZdZeZdZ fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Z fddZ  ZS )r
   z*Pool of :class:`kombu.Producer` instances.Tc                   s0   || _ |dd p| j| _t j|i | d S )Nr   )r   popr   super__init__)selfr   argskwargs	__class__r   r   r       s   zProducerPool.__init__c                 C  s   | j jddS )NT)block)r   acquirer   r   r   r   _acquire_connection%   s   z ProducerPool._acquire_connectionc                 C  s.   |   }z| |W S  ty   |   w r   )r'   r   BaseExceptionrelease)r   connr   r   r   create_producer(   s   zProducerPool.create_producerc                 C  s
   t | jS r   )r	   r+   r&   r   r   r   new0   s   
zProducerPool.newc                 C  s.   | j rt| j D ]}| j|   qd S d S r   )limitrange	_resource
put_nowaitr,   )r   _r   r   r   setup3   s
   zProducerPool.setupc                 C  s   d S r   r   r   resourcer   r   r   close_resource8   s   zProducerPool.close_resourcec                 C  sN   t |r| }|jd u r%|  }z|| W |S  ty$   |   w |S r   )callable_channelr'   reviver(   r)   )r   pr*   r   r   r   prepare;   s   
zProducerPool.preparec                   s&   |j r|j   d |_t | d S r   )__connection__r)   channelr   r3   r"   r   r   r)   G   s   
zProducerPool.release)__name__
__module____qualname____doc__r   close_after_forkr   r'   r+   r,   r2   r5   r:   r)   __classcell__r   r   r"   r   r
      s    r
   c                   @  s*   e Zd ZdZd
ddZdd Zdd	 ZdS )r   zCollection of resource pools.NTc                 C  s0   || _ || _| jrtd urt| t d S d S d S r   )r-   rA   r   r   )r   r-   rA   r   r   r   r   Q   s
   zPoolGroup.__init__c                 C  s   t d)Nz!PoolGroups must define ``create``)NotImplementedError)r   r4   r-   r   r   r   createW   s   zPoolGroup.createc                 C  s,   | j }|tu r
t }| || }| |< |S r   )r-   use_global_limitr   rD   )r   r4   r-   kr   r   r   __missing__Z   s
   zPoolGroup.__missing__)NT)r=   r>   r?   r@   r   rD   rG   r   r   r   r   r   N   s
    
r   c                 C  s   t |  | S )z*Register group (can be used as decorator).)_groupsappendr   r   r   r   r   b   s   
r   c                   @     e Zd ZdZdd ZdS )ConnectionszCollection of connection pools.c                 C  s   |j |dS Nr-   )Poolr   
connectionr-   r   r   r   rD   k   r   zConnections.createNr=   r>   r?   r@   rD   r   r   r   r   rK   h       rK   rM   c                   @  rJ   )	ProducerszCollection of producer pools.c                 C  s   t t| |dS rL   )r
   r   rO   r   r   r   rD   u   s   zProducers.createNrQ   r   r   r   r   rS   r   rR   rS   c                   C  s   t dd tD  S )Nc                 s  s$    | ]}|r
|  ntg V  qd S r   )valuesiter).0gr   r   r   	<genexpr>}   s   " z_all_pools.<locals>.<genexpr>)r   rH   r   r   r   r   
_all_pools|   s   rY   c                   C  s   t d S )z"Get current connection pool limit.r   )_limitr   r   r   r   r      s   r   Fc                 C  s>   | pd} t d p	d}| |kr| t d< t D ]}||  q| S )zSet new connection pool limit.r   )rZ   rY   resize)r-   forcereset_afterignore_errorsglimitpoolr   r   r   r      s   
r   c               	   O  s@   t  D ]}z|  W q ty   Y qw tD ]}|  qdS )z*Reset all pools by closing open resources.N)rY   force_close_all	ExceptionrH   r   )r    r!   r`   r   r   r   r   r      s   

r   )FFF)$r@   
__future__r   os	itertoolsr   rP   r   	messagingr   utils.collectionsr   utils.compatr   utils.functionalr	   __all__rZ   rH   objectrE   environgetdisable_limit_protectionr   r
   r   r   rK   r   rS   r   rY   r   r   r   r   r   r   r   <module>   s4    4
