Skip to content
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Unreleased

- [added] Added `generate_password_reset_link()`,
`generate_email_verification_link()` and `generate_email_sign_in_link()`
methods to the `auth` API.
- [added] Migrated the `auth` user management API to the
new Identity Toolkit endpoint.
- [fixed] Extending HTTP retries to more HTTP methods like POST and PATCH.
Expand Down
72 changes: 72 additions & 0 deletions firebase_admin/_auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'acr', 'amr', 'at_hash', 'aud', 'auth_time', 'azp', 'cnf', 'c_hash', 'exp', 'iat',
'iss', 'jti', 'nbf', 'nonce', 'sub', 'firebase',
])
VALID_ACTION_TYPE = set(['VERIFY_EMAIL', 'EMAIL_SIGNIN', 'PASSWORD_RESET'])


def validate_uid(uid, required=False):
Expand Down Expand Up @@ -181,3 +182,74 @@ def validate_custom_claims(custom_claims, required=False):
raise ValueError(
'Claim "{0}" is reserved, and must not be set.'.format(invalid_claims.pop()))
return claims_str

def validate_action_code_settings(settings):
""" Validates the provided action code settings for email link generation and
populates the REST api parameters.

settings - dict provided to build the ActionCodeSettings object
returns - dict of parameters to be passed for link gereration.
"""
if not isinstance(settings, dict):
raise ValueError('Invalid data argument: {0}. Must be a dictionary.'.format(settings))

parameters = {}
# Validate url
url = settings.get('url', None)
if url:
try:
parsed = urllib.parse.urlparse(url)
if not parsed.netloc:
raise ValueError('Malformed photo URL: "{0}".'.format(url))
parameters['continueUrl'] = url
except Exception:
raise ValueError('Malformed photo URL: "{0}".'.format(url))

# Validate boolean types
for field in ['handle_code_in_app', 'android_install_app']:
if not isinstance(settings.get(field, False), bool):
raise ValueError('Invalid value provided for {0}: {1}'.format(
field, settings.get(field, False)))

# Validate string types
for field in ['dynamic_link_domain', 'ios_bundle_id',
'android_package_name', 'android_minimum_version']:
if not isinstance(settings.get(field, ''), six.string_types):
raise ValueError('Invalid value provided for {0}: {1}'.format(
field, settings.get(field, False)))

# handle_code_in_app
handle_code_in_app = settings.get('handle_code_in_app', False)
if handle_code_in_app:
parameters['canHandleCodeInApp'] = handle_code_in_app

# dynamic_link_domain
dynamic_link_domain = settings.get('dynamic_link_domain', None)
if dynamic_link_domain:
parameters['dynamicLinkDomain'] = dynamic_link_domain

# ios_bundle_id
ios_bundle_id = settings.get('ios_bundle_id', None)
if ios_bundle_id:
parameters['iosBundleId'] = ios_bundle_id

# android_* attributes
android_package_name = settings.get('android_package_name', None)
android_minimum_version = settings.get('android_minimum_version', None)
android_install_app = settings.get('android_install_app', False)
if (android_minimum_version or android_install_app) and not android_package_name:
raise ValueError("Android package name is required when specifying other Android settings")

if android_package_name:
parameters['androidPackageName'] = android_package_name
if android_minimum_version:
parameters['androidMinimumVersion'] = android_minimum_version
if android_install_app:
parameters['androidInstallApp'] = android_install_app
return parameters

def validate_action_type(action_type):
if not action_type in VALID_ACTION_TYPE:
raise ValueError('Invalid action type provided action_type: {0}. \
Valid values are {1}'.format(action_type, VALID_ACTION_TYPE))
return action_type
114 changes: 114 additions & 0 deletions firebase_admin/_user_mgt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Firebase user management sub module."""

import copy
import json

import requests
Expand All @@ -30,6 +31,7 @@
USER_DELETE_ERROR = 'USER_DELETE_ERROR'
USER_IMPORT_ERROR = 'USER_IMPORT_ERROR'
USER_DOWNLOAD_ERROR = 'LIST_USERS_ERROR'
USER_LINK_GENERATE_ERROR = 'USER_LINK_GENERATE_ERROR'

MAX_LIST_USERS_RESULTS = 1000
MAX_IMPORT_USERS_SIZE = 1000
Expand Down Expand Up @@ -372,6 +374,81 @@ def photo_url(self):
def provider_id(self):
return self._data.get('providerId')

class ActionCodeSettings(object):
"""Contains required continue/state URL with optional Android and iOS settings.
Used when invoking the email action link generation APIs.
"""
KEYS = set(['url', 'handle_code_in_app', 'dynamic_link_domain', 'ios_bundle_id',
'android_package_name', 'android_minimum_version', 'android_install_app'])

def __init__(self, data=None):
super(ActionCodeSettings, self).__init__()
data = data or {}
if not isinstance(data, dict):
raise ValueError('Invalid data argument: {0}. Must be a dictionary.'.format(data))
if len(set(six.iterkeys(data)) - self.KEYS):
raise ValueError('Invalid settings provided: {0}. Valid dictionary \
have following keys {1}'.format(data, self.KEYS))
self._data = copy.deepcopy(data)

@property
def url(self):
return self._data.get('url', None)

@url.setter
def url(self, url):
self._data['url'] = url

@property
def handle_code_in_app(self):
return self._data.get('handle_code_in_app', False)

@handle_code_in_app.setter
def handle_code_in_app(self, handle_code_in_app):
self._data['handle_code_in_app'] = handle_code_in_app

@property
def dynamic_link_domain(self):
return self._data.get('dynamic_link_domain', None)

@dynamic_link_domain.setter
def dynamic_link_domain(self, dynamic_link_domain):
self._data['dynamic_link_domain'] = dynamic_link_domain

@property
def ios_bundle_id(self):
return self._data.get('ios_bundle_id', None)

@ios_bundle_id.setter
def ios_bundle_id(self, ios_bundle_id):
self._data['ios_bundle_id'] = ios_bundle_id

@property
def android_package_name(self):
return self._data.get('android_package_name', None)

@android_package_name.setter
def android_package_name(self, android_package_name):
self._data['android_package_name'] = android_package_name

@property
def android_minimum_version(self):
return self._data.get('android_minimum_version', None)

@android_minimum_version.setter
def android_minimum_version(self, android_minimum_version):
self._data['android_minimum_version'] = android_minimum_version

@property
def android_install_app(self):
return self._data.get('android_install_app', False)

@android_install_app.setter
def android_install_app(self, android_install_app):
self._data['android_install_app'] = android_install_app

def to_parameters_dict(self):
return _auth_utils.validate_action_code_settings(self._data)

class UserManager(object):
"""Provides methods for interacting with the Google Identity Toolkit."""
Expand Down Expand Up @@ -537,6 +614,43 @@ def import_users(self, users, hash_alg=None):
raise ApiCallError(USER_IMPORT_ERROR, 'Failed to import users.')
return response

def generate_email_action_link(self, action_type, email, settings=None):
"""Fetches the email action links for types

Args:
action_type: String. Valid values ['VERIFY_EMAIL', 'EMAIL_SIGNIN', 'PASSWORD_RESET']
email: Email of the user for which the action is performed
settings: ``ActionCodeSettings`` object or dict (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link, etc.
Returns:
link_url: action url to be emailed to the user

Raises:
ApiCallError: If an error occurs while generating the link

"""
payload = {
'requestType': _auth_utils.validate_action_type(action_type),
'email': _auth_utils.validate_email(email),
'returnOobLink': True
}

if settings and not isinstance(settings, ActionCodeSettings):
settings = ActionCodeSettings(settings)

if settings and isinstance(settings, ActionCodeSettings):
payload.update(settings.to_parameters_dict())

try:
response = self._client.body('post', '/accounts:sendOobCode', json=payload)
except requests.exceptions.RequestException as error:
self._handle_http_error(USER_LINK_GENERATE_ERROR, 'Failed to generate link.', error)
else:
if not response or not response.get('oobLink'):
raise ApiCallError(USER_LINK_GENERATE_ERROR, 'Failed to generate link.')
return response.get('oobLink')

def _handle_http_error(self, code, msg, error):
if error.response is not None:
msg += '\nServer response: {0}'.format(error.response.content.decode())
Expand Down
74 changes: 74 additions & 0 deletions firebase_admin/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@


__all__ = [
'ActionCodeSettings',
'AuthError',
'ErrorInfo',
'ExportedUserRecord',
Expand All @@ -51,6 +52,9 @@
'create_session_cookie',
'create_user',
'delete_user',
'generate_password_reset_link',
'generate_email_verification_link',
'generate_email_sign_in_link',
'get_user',
'get_user_by_email',
'get_user_by_phone_number',
Expand All @@ -63,6 +67,7 @@
'verify_session_cookie',
]

ActionCodeSettings = _user_mgt.ActionCodeSettings
ErrorInfo = _user_import.ErrorInfo
ExportedUserRecord = _user_mgt.ExportedUserRecord
ListUsersPage = _user_mgt.ListUsersPage
Expand Down Expand Up @@ -448,6 +453,75 @@ def import_users(users, hash_alg=None, app=None):
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def generate_password_reset_link(email, settings=None, app=None):
"""Generates the out-of-band email action link for password reset flows for the specified email
address.

Args:
email: The email of the user whose password is to be reset.
settings: dict or ``ActionCodeSettings`` instance (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link, etc.
app: An App instance (optional).
Returns:
link: The password reset link created by API

Raises:
ValueError: If the provided arguments are invalid
AuthError: If an error occurs while generating the link
"""
user_manager = _get_auth_service(app).user_manager
try:
return user_manager.generate_email_action_link('PASSWORD_RESET', email, settings)
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def generate_email_verification_link(email, settings=None, app=None):
"""Generates the out-of-band email action link for email verification flows for the specified
email address.

Args:
email: The email of the user to be verified.
settings: dict or ``ActionCodeSettings`` instance (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link, etc.
app: An App instance (optional).
Returns:
link: The email verification link created by API

Raises:
ValueError: If the provided arguments are invalid
AuthError: If an error occurs while generating the link
"""
user_manager = _get_auth_service(app).user_manager
try:
return user_manager.generate_email_action_link('VERIFY_EMAIL', email, settings)
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def generate_email_sign_in_link(email, settings=None, app=None):
"""Generates the out-of-band email action link for email link sign-in flows, using the action
code settings provided.

Args:
email: The email of the user signing in.
settings: dict or ``ActionCodeSettings`` instance (optional). Defines whether
the link is to be handled by a mobile app and the additional state information to be
passed in the deep link, etc.
app: An App instance (optional).
Returns:
link: The email sign in link created by API

Raises:
ValueError: If the provided arguments are invalid
AuthError: If an error occurs while generating the link
"""
user_manager = _get_auth_service(app).user_manager
try:
return user_manager.generate_email_action_link('EMAIL_SIGNIN', email, settings)
except _user_mgt.ApiCallError as error:
raise AuthError(error.code, str(error), error.detail)

def _check_jwt_revoked(verified_claims, error_code, label, app):
user = get_user(verified_claims.get('uid'), app=app)
if verified_claims.get('iat') * 1000 < user.tokens_valid_after_timestamp:
Expand Down
39 changes: 39 additions & 0 deletions integration/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import random
import time
import uuid
import six

import pytest
import requests
Expand Down Expand Up @@ -372,6 +373,44 @@ def test_import_users_with_password(api_key):
finally:
auth.delete_user(uid)

@pytest.fixture
def action_code_settings():
data = {
'url': 'http://localhost',
}
return auth.ActionCodeSettings(data)

def _validate_link_url(link):
assert isinstance(link, six.string_types)
six.moves.urllib.parse.urlparse(link)

def test_password_reset(new_user_with_params):
link = auth.generate_password_reset_link(new_user_with_params.email)
_validate_link_url(link)

def test_email_verification(new_user_with_params):
link = auth.generate_email_verification_link(new_user_with_params.email)
_validate_link_url(link)

def test_email_sign_in(new_user_with_params):
link = auth.generate_email_sign_in_link(new_user_with_params.email)
_validate_link_url(link)

def test_password_reset_with_settings(new_user_with_params, action_code_settings):
link = auth.generate_password_reset_link(new_user_with_params.email,
settings=action_code_settings)
_validate_link_url(link)

def test_email_verification_with_settings(new_user_with_params, action_code_settings):
link = auth.generate_email_verification_link(new_user_with_params.email,
settings=action_code_settings)
_validate_link_url(link)

def test_email_sign_in_with_settings(new_user_with_params, action_code_settings):
link = auth.generate_email_sign_in_link(new_user_with_params.email,
settings=action_code_settings)
_validate_link_url(link)


class CredentialWrapper(credentials.Base):
"""A custom Firebase credential that wraps an OAuth2 token."""
Expand Down
Loading