Skip to content
89 changes: 88 additions & 1 deletion test/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import textwrap
import traceback
import uuid
from typing import Any, Dict
from typing import Any, Dict, Mapping

from pymongo.collection import Collection

Expand Down Expand Up @@ -2202,6 +2202,93 @@ def test_05_roundtrip_encrypted_unindexed(self):
self.assertEqual(decrypted, val)


# https://github.com/mongodb/specifications/blob/072601/source/client-side-encryption/tests/README.rst#rewrap
class TestRewrapWithSeparateClientEncryption(EncryptionIntegrationTest):

MASTER_KEYS: Mapping[str, Mapping[str, Any]] = {
"aws": {
"region": "us-east-1",
"key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0",
},
"azure": {
"keyVaultEndpoint": "key-vault-csfle.vault.azure.net",
"keyName": "key-name-csfle",
},
"gcp": {
"projectId": "devprod-drivers",
"location": "global",
"keyRing": "key-ring-csfle",
"keyName": "key-name-csfle",
},
"kmip": {},
"local": {},
}

KMS_TLS_OPTS = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tlsCAFile is only valid/needed for kmip. Can we use the existing KMS_TLS_OPTS global instead of adding a new self.KMS_TLS_OPTS?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"aws": {"tlsCAFile": CA_PEM},
"azure": {"tlsCAFile": CA_PEM},
"gcp": {"tlsCAFile": CA_PEM},
"kmip": {"tlsCAFile": CA_PEM},
}

def test_rewrap(self):
for src_provider in self.MASTER_KEYS:
for dst_provider in self.MASTER_KEYS:
with self.subTest(src_provider=src_provider, dst_provider=dst_provider):
self.run_test(src_provider, dst_provider)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test says we still need to test the "local" provider:

For "local", do not set a masterKey document.

I think that means we need to add

... "kmip": {}, "local": None, } 
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They used local: {} in the C driver, I'll go with that.


def run_test(self, src_provider, dst_provider):
# Step 1. Drop the collection ``keyvault.datakeys``.
self.client.keyvault.drop_collection("datakeys")

# Step 2. Create a ``ClientEncryption`` object named ``client_encryption1``
client_encryption1 = ClientEncryption(
key_vault_client=self.client,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=self.KMS_TLS_OPTS,
codec_options=OPTS,
)
self.addCleanup(client_encryption1.close)

# Step 3. Call ``client_encryption1.create_data_key`` with ``src_provider``.
key_id = client_encryption1.create_data_key(
master_key=self.MASTER_KEYS[src_provider], kms_provider=src_provider
)

# Step 4. Call ``client_encryption1.encrypt`` with the value "test"
cipher_text = client_encryption1.encrypt(
"test", key_id=key_id, algorithm=Algorithm.AEAD_AES_256_CBC_HMAC_SHA_512_Deterministic
)

# Step 5. Create a ``ClientEncryption`` object named ``client_encryption2``
client2 = MongoClient()
self.addCleanup(client2.close)
client_encryption2 = ClientEncryption(
key_vault_client=client2,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=self.KMS_TLS_OPTS,
codec_options=OPTS,
)
self.addCleanup(client_encryption1.close)

# Step 6. Call ``client_encryption2.rewrap_many_data_key`` with an empty ``filter``.
rewrap_many_data_key_result = client_encryption2.rewrap_many_data_key(
{}, provider=dst_provider, master_key=self.MASTER_KEYS[dst_provider]
)

self.assertEqual(rewrap_many_data_key_result.bulk_write_result.modified_count, 1)

# 7. Call ``client_encryption1.decrypt`` with the ``cipher_text``. Assert the return value is "test".
decrypt_result1 = client_encryption1.decrypt(cipher_text)
self.assertEqual(decrypt_result1, "test")

# 8. Call ``client_encryption2.decrypt`` with the ``cipher_text``. Assert the return value is "test".
decrypt_result2 = client_encryption2.decrypt(cipher_text)
self.assertEqual(decrypt_result2, "test")


class TestQueryableEncryptionDocsExample(EncryptionIntegrationTest):
# Queryable Encryption is not supported on Standalone topology.
@client_context.require_no_standalone
Expand Down