first commit
This commit is contained in:
295
server/routes/auth.py
Normal file
295
server/routes/auth.py
Normal file
@@ -0,0 +1,295 @@
|
||||
from flask import Blueprint, request, jsonify
|
||||
#from werkzeug.security import generate_password_hash, check_password_hash
|
||||
from utils.email import send_email, send_gmail
|
||||
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
|
||||
from flask_jwt_extended import decode_token
|
||||
import datetime
|
||||
import random
|
||||
import os
|
||||
from datetime import timezone
|
||||
import hashlib
|
||||
from models.users import Users
|
||||
from utils.welcome_email import WelcomeMessage
|
||||
from models.audit import Audit, AuditModel
|
||||
|
||||
auth_bp = Blueprint("auth", __name__)
|
||||
audit = Audit()
|
||||
|
||||
def hash_password(password: str) -> bytes:
|
||||
return hashlib.md5(password.encode('utf-8')).hexdigest()
|
||||
|
||||
@auth_bp.route("/register", methods=["POST"])
|
||||
def register():
|
||||
users = Users()
|
||||
data = request.get_json()
|
||||
email = data.get("email")
|
||||
password = data.get("password")
|
||||
workspace_id = data.get("workspace_id")
|
||||
|
||||
if not email or not password:
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
existing_user = users.get_user_by_email(email)
|
||||
if existing_user:
|
||||
entry = AuditModel(action=f"Attempt to register an existing email:{email}", status='409 - User already exists')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "User already exists"}), 409
|
||||
|
||||
users.register_user(email, password, workspace_id)
|
||||
|
||||
welcome_message = WelcomeMessage(email)
|
||||
welcome_message.send_email()
|
||||
entry = AuditModel(action=f"User registered successfully:{email}", status='201 - User created')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"message": "User registered successfully!"}), 201
|
||||
|
||||
@auth_bp.route("/login", methods=["POST"])
|
||||
def login():
|
||||
users = Users()
|
||||
data = request.get_json()
|
||||
email = data.get("email", "").strip().lower()
|
||||
password = data.get("password", "")
|
||||
|
||||
if not email or not password:
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to login: {email}", status='400 - Missing email or password')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Missing email or password"}), 400
|
||||
|
||||
user = users.get_user_by_email(email)
|
||||
if not user or not hash_password(password)==user.password:
|
||||
print(user.password, password)
|
||||
entry = AuditModel(user_id=user.id ,action=f"Attempt to login: {email}", status='401 - Invalid credentials')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Invalid credentials"}), 401
|
||||
|
||||
if user.active != 1:
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to login: {email}", status='401 - User inactive')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Inactive user"}), 401
|
||||
|
||||
otp_code = str(random.randint(100000, 999999))
|
||||
expiration = datetime.datetime.now(timezone.utc) + datetime.timedelta(minutes=10)
|
||||
users.update_user_otp(user.id, otp_code, expiration)
|
||||
|
||||
send_gmail(
|
||||
to_email=user.email,
|
||||
subject="Your Login Verification Code",
|
||||
body=f"Your login verification code is: {otp_code}"
|
||||
)
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to login: {email}", status='200 - Verification code send!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"message": "Verification code sent to your email."}), 200
|
||||
|
||||
|
||||
@auth_bp.route("/verify_code", methods=["POST"])
|
||||
def verify_code():
|
||||
users = Users()
|
||||
data = request.get_json()
|
||||
email = data.get("email", "").strip().lower()
|
||||
code = data.get("code", "")
|
||||
|
||||
if not email or not code:
|
||||
entry = AuditModel(action=f"Attempt to verify code: {email}", status='400 - Missing email or verification code!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Missing email or verification code"}), 400
|
||||
|
||||
user = users.get_user_by_email(email)
|
||||
#-----------------------------------------------> for testing only remove in prod
|
||||
#if email != 'test@test.com':
|
||||
#-----------------------------------------------> for testing only remove in prod
|
||||
if not user or user.otp_code != code:
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to verify code: {email}", status='401 - Invalid code!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Invalid code"}), 401
|
||||
|
||||
exp = user.otp_expiration
|
||||
# Normalize to aware UTC datetime for safe comparison across SQLite (string) and Postgres (datetime)
|
||||
now_utc = datetime.datetime.now(timezone.utc)
|
||||
if isinstance(exp, str):
|
||||
try:
|
||||
exp_dt = datetime.datetime.fromisoformat(exp)
|
||||
except Exception:
|
||||
return jsonify({"error": "Invalid expiration format"}), 500
|
||||
if exp_dt.tzinfo is None:
|
||||
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
# Assume a datetime object from DB driver
|
||||
exp_dt = exp
|
||||
if exp_dt is None:
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to verify code:{email}", status='500 - Missing expiration!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Missing expiration"}), 500
|
||||
if exp_dt.tzinfo is None:
|
||||
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
|
||||
if now_utc > exp_dt:
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to verify code:{email}", status='403 - Verification code has expired!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Verification code has expired"}), 403
|
||||
|
||||
users.clear_user_otp(user.id)
|
||||
|
||||
access_token = create_access_token(
|
||||
identity=str(user.id),
|
||||
expires_delta=datetime.timedelta(hours=12)
|
||||
)
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to verify code:{email}", status='200 Ok - Login successful!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({
|
||||
"message": "Login successful",
|
||||
"access_token": access_token
|
||||
}), 200
|
||||
|
||||
|
||||
@auth_bp.route("/forgot_password", methods=["POST"])
|
||||
def forgot_password():
|
||||
users = Users()
|
||||
data = request.get_json()
|
||||
email = data.get("email", "").strip().lower()
|
||||
|
||||
if not email:
|
||||
entry = AuditModel(action=f"Attempt to recover password", status='400 - Email is required!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Email is required"}), 400
|
||||
|
||||
user = users.get_user_by_email(email)
|
||||
if user:
|
||||
reset_token = create_access_token(
|
||||
identity=user["id"],
|
||||
expires_delta=datetime.timedelta(minutes=15),
|
||||
additional_claims={"purpose": "password_reset"}
|
||||
)
|
||||
|
||||
send_gmail(
|
||||
to_email=user["email"],
|
||||
subject="Password Reset Request",
|
||||
body=(
|
||||
"Click the link to reset your password: "
|
||||
f"{os.getenv('FRONTEND_BASE_URL', 'http://127.0.0.1:5100')}/reset_password?token={reset_token}"
|
||||
)
|
||||
)
|
||||
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to recover password: {email}", status='200 Ok - If this email is registered, a reset link has been sent!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"message": "If this email is registered, a reset link has been sent."}), 200
|
||||
|
||||
|
||||
@auth_bp.route("/reset_password", methods=["POST"])
|
||||
def reset_password():
|
||||
users = Users()
|
||||
data = request.get_json()
|
||||
token = data.get("token", "")
|
||||
new_password = data.get("new_password", "")
|
||||
|
||||
if not token or not new_password:
|
||||
entry = AuditModel( action=f"Attempt to recover password", status='400 - Missing token or new password!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Missing token or new password"}), 400
|
||||
|
||||
try:
|
||||
decoded_token = decode_token(token)
|
||||
if decoded_token.get("purpose") != "password_reset":
|
||||
entry = AuditModel( action=f"Attempt to recover password", status='403 - Invalid token purpose!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Invalid token purpose"}), 403
|
||||
except Exception:
|
||||
entry = AuditModel( action=f"Attempt to recover password", status='403 - Invalid or expired token!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Invalid or expired token"}), 403
|
||||
|
||||
user_id = decoded_token["sub"]
|
||||
user = users.get_user(user_id)
|
||||
if not user:
|
||||
entry = AuditModel( action=f"Attempt to recover password", status='404 - User not found!')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
users.update_password(user_id, new_password)
|
||||
entry = AuditModel(user_id=user.id, action=f"Attempt to recover password:{user.email}", status='200 ok - Password has been reset successfully.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"message": "Password has been reset successfully."}), 200
|
||||
|
||||
|
||||
@auth_bp.route("/me", methods=["GET"])
|
||||
@jwt_required()
|
||||
def me():
|
||||
users = Users()
|
||||
user_id = get_jwt_identity()
|
||||
user = users.get_user(user_id)
|
||||
if not user:
|
||||
entry = AuditModel( action=f"Get user data: ", status='404 - User not found.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
entry = AuditModel(user_id=user.id, action=f"Get user data: {user.email}", status='200 Ok.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({
|
||||
'id': user.id,
|
||||
'workspace_id': user.workspace_id,
|
||||
'first_name':user.first_name,
|
||||
'last_name': user.last_name,
|
||||
'email':user.email,
|
||||
'address':user.address,
|
||||
'profession':user.profession,
|
||||
'role':user.role,
|
||||
'status': user.status,
|
||||
'profile_pic': user.profile_pic,
|
||||
'created_at': user.created_at,
|
||||
'otp_code': user.otp_code,
|
||||
'otp_expiration': user.otp_expiration
|
||||
}), 200
|
||||
|
||||
|
||||
# Validate token endpoint
|
||||
@auth_bp.route("/validate_token", methods=["GET"])
|
||||
@jwt_required()
|
||||
def validate_token():
|
||||
users = Users()
|
||||
user_id = get_jwt_identity()
|
||||
user = users.get_user(user_id)
|
||||
if not user:
|
||||
entry = AuditModel(action=f"Get access token:", status='404 - User not found.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
entry = AuditModel(user_id=user.id, action=f"Get access token: {user.email}", status='200 Ok - Token is valid')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"message": "Token is valid"}), 200
|
||||
|
||||
# @auth_bp.route("/temporary_password", methods=["POST"])
|
||||
# @jwt_required()
|
||||
# def change_passwd():
|
||||
# data = request.get_json()
|
||||
# if not data:
|
||||
# entry = AuditModel(action=f"Get temporary password token:", status='404 - Password not found.')
|
||||
# audit.new_entry(entry)
|
||||
# return jsonify({"error": "Password not found"}), 404
|
||||
# users = Users()
|
||||
# user_id = get_jwt_identity()
|
||||
# new_password_hash = generate_password_hash(data['password'])
|
||||
# users.update_user_password(user_id, new_password_hash)
|
||||
# users.update_temp_pass(user_id)
|
||||
# entry = AuditModel(user_id=user_id, action=f"Get temporary password:", status='200 Ok - Password has been updated successfully.')
|
||||
# audit.new_entry(entry)
|
||||
# return jsonify({"message": "Password has been updated successfully."}), 200
|
||||
|
||||
|
||||
@auth_bp.route("/update_passwrod", methods=["POST"])
|
||||
def update_passwrod():
|
||||
data = request.get_json()
|
||||
email = data.get("email", "").strip().lower()
|
||||
password = data.get('password')
|
||||
token = data.get('token')
|
||||
env_token = os.getenv('PASSWORD_TOKEN')
|
||||
if not email and not password and not token:
|
||||
entry = AuditModel(action=f"Update Password:", status='403 - Data not provided.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Data not provided"}), 403
|
||||
if env_token != env_token:
|
||||
entry = AuditModel(action=f"Update Password:", status='401 - Invalid token.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"error": "Data not provided"}), 401
|
||||
|
||||
users = Users()
|
||||
users.update_password(email, password)
|
||||
entry = AuditModel(action=f"Update Password:", status='200 - Password has been updated successfully.')
|
||||
audit.new_entry(entry)
|
||||
return jsonify({"message": "Password has been updated successfully."}), 200
|
||||
Reference in New Issue
Block a user