Files
TMS/transportmanager/server/routes/auth.py

224 lines
7.3 KiB
Python

from flask import Blueprint, request, jsonify
from werkzeug.security import generate_password_hash, check_password_hash
from utils.email import send_email, send_gmail
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
from flask_jwt_extended import decode_token
import datetime
import random
import os
from datetime import timezone
from models.user import Users
from utils.welcome_email import WelcomeMessage
auth_bp = Blueprint("auth", __name__)
@auth_bp.route("/register", methods=["POST"])
def register():
users = Users()
data = request.get_json()
name = data.get("name")
email = data.get("email")
password = data.get("password")
if not name or not email or not password:
return jsonify({"error": "Missing required fields"}), 400
existing_user = users.get_user_by_email(email)
if existing_user:
return jsonify({"error": "User already exists"}), 409
password_hash = generate_password_hash(password)
users.insert_user(name, email, password_hash)
welcome_message = WelcomeMessage(name, email)
welcome_message.send_email()
return jsonify({"message": "User registered successfully!"}), 201
@auth_bp.route("/login", methods=["POST"])
def login():
users = Users()
data = request.get_json()
email = data.get("email", "").strip().lower()
password = data.get("password", "")
if not email or not password:
return jsonify({"error": "Missing email or password"}), 400
user = users.get_user_by_email(email)
if not user or not check_password_hash(user["password_hash"], password):
return jsonify({"error": "Invalid credentials"}), 401
if user["active"] != 1:
return jsonify({"error": "Inactive user"}), 401
otp_code = str(random.randint(100000, 999999))
expiration = datetime.datetime.now(timezone.utc) + datetime.timedelta(minutes=10)
users.update_user_otp(user["id"], otp_code, expiration)
send_gmail(
to_email=user["email"],
subject="Your Login Verification Code",
body=f"Your login verification code is: {otp_code}"
)
return jsonify({"message": "Verification code sent to your email."}), 200
@auth_bp.route("/verify_code", methods=["POST"])
def verify_code():
users = Users()
data = request.get_json()
email = data.get("email", "").strip().lower()
code = data.get("code", "")
if not email or not code:
return jsonify({"error": "Missing email or verification code"}), 400
user = users.get_user_by_email(email)
#-----------------------------------------------> for testing only remove in prod
#if email != 'test@test.com':
#-----------------------------------------------> for testing only remove in prod
if not user or user.get("otp_code") != code:
return jsonify({"error": "Invalid code"}), 401
exp = user.get("otp_expiration")
# Normalize to aware UTC datetime for safe comparison across SQLite (string) and Postgres (datetime)
now_utc = datetime.datetime.now(timezone.utc)
if isinstance(exp, str):
try:
exp_dt = datetime.datetime.fromisoformat(exp)
except Exception:
return jsonify({"error": "Invalid expiration format"}), 500
if exp_dt.tzinfo is None:
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
else:
# Assume a datetime object from DB driver
exp_dt = exp
if exp_dt is None:
return jsonify({"error": "Missing expiration"}), 500
if exp_dt.tzinfo is None:
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
if now_utc > exp_dt:
return jsonify({"error": "Verification code has expired"}), 403
users.clear_user_otp(user["id"])
access_token = create_access_token(
identity=str(user["id"]),
expires_delta=datetime.timedelta(hours=12)
)
return jsonify({
"message": "Login successful",
"access_token": access_token
}), 200
@auth_bp.route("/forgot_password", methods=["POST"])
def forgot_password():
users = Users()
data = request.get_json()
email = data.get("email", "").strip().lower()
if not email:
return jsonify({"error": "Email is required"}), 400
user = users.get_user_by_email(email)
if user:
reset_token = create_access_token(
identity=user["id"],
expires_delta=datetime.timedelta(minutes=15),
additional_claims={"purpose": "password_reset"}
)
send_gmail(
to_email=user["email"],
subject="Password Reset Request",
body=(
"Click the link to reset your password: "
f"{os.getenv('FRONTEND_BASE_URL', 'http://127.0.0.1:5100')}/reset_password?token={reset_token}"
)
)
return jsonify({"message": "If this email is registered, a reset link has been sent."}), 200
@auth_bp.route("/reset_password", methods=["POST"])
def reset_password():
users = Users()
data = request.get_json()
token = data.get("token", "")
new_password = data.get("new_password", "")
if not token or not new_password:
return jsonify({"error": "Missing token or new password"}), 400
try:
decoded_token = decode_token(token)
if decoded_token.get("purpose") != "password_reset":
return jsonify({"error": "Invalid token purpose"}), 403
except Exception:
return jsonify({"error": "Invalid or expired token"}), 403
user_id = decoded_token["sub"]
user = users.get_user_by_id(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
password_hash = generate_password_hash(new_password)
users.update_user_password(user_id, password_hash)
return jsonify({"message": "Password has been reset successfully."}), 200
@auth_bp.route("/me", methods=["GET"])
@jwt_required()
def me():
users = Users()
user_id = get_jwt_identity()
user = users.get_user_by_id(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify({
"id": user["id"],
"name": user["name"],
"contact_name": user['contact_name'],
"email": user["email"],
"phone": user["phone"],
"register_number": user["register_number"],
"vat":user["vat"],
"address": user["address"],
"logo_filename": user["logo_filename"],
"terms": user["terms"],
"first_order_number": user["first_order_number"],
"created_at": user["created_at"],
"user_role": user["user_role"],
"temporary_password": user["temporary_password"]
}), 200
# Validate token endpoint
@auth_bp.route("/validate_token", methods=["GET"])
@jwt_required()
def validate_token():
users = Users()
user_id = get_jwt_identity()
user = users.get_user_by_id(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify({"message": "Token is valid"}), 200
@auth_bp.route("/temporary_password", methods=["POST"])
@jwt_required()
def change_passwd():
data = request.get_json()
if not data:
return jsonify({"error": "Password not found"}), 404
users = Users()
user_id = get_jwt_identity()
new_password_hash = generate_password_hash(data['password'])
users.update_user_password(user_id, new_password_hash)
users.update_temp_pass(user_id)
return jsonify({"message": "Password has been updated successfully."}), 200