43
43
CERT_PATH = os .path .join (os .path .dirname (os .path .realpath (__file__ )),
44
44
'certificates' )
45
45
CLIENT_PEM = os .path .join (CERT_PATH , 'client.pem' )
46
+ CLIENT_ENCRYPTED_PEM = os .path .join (CERT_PATH , 'client_encrypted.pem' )
46
47
CA_PEM = os .path .join (CERT_PATH , 'ca.pem' )
47
48
CRL_PEM = os .path .join (CERT_PATH , 'crl.pem' )
48
49
SIMPLE_SSL = False
@@ -224,6 +225,38 @@ def test_simple_ssl(self):
224
225
self .assertTrue (db .test .find_one ()['ssl' ])
225
226
client .drop_database ('pymongo_ssl_test' )
226
227
228
+ def test_ssl_pem_passphrase (self ):
229
+ # Expects the server to be running with server.pem and ca.pem
230
+ #
231
+ # --sslPEMKeyFile=/path/to/pymongo/test/certificates/server.pem
232
+ # --sslCAFile=/path/to/pymongo/test/certificates/ca.pem
233
+ if not CERT_SSL :
234
+ raise SkipTest ("No mongod available over SSL with certs" )
235
+
236
+ vi = sys .version_info
237
+ if vi [0 ] == 2 and vi < (2 , 7 , 9 ) or vi [0 ] == 3 and vi < (3 , 3 ):
238
+ self .assertRaises (
239
+ ConfigurationError ,
240
+ MongoClient ,
241
+ 'server' ,
242
+ ssl = True ,
243
+ ssl_certfile = CLIENT_ENCRYPTED_PEM ,
244
+ ssl_pem_passphrase = "clientpassword" ,
245
+ ssl_ca_certs = CA_PEM ,
246
+ serverSelectionTimeoutMS = 100 )
247
+ else :
248
+ connected (MongoClient ('server' ,
249
+ ssl = True ,
250
+ ssl_certfile = CLIENT_ENCRYPTED_PEM ,
251
+ ssl_pem_passphrase = "clientpassword" ,
252
+ ssl_ca_certs = CA_PEM ,
253
+ serverSelectionTimeoutMS = 100 ))
254
+
255
+ uri_fmt = ("mongodb://server/?ssl=true"
256
+ "&ssl_certfile=%s&ssl_pem_passphrase=clientpassword"
257
+ "&ssl_ca_certs=%s&serverSelectionTimeoutMS=100" )
258
+ connected (MongoClient (uri_fmt % (CLIENT_ENCRYPTED_PEM , CA_PEM )))
259
+
227
260
def test_cert_ssl (self ):
228
261
# Expects the server to be running with server.pem and ca.pem.
229
262
#
@@ -515,7 +548,7 @@ def test_validation_with_system_ca_certs(self):
515
548
os .environ .pop ('SSL_CERT_FILE' )
516
549
517
550
def test_system_certs_config_error (self ):
518
- ctx = get_ssl_context (None , None , None , ssl .CERT_NONE , None )
551
+ ctx = get_ssl_context (None , None , None , None , ssl .CERT_NONE , None )
519
552
if ((sys .platform != "win32"
520
553
and hasattr (ctx , "set_default_verify_paths" ))
521
554
or hasattr (ctx , "load_default_certs" )):
@@ -547,11 +580,11 @@ def test_certifi_support(self):
547
580
# Force the test on Windows, regardless of environment.
548
581
ssl_support .HAVE_WINCERTSTORE = False
549
582
try :
550
- ctx = get_ssl_context (None , None , CA_PEM , ssl .CERT_REQUIRED , None )
583
+ ctx = get_ssl_context (None , None , None , CA_PEM , ssl .CERT_REQUIRED , None )
551
584
ssl_sock = ctx .wrap_socket (socket .socket ())
552
585
self .assertEqual (ssl_sock .ca_certs , CA_PEM )
553
586
554
- ctx = get_ssl_context (None , None , None , None , None )
587
+ ctx = get_ssl_context (None , None , None , None , None , None )
555
588
ssl_sock = ctx .wrap_socket (socket .socket ())
556
589
self .assertEqual (ssl_sock .ca_certs , ssl_support .certifi .where ())
557
590
finally :
@@ -568,11 +601,11 @@ def test_wincertstore(self):
568
601
if not ssl_support .HAVE_WINCERTSTORE :
569
602
raise SkipTest ("Need wincertstore to test wincertstore." )
570
603
571
- ctx = get_ssl_context (None , None , CA_PEM , ssl .CERT_REQUIRED , None )
604
+ ctx = get_ssl_context (None , None , None , CA_PEM , ssl .CERT_REQUIRED , None )
572
605
ssl_sock = ctx .wrap_socket (socket .socket ())
573
606
self .assertEqual (ssl_sock .ca_certs , CA_PEM )
574
607
575
- ctx = get_ssl_context (None , None , None , None , None )
608
+ ctx = get_ssl_context (None , None , None , None , None , None )
576
609
ssl_sock = ctx .wrap_socket (socket .socket ())
577
610
self .assertEqual (ssl_sock .ca_certs , ssl_support ._WINCERTS .name )
578
611
0 commit comments