o
    Df.#                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 zddl
Z
ddlZ
ddlZ
ddlZ
W n ey;   dZ
Y nw dZeeZd	Zd
ZdZdZdZdZdZdZdd ZG dd de	ZdS )z@Apache Cassandra result store backend using the DataStax driver.    N)states)ImproperlyConfigured)
get_logger   )BaseBackend)CassandraBackendz
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
z
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
z(Cassandra backend improperly configured.z!Cassandra backend not configured.z
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
z]
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
z
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
z
    USING TTL {0}
c                 C   s
   t | dS )Nutf8)bytes)x r   R/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/backends/cassandra.pybuf_tC   s   
r   c                       sh   e Zd ZdZdZdZdZ		d fdd	Zddd	Z	dd
dZ	dddZ
dd Zd fdd	Z  ZS )r   aG  Cassandra/AstraDB backend utilizing DataStax driver.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`cassandra-driver` is not available,
            or not-exactly-one of the :setting:`cassandra_servers` and
            the :setting:`cassandra_secure_bundle_path` settings is set.
    NTR#  c                    s  t  jdi | tstt| jj}|p|dd | _|p#|dd | _	|p,|dd | _
|p5|dd | _|p>|dd | _|di | _| jpL| j	}	|	rU| jrU| jsYtt| jrc| j	rctt|pj|dd }
|
d urtt|
nd| _|d	p}d
}|dpd
}ttj|tjj| _ttj|tjj| _d | _|dd }|dd }|r|rttj|d }|stt|di || _d | _d | _d | _d | _t  | _!d S )Ncassandra_serverscassandra_secure_bundle_pathcassandra_portcassandra_keyspacecassandra_tablecassandra_optionscassandra_entry_ttl cassandra_read_consistencyLOCAL_QUORUMcassandra_write_consistencycassandra_auth_providercassandra_auth_kwargsr   )"super__init__	cassandrar   E_NO_CASSANDRAappconfgetserversbundle_pathportkeyspacetabler   E_CASSANDRA_NOT_CONFIGUREDE_CASSANDRA_MISCONFIGURED	Q_EXPIRESformat
cqlexpiresgetattrConsistencyLevelr   read_consistencywrite_consistencyauth_providerauth!E_NO_SUCH_CASSANDRA_AUTH_PROVIDER_cluster_session_write_stmt
_read_stmt	threadingRLock_lock)selfr#   r&   r'   	entry_ttlr%   r$   kwargsr!   db_directionsexpires	read_cons
write_consr1   auth_kwargsauth_provider_class	__class__r   r   r   X   sV   zCassandraBackend.__init__Fc                 C   sz  | j durdS | j  zz| j durW W | j  dS | jr2tjj| jf| j| j	d| j
| _ntjjdd| ji| j	d| j
| _| j| j| _ tjtj| j| jd| _| j| j_tjtj| jd| _| j| j_|rtjtj| jd}| j|_z| j | W n
 tjy   Y nw W n tjy   | jdur| j  d| _d| _  w W | j  dS | j  w )zjPrepare the connection for action.

        Arguments:
            write (bool): are we a writer?
        N)r%   r1   secure_connect_bundle)cloudr1   )r'   r?   )r'   r   ) r5   r:   acquirereleaser#   r   clusterClusterr%   r1   r   r4   r$   connectr&   querySimpleStatementQ_INSERT_RESULTr+   r'   r,   r6   r0   consistency_levelQ_SELECT_RESULTr7   r/   Q_CREATE_RESULT_TABLEexecuteAlreadyExistsOperationTimedOutshutdown)r;   write	make_stmtr   r   r   _get_connection   sl   


;


	


z CassandraBackend._get_connectionc                 K   sV   | j dd | j| j||t| || j t| |t| | |f dS )z1Store return value and state of an executed task.T)rW   N)	rY   r5   rS   r6   r   encoder    nowcurrent_task_children)r;   task_idresultstate	tracebackrequestr=   r   r   r   _store_result   s   

zCassandraBackend._store_resultc                 C   s   dS )Nzcassandra://r   )r;   include_passwordr   r   r   as_uri   s   zCassandraBackend.as_uric              
   C   sf   |    | j| j|f }|stjddS |\}}}}}| ||| ||| || |dS )z$Get task meta-data for a task by id.N)statusr^   )r]   re   r^   	date_doner`   children)	rY   r5   rS   r7   oner   PENDINGmeta_from_decodeddecode)r;   r]   resre   r^   rf   r`   rg   r   r   r   _get_task_meta_for   s   z#CassandraBackend._get_task_meta_forr   c                    s2   |si n|}| | j| j| jd t ||S )N)r#   r&   r'   )updater#   r&   r'   r   
__reduce__)r;   argsr=   rD   r   r   ro      s   zCassandraBackend.__reduce__)NNNNr   N)F)NN)T)r   N)__name__
__module____qualname____doc__r#   r$   supports_autoexpirer   rY   rb   rd   rm   ro   __classcell__r   r   rD   r   r   G   s    

6I

r   )rt   r8   celeryr   celery.exceptionsr   celery.utils.logr   baser   r   cassandra.authcassandra.clustercassandra.queryImportError__all__rq   loggerr   r3   r)   r(   rO   rQ   rR   r*   r   r   r   r   r   r   <module>   s4    