o
    Df8                     @   st  d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZG dd deZG dd dZ		d4ddZd5ddZdd ZeddfddZdd Z		d6ddZdd Zdd  Z d!d" Z!d#d$ Z"G d%d& d&Z#	'			d7d)d*Z$d+d, Z%d-d. Z&d/d0 Z'd1d2 Z(eeed3Z)ee%ed3Z*ee&ed3Z+ee'ed3Z,dS )8z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                   @   s   e Zd ZdZdS )r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__ r   r   O/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/contrib/migrate.pyr      s    r   c                   @   s0   e Zd ZdZdZdZdZedd Zdd Z	dS )r   zMigration progress state.r   c                 C   s   | j sdS t| j S )N?)	total_apxstrselfr   r   r   strtotal&   s   
zState.strtotalc                 C   s$   | j r	d| j  S | j d| j S )N^/)filteredcountr%   r#   r   r   r   __repr__,   s   zState.__repr__N)
r   r   r   r   r)   r(   r!   propertyr%   r*   r   r   r   r   r      s    
r   c              
   C   s   |sg d}t |j}|j|j|j}}}|du r|d n|}|du r(|d n|}|j|j}	}
|dd}|dd}|durEt|nd}|D ]}||d qI| j	t |f|||||	|
|d| dS )zRepublish message.)application_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression
expiration)r0   r1   r2   r/   r-   r.   r3   )
r   bodydelivery_infor/   
propertiesr-   r.   popfloatpublish)producermessager0   r1   remove_propsr4   infor/   propsctypeencr2   r3   keyr   r   r   r   2   s*   


r   c                 C   s>   |j }|du r	i n|}t| |||d ||d d dS )zMigrate single task message.Nr0   r1   r0   r1   )r5   r   get)r:   body_r;   queuesr=   r   r   r   r   P   s   
r   c                    s    fdd}|S )Nc                    s   r
| d vr
d S  | |S Ntaskr   r4   r;   callbacktasksr   r   r(   [   s   
z!filter_callback.<locals>.filteredr   )rJ   rK   r(   r   rI   r   filter_callbackY   s   rL   c                    sV   t |}t|jj|dd t| d} fdd}t|| |f|d|S )z)Migrate tasks from one broker to another.F)auto_declarerE   c                    sh   |  j }| j| j|_|j| jkr| j|j|_|jj| jkr.| j| j|j_|  d S N)channelrC   namer1   r0   declare)queue	new_queuer:   rE   r   r   on_declare_queuek   s   
z'migrate_tasks.<locals>.on_declare_queue)rE   rV   )r	   prepare_queuesamqpProducerr   r   )sourcedestmigrateapprE   kwargsrV   r   rU   r   r   c   s   
r   c                 C   s   t |tr| jj| S |S rO   )
isinstancer"   rX   rE   )r]   qr   r   r   _maybe_queuey   s   
ra   c	              
      s   t    fdd|pg D pd}
 j|dd+ jt 	f	dd}t |fd|
i|	W  d   S 1 sFw   Y  dS )	aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    c                    s   g | ]}t  |qS r   )ra   ).0rS   )r]   r   r   
<listcomp>   s    zmove.<locals>.<listcomp>NF)poolc                    s   | |}|rNr|}t |tr!t|j |jj|j}}nt|\}}t|||d |	   j
d7  _
 rD | | rPj
krRt d S d S d S )NrB      )r_   r   r   default_channelr0   rQ   r1   expand_destr   ackr(   r   )r4   r;   retexrk)	rJ   connr0   limit	predicater:   r1   state	transformr   r   on_task   s&   

zmove.<locals>.on_taskconsume_from)r	   connection_or_acquirerX   rY   r   r   )rn   
connectionr0   r1   rZ   r]   rJ   rm   rp   r^   rE   rq   r   )
r]   rJ   rl   r0   rm   rn   r:   r1   ro   rp   r   r      s   >$r   c              	   C   s:   z	| \}}W ||fS  t tfy   ||}}Y ||fS w rO   )	TypeError
ValueError)ri   r0   r1   rj   rk   r   r   r   rg      s   
rg   c                 C   s   |d | kS )z'Return true if task id equals task_id'.idr   )task_idr4   r;   r   r   r   r         r   c                 C   s   |d | v S )z-Return true if task id is member of set ids'.rw   r   )idsr4   r;   r   r   r   r      ry   r   c                 C   s@   t | tr
| d} t | trtdd | D } | d u ri } | S )N,c                 s   s*    | ]}t tt|d ddV  qdS ):N   )tupler   r   splitrb   r`   r   r   r   	<genexpr>   s    "z!prepare_queues.<locals>.<genexpr>)r_   r"   r   listdictrN   r   r   r   rW      s   


rW   c                   @   sN   e Zd Z				dddZdd Zdd	 Zd
d Zdd Zdd Zdd Z	dS )FiltererN      ?Fc                    s   | _ | _| _| _| _| _tt|pg  _t	| _
|	 _|
 _| _ fdd|p4t j
D  _|p<t  _| _d S )Nc                    s   g | ]}t  j|qS r   )ra   r]   r   r#   r   r   rc   	  s    
z%Filterer.__init__.<locals>.<listcomp>)r]   rl   filterrm   timeoutack_messagessetr   rK   rW   rE   rJ   foreverrV   r   rr   r   ro   accept)r$   r]   rl   r   rm   r   r   rK   rE   rJ   r   rV   rr   ro   r   r^   r   r#   r   __init__   s    


zFilterer.__init__c              	   C   s   |  |  > zt| j| j| jdD ]}qW n tjy!   Y n ty)   Y nw W d    | jS W d    | jS W d    | jS 1 sHw   Y  | jS )N)r   ignore_timeouts)	prepare_consumercreate_consumerr   rl   r   r   socketr   ro   )r$   _r   r   r   start  s0   







zFilterer.startc                 C   s2   | j  jd7  _| jr| j j| jkrt d S d S )Nre   )ro   r)   rm   r   r$   r4   r;   r   r   r   update_state  s   zFilterer.update_statec                 C   s   |   d S rO   )rh   r   r   r   r   ack_message#  s   zFilterer.ack_messagec                 C   s   | j jj| j| j| jdS )N)rE   r   )r]   rX   TaskConsumerrl   rr   r   r#   r   r   r   r   &  s
   zFilterer.create_consumerc                 C   s   | j }| j}| j}| jrt|| j}t|| j}t|| j}|| || | jr1|| j | jd urKt| j| j	}| jrFt|| j}|| | 
| |S rO   )r   r   r   rK   rL   register_callbackr   rJ   r   ro   declare_queues)r$   consumerr   r   r   rJ   r   r   r   r   -  s$   




zFilterer.prepare_consumerc              	   C   s~   |j D ]9}| j r|j| j vrq| jd ur| | z||jjdd\}}}|r0| j j|7  _W q | jjy<   Y qw d S )NT)passive)	rE   rQ   rV   rP   queue_declarero   r!   rl   channel_errors)r$   r   rS   r   mcountr   r   r   r   A  s$   


zFilterer.declare_queuesNr   FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r   r   r   r   r      s    
r   r   Fc                 K   s0   t | ||f|||||||	|
|||d| S )zFilter tasks.)rm   r   r   rK   rE   rJ   r   rV   rr   ro   r   )r   r   )r]   rl   r   rm   r   r   rK   rE   rJ   r   rV   rr   ro   r   r^   r   r   r   r   Q  s&   r   c                 K   s   t | |ifi |S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )rx   r[   r^   r   r   r   r   f  s   r   c                    s$    fdd}t |fdt i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                    s     |jd S )Ncorrelation_id)rC   r6   rH   mapr   r   task_id_in_map  s   z%move_by_idmap.<locals>.task_id_in_maprm   )r   len)r   r^   r   r   r   r   r   t  s   r   c                    s    fdd}t |fi |S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                    s     | d S rF   )rC   rH   r   r   r   task_name_in_map  s   z)move_by_taskmap.<locals>.task_name_in_map)r   )r   r^   r   r   r   r   r     s   r   c                 K   s   t tjd| |d| d S )N)ro   r4   r   )printMOVING_PROGRESS_FMTformat)ro   r4   r;   r^   r   r   r   filter_status  s   r   )rp   )NNNrO   )NNNNNNNNr   )-r   r   	functoolsr   	itertoolsr   r   kombur   r   kombu.commonr   kombu.utils.encodingr   
celery.appr	   celery.utils.nodenamesr
   celery.utils.textr   __all__r   	Exceptionr   r   r   r   rL   r   ra   r   rg   r   r   rW   r   r   r   r   r   r   r   r   move_direct_by_idmapmove_direct_by_taskmapr   r   r   r   <module>   sX    

	


[Z
