22# Copyright (c) Microsoft Corporation. 
33# Licensed under the MIT License. 
44# ------------------------------------ 
5+ import  base64 
56import  os 
67import  sys 
78
@@ -82,20 +83,31 @@ def live_service_principal():
8283 }
8384
8485
85- def  get_certificate_parameters (content , password_protected_content , password , extension ):
86-  # type: (bytes, bytes, str, str) -> dict 
86+ def  get_certificate_parameters (content : bytes , extension : str ) ->  dict :
8787 current_directory  =  os .path .dirname (__file__ )
8888 parameters  =  {
8989 "cert_bytes" : content ,
9090 "cert_path" : os .path .join (current_directory , "certificate."  +  extension ),
91+  }
92+ 
93+  try :
94+  with  open (parameters ["cert_path" ], "wb" ) as  f :
95+  f .write (parameters ["cert_bytes" ])
96+  except  IOError  as  ex :
97+  pytest .skip ("Failed to write a file: {}" .format (ex ))
98+ 
99+  return  parameters 
100+ 
101+ 
102+ def  get_certificate_with_password_parameters (password_protected_content : bytes , password : str , extension : str ) ->  dict :
103+  current_directory  =  os .path .dirname (__file__ )
104+  parameters  =  {
91105 "cert_with_password_bytes" : password_protected_content ,
92106 "cert_with_password_path" : os .path .join (current_directory , "certificate-with-password."  +  extension ),
93107 "password" : password ,
94108 }
95109
96110 try :
97-  with  open (parameters ["cert_path" ], "wb" ) as  f :
98-  f .write (parameters ["cert_bytes" ])
99111 with  open (parameters ["cert_with_password_path" ], "wb" ) as  f :
100112 f .write (parameters ["cert_with_password_bytes" ])
101113 except  IOError  as  ex :
@@ -110,31 +122,45 @@ def live_pem_certificate(live_service_principal):
110122 password_protected_content  =  os .environ .get ("PEM_CONTENT_PASSWORD_PROTECTED" )
111123 password  =  os .environ .get ("CERTIFICATE_PASSWORD" )
112124
113-  if  content  and  password_protected_content  and  password :
114-  parameters  =  get_certificate_parameters (
115-  content .encode ("utf-8" ), password_protected_content .encode ("utf-8" ), password , "pem" 
125+  cert_info  =  {}
126+ 
127+  if  content :
128+  content  =  content .replace ("\\ n" , "\r \n " )
129+  parameters  =  get_certificate_parameters (content .encode ("utf-8" ), "pem" )
130+  cert_info .update (parameters )
131+ 
132+  if  password_protected_content  and  password :
133+  parameters  =  get_certificate_with_password_parameters (
134+  password_protected_content .encode ("utf-8" ), password , "pem" 
116135 )
117-  return   dict ( live_service_principal ,  ** parameters )
136+  cert_info . update ( parameters )
118137
138+  if  cert_info :
139+  return  dict (live_service_principal , ** cert_info )
119140 pytest .skip ("Missing PEM certificate configuration" )
120141
121142
122143@pytest .fixture () 
123144def  live_pfx_certificate (live_service_principal ):
124145 # PFX bytes arrive base64 encoded because Key Vault secrets have string values 
125-  encoded_content  =  os .environ .get ("PFX_CONTENT " )
146+  encoded_content  =  os .environ .get ("PFX_CONTENTS " )
126147 encoded_password_protected_content  =  os .environ .get ("PFX_CONTENT_PASSWORD_PROTECTED" )
127148 password  =  os .environ .get ("CERTIFICATE_PASSWORD" )
128149
129-  if  encoded_content  and  encoded_password_protected_content  and  password :
130-  import  base64 
150+  cert_info  =  {}
131151
152+  if  encoded_content :
132153 content  =  base64 .b64decode (encoded_content .encode ("utf-8" ))
133-  password_protected_content  =  base64 .b64decode (encoded_password_protected_content .encode ("utf-8" ))
154+  parameters  =  get_certificate_parameters (content , "pfx" )
155+  cert_info .update (parameters )
134156
135-  parameters  =  get_certificate_parameters (content , password_protected_content , password , "pfx" )
136-  return  dict (live_service_principal , ** parameters )
157+  if  encoded_password_protected_content  and  password :
158+  password_protected_content  =  base64 .b64decode (encoded_password_protected_content .encode ("utf-8" ))
159+  parameters  =  get_certificate_with_password_parameters (password_protected_content , password , "pfx" )
160+  cert_info .update (parameters )
137161
162+  if  cert_info :
163+  return  dict (live_service_principal , ** cert_info )
138164 pytest .skip ("Missing PFX certificate configuration" )
139165
140166
0 commit comments