204 lines
6.6 KiB
Python
204 lines
6.6 KiB
Python
from flask import Blueprint, request, jsonify
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from utils.email import send_email, send_gmail
|
|
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
|
|
from flask_jwt_extended import decode_token
|
|
import datetime
|
|
import random
|
|
import os
|
|
from datetime import timezone
|
|
|
|
from models.user import Users
|
|
|
|
auth_bp = Blueprint("auth", __name__)
|
|
|
|
@auth_bp.route("/register", methods=["POST"])
|
|
def register():
|
|
users = Users()
|
|
data = request.get_json()
|
|
name = data.get("name")
|
|
email = data.get("email")
|
|
password = data.get("password")
|
|
|
|
if not name or not email or not password:
|
|
return jsonify({"error": "Missing required fields"}), 400
|
|
|
|
existing_user = users.get_user_by_email(email)
|
|
if existing_user:
|
|
return jsonify({"error": "User already exists"}), 409
|
|
|
|
password_hash = generate_password_hash(password)
|
|
users.insert_user(name, email, password_hash)
|
|
|
|
return jsonify({"message": "User registered successfully!"}), 201
|
|
|
|
|
|
@auth_bp.route("/login", methods=["POST"])
|
|
def login():
|
|
users = Users()
|
|
data = request.get_json()
|
|
email = data.get("email", "").strip().lower()
|
|
password = data.get("password", "")
|
|
|
|
if not email or not password:
|
|
return jsonify({"error": "Missing email or password"}), 400
|
|
|
|
user = users.get_user_by_email(email)
|
|
if not user or not check_password_hash(user["password_hash"], password):
|
|
return jsonify({"error": "Invalid credentials"}), 401
|
|
|
|
otp_code = str(random.randint(100000, 999999))
|
|
expiration = datetime.datetime.now(timezone.utc) + datetime.timedelta(minutes=10)
|
|
users.update_user_otp(user["id"], otp_code, expiration)
|
|
|
|
send_gmail(
|
|
to_email=user["email"],
|
|
subject="Your Login Verification Code",
|
|
body=f"Your login verification code is: {otp_code}"
|
|
)
|
|
|
|
return jsonify({"message": "Verification code sent to your email."}), 200
|
|
|
|
|
|
@auth_bp.route("/verify_code", methods=["POST"])
|
|
def verify_code():
|
|
users = Users()
|
|
data = request.get_json()
|
|
email = data.get("email", "").strip().lower()
|
|
code = data.get("code", "")
|
|
|
|
if not email or not code:
|
|
return jsonify({"error": "Missing email or verification code"}), 400
|
|
|
|
user = users.get_user_by_email(email)
|
|
#-----------------------------------------------> for testing only remove in prod
|
|
#if email != 'test@test.com':
|
|
#-----------------------------------------------> for testing only remove in prod
|
|
if not user or user.get("otp_code") != code:
|
|
return jsonify({"error": "Invalid code"}), 401
|
|
|
|
exp = user.get("otp_expiration")
|
|
# Normalize to aware UTC datetime for safe comparison across SQLite (string) and Postgres (datetime)
|
|
now_utc = datetime.datetime.now(timezone.utc)
|
|
if isinstance(exp, str):
|
|
try:
|
|
exp_dt = datetime.datetime.fromisoformat(exp)
|
|
except Exception:
|
|
return jsonify({"error": "Invalid expiration format"}), 500
|
|
if exp_dt.tzinfo is None:
|
|
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
|
|
else:
|
|
# Assume a datetime object from DB driver
|
|
exp_dt = exp
|
|
if exp_dt is None:
|
|
return jsonify({"error": "Missing expiration"}), 500
|
|
if exp_dt.tzinfo is None:
|
|
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
|
|
if now_utc > exp_dt:
|
|
return jsonify({"error": "Verification code has expired"}), 403
|
|
|
|
users.clear_user_otp(user["id"])
|
|
|
|
access_token = create_access_token(
|
|
identity=str(user["id"]),
|
|
expires_delta=datetime.timedelta(hours=12)
|
|
)
|
|
return jsonify({
|
|
"message": "Login successful",
|
|
"access_token": access_token
|
|
}), 200
|
|
|
|
|
|
@auth_bp.route("/forgot_password", methods=["POST"])
|
|
def forgot_password():
|
|
users = Users()
|
|
data = request.get_json()
|
|
email = data.get("email", "").strip().lower()
|
|
|
|
if not email:
|
|
return jsonify({"error": "Email is required"}), 400
|
|
|
|
user = users.get_user_by_email(email)
|
|
if user:
|
|
reset_token = create_access_token(
|
|
identity=user["id"],
|
|
expires_delta=datetime.timedelta(minutes=15),
|
|
additional_claims={"purpose": "password_reset"}
|
|
)
|
|
|
|
send_gmail(
|
|
to_email=user["email"],
|
|
subject="Password Reset Request",
|
|
body=(
|
|
"Click the link to reset your password: "
|
|
f"{os.getenv('FRONTEND_BASE_URL', 'http://127.0.0.1:5100')}/reset_password?token={reset_token}"
|
|
)
|
|
)
|
|
|
|
return jsonify({"message": "If this email is registered, a reset link has been sent."}), 200
|
|
|
|
|
|
@auth_bp.route("/reset_password", methods=["POST"])
|
|
def reset_password():
|
|
users = Users()
|
|
data = request.get_json()
|
|
token = data.get("token", "")
|
|
new_password = data.get("new_password", "")
|
|
|
|
if not token or not new_password:
|
|
return jsonify({"error": "Missing token or new password"}), 400
|
|
|
|
try:
|
|
decoded_token = decode_token(token)
|
|
if decoded_token.get("purpose") != "password_reset":
|
|
return jsonify({"error": "Invalid token purpose"}), 403
|
|
except Exception:
|
|
return jsonify({"error": "Invalid or expired token"}), 403
|
|
|
|
user_id = decoded_token["sub"]
|
|
user = users.get_user_by_id(user_id)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
password_hash = generate_password_hash(new_password)
|
|
users.update_user_password(user_id, password_hash)
|
|
|
|
return jsonify({"message": "Password has been reset successfully."}), 200
|
|
|
|
|
|
@auth_bp.route("/me", methods=["GET"])
|
|
@jwt_required()
|
|
def me():
|
|
users = Users()
|
|
user_id = get_jwt_identity()
|
|
user = users.get_user_by_id(user_id)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
return jsonify({
|
|
"id": user["id"],
|
|
"name": user["name"],
|
|
"contact_name": user['contact_name'],
|
|
"email": user["email"],
|
|
"phone": user["phone"],
|
|
"register_number": user["register_number"],
|
|
"vat":user["vat"],
|
|
"address": user["address"],
|
|
"logo_filename": user["logo_filename"],
|
|
"terms": user["terms"],
|
|
"first_order_number": user["first_order_number"],
|
|
"created_at": user["created_at"],
|
|
"user_role": user["user_role"]
|
|
}), 200
|
|
|
|
|
|
# Validate token endpoint
|
|
@auth_bp.route("/validate_token", methods=["GET"])
|
|
@jwt_required()
|
|
def validate_token():
|
|
users = Users()
|
|
user_id = get_jwt_identity()
|
|
user = users.get_user_by_id(user_id)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
return jsonify({"message": "Token is valid"}), 200 |