o
    Df                     @   st  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZ d	d
lmZmZ d	dlmZmZ d	dlmZ d	dlmZmZmZ d	dlmZmZ zddl Z W n e!yk   dZ Y nw dZ"dZ#dd Z$edd Z%edd Z&G dd dZ'ej(G dd de'Z)ej(G dd de'Z*ej(G dd de*Z+ej(G dd  d e)Z,d#d!d"Z-dS )$z3Task results/state and results for groups of tasks.    N)deque)contextmanager)proxy)isoparse)cached_property)Thenablebarrierpromise   )current_appstates)_set_task_join_will_blocktask_join_will_block)app_or_default)ImproperlyConfiguredIncompleteStreamTimeoutError)DependencyGraphGraphFormatter)
ResultBaseAsyncResult	ResultSetGroupResultEagerResultresult_from_tuplezNever call result.get() within a task!
See https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
c                   C   s   t  rttd S N)r   RuntimeErrorE_WOULDBLOCK r   r   F/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/result.pyassert_will_not_block$   s   r    c                  c   0    t  } td z
d V  W t|  d S t|  w NFr   r   reset_valuer   r   r   allow_join_result)      r&   c                  c   r!   NTr#   r$   r   r   r   denied_join_result3   r'   r)   c                   @   s   e Zd ZdZdZdS )r   zBase class for results.N)__name__
__module____qualname____doc__parentr   r   r   r   r   =   s    r   c                   @   sB  e Zd ZdZdZeZdZdZ			dhddZe	dd Z
e
jdd Z
did	d
Zdd Zdd Zdd Zdd Z		djddZ		djddZdddddddddejejfddZeZdd Zdd Zdkdd Zd!d" Zdkd#d$Zd%d& Zd'd( Zd)d* Zd+d, Z dld-d.Z!e!Z"d/d0 Z#dmd1d2Z$d3d4 Z%d5d6 Z&d7d8 Z'd9d: Z(d;d< Z)d=d> Z*d?d@ Z+dAdB Z,e-dCdD Z.e	dEdF Z/e	dGdH Z0dIdJ Z1dKdL Z2dMdN Z3dOdP Z4e	dQdR Z5e5Z6e	dSdT Z7e	dUdV Z8e8Z9e	dWdX Z:e:jdYdX Z:e	dZd[ Z;e	d\d] Z<e	d^d_ Z=e	d`da Z>e	dbdc Z?e	ddde Z@e	dfdg ZAdS )nr   zxQuery task state.

    Arguments:
        id (str): See :attr:`id`.
        backend (Backend): See :attr:`backend`.
    Nc                 C   sd   |d u rt dt| t|p| j| _|| _|p| jj| _|| _t| jdd| _	d | _
d| _d S )Nz#AsyncResult requires valid id, not TweakF)
ValueErrortyper   appidbackendr.   r	   _on_fulfilledon_ready_cache_ignored)selfr4   r5   	task_namer3   r.   r   r   r   __init__X   s   
zAsyncResult.__init__c                 C   s   t | dr| jS dS )z+If True, task result retrieval is disabled.r9   F)hasattrr9   r:   r   r   r   ignoredf   s   
zAsyncResult.ignoredc                 C   s
   || _ dS )z%Enable/disable task result retrieval.N)r9   )r:   valuer   r   r   r?   m   s   
Fc                 C   s   | j j| |d | j||S )Nr/   )r5   add_pending_resultr7   thenr:   callbackon_errorr0   r   r   r   rB   r   s   zAsyncResult.thenc                 C   s   | j |  |S r   r5   remove_pending_resultr:   resultr   r   r   r6   v   s   zAsyncResult._on_fulfilledc                 C   s   | j }| j|o
| fd fS r   )r.   r4   as_tuple)r:   r.   r   r   r   rJ   z   s   zAsyncResult.as_tuplec                 C   s0   g }| j }|| j |dur||  |S )zReturn as a list of task IDs.N)r.   appendr4   extendas_list)r:   resultsr.   r   r   r   rM   ~   s   zAsyncResult.as_listc                 C   s(   d| _ | jr| j  | j| j dS )z/Forget the result of this task and its parents.N)r8   r.   forgetr5   r4   r>   r   r   r   rO      s   
zAsyncResult.forgetc                 C   s    | j jj| j|||||d dS )a  Send revoke signal to all workers.

        Any worker receiving the task, or having reserved the
        task, *must* ignore it.

        Arguments:
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from workers.
                The ``timeout`` argument specifies the seconds to wait.
                Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                ``wait`` is enabled.
        
connection	terminatesignalreplytimeoutN)r3   controlrevoker4   r:   rQ   rR   rS   waitrU   r   r   r   rW      s   
zAsyncResult.revokec                 C   s   | j jj||||||d dS )a7  Send revoke signal to all workers only for tasks with matching headers values.

        Any worker receiving the task, or having reserved the
        task, *must* ignore it.
        All header fields *must* match.

        Arguments:
            headers (dict[str, Union(str, list)]): Headers to match when revoking tasks.
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from workers.
                The ``timeout`` argument specifies the seconds to wait.
                Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                ``wait`` is enabled.
        rP   N)r3   rV   revoke_by_stamped_headers)r:   headersrQ   rR   rS   rY   rU   r   r   r   rZ      s   
z%AsyncResult.revoke_by_stamped_headersT      ?c              
   C   s   | j rdS |	r
t  t }|r|r| jrt| jdd}|   |r&|| | jr4|r1| j|d | jS | j	
|  | j	j| |||||||dS )a  Wait until task is ready, and return its result.

        Warning:
           Waiting for tasks within a task may lead to deadlocks.
           Please read :ref:`task-synchronous-subtasks`.

        Warning:
           Backends use resources to store and transmit results. To ensure
           that resources are released, you must eventually call
           :meth:`~@AsyncResult.get` or :meth:`~@AsyncResult.forget` on
           EVERY :class:`~@AsyncResult` instance returned after calling
           a task.

        Arguments:
            timeout (float): How long to wait, in seconds, before the
                operation times out. This is the setting for the publisher
                (celery client) and is different from `timeout` parameter of
                `@app.task`, which is the setting for the worker. The task
                isn't terminated even if timeout occurs.
            propagate (bool): Re-raise exception if the task failed.
            interval (float): Time to wait (in seconds) before retrying to
                retrieve the result.  Note that this does not have any effect
                when using the RPC/redis result store backends, as they don't
                use polling.
            no_ack (bool): Enable amqp no ack (automatically acknowledge
                message).  If this is :const:`False` then the message will
                **not be acked**.
            follow_parents (bool): Re-raise any exception raised by
                parent tasks.
            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
                this is the default configuration. CAUTION do not enable this
                unless you must.

        Raises:
            celery.exceptions.TimeoutError: if `timeout` isn't
                :const:`None` and the result does not arrive within
                `timeout` seconds.
            Exception: If the remote call raised an exception then that
                exception will be re-raised in the caller process.
        NTr/   )rD   )rU   intervalon_intervalno_ack	propagaterD   
on_message)r?   r    r	   r.   _maybe_reraise_parent_errorrB   r8   maybe_throwrI   r5   rA   wait_for_pending)r:   rU   r`   r]   r_   follow_parentsrD   ra   r^   disable_sync_subtasksEXCEPTION_STATESPROPAGATE_STATES_on_intervalr   r   r   get   s0   -
zAsyncResult.getc                 C   s"   t t|  D ]}|  qd S r   )reversedlist_parentsrc   r:   noder   r   r   rb     s   
z'AsyncResult._maybe_reraise_parent_errorc                 c   s$    | j }|r|V  |j }|sd S d S r   r.   rn   r   r   r   rm   
  s   zAsyncResult._parentsc                 k   s2    | j |dD ]\}}||jdi |fV  qdS )a  Collect results as they return.

        Iterator, like :meth:`get` will wait for the task to complete,
        but will also follow :class:`AsyncResult` and :class:`ResultSet`
        returned by the task, yielding ``(result, value)`` tuples for each
        result in the tree.

        An example would be having the following tasks:

        .. code-block:: python

            from celery import group
            from proj.celery import app

            @app.task(trail=True)
            def A(how_many):
                return group(B.s(i) for i in range(how_many))()

            @app.task(trail=True)
            def B(i):
                return pow2.delay(i)

            @app.task(trail=True)
            def pow2(i):
                return i ** 2

        .. code-block:: pycon

            >>> from celery.result import ResultBase
            >>> from proj.tasks import A

            >>> result = A.delay(10)
            >>> [v for v in result.collect()
            ...  if not isinstance(v, (ResultBase, tuple))]
            [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

        Note:
            The ``Task.trail`` option must be enabled
            so that the list of children is stored in ``result.children``.
            This is the default but enabled explicitly for illustration.

        Yields:
            Tuple[AsyncResult, Any]: tuples containing the result instance
            of the child task, and the return value of that task.
        intermediateNr   iterdepsrj   )r:   rr   kwargs_Rr   r   r   collect  s   .zAsyncResult.collectc                 C   s"   d }|   D ]\}}| }q|S r   rs   )r:   r@   rv   rw   r   r   r   get_leafA  s   
zAsyncResult.get_leafc                 #   sn    t d | fg}| }|r5| \} | fV    r,| fdd jp'g D  n|r1t |sd S d S )Nc                 3   s    | ]} |fV  qd S r   r   .0childro   r   r   	<genexpr>P      z'AsyncResult.iterdeps.<locals>.<genexpr>)r   popleftreadyrL   childrenr   )r:   rr   stackis_incomplete_streamr.   r   r}   r   rt   G  s   
 zAsyncResult.iterdepsc                 C   s   | j | jjv S )zReturn :const:`True` if the task has executed.

        If the task is still running, pending, or is waiting
        for retry then :const:`False` is returned.
        )stater5   READY_STATESr>   r   r   r   r   U  s   zAsyncResult.readyc                 C      | j tjkS )z7Return :const:`True` if the task executed successfully.)r   r   SUCCESSr>   r   r   r   
successful]     zAsyncResult.successfulc                 C   r   )z(Return :const:`True` if the task failed.)r   r   FAILUREr>   r   r   r   faileda  r   zAsyncResult.failedc                 O   s   | j j|i | d S r   )r7   throwr:   argsru   r   r   r   r   e  s   zAsyncResult.throwc                 C   sn   | j d u r	|  n| j }|d |d |d}}}|tjv r+|r+| || | |d ur5|| j| |S )NstatusrI   	traceback)r8   _get_task_metarj   r   rh   r   _to_remote_tracebackr4   )r:   r`   rD   cacher   r@   tbr   r   r   rc   h  s   
zAsyncResult.maybe_throwc                 C   s2   |rt d ur| jjjrt j| S d S d S d S r   )tblibr3   conftask_remote_tracebacks	Tracebackfrom_stringas_traceback)r:   r   r   r   r   r   s  s   z AsyncResult._to_remote_tracebackc                 C   sL   t |p	t| jddd}| j|dD ]\}}|| |r#||| q|S )Noval)rootshape)	formatterrq   )r   r   r4   rt   add_arcadd_edge)r:   rr   r   graphr.   ro   r   r   r   build_graphw  s   
zAsyncResult.build_graphc                 C   
   t | jS z`str(self) -> self.id`.strr4   r>   r   r   r   __str__     
zAsyncResult.__str__c                 C   r   z`hash(self) -> hash(self.id)`.hashr4   r>   r   r   r   __hash__  r   zAsyncResult.__hash__c                 C   s   dt | j d| j dS )N<: >)r2   r*   r4   r>   r   r   r   __repr__  s   zAsyncResult.__repr__c                 C   s.   t |tr|j| jkS t |tr|| jkS tS r   )
isinstancer   r4   r   NotImplementedr:   otherr   r   r   __eq__  s
   


zAsyncResult.__eq__c                 C   s   |  | j| jd | j| jS r   )	__class__r4   r5   r3   r.   r>   r   r   r   __copy__  s   zAsyncResult.__copy__c                 C      | j |  fS r   r   __reduce_args__r>   r   r   r   
__reduce__     zAsyncResult.__reduce__c                 C   s   | j | jd d | jfS r   )r4   r5   r.   r>   r   r   r   r        zAsyncResult.__reduce_args__c                 C   s   | j dur| j |  dS dS )z9Cancel pending operations when the instance is destroyed.NrF   r>   r   r   r   __del__  s   
zAsyncResult.__del__c                 C   s   |   S r   )r   r>   r   r   r   r        zAsyncResult.graphc                 C   s   | j jS r   )r5   supports_native_joinr>   r   r   r   r     r   z AsyncResult.supports_native_joinc                 C      |   dS )Nr   r   rj   r>   r   r   r   r        zAsyncResult.childrenc                 C   s:   |r|d }|t jv r| | j|}| |  |S |S )Nr   )r   r   
_set_cacher5   meta_from_decodedr7   )r:   metar   dr   r   r   _maybe_set_cache  s   

zAsyncResult._maybe_set_cachec                 C   s$   | j d u r| | j| jS | j S r   )r8   r   r5   get_task_metar4   r>   r   r   r   r     s   
zAsyncResult._get_task_metac                 K   s   t |  gS r   )iterr   r:   ru   r   r   r   
_iter_meta  r   zAsyncResult._iter_metac                    s.   | d}|r fdd|D |d< | _|S )Nr   c                    s   g | ]}t | jqS r   )r   r3   rz   r>   r   r   
<listcomp>      z*AsyncResult._set_cache.<locals>.<listcomp>)rj   r8   )r:   r   r   r   r>   r   r     s   


zAsyncResult._set_cachec                 C      |   d S )zTask return value.

        Note:
            When the task has been executed, this contains the return value.
            If the task raised an exception, this will be the exception
            instance.
        rI   r   r>   r   r   r   rI     s   	zAsyncResult.resultc                 C   r   )z#Get the traceback of a failed task.r   r   r>   r   r   r   r     s   zAsyncResult.tracebackc                 C   r   )a  The tasks current state.

        Possible values includes:

            *PENDING*

                The task is waiting for execution.

            *STARTED*

                The task has been started.

            *RETRY*

                The task is to be retried, possibly because of failure.

            *FAILURE*

                The task raised an exception, or has exceeded the retry limit.
                The :attr:`result` attribute then contains the
                exception raised by the task.

            *SUCCESS*

                The task executed successfully.  The :attr:`result` attribute
                then contains the tasks return value.
        r   r   r>   r   r   r   r     s   zAsyncResult.statec                 C      | j S )zCompat. alias to :attr:`id`.r4   r>   r   r   r   task_id     zAsyncResult.task_idc                 C   
   || _ d S r   r   )r:   r4   r   r   r   r     r   c                 C   r   )Nnamer   r>   r   r   r   r     r   zAsyncResult.namec                 C   r   )Nr   r   r>   r   r   r   r     r   zAsyncResult.argsc                 C   r   )Nru   r   r>   r   r   r   ru     r   zAsyncResult.kwargsc                 C   r   )Nworkerr   r>   r   r   r   r     r   zAsyncResult.workerc                 C   s*   |   d}|rt|tjst|S |S )zUTC date and time.	date_done)r   rj   r   datetimer   )r:   r   r   r   r   r     s   zAsyncResult.date_donec                 C   r   )Nretriesr   r>   r   r   r   r     r   zAsyncResult.retriesc                 C   r   )Nqueuer   r>   r   r   r   r     r   zAsyncResult.queue)NNNNr"   NFNFN)F)TN)FN)Br*   r+   r,   r-   r3   r   r4   r5   r<   propertyr?   setterrB   r6   rJ   rM   rO   rW   rZ   r   rg   rh   rj   rY   rb   rm   rx   ry   rt   r   r   r   r   rc   maybe_reraiser   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rI   infor   r   r   r   r   r   ru   r   r   r   r   r   r   r   r   r   D   s    



	


H
1

	




		
	









r   c                   @   sR  e Zd ZdZdZdZdCddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd ZdDddZeZdd Zdd Zdd Zdd Z		dEd!d"Zd#d$ Zd%d& Z	'		dFd(d)Z	'		dFd*d+ZdGd,d-Z		dHd.d/Z				dId0d1Zd2d3 Zd4d5 Zd6d7 Zd8d9 Z d:d; Z!e"d<d= Z#e"d>d? Z$e$j%d@d? Z$e"dAdB Z&dS )Jr   zpA collection of results.

    Arguments:
        results (Sequence[AsyncResult]): List of result instances.
    Nc                 K   sP   || _ || _tt| fd| _|pt|| _| jr&| jt| jdd d S d S )N)r   Tr/   )	_apprN   r	   r   r7   r   _on_fullrB   	_on_ready)r:   rN   r3   ready_barrierru   r   r   r   r<   1  s   zResultSet.__init__c                 C   s4   || j vr| j | | jr| j| dS dS dS )zvAdd :class:`AsyncResult` as a new member of the set.

        Does nothing if the result is already a member.
        N)rN   rK   r   addrH   r   r   r   r   9  s   
zResultSet.addc                 C   s   | j jr
|   d S d S r   )r5   is_asyncr7   r>   r   r   r   r   C  s   zResultSet._on_readyc                 C   s@   t |tr| j|}z	| j| W dS  ty   t|w )z~Remove result from the set; it must be a member.

        Raises:
            KeyError: if the result isn't a member.
        N)r   r   r3   r   rN   remover1   KeyErrorrH   r   r   r   r   G  s   
zResultSet.removec                 C   s&   z|  | W dS  ty   Y dS w )zbRemove result from the set if it is a member.

        Does nothing if it's not a member.
        N)r   r   rH   r   r   r   discardT  s
   zResultSet.discardc                    s    j  fdd|D  dS )z Extend from iterable of results.c                 3   s    | ]
}| j vr|V  qd S r   rN   r{   rr>   r   r   r~   `  s    z#ResultSet.update.<locals>.<genexpr>N)rN   rL   )r:   rN   r   r>   r   update^  s   zResultSet.updatec                 C   s   g | j dd< dS )z!Remove all results from this set.Nr   r>   r   r   r   clearb  s   zResultSet.clearc                 C      t dd | jD S )zReturn true if all tasks successful.

        Returns:
            bool: true if all of the tasks finished
                successfully (i.e. didn't raise an exception).
        c                 s       | ]}|  V  qd S r   )r   r{   rI   r   r   r   r~   m  r   z'ResultSet.successful.<locals>.<genexpr>allrN   r>   r   r   r   r   f     zResultSet.successfulc                 C   r   )zReturn true if any of the tasks failed.

        Returns:
            bool: true if one of the tasks failed.
                (i.e., raised an exception)
        c                 s   r   r   )r   r   r   r   r   r~   v  r   z#ResultSet.failed.<locals>.<genexpr>anyrN   r>   r   r   r   r   o  r   zResultSet.failedTc                 C   s   | j D ]	}|j||d qd S )N)rD   r`   )rN   rc   )r:   rD   r`   rI   r   r   r   rc   x  s   
zResultSet.maybe_throwc                 C   r   )zReturn true if any of the tasks are incomplete.

        Returns:
            bool: true if one of the tasks are still
                waiting for execution.
        c                 s   s    | ]}|   V  qd S r   r   r   r   r   r   r~     s    z$ResultSet.waiting.<locals>.<genexpr>r   r>   r   r   r   waiting}  r   zResultSet.waitingc                 C   r   )zDid all of the tasks complete? (either by success of failure).

        Returns:
            bool: true if all of the tasks have been executed.
        c                 s   r   r   r   r   r   r   r   r~     r   z"ResultSet.ready.<locals>.<genexpr>r   r>   r   r   r   r     s   zResultSet.readyc                 C   r   )a  Task completion count.

        Note that `complete` means `successful` in this context. In other words, the
        return value of this method is the number of ``successful`` tasks.

        Returns:
            int: the number of complete (i.e. successful) tasks.
        c                 s   s    | ]	}t | V  qd S r   )intr   r   r   r   r   r~     s    z,ResultSet.completed_count.<locals>.<genexpr>)sumrN   r>   r   r   r   completed_count  s   	zResultSet.completed_countc                 C   s   | j D ]}|  qdS )z?Forget about (and possible remove the result of) all the tasks.N)rN   rO   rH   r   r   r   rO     s   

zResultSet.forgetFc                 C   s*   | j jjdd | jD |||||d dS )a[  Send revoke signal to all workers for all tasks in the set.

        Arguments:
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.
            wait (bool): Wait for replies from worker.
                The ``timeout`` argument specifies the number of seconds
                to wait.  Disabled by default.
            timeout (float): Time in seconds to wait for replies when
                the ``wait`` argument is enabled.
        c                 S   s   g | ]}|j qS r   r   r   r   r   r   r         z$ResultSet.revoke.<locals>.<listcomp>)rQ   rU   rR   rS   rT   N)r3   rV   rW   rN   rX   r   r   r   rW     s   
zResultSet.revokec                 C   r   r   )r   rN   r>   r   r   r   __iter__     
zResultSet.__iter__c                 C   s
   | j | S )z`res[i] -> res.results[i]`.r   )r:   indexr   r   r   __getitem__  r   zResultSet.__getitem__r\   c	           	   
   C   s&   | j r| jn| j||||||||dS )zSee :meth:`join`.

        This is here for API compatibility with :class:`AsyncResult`,
        in addition it uses :meth:`join_native` if available for the
        current result backend.
        )rU   r`   r]   rD   r_   ra   rf   r^   )r   join_nativejoin)	r:   rU   r`   r]   rD   r_   ra   rf   r^   r   r   r   rj     s   	zResultSet.getc	              	   C   s   |rt   t }	d}
|durtdg }| jD ]/}d}
|r.|t |	  }
|
dkr.td|j|
|||||d}|rB||j| q|| q|S )a  Gather the results of all tasks as a list in order.

        Note:
            This can be an expensive operation for result store
            backends that must resort to polling (e.g., database).

            You should consider using :meth:`join_native` if your backend
            supports it.

        Warning:
            Waiting for tasks within a task may lead to deadlocks.
            Please see :ref:`task-synchronous-subtasks`.

        Arguments:
            timeout (float): The number of seconds to wait for results
                before the operation times out.
            propagate (bool): If any of the tasks raises an exception,
                the exception will be re-raised when this flag is set.
            interval (float): Time to wait (in seconds) before retrying to
                retrieve a result from the set.  Note that this does not have
                any effect when using the amqp result store backend,
                as it does not use polling.
            callback (Callable): Optional callback to be called for every
                result received.  Must have signature ``(task_id, value)``
                No results will be returned by this function if a callback
                is specified.  The order of results is also arbitrary when a
                callback is used.  To get access to the result object for
                a particular id you'll have to generate an index first:
                ``index = {r.id: r for r in gres.results.values()}``
                Or you can create new result objects on the fly:
                ``result = app.AsyncResult(task_id)`` (both will
                take advantage of the backend cache anyway).
            no_ack (bool): Automatic message acknowledgment (Note that if this
                is set to :const:`False` then the messages
                *will not be acknowledged*).
            disable_sync_subtasks (bool): Disable tasks to wait for sub tasks
                this is the default configuration. CAUTION do not enable this
                unless you must.

        Raises:
            celery.exceptions.TimeoutError: if ``timeout`` isn't
                :const:`None` and the operation takes longer than ``timeout``
                seconds.
        Nz,Backend does not support on_message callbackg        zjoin operation timed out)rU   r`   r]   r_   r^   rf   )	r    time	monotonicr   rN   r   rj   r4   rK   )r:   rU   r`   r]   rD   r_   ra   rf   r^   
time_start	remainingrN   rI   r@   r   r   r   r    s0   /
zResultSet.joinc                 C      | j ||S r   r7   rB   rC   r   r   r   rB     r   zResultSet.thenc                 C   s   | j j| |||||dS )a0  Backend optimized version of :meth:`iterate`.

        .. versionadded:: 2.2

        Note that this does not support collecting the results
        for different task types using different backends.

        This is currently only supported by the amqp, Redis and cache
        result backends.
        )rU   r]   r_   ra   r^   )r5   iter_native)r:   rU   r]   r_   ra   r^   r   r   r   r
    s
   zResultSet.iter_nativec	                 C   s   |rt   |r	dn	dd t| jD }	|rdn
dd tt| D }
| |||||D ]5\}}t|trCg }|D ]	}||	  q8n|d }|rR|d t
jv rR||rZ||| q+||
|	| < q+|
S )a-  Backend optimized version of :meth:`join`.

        .. versionadded:: 2.2

        Note that this does not support collecting the results
        for different task types using different backends.

        This is currently only supported by the amqp, Redis and cache
        result backends.
        Nc                 S   s   i | ]\}}|j |qS r   r   )r{   irI   r   r   r   
<dictcomp>7  r   z)ResultSet.join_native.<locals>.<dictcomp>c                 S   s   g | ]}d qS r   r   )r{   rv   r   r   r   r   :  s    z)ResultSet.join_native.<locals>.<listcomp>rI   r   )r    	enumeraterN   rangelenr
  r   rl   rK   rj   r   rh   )r:   rU   r`   r]   rD   r_   ra   r^   rf   order_indexaccr   r   r@   children_resultr   r   r   r  '  s*   

zResultSet.join_nativec                 K   s.   dd | j jdd | jD fddi|D S )Nc                 s   s    | ]\}}|V  qd S r   r   )r{   rv   r   r   r   r   r~   L  r   z'ResultSet._iter_meta.<locals>.<genexpr>c                 S   s   h | ]}|j qS r   r   r   r   r   r   	<setcomp>M  r   z'ResultSet._iter_meta.<locals>.<setcomp>max_iterationsr
   )r5   get_manyrN   r   r   r   r   r   K  s   
zResultSet._iter_metac                 C   s   dd | j D S )Nc                 s   s.    | ]}|j |jr|jtjv r|V  qd S r   )r5   	is_cachedr4   r   r   rh   )r{   resr   r   r   r~   Q  s    z0ResultSet._failed_join_report.<locals>.<genexpr>r   r>   r   r   r   _failed_join_reportP     zResultSet._failed_join_reportc                 C   r   r   )r  rN   r>   r   r   r   __len__U  r   zResultSet.__len__c                 C   s   t |tr|j| jkS tS r   )r   r   rN   r   r   r   r   r   r   X  s   
zResultSet.__eq__c                 C   s*   dt | j dddd | jD  dS )Nr   z: [, c                 s       | ]}|j V  qd S r   r   r   r   r   r   r~   ^      z%ResultSet.__repr__.<locals>.<genexpr>]>)r2   r*   r  rN   r>   r   r   r   r   ]  s   *zResultSet.__repr__c                 C   s$   z| j d jW S  ty   Y d S w Nr   )rN   r   
IndexErrorr>   r   r   r   r   `  s
   zResultSet.supports_native_joinc                 C   s,   | j d u r| jr| jd jnt | _ | j S r  )r   rN   r3   r   _get_current_objectr>   r   r   r   r3   g  s
   
zResultSet.appc                 C   r   r   )r   )r:   r3   r   r   r   r3   n  r   c                 C   s   | j r| j jS | jd jS r  )r3   r5   rN   r>   r   r   r   r5   r  s   zResultSet.backendNNr(   r   )NTr\   NTNTNr"   )Nr\   TNN)NTr\   NTNNT)'r*   r+   r,   r-   r   rN   r<   r   r   r   r   r   r   r   r   rc   r   r   r   r   rO   rW   r   r  rj   r  rB   r
  r  r   r  r  r   r   r   r   r3   r   r5   r   r   r   r   r   $  sl    


	
		



J

$


r   c                       s   e Zd ZdZdZdZd fdd	Z fddZd ddZd d	d
Z	dd Z
dd Zdd ZeZdd Zdd Zdd Zdd Zdd Zedd Zed!ddZ  ZS )"r   az  Like :class:`ResultSet`, but with an associated id.

    This type is returned by :class:`~celery.group`.

    It enables inspection of the tasks state and return values as
    a single entity.

    Arguments:
        id (str): The id of the group.
        results (Sequence[AsyncResult]): List of result instances.
        parent (ResultBase): Parent result of this group.
    Nc                    s$   || _ || _t j|fi | d S r   )r4   r.   superr<   )r:   r4   rN   r.   ru   r   r   r   r<     s   zGroupResult.__init__c                    s   | j |  t   d S r   )r5   rG   r#  r   r>   r$  r   r   r     s   zGroupResult._on_readyc                 C   s   |p| j j| j| S )zSave group-result for later retrieval using :meth:`restore`.

        Example:
            >>> def save_and_restore(result):
            ...     result.save()
            ...     result = GroupResult.restore(result.id)
        )r3   r5   
save_groupr4   r:   r5   r   r   r   save  s   zGroupResult.savec                 C   s   |p| j j| j dS )z.Remove this result if it was previously saved.N)r3   r5   delete_groupr4   r&  r   r   r   delete  s   zGroupResult.deletec                 C   r   r   r   r>   r   r   r   r     r   zGroupResult.__reduce__c                 C   s   | j | jfS r   )r4   rN   r>   r   r   r   r        zGroupResult.__reduce_args__c                 C   s   t | jp| jS r   )boolr4   rN   r>   r   r   r   __bool__  r  zGroupResult.__bool__c                 C   sF   t |tr|j| jko|j| jko|j| jkS t |tr!|| jkS tS r   )r   r   r4   rN   r.   r   r   r   r   r   r   r     s   




zGroupResult.__eq__c              	   C   s2   dt | j d| j dddd | jD  dS )Nr   r   z [r  c                 s   r  r   r   r   r   r   r   r~     r  z'GroupResult.__repr__.<locals>.<genexpr>r  )r2   r*   r4   r  rN   r>   r   r   r   r     s   2zGroupResult.__repr__c                 C   r   r   r   r>   r   r   r   r     r   zGroupResult.__str__c                 C   r   r   r   r>   r   r   r   r     r   zGroupResult.__hash__c                 C   s&   | j | jo	| j fdd | jD fS )Nc                 S   s   g | ]}|  qS r   )rJ   r   r   r   r   r     s    z(GroupResult.as_tuple.<locals>.<listcomp>)r4   r.   rJ   rN   r>   r   r   r   rJ     s   zGroupResult.as_tuplec                 C   r   r   r   r>   r   r   r   r     s   zGroupResult.childrenc                 C   s.   |pt | jts| jnt}|p|j}||S )z&Restore previously saved group result.)r   r3   r   r   r5   restore_group)clsr4   r5   r3   r   r   r   restore  s
   

zGroupResult.restore)NNNr   r"  )r*   r+   r,   r-   r4   rN   r<   r   r'  r)  r   r   r,  __nonzero__r   r   r   r   rJ   r   r   classmethodr/  __classcell__r   r   r$  r   r   w  s*    



r   c                   @   s   e Zd ZdZd%ddZd&ddZdd	 Zd
d Zdd Zdd Z	dd Z
		d'ddZeZdd Zdd Zdd Zedd Zedd Zedd  ZeZed!d" Zed#d$ ZdS )(r   z.Result that we know has already been executed.Nc                 C   s4   || _ || _|| _|| _|| _t | _| |  d S r   )r4   _result_state
_traceback_namer	   r7   )r:   r4   	ret_valuer   r   r   r   r   r   r<     s   zEagerResult.__init__Fc                 C   r  r   r	  rC   r   r   r   rB     r   zEagerResult.thenc                 C   r   r   )r8   r>   r   r   r   r     s   zEagerResult._get_task_metac                 C   r   r   r   r>   r   r   r   r     r   zEagerResult.__reduce__c                 C   s   | j | j| j| jfS r   )r4   r3  r4  r5  r>   r   r   r   r     r   zEagerResult.__reduce_args__c                 C   s   |   \}}|| S r   )r   )r:   r.  r   r   r   r   r     s   zEagerResult.__copy__c                 C      dS r(   r   r>   r   r   r   r        zEagerResult.readyTc                 K   sN   |rt   |  r| jS | jtjv r%|r"t| jtr| jt| j| jS d S r   )r    r   rI   r   r   rh   r   	Exception)r:   rU   r`   rf   ru   r   r   r   rj     s   
zEagerResult.getc                 C   s   d S r   r   r>   r   r   r   rO     r9  zEagerResult.forgetc                 O   s   t j| _d S r   )r   REVOKEDr4  r   r   r   r   rW   
  r*  zEagerResult.revokec                 C   s   d| j  dS )Nz<EagerResult: r   r   r>   r   r   r   r     r   zEagerResult.__repr__c                 C   s   | j | j| j| j| jdS )N)r   rI   r   r   r   )r4   r3  r4  r5  r6  r>   r   r   r   r8     s   zEagerResult._cachec                 C   r   )zThe tasks return value.)r3  r>   r   r   r   rI     r   zEagerResult.resultc                 C   r   )zThe tasks state.)r4  r>   r   r   r   r     r   zEagerResult.statec                 C   r   )z!The traceback if the task failed.)r5  r>   r   r   r   r   %  r   zEagerResult.tracebackc                 C   r8  r"   r   r>   r   r   r   r   *  s   z EagerResult.supports_native_joinr"  r"   )NTT)r*   r+   r,   r-   r<   rB   r   r   r   r   r   rj   rY   rO   rW   r   r   r8   rI   r   r   r   r   r   r   r   r   r     s6    



	


r   c                    s   t    j}t| ts?| \}}t|ttfr|n|df\}}|r&t| }|dur9 j| fdd|D |dS |||dS | S )zDeserialize result from tuple.Nc                    s   g | ]}t | qS r   )r   rz   r3   r   r   r   =  s    z%result_from_tuple.<locals>.<listcomp>rp   )r   r   r   r   rl   tupler   r   )r   r3   Resultr  nodesr4   r.   r   r<  r   r   /  s   

r   r   ).r-   r   r  collectionsr   
contextlibr   weakrefr   dateutil.parserr   kombu.utils.objectsr   viner   r   r	    r   r   r4  r   r   r3   r   
exceptionsr   r   r   utils.graphr   r   r   ImportError__all__r   r    r&   r)   r   registerr   r   r   r   r   r   r   r   r   <module>   sR    
	
	   b  T_W