Skip to content
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Unreleased

- [added] Added `generate_password_reset_link()`,
`generate_email_verification_link()` and `generate_sign_in_with_email_link()`
methods to the `auth` API.
- [added] Migrated the `auth` user management API to the
new Identity Toolkit endpoint.
- [fixed] Extending HTTP retries to more HTTP methods like POST and PATCH.
Expand Down
7 changes: 7 additions & 0 deletions firebase_admin/_auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'acr', 'amr', 'at_hash', 'aud', 'auth_time', 'azp', 'cnf', 'c_hash', 'exp', 'iat',
'iss', 'jti', 'nbf', 'nonce', 'sub', 'firebase',
])
VALID_EMAIL_ACTION_TYPES = set(['VERIFY_EMAIL', 'EMAIL_SIGNIN', 'PASSWORD_RESET'])


def validate_uid(uid, required=False):
Expand Down Expand Up @@ -181,3 +182,9 @@ def validate_custom_claims(custom_claims, required=False):
raise ValueError(
'Claim "{0}" is reserved, and must not be set.'.format(invalid_claims.pop()))
return claims_str

def validate_action_type(action_type):
if action_type not in VALID_EMAIL_ACTION_TYPES:
raise ValueError('Invalid action type provided action_type: {0}. \
Valid values are {1}'.format(action_type, ', '.join(VALID_EMAIL_ACTION_TYPES)))
return action_type
118 changes: 118 additions & 0 deletions firebase_admin/_user_mgt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import requests
import six
from six.moves import urllib

from firebase_admin import _auth_utils
from firebase_admin import _user_import
Expand All @@ -30,6 +31,7 @@
USER_DELETE_ERROR = 'USER_DELETE_ERROR'
USER_IMPORT_ERROR = 'USER_IMPORT_ERROR'
USER_DOWNLOAD_ERROR = 'LIST_USERS_ERROR'
GENERATE_EMAIL_ACTION_LINK_ERROR = 'GENERATE_EMAIL_ACTION_LINK_ERROR'

MAX_LIST_USERS_RESULTS = 1000
MAX_IMPORT_USERS_SIZE = 1000
Expand Down Expand Up @@ -372,6 +374,87 @@ def photo_url(self):
def provider_id(self):
return self._data.get('providerId')

class ActionCodeSettings(object):
"""Contains required continue/state URL with optional Android and iOS settings.
Used when invoking the email action link generation APIs.
"""

def __init__(self, url, handle_code_in_app=None, dynamic_link_domain=None, ios_bundle_id=None,
android_package_name=None, android_install_app=None, android_minimum_version=None):
self.url = url
self.handle_code_in_app = handle_code_in_app
self.dynamic_link_domain = dynamic_link_domain
self.ios_bundle_id = ios_bundle_id
self.android_package_name = android_package_name
self.android_install_app = android_install_app
self.android_minimum_version = android_minimum_version

def encode_action_code_settings(settings):
""" Validates the provided action code settings for email link generation and
populates the REST api parameters.

settings - ``ActionCodeSettings`` object provided to be encoded
returns - dict of parameters to be passed for link gereration.
"""

parameters = {}
# url
if not settings.url:
raise ValueError("Dynamic action links url is mandatory")

try:
parsed = urllib.parse.urlparse(settings.url)
if not parsed.netloc:
raise ValueError('Malformed dynamic action links url: "{0}".'.format(settings.url))
parameters['continueUrl'] = settings.url
except Exception:
raise ValueError('Malformed dynamic action links url: "{0}".'.format(settings.url))

# handle_code_in_app
if settings.handle_code_in_app is not None:
if not isinstance(settings.handle_code_in_app, bool):
raise ValueError('Invalid value provided for handle_code_in_app: {0}'
.format(settings.handle_code_in_app))
parameters['canHandleCodeInApp'] = settings.handle_code_in_app

# dynamic_link_domain
if settings.dynamic_link_domain is not None:
if not isinstance(settings.dynamic_link_domain, six.string_types):
raise ValueError('Invalid value provided for dynamic_link_domain: {0}'
.format(settings.dynamic_link_domain))
parameters['dynamicLinkDomain'] = settings.dynamic_link_domain

# ios_bundle_id
if settings.ios_bundle_id is not None:
if not isinstance(settings.ios_bundle_id, six.string_types):
raise ValueError('Invalid value provided for ios_bundle_id: {0}'
.format(settings.ios_bundle_id))
parameters['iosBundleId'] = settings.ios_bundle_id

# android_* attributes
if (settings.android_minimum_version or settings.android_install_app) \
and not settings.android_package_name:
raise ValueError("Android package name is required when specifying other Android settings")

if settings.android_package_name is not None:
if not isinstance(settings.android_package_name, six.string_types):
raise ValueError('Invalid value provided for android_package_name: {0}'
.format(settings.android_package_name))
parameters['androidPackageName'] = settings.android_package_name

if settings.android_minimum_version is not None:
if not isinstance(settings.android_minimum_version, six.string_types):
raise ValueError('Invalid value provided for android_minimum_version: {0}'
.format(settings.android_minimum_version))
parameters['androidMinimumVersion'] = settings.android_minimum_version

if settings.android_install_app is not None:
if not isinstance(settings.android_install_app, bool):
raise ValueError('Invalid value provided for android_install_app: {0}'
.format(settings.android_install_app))
parameters['androidInstallApp'] = settings.android_install_app

return parameters

class UserManager(object):
"""Provides methods for interacting with the Google Identity Toolkit."""
Expand Down Expand Up @@ -537,6 +620,41 @@ def import_users(self, users, hash_alg=None):
raise ApiCallError(USER_IMPORT_ERROR, 'Failed to import users.')
return response

def generate_email_action_link(self, action_type, email, action_code_settings=None):
"""Fetches the email action links for types

Args:
action_type: String. Valid values ['VERIFY_EMAIL', 'EMAIL_SIGNIN', 'PASSWORD_RESET']
email: Email of the user for which the action is performed
action_code_settings: ``ActionCodeSettings`` object or dict (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link, etc.
Returns:
link_url: action url to be emailed to the user

Raises:
ApiCallError: If an error occurs while generating the link
ValueError: If the provided arguments are invalid
"""
payload = {
'requestType': _auth_utils.validate_action_type(action_type),
'email': _auth_utils.validate_email(email),
'returnOobLink': True
}

if action_code_settings:
payload.update(encode_action_code_settings(action_code_settings))

try:
response = self._client.body('post', '/accounts:sendOobCode', json=payload)
except requests.exceptions.RequestException as error:
self._handle_http_error(GENERATE_EMAIL_ACTION_LINK_ERROR, 'Failed to generate link.',
error)
else:
if not response or not response.get('oobLink'):
raise ApiCallError(GENERATE_EMAIL_ACTION_LINK_ERROR, 'Failed to generate link.')
return response.get('oobLink')

def _handle_http_error(self, code, msg, error):
if error.response is not None:
msg += '\nServer response: {0}'.format(error.response.content.decode())
Expand Down
77 changes: 77 additions & 0 deletions firebase_admin/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@


__all__ = [
'ActionCodeSettings',
'AuthError',
'ErrorInfo',
'ExportedUserRecord',
Expand All @@ -51,6 +52,9 @@
'create_session_cookie',
'create_user',
'delete_user',
'generate_password_reset_link',
'generate_email_verification_link',
'generate_sign_in_with_email_link',
'get_user',
'get_user_by_email',
'get_user_by_phone_number',
Expand All @@ -63,6 +67,7 @@
'verify_session_cookie',
]

ActionCodeSettings = _user_mgt.ActionCodeSettings
ErrorInfo = _user_import.ErrorInfo
ExportedUserRecord = _user_mgt.ExportedUserRecord
ListUsersPage = _user_mgt.ListUsersPage
Expand Down Expand Up @@ -448,6 +453,78 @@ def import_users(users, hash_alg=None, app=None):
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def generate_password_reset_link(email, action_code_settings=None, app=None):
"""Generates the out-of-band email action link for password reset flows for the specified email
address.

Args:
email: The email of the user whose password is to be reset.
action_code_settings: ``ActionCodeSettings`` instance (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link.
app: An App instance (optional).
Returns:
link: The password reset link created by API

Raises:
ValueError: If the provided arguments are invalid
AuthError: If an error occurs while generating the link
"""
user_manager = _get_auth_service(app).user_manager
try:
return user_manager.generate_email_action_link('PASSWORD_RESET', email,
action_code_settings=action_code_settings)
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def generate_email_verification_link(email, action_code_settings=None, app=None):
"""Generates the out-of-band email action link for email verification flows for the specified
email address.

Args:
email: The email of the user to be verified.
action_code_settings: ``ActionCodeSettings`` instance (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link.
app: An App instance (optional).
Returns:
link: The email verification link created by API

Raises:
ValueError: If the provided arguments are invalid
AuthError: If an error occurs while generating the link
"""
user_manager = _get_auth_service(app).user_manager
try:
return user_manager.generate_email_action_link('VERIFY_EMAIL', email,
action_code_settings=action_code_settings)
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def generate_sign_in_with_email_link(email, action_code_settings, app=None):
"""Generates the out-of-band email action link for email link sign-in flows, using the action
code settings provided.

Args:
email: The email of the user signing in.
action_code_settings: ``ActionCodeSettings`` instance. Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link.
app: An App instance (optional).
Returns:
link: The email sign in link created by API

Raises:
ValueError: If the provided arguments are invalid
AuthError: If an error occurs while generating the link
"""
user_manager = _get_auth_service(app).user_manager
try:
return user_manager.generate_email_action_link('EMAIL_SIGNIN', email,
action_code_settings=action_code_settings)
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def _check_jwt_revoked(verified_claims, error_code, label, app):
user = get_user(verified_claims.get('uid'), app=app)
if verified_claims.get('iat') * 1000 < user.tokens_valid_after_timestamp:
Expand Down
81 changes: 81 additions & 0 deletions integration/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import random
import time
import uuid
import six

import pytest
import requests
Expand All @@ -30,7 +31,11 @@

_verify_token_url = 'https://www.googleapis.com/identitytoolkit/v3/relyingparty/verifyCustomToken'
_verify_password_url = 'https://www.googleapis.com/identitytoolkit/v3/relyingparty/verifyPassword'
_password_reset_url = 'https://www.googleapis.com/identitytoolkit/v3/relyingparty/resetPassword'
_verify_email_url = 'https://www.googleapis.com/identitytoolkit/v3/relyingparty/setAccountInfo'
_email_sign_in_url = 'https://www.googleapis.com/identitytoolkit/v3/relyingparty/emailLinkSignin'

ACTION_LINK_CONTINUE_URL = 'http://localhost?a=1&b=5#f=1'

def _sign_in(custom_token, api_key):
body = {'token' : custom_token.decode(), 'returnSecureToken' : True}
Expand All @@ -54,6 +59,35 @@ def _random_id():
def _random_phone():
return '+1' + ''.join([str(random.randint(0, 9)) for _ in range(0, 10)])

def _reset_password(oob_code, new_password, api_key):
body = {'oobCode': oob_code, 'newPassword': new_password}
params = {'key' : api_key}
resp = requests.request('post', _password_reset_url, params=params, json=body)
resp.raise_for_status()
return resp.json().get('email')

def _verify_email(oob_code, api_key):
body = {'oobCode': oob_code}
params = {'key' : api_key}
resp = requests.request('post', _verify_email_url, params=params, json=body)
resp.raise_for_status()
return resp.json().get('email')

def _sign_in_with_email_link(email, oob_code, api_key):
body = {'oobCode': oob_code, 'email': email}
params = {'key' : api_key}
resp = requests.request('post', _email_sign_in_url, params=params, json=body)
resp.raise_for_status()
return resp.json().get('idToken')

def _validate_link_url(link, check_continue_url=True):
assert isinstance(link, six.string_types)
query = six.moves.urllib.parse.urlparse(link).query
query_dict = dict(six.moves.urllib.parse.parse_qsl(query))
if check_continue_url:
assert query_dict['continueUrl'] == ACTION_LINK_CONTINUE_URL
return query_dict['oobCode']

def test_custom_token(api_key):
custom_token = auth.create_custom_token('user1')
id_token = _sign_in(custom_token, api_key)
Expand Down Expand Up @@ -151,6 +185,18 @@ def new_user_list():
for uid in users:
auth.delete_user(uid)

@pytest.fixture
def new_user_email_unverified():
random_id, email = _random_id()
user = auth.create_user(
uid=random_id,
email=email,
email_verified=False,
password='password'
)
yield user
auth.delete_user(user.uid)

def test_get_user(new_user_with_params):
user = auth.get_user(new_user_with_params.uid)
assert user.uid == new_user_with_params.uid
Expand Down Expand Up @@ -372,6 +418,41 @@ def test_import_users_with_password(api_key):
finally:
auth.delete_user(uid)

def test_password_reset(new_user_email_unverified, api_key):
link = auth.generate_password_reset_link(new_user_email_unverified.email)
oob_code = _validate_link_url(link, check_continue_url=False)
assert new_user_email_unverified.email == _reset_password(oob_code, "newPassword", api_key)
assert auth.get_user(new_user_email_unverified.uid).email_verified

def test_email_verification(new_user_email_unverified, api_key):
link = auth.generate_email_verification_link(new_user_email_unverified.email)
oob_code = _validate_link_url(link, check_continue_url=False)
assert new_user_email_unverified.email == _verify_email(oob_code, api_key)
assert auth.get_user(new_user_email_unverified.uid).email_verified

def test_password_reset_with_settings(new_user_email_unverified, api_key):
action_code_settings = auth.ActionCodeSettings(ACTION_LINK_CONTINUE_URL)
link = auth.generate_password_reset_link(new_user_email_unverified.email,
action_code_settings=action_code_settings)
oob_code = _validate_link_url(link)
assert new_user_email_unverified.email == _reset_password(oob_code, "newPassword", api_key)
assert auth.get_user(new_user_email_unverified.uid).email_verified

def test_email_verification_with_settings(new_user_email_unverified, api_key):
action_code_settings = auth.ActionCodeSettings(ACTION_LINK_CONTINUE_URL)
link = auth.generate_email_verification_link(new_user_email_unverified.email,
action_code_settings=action_code_settings)
oob_code = _validate_link_url(link)
assert new_user_email_unverified.email == _verify_email(oob_code, api_key)
assert auth.get_user(new_user_email_unverified.uid).email_verified

def test_email_sign_in_with_settings(new_user_email_unverified, api_key):
action_code_settings = auth.ActionCodeSettings(ACTION_LINK_CONTINUE_URL)
link = auth.generate_sign_in_with_email_link(new_user_email_unverified.email,
action_code_settings=action_code_settings)
oob_code = _validate_link_url(link)
assert _sign_in_with_email_link(new_user_email_unverified.email, oob_code, api_key)
assert auth.get_user(new_user_email_unverified.uid).email_verified

class CredentialWrapper(credentials.Base):
"""A custom Firebase credential that wraps an OAuth2 token."""
Expand Down
Loading