Skip to content

Commit 65345ee

Browse files
committed
Example Flask with flask-restful and flask-jwt-extended
1 parent 7a711d4 commit 65345ee

File tree

1 file changed

+159
-0
lines changed

1 file changed

+159
-0
lines changed

backend/app.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from flask import Flask, request, jsonify
2+
from pymongo import MongoClient
3+
from flask_restful import Resource, Api
4+
from bson.json_util import dumps
5+
from passlib.hash import sha256_crypt
6+
from flask_jwt_extended import create_access_token,create_refresh_token,get_jwt,get_jwt_identity, jwt_required, JWTManager
7+
import datetime
8+
import time
9+
# py -m flask --app app --debug run
10+
11+
client = MongoClient()
12+
db = client.example_db
13+
demo_collection = db.demo_collection
14+
user_collection = db.user_collection
15+
token_collection = db.token_collection
16+
17+
app = Flask(__name__, template_folder='templates')
18+
19+
# For helping with Same-Origin requests from the React App
20+
@app.after_request
21+
def after_request(response):
22+
response.headers.add('Access-Control-Allow-Origin', '*')
23+
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
24+
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
25+
return response
26+
27+
api = Api(app)
28+
29+
jwt = JWTManager(app)
30+
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string-example'
31+
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = datetime.timedelta(minutes=240)
32+
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = datetime.timedelta(days=30)
33+
34+
@jwt.token_in_blocklist_loader
35+
def check_if_token_in_blacklist(jwt_header, jwt_payload: dict):
36+
jti = jwt_payload['jti']
37+
return is_jti_blacklisted(jti)
38+
39+
def is_jti_blacklisted(jti):
40+
if db.token_collection.count_documents({"jti": jti}) != 0:
41+
return True
42+
else:
43+
return False
44+
45+
app.config["INITIAL_USERNAME"] = "user@test.com"
46+
app.config["INITIAL_PASSWORD"] = sha256_crypt.encrypt("initial_password")
47+
48+
# Create the Initial User specified in the configuration data
49+
def create_initial_user():
50+
temp_dict = {"email": app.config["INITIAL_USERNAME"], "password": app.config["INITIAL_PASSWORD"]}
51+
if db.user_collection.count_documents({"email": app.config["INITIAL_USERNAME"]}) != 0:
52+
print("[*] Found Initial User")
53+
filter = {"email": app.config["INITIAL_USERNAME"]}
54+
record = user_collection.update_one(filter, {"$set": temp_dict})
55+
else:
56+
print("[*] Adding Initial User")
57+
user_collection.update_one(temp_dict, {'$set':temp_dict}, upsert=True)
58+
create_initial_user()
59+
60+
# Verify the initial user was created and password functionality is working as expected
61+
def verify_initial_user():
62+
temp_doc = user_collection.find_one({'email': app.config["INITIAL_USERNAME"]}, {})
63+
if sha256_crypt.verify("change_me", temp_doc['password']):
64+
print("[*] Successfully Verified Initial User")
65+
verify_initial_user()
66+
67+
68+
# REST API Specifications for retrieving and setting scan configurations
69+
class Scan(Resource):
70+
@jwt_required()
71+
def get(self):
72+
time.sleep(1)
73+
scans = demo_collection.aggregate([{"$project": {'id': '$_id', 'name': 1, 'category': 1, 'test_object': 1, '_id': 0}}, ])
74+
return dumps(scans)
75+
76+
@jwt_required()
77+
def post(self):
78+
data = request.get_json()
79+
return jsonify({'data': data}), 201
80+
api.add_resource(Scan, '/api/demo')
81+
82+
# REST API Specifications for creating JSON Web Tokens (JWTs)
83+
class Login(Resource):
84+
def post(self):
85+
email = request.json.get("email", None)
86+
password = request.json.get("password", None)
87+
if (email is None) or (password is None):
88+
return {"msg": "Wrong Email or Password!"}, 401
89+
temp_doc = user_collection.find_one({'email':email}, {})
90+
if db.user_collection.count_documents({"email": email}) != 0:
91+
if sha256_crypt.verify(password, temp_doc['password']):
92+
print("[*] Successfully Verified User")
93+
access_token = create_access_token(identity=email)
94+
refresh_token = create_refresh_token(identity=email)
95+
response = {"msg":"Successfully Authenticated", "access_token": access_token, "refresh_token": refresh_token}
96+
return response
97+
else:
98+
return {"msg": "Wrong Email or Password!"}, 401
99+
else:
100+
return {"msg": "Wrong Email or Password!"}, 401
101+
api.add_resource(Login, '/api/login')
102+
103+
# Revoke an Active Token via adding to Block List
104+
class Logout(Resource):
105+
@jwt_required()
106+
def post(self):
107+
jti_data = get_jwt()
108+
id = get_jwt_identity()
109+
jti = jti_data['jti']
110+
try:
111+
if is_jti_blacklisted(jti):
112+
pass
113+
else:
114+
revoked_token = RevokedToken(id, jti)
115+
revoked_token.add()
116+
return {'msg': 'Access Token has been Revoked'}
117+
except Exception as e:
118+
print(e)
119+
return {'msg': 'Something Went Wrong Revoking Access Token'}, 500
120+
api.add_resource(Logout, '/api/logout')
121+
122+
class TokenRefresh(Resource):
123+
@jwt_required(refresh=True)
124+
def post(self):
125+
current_user = get_jwt_identity()
126+
access_token = create_access_token(identity = current_user)
127+
return {'access_token': access_token}
128+
api.add_resource(TokenRefresh, '/api/refresh')
129+
130+
class ValidateToken(Resource):
131+
@jwt_required()
132+
def get(self):
133+
try:
134+
jti_data = get_jwt()
135+
jti = jti_data['jti']
136+
print(jti)
137+
if is_jti_blacklisted(jti):
138+
return {'msg': 'Revoked'}
139+
else:
140+
return {'msg': 'Not Revoked'}
141+
except Exception as e:
142+
print(e)
143+
return {'msg': 'Invalid JWT'}
144+
api.add_resource(ValidateToken, '/api/validate')
145+
146+
class Add_User(Resource):
147+
@jwt_required()
148+
def post(self):
149+
pass
150+
151+
152+
class RevokedToken:
153+
def __init__(self, ident, jti):
154+
self.id = ident
155+
self.jti = jti
156+
self.temp_dict = {"id":self.id, "jti":self.jti}
157+
158+
def add(self):
159+
token_collection.insert_one(self.temp_dict, {'$set':self.temp_dict})

0 commit comments

Comments
 (0)