o
    Df,                     @   s  d Z ddlmZmZmZ ddlmZ ddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZ dd	lmZ zdd
lZW n eyC   d
ZY nw erezddlmZ W n ey]   ddlmZ Y nw ddlmZ n
d
ZG dd deZdZeddgZG dd deZd
S )zMongoDB result store backend.    )datetime	timedeltatimezone)EncodeError)cached_property)maybe_sanitize_urlurlparse)states)ImproperlyConfigured   )BaseBackendN)Binary)InvalidDocumentc                   @   s   e Zd ZdS )r   N)__name__
__module____qualname__ r   r   P/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/backends/mongodb.pyr      s    r   )MongoBackendpicklemsgpackc                       s  e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZdZd	ZdZd3 fd
d	Zedd Zdd Zdd Z fddZ fddZ	d4ddZdd Zdd Zdd Zdd Zd d! Zd"d# Zd5 fd%d&	Zd'd( Ze d)d* Z!e d+d, Z"e d-d. Z#e d/d0 Z$d6d1d2Z%  Z&S )7r   zMongoDB result backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`pymongo` is not available.
    N	localhostii  celerycelery_taskmetacelery_groupmeta
   Fc                    s  i | _ t j|fi | tstd|   D ]\}}| j || q| jr]| 	| j| _tj
| j}dd |d D }|d | _|d | _|| _|d rU|d | _| j |d  | jjd	}|d urt|tsqtd
t|}d|v s}d|v rd | _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _| j |di  | j | d S d S )NzCYou need to install the pymongo library to use the MongoDB backend.c                 S   s"   g | ]}|d   d|d  qS )r   :r   r   ).0xr   r   r   
<listcomp>N   s    z)MongoBackend.__init__.<locals>.<listcomp>nodelistusernamepassworddatabaseoptionsmongodb_backend_settingsz4MongoDB backend settings should be grouped in a dicthostport
mongo_hostusertaskmeta_collectiongroupmeta_collection)r$   super__init__pymongor
   _prepare_client_optionsitems
setdefaulturl_ensure_mongodb_uri_compliance
uri_parser	parse_urir)   r"   r(   database_nameupdateappconfget
isinstancedictpopr&   r'   r*   r+   )selfr8   kwargskeyvalueuri_data	hostslistconfig	__class__r   r   r-   :   sX   



zMongoBackend.__init__c                 C   s2   t | }|jdsd|  } | dkr| d7 } | S )Nmongodbzmongodb+
mongodb://r   )r   scheme
startswith)r2   
parsed_urlr   r   r   r3   v   s   
z+MongoBackend._ensure_mongodb_uri_compliancec                 C   s    t jdkr
d| jiS | jddS )N)   maxPoolSizeF)max_pool_sizeauto_start_request)r.   version_tuplerN   r>   r   r   r   r/      s
   

z$MongoBackend._prepare_client_optionsc                 C   s   | j du rGddlm} | j}|s&| j}t|tr&|ds&d| d| j }t	| j
}||d< | jr7| j|d< | jr?| j|d< |d	i || _ | j S )
zConnect to the MongoDB server.Nr   )MongoClientrH   r   r&   r!   r"   r   )_connectionr.   rR   r(   r&   r;   strrJ   r'   r<   r$   r)   r"   )r>   rR   r&   r9   r   r   r   _get_connection   s"   




zMongoBackend._get_connectionc                    s0   | j dkr|S t |}| j tv rt|}|S Nbson)
serializerr,   encodeBINARY_CODECSr   )r>   datapayloadrE   r   r   rY      s   

zMongoBackend.encodec                    s   | j dkr|S t |S rV   )rX   r,   decode)r>   r[   rE   r   r   r]      s   
zMongoBackend.decodec           	   
   K   s`   | j | ||||dd}||d< z| jjd|i|dd W |S  ty/ } zt|d}~ww )z1Store return value and state of an executed task.F)resultstate	tracebackrequestformat_date_idTupsertN)_get_result_metarY   
collectionreplace_oner   r   )	r>   task_idr^   r_   r`   ra   r?   metaexcr   r   r   _store_result   s   zMongoBackend._store_resultc                 C   s   | j d|i}|rZ| jjddr?| |d |d |d |d |d |d |d	 |d
 |d |d |d | |d dS | |d |d | |d |d |d |d dS tjddS )z$Get task meta-data for a task by id.rc   extendedr^   nameargsqueuer?   statusworkerretrieschildren	date_doner`   )rn   ro   ri   rp   r?   rq   rr   rs   rt   ru   r`   r^   )ri   rq   r^   ru   r`   rt   N)rq   r^   )	rg   find_oner8   r9   find_value_for_keymeta_from_decodedr]   r	   PENDING)r>   ri   objr   r   r   _get_task_meta_for   s4   zMongoBackend._get_task_meta_forc                 C   s>   ||  dd |D ttjd}| jjd|i|dd |S )zSave the group result.c                 S   s   g | ]}|j qS r   )id)r   ir   r   r   r      s    z,MongoBackend._save_group.<locals>.<listcomp>)rc   r^   ru   rc   Trd   )rY   r   nowr   utcgroup_collectionrh   )r>   group_idr^   rj   r   r   r   _save_group   s   
zMongoBackend._save_groupc                    sD    j d|i}|r |d |d  fdd |d D dS dS )z!Get the result for a group by id.rc   ru   c                    s   g | ]} j |qS r   )r8   AsyncResult)r   taskrQ   r   r   r      s    
z/MongoBackend._restore_group.<locals>.<listcomp>r^   )ri   ru   r^   N)r   rv   r]   )r>   r   rz   r   rQ   r   _restore_group   s   
zMongoBackend._restore_groupc                 C      | j d|i dS )zDelete a group by id.rc   N)r   
delete_one)r>   r   r   r   r   _delete_group   s   zMongoBackend._delete_groupc                 C   r   )zRemove result from MongoDB.

        Raises:
            pymongo.exceptions.OperationsError:
                if the task_id could not be removed.
        rc   N)rg   r   )r>   ri   r   r   r   _forget   s   
zMongoBackend._forgetc                 C   sN   | j sdS | jdd| j | j ii | jdd| j | j ii dS )zDelete expired meta-data.Nru   z$lt)expiresrg   delete_manyr8   r~   expires_deltar   rQ   r   r   r   cleanup	  s   zMongoBackend.cleanupr   c                    s(   |si n|}t  |t|| j| jdS )N)r   r2   )r,   
__reduce__r<   r   r2   )r>   ro   r?   rE   r   r   r     s   zMongoBackend.__reduce__c                 C   s   |   }|| j S N)rU   r6   )r>   connr   r   r   _get_database  s   
zMongoBackend._get_databasec                 C   s   |   S )z]Get database from MongoDB connection.

        performs authentication if necessary.
        )r   rQ   r   r   r   r#     s   zMongoBackend.databasec                 C      | j | j }|jddd |S z"Get the meta-data task collection.ru   T)
background)r#   r*   create_indexr>   rg   r   r   r   rg   &     zMongoBackend.collectionc                 C   r   r   )r#   r+   r   r   r   r   r   r   0  r   zMongoBackend.group_collectionc                 C   s   t | jdS )N)seconds)r   r   rQ   r   r   r   r   :  s   zMongoBackend.expires_deltac                 C   sL   | j sdS |r
| j S d| j vrt| j S | j dd\}}dt||gS )z~Return the backend as an URI.

        Arguments:
            include_password (bool): Password censored if disabled.
        rH   ,r   )r2   r   splitjoin)r>   include_passworduri1	remainderr   r   r   as_uri>  s   

zMongoBackend.as_urir   )NN)r   N)F)'r   r   r   __doc__r(   r&   r'   r)   r"   r6   r*   r+   rN   r$   supports_autoexpirerS   r-   staticmethodr3   r/   rU   rY   r]   rl   r{   r   r   r   r   r   r   r   r   r#   rg   r   r   r   __classcell__r   r   rE   r   r   #   sP    <





	
	
r   )r   r   r   r   kombu.exceptionsr   kombu.utils.objectsr   kombu.utils.urlr   r   r   r	   celery.exceptionsr
   baser   r.   ImportErrorbson.binaryr   pymongo.binarypymongo.errorsr   	Exception__all__	frozensetrZ   r   r   r   r   r   <module>   s2    