o
    `f,                     @   s  d dl Z d dlZd dlZd dlmZmZ d dlZd dlZd dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZmZ d dlmZ d dlm Z m!Z! dd Z"dddZ#dd Z$dd Z%dddZ&G dd dZ'dS )    N)urljoinurlparse)hazmatx509)InvalidSignature)backends)DSAPublicKey)ECDSAEllipticCurvePublicKey)PKCS1v15)RSAPublicKey)SHA1Hash)EncodingPublicFormat)ocsp)AuthorizationErrorConnectionErrorc                 C   s   |   }zEt|tr||j|jt |j W d S t|tr+||j|j|j W d S t|t	r?||j|jt
|j W d S ||j|j W d S  tyT   tdw )Nzfailed to valid ocsp response)
public_key
isinstancer   verify	signaturetbs_response_bytesr   signature_hash_algorithmr   r
   r	   r   r   )issuer_certocsp_responsepubkey r   C/home/ubuntu/webapp/venv/lib/python3.10/site-packages/redis/ocsp.py_verify_response   s2   





r   Tc                 C   sN  t |}|jt jjkrtd|jt jjkr/|jt jj	kr.t
dt|jdd  dnt
d|jtj kr?t
d|jrN|jtj k rNt
d|j}|j}|j}| }|d	urb|| jksf||kri| }n5|j}t|| ||}	z|	d
 }
W n ty   t
dw |
jtj}|d	u stjjj|jvrt
d|
}|rt || dS )z=A wrapper the return the validity of a known ocsp certificatez4you are not authorized to view this ocsp certificatezReceived an .   z ocsp certificate statusz@failed to retrieve a successful response from the ocsp responderz)ocsp certificate was issued in the futurez1ocsp certificate has invalid update - in the pastNr   z'no certificates found for the responderz'delegate not autorized for ocsp signingT)!r   load_der_ocsp_responseresponse_statusOCSPResponseStatusUNAUTHORIZEDr   
SUCCESSFULcertificate_statusOCSPCertStatusGOODr   strsplitthis_updatedatetimenownext_updateresponder_nameissuer_key_hashresponder_key_hashsubjectcertificates_get_certificates
IndexError
extensionsget_extension_for_classr   ExtendedKeyUsageoidExtendedKeyUsageOIDOCSP_SIGNINGvaluer   )r   
ocsp_bytesvalidater   r0   issuer_hashresponder_hashcert_to_validatecertsresponder_certsresponder_certextr   r   r   _check_certificate1   sT   


rG   c                    s8   d u r fdd| D }|S  fdd| D }|S )Nc                    s(   g | ]}t |kr|j jkr|qS r   )_get_pubkey_hashissuerr3   .0c)r   rA   r   r   
<listcomp>n   s
    z%_get_certificates.<locals>.<listcomp>c                    s&   g | ]}|j kr|j j kr|qS r   )r3   rI   rJ   )r   r0   r   r   rM   t   s
    r   )rC   r   r0   rA   r4   r   )r   rA   r0   r   r5   l   s   r5   c                 C   st   |   }t|tr|tjtj}nt|tr |tj	tj
}n|tjtj}tt t d}|| | S )N)backend)r   r   r   public_bytesr   DERr   PKCS1r
   X962UncompressedPointSubjectPublicKeyInfor   r   r   default_backendupdatefinalize)certificater   hsha1r   r   r   rH   }   s   


rH   c                 C   s   |dv rt dd}|   }|  D ]}| }|j|jkr$|} nq|du r-t d|dur>t|}||kr>t dt||S )zAn implementation of a function for set_ocsp_client_callback in PyOpenSSL.

    This function validates that the provide ocsp_bytes response is valid,
    and matches the expected, stapled responses.
    )    Nzno ocsp response presentNz2no matching issuer cert found in certificate chainz/received and expected certificates do not match)	r   get_peer_certificateto_cryptographyget_peer_cert_chainr3   rI   r   load_pem_x509_certificaterG   )conr>   expectedr   	peer_certrL   certer   r   r   ocsp_staple_verifier   s"   

re   c                   @   sR   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )OCSPVerifiera  A class to verify ssl sockets for RFC6960/RFC6961. This can be used
    when using direct validation of OCSP responses and certificate revocations.

    @see https://datatracker.ietf.org/doc/html/rfc6960
    @see https://datatracker.ietf.org/doc/html/rfc6961
    Nc                 C   s   || _ || _|| _|| _d S N)SOCKHOSTPORTCA_CERTS)selfsockhostportca_certsr   r   r   __init__   s   
zOCSPVerifier.__init__c                 C   s"   t |}t| t }|S )z?Convert SSL certificates in a binary (DER) format to ASCII PEM.)sslDER_cert_to_PEM_certr   r_   encoder   rU   )rl   derpemrc   r   r   r   
_bin2ascii   s   
zOCSPVerifier._bin2asciic                 C   s0   | j d}|du rtd| |}| |S )zThis function returns the certificate, primary issuer, and primary ocsp
        server in the chain for a socket already wrapped with ssl.
        TFz!no certificate found for ssl peer)rh   getpeercertr   rw   _certificate_components)rl   ru   rc   r   r   r   components_from_socket   s
   

z#OCSPVerifier.components_from_socketc                 C   s   z|j tjjjj}W n tjj jy   t	dw dd |D }z|d j
j}W n ty5   d}Y nw dd |D }z|d j
j}W n tyP   t	dw |||fS )zGiven an SSL certificate, retract the useful components for
        validating the certificate status with an OCSP server.

        Args:
            cert ([bytes]): A PEM encoded ssl certificate
        z-No AIA information present in ssl certificatec                 S       g | ]}|j tjjjkr|qS r   )access_methodr   r:   AuthorityInformationAccessOID
CA_ISSUERSrK   ir   r   r   rM      
    z8OCSPVerifier._certificate_components.<locals>.<listcomp>r   Nc                 S   r{   r   )r|   r   r:   r}   OCSPr   r   r   r   rM      r   zno ocsp servers in certificate)r7   get_extension_for_oidr   r:   ExtensionOIDAUTHORITY_INFORMATION_ACCESSr=   cryptographyExtensionNotFoundr   access_locationr6   )rl   rc   aiaissuersrI   ocspsr   r   r   r   ry      s4   
z$OCSPVerifier._certificate_componentsc                 C   s6   t j| j| jf| jd}t| t	 }| 
|S )zReturn the certificate, primary issuer, and primary ocsp server
        from the host defined by the socket. This is useful in cases where
        different certificates are occasionally presented.
        )rp   )rr   get_server_certificateri   rj   rk   r   r_   rt   r   rU   ry   )rl   rv   rc   r   r   r   !components_from_direct_connection   s   
z.OCSPVerifier.components_from_direct_connectionc                 C   sT   t  }|||tjjj }| }t	
|tjjjj}t||d}|S )z#Return the complete url to the ocspascii)r   OCSPRequestBuilderadd_certificater   r   
primitiveshashesSHA256buildbase64	b64encoderO   serializationr   rP   r   decode)rl   serverrc   r   orbrequestpathurlr   r   r   build_certificate_url   s   z"OCSPVerifier.build_certificate_urlc           	      C   sp   t |}|jstd|j}| |}| |||}t|jdd}t j||d}|js1tdt	||jdS )z3Checks the validity of an ocsp server for an issuerz"failed to fetch issuer certificatezapplication/ocsp-request)HostzContent-Type)headersz failed to fetch ocsp certificateT)
requestsgetokr   contentrw   r   r   netlocrG   )	rl   r   rc   
issuer_urlrru   r   ocsp_urlheaderr   r   r   check_certificate  s   

zOCSPVerifier.check_certificatec                 C   sn   z|   \}}}|du rtd| |||W S  ty6   |  \}}}|du r-td| ||| Y S w )aD  Returns the validity of the certificate wrapping our socket.
        This first retrieves for validate the certificate, issuer_url,
        and ocsp_server for certificate validate. Then retrieves the
        issuer certificate from the issuer_url, and finally checks
        the validity of OCSP revocation status.
        Nz%no issuers found in certificate chain)rz   r   r   r   r   )rl   rc   r   ocsp_serverr   r   r   is_valid!  s   	zOCSPVerifier.is_validrg   )__name__
__module____qualname____doc__rq   rw   rz   ry   r   r   r   r   r   r   r   r   rf      s    
(
rf   )Trg   )(r   r-   rr   urllib.parser   r   %cryptography.hazmat.primitives.hashesr   r   r   r   cryptography.exceptionsr   cryptography.hazmatr   -cryptography.hazmat.primitives.asymmetric.dsar   ,cryptography.hazmat.primitives.asymmetric.ecr	   r
   1cryptography.hazmat.primitives.asymmetric.paddingr   -cryptography.hazmat.primitives.asymmetric.rsar   r   r   ,cryptography.hazmat.primitives.serializationr   r   cryptography.x509r   redis.exceptionsr   r   r   rG   r5   rH   re   rf   r   r   r   r   <module>   s.    
;
