0

I'm working on a python library to process apple pay payload into usable card details. To do so I follow the official documentation here.

Everything is working perfectly except for 2 verification steps:

Step 1.c.

Ensure that there is a valid X.509 chain of trust from the signature to the root CA. Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate, that the leaf certificate is signed by the intermediate CA, and that the intermediate CA is signed by the Apple Root CA - G3.

Here I ended up doing the 2 last check by the following code using the cryptography lib:

from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA

def verify_root_ca_chain_of_trust(
    trusted_root_ca: x509.Certificate,
    intermediate_cert: x509.Certificate,
    leaf_cert: x509.Certificate
) -> None:
    try:
        # verify that the intermediate CA is signed by the Apple Root CA - G3
        trusted_pub = trusted_root_ca.public_key()
        trusted_pub.verify(
            intermediate_cert.signature,
            intermediate_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
        # verify that the leaf certificate is signed by the intermediate CA
        trusted_intermediate_pub = intermediate_cert.public_key()
        trusted_intermediate_pub.verify(
            leaf_cert.signature,
            leaf_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
    except TypeError as err:
        raise CustomError('error') from err

My problem here is that I only implement 2/3 of the checks required. What I can't figure out how to do is:

Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate

Step 1.d. and 1.e.

d. For ECC (EC_v1), ensure that the signature is a valid ECDSA signature (ecdsa-with-SHA256 1.2.840.10045.4.3.2) of the concatenated values of the ephemeralPublicKey, data, transactionId, and applicationData keys.

e. Inspect the CMS signing time of the signature, as defined by section 11.3 of RFC 5652. If the time signature and the transaction time differ by more than a few minutes, it's possible that the token is a replay attack.

what I tried so far:

def validate_token_signature(
    trusted_cert: x509.Certificate,
    signature: str,
    payment_data: str,
    ephemeral_pub: str,
    transaction_id: str,
    application_data: str = None,
) -> None:
    data_byte: bytes = base64.b64decode(ephemeral_pub)
    payment_data_byte: bytes = base64.b64decode(payment_data)
    transaction_id_byte: bytes = bytes.fromhex(transaction_id)
    data: bytes = data_byte + payment_data_byte + transaction_id_byte
    if application_data is not None:
        application_data_byte: bytes = base64.b64decode(application_data)
        data = data + application_data_byte
    try:
        trusted_leaf_pub = trusted_cert.public_key()
        trusted_leaf_pub.verify(base64.b64decode(signature), data, ECDSA(hashes.SHA256()))
    except InvalidSignature as err:
        print(err)
        # raise SignatureError('error') from err```

Here they do not specified which signature to verify, I presume it's the PKCS#7 but cryptography only process pkcs#7 as a list of x509 certs as stated here:

Deserialize a PEM encoded PKCS7 blob to a list of certificates. PKCS7 can contain many other types of data, including CRLs, but this function will ignore everything except certificates.

Is there a way to do those checks with python cryptography or do I have to use another lib like pyopenssl or something else?

Bastien B
  • 1,018
  • 8
  • 25
  • Unfortunately PKCS#7 / CMS support seems pretty sparse on Python, e.g. pycryptodome doesn't support it (yet). Happy to be corrected on that though. Would e.g. [this work for you](https://gist.github.com/russau/c0123ef934ef88808050462a8638a410) in the mean time? You would really not want to (re-)implement CMS verification. – Maarten Bodewes Jan 10 '22 at 16:59
  • From what i understand and tested, m2crypto is based on openssl, just like pyopenssl, and also implement the verify par of it, so it would be subject to this https://stackoverflow.com/questions/30700348/how-to-validate-verify-an-x509-certificate-chain-of-trust-in-python If i ended up using the openssl verify i would prefer to use the pyopenssl one, maintained by pyca and lot of people (i aleready use cryptography from them). Here my problem is that non of the 2 let me obtain CMS signing time (or i can't figure out where it is documented) – Bastien B Jan 14 '22 at 09:07
  • One slightly harder option would be to go through the source code and see if you can implement it yourself of course, based on the system they already have. I'd at first look to options to ask for extra details though, as signing time is strictly in the "other" category in the RFC. – Maarten Bodewes Jan 14 '22 at 09:10

1 Answers1

0

To read the cms signing time and validate the pkcs7 signature i needed to get acces to the cms content and signer info. This can't be achive with pkcs#7 modules from m2crypto or cryptography. What i did endedup using was asn1crypto.

Step 1.c and 1.d are part of the same checks so i did keep the abovechecks for the chain of trust:

def verify_root_ca_chain_of_trust(
    trusted_root_ca: x509.Certificate,
    intermediate_cert: x509.Certificate,
    leaf_cert: x509.Certificate
) -> None:
    try:
        # verify that the intermediate CA is signed by the Apple Root CA - G3
        trusted_pub = trusted_root_ca.public_key()
        trusted_pub.verify(
            intermediate_cert.signature,
            intermediate_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
        # verify that the leaf certificate is signed by the intermediate CA
        trusted_intermediate_pub = intermediate_cert.public_key()
        trusted_intermediate_pub.verify(
            leaf_cert.signature,
            leaf_cert.tbs_certificate_bytes,
            ECDSA(hashes.SHA256())
        )
    except TypeError as err:
        raise CustomError('error') from err

To do the last part check 'Specifically, ensure that the signature was created using the private key corresponding to the leaf certificate' i did use asn1crypto:

    from asn1crypto import cms
    from asn1crypto import core

    def validate_token_signature(
        trusted_cert: x509.Certificate,
        signature: bytes,
        payment_data: bytes,
        ephemeral_pub: bytes,
        transaction_id: bytes,
        application_data: bytes,
    ) -> None:
        
        signed_data = cms.ContentInfo.load(signature)['content']
        algo = signed_data['digest_algorithms'][0]['algorithm'].native
        signers_info = signed_data['signer_infos']
        attr_signature = signers_info[0].native['signature']
        attrs = signers_info[0]['signed_attrs']

        # Insure data signer and cert signer match
        cert_issuer: str = trusted_cert.issuer.rdns[0].rfc4514_string().split("=")[1]
        signed_data_issuer: str = dict(dict(dict(signers_info.native[0])['sid'])['issuer'])['common_name']
        if not cert_issuer == signed_data_issuer:
            raise CustomError('error')

        # Verify that cert is still validate now
        if not trusted_cert.not_valid_before < datetime.now() < trusted_cert.not_valid_after:
            raise CustomError('error')

        # user data
        udata: bytes = ephemeral_pub + payment_data + transaction_id + application_data

        mdData = getattr(hashlib, algo)(udata).digest()

        if attrs is not None and not isinstance(attrs, core.Void):
            # if attrs, mdSigned == message_digest attribute
            mdSigned = None
            for attr in attrs:
                if attr['type'].native == 'message_digest':
                    mdSigned = attr['values'].native[0]
            signedData = attrs.dump()
            signedData = b'\x31' + signedData[1:]
        else:
            # if no attrs, mdSigned == hash of userdata
            mdSigned = mdData
            signedData = udata

        # 2- verify() must succeed succeeded
        try:
            trusted_cert.public_key().verify(attr_signature, signedData, ECDSA(hashes.SHA256()))
        except InvalidSignature:
            raise CustomError('error')

        # 3- hashok must be True
        if not mdData == mdSigned:
            raise CustomError('error')

for the cms signing time verification (1.e) i did also use the data get through the asn1 object:

    from asn1crypto import cms

    def cms_compare(p7: bytes) -> None:
        ci = cms.ContentInfo.load(p7)
        try:
            content = dict(ci.native['content'])
            signed_time: str = dict(dict(content['signer_infos'][0])['signed_attrs'][1])['values'][0]
            timed: datetime = datetime.strptime(str(signed_time), "%Y-%m-%d %H:%M:%S%z")

            if int(time.time()) - int(timed.strftime('%s')) > 30:
                raise CustomError('error')
        except TypeError:
            raise CustomError('error')
Bastien B
  • 1,018
  • 8
  • 25