1+ from datetime import datetime , timedelta , timezone
2+
3+ import pytest
4+ import time_machine
5+ from django .urls import reverse
6+ from rest_framework import status
7+
8+ from user .enums import TokenEnum , SystemRoleEnum
9+ from user .models import Token , PendingUser , User
10+
11+ from .conftest import api_client_with_credentials
12+
13+
14+ pytestmark = pytest .mark .django_db
15+
16+
17+ class TestAuthEndpoints :
18+ initiate_password_reset_url = reverse (
19+ 'auth:auth-initiate-password-reset' )
20+ password_change_url = reverse ('auth:password-change-list' )
21+
22+ login_url = reverse ("auth:login" )
23+ verify_account_url = reverse ("auth:auth-verify-account" )
24+ create_password_via_reset_otp_url = reverse ("auth:auth-create-password" )
25+
26+ def test_user_login (self , api_client , active_user , auth_user_password ):
27+ data = {
28+ "phone" : active_user .phone ,
29+ "password" : auth_user_password }
30+ response = api_client .post (self .login_url , data )
31+ assert response .status_code == status .HTTP_200_OK
32+ returned_json = response .json ()
33+
34+ assert 'refresh' in returned_json
35+ assert 'access' in returned_json
36+
37+ def test_deny_login_to_inactive_user (self , api_client , inactive_user , auth_user_password ):
38+ data = {
39+ "phone" : inactive_user .phone ,
40+ "password" : auth_user_password }
41+ response = api_client .post (self .login_url , data )
42+ assert response .status_code == status .HTTP_401_UNAUTHORIZED
43+
44+ def test_deny_login_invalid_credentials (self , api_client , active_user ):
45+ data = {
46+ "phone" : active_user .phone ,
47+ "password" : "wrong@pass" }
48+ response = api_client .post (self .login_url , data )
49+ assert response .status_code == status .HTTP_401_UNAUTHORIZED
50+
51+ def test_password_reset_initiate (self , mocker , api_client , active_user ):
52+ """Initiate a password reset for not authenticated user"""
53+ mock_send_reset_otp = mocker .patch (
54+ 'user.tasks.send_phone_notification.delay' )
55+ data = {
56+ 'phone' : active_user .phone ,
57+ }
58+ response = api_client .post (
59+ self .initiate_password_reset_url , data , format = "json" )
60+ assert response .status_code == status .HTTP_200_OK
61+ mock_send_reset_otp .side_effect = print (
62+ "Sent to celery task:Password Reset SMS!" )
63+
64+ token : Token = Token .objects .get (
65+ user = active_user , token_type = TokenEnum .PASSWORD_RESET )
66+ otp = token .token
67+ message_info = {
68+ 'message' : f"Password Reset!\n Use { otp } to reset your password.\n It expires in 10 minutes" ,
69+ 'phone' : active_user .phone
70+ }
71+ mock_send_reset_otp .assert_called_once_with (message_info )
72+
73+ def test_deny_initiate_password_reset (self , api_client ):
74+ """Deny password change for non-registered user"""
75+ data = {
76+ 'phone' : "+2348157777777" ,
77+ }
78+ response = api_client .post (
79+ self .initiate_password_reset_url , data , format = "json" )
80+ assert response .status_code == 400
81+
82+
83+
84+ def test_change_password_using_valid_old_password (self , api_client , authenticate_user , auth_user_password ):
85+ user = authenticate_user ()
86+ token = user ['token' ]
87+ user_instance = user ['user_instance' ]
88+ data = {
89+ 'old_password' : auth_user_password ,
90+ 'new_password' : 'newpass@@' ,
91+ }
92+ api_client_with_credentials (token , api_client )
93+ response = api_client .post (
94+ self .password_change_url , data , format = "json" )
95+ assert response .status_code == status .HTTP_200_OK
96+ user_instance .refresh_from_db ()
97+ assert user_instance .check_password ('newpass@@' )
98+
99+ def test_deny_change_password_using_invalid_old_password (self , api_client , authenticate_user ):
100+ user = authenticate_user ()
101+ token = user ['token' ]
102+ data = {
103+ 'old_password' : 'invalidpass' ,
104+ 'new_password' : 'New87ge&nerated' ,
105+ }
106+ api_client_with_credentials (token , api_client )
107+ response = api_client .post (
108+ self .password_change_url , data , format = "json" )
109+ assert response .status_code == status .HTTP_400_BAD_REQUEST
110+
111+ def test_deny_change_password_for_unathenticated_user (self , api_client ):
112+ """Only Authenticated User can change password using old valid password"""
113+ data = {
114+ 'old_password' : 'invalidpass' ,
115+ 'new_password' : 'New87ge&nerated' ,
116+ }
117+ response = api_client .post (
118+ self .password_change_url , data , format = "json" )
119+ assert response .status_code == status .HTTP_401_UNAUTHORIZED
120+
121+ def test_verify_account_using_otp (self , api_client ):
122+ pending_user = PendingUser .objects .create (phone = '+2548157787640' ,
123+ verification_code = 1234 ,
124+ password = 'somesecret'
125+ )
126+
127+ data = {'otp' : pending_user .verification_code ,
128+ 'phone' : pending_user .phone }
129+ response = api_client .post (self .verify_account_url , data )
130+ assert response .status_code == 200
131+ user_object = User .objects .get (phone = pending_user .phone )
132+ assert user_object .verified == True
133+ assert user_object .is_active == True
134+ assert user_object .roles == [SystemRoleEnum .CUSTOMER ]
135+
136+ def test_deny_verify_account_expired_otp (self , api_client ):
137+ """Prevent account verification if OTP has expired"""
138+ pending_user = PendingUser .objects .create (phone = '+2548157787640' ,
139+ verification_code = 1234 ,
140+ password = 'somesecret'
141+ )
142+ with time_machine .travel (datetime .now (timezone .utc ) + timedelta (minutes = 13 )):
143+ data = {'otp' : pending_user .verification_code ,
144+ 'phone' : pending_user .phone }
145+ response = api_client .post (self .verify_account_url , data )
146+ assert response .status_code == 400
147+
148+ def test_deny_verify_account_using_invalid_otp (self , api_client ):
149+ pending_user = PendingUser .objects .create (phone = '+2548157787640' ,
150+ verification_code = 1234 ,
151+ password = 'somesecret'
152+ )
153+
154+ data = {'otp' : 3456 ,
155+ 'phone' : pending_user .phone }
156+ response = api_client .post (self .verify_account_url , data )
157+ assert response .status_code == 400
158+
159+ def test_create_new_password_using_valid_reset_otp (self , api_client , active_user , token_factory ):
160+ token : Token = token_factory (
161+ user = active_user , token_type = TokenEnum .PASSWORD_RESET )
162+ data = {
163+ "otp" : token .token ,
164+ "new_password" : "new_pass_me"
165+ }
166+ response = api_client .post (
167+ self .create_password_via_reset_otp_url , data )
168+ assert response .status_code == 200
169+ active_user .refresh_from_db ()
170+ assert active_user .check_password ('new_pass_me' )
171+
172+ def test_deny_create_new_password_using_invalid_reset_otp (self , api_client , active_user , token_factory ):
173+ token_factory (
174+ token_type = TokenEnum .PASSWORD_RESET , user = active_user , token = 1234 )
175+ data = {
176+ "otp" : 4321 ,
177+ "new_password" : "new_pass_me"
178+ }
179+ response = api_client .post (
180+ self .create_password_via_reset_otp_url , data )
181+ assert response .status_code == 400
0 commit comments