from flask import Blueprint, request, jsonify from werkzeug.security import generate_password_hash, check_password_hash from utils.email import send_email, send_gmail from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity from flask_jwt_extended import decode_token import datetime import random import os from datetime import timezone from models.user import Users auth_bp = Blueprint("auth", __name__) @auth_bp.route("/register", methods=["POST"]) def register(): users = Users() data = request.get_json() name = data.get("name") email = data.get("email") password = data.get("password") if not name or not email or not password: return jsonify({"error": "Missing required fields"}), 400 existing_user = users.get_user_by_email(email) if existing_user: return jsonify({"error": "User already exists"}), 409 password_hash = generate_password_hash(password) users.insert_user(name, email, password_hash) return jsonify({"message": "User registered successfully!"}), 201 @auth_bp.route("/login", methods=["POST"]) def login(): users = Users() data = request.get_json() email = data.get("email", "").strip().lower() password = data.get("password", "") if not email or not password: return jsonify({"error": "Missing email or password"}), 400 user = users.get_user_by_email(email) if not user or not check_password_hash(user["password_hash"], password): return jsonify({"error": "Invalid credentials"}), 401 otp_code = str(random.randint(100000, 999999)) expiration = datetime.datetime.now(timezone.utc) + datetime.timedelta(minutes=10) users.update_user_otp(user["id"], otp_code, expiration) send_gmail( to_email=user["email"], subject="Your Login Verification Code", body=f"Your login verification code is: {otp_code}" ) return jsonify({"message": "Verification code sent to your email."}), 200 @auth_bp.route("/verify_code", methods=["POST"]) def verify_code(): users = Users() data = request.get_json() email = data.get("email", "").strip().lower() code = data.get("code", "") if not email or not code: return jsonify({"error": "Missing email or verification code"}), 400 user = users.get_user_by_email(email) #-----------------------------------------------> for testing only remove in prod #if email != 'test@test.com': #-----------------------------------------------> for testing only remove in prod if not user or user.get("otp_code") != code: return jsonify({"error": "Invalid code"}), 401 exp = user.get("otp_expiration") # Normalize to aware UTC datetime for safe comparison across SQLite (string) and Postgres (datetime) now_utc = datetime.datetime.now(timezone.utc) if isinstance(exp, str): try: exp_dt = datetime.datetime.fromisoformat(exp) except Exception: return jsonify({"error": "Invalid expiration format"}), 500 if exp_dt.tzinfo is None: exp_dt = exp_dt.replace(tzinfo=timezone.utc) else: # Assume a datetime object from DB driver exp_dt = exp if exp_dt is None: return jsonify({"error": "Missing expiration"}), 500 if exp_dt.tzinfo is None: exp_dt = exp_dt.replace(tzinfo=timezone.utc) if now_utc > exp_dt: return jsonify({"error": "Verification code has expired"}), 403 users.clear_user_otp(user["id"]) access_token = create_access_token( identity=str(user["id"]), expires_delta=datetime.timedelta(hours=12) ) return jsonify({ "message": "Login successful", "access_token": access_token }), 200 @auth_bp.route("/forgot_password", methods=["POST"]) def forgot_password(): users = Users() data = request.get_json() email = data.get("email", "").strip().lower() if not email: return jsonify({"error": "Email is required"}), 400 user = users.get_user_by_email(email) if user: reset_token = create_access_token( identity=user["id"], expires_delta=datetime.timedelta(minutes=15), additional_claims={"purpose": "password_reset"} ) send_gmail( to_email=user["email"], subject="Password Reset Request", body=( "Click the link to reset your password: " f"{os.getenv('FRONTEND_BASE_URL', 'http://127.0.0.1:5100')}/reset_password?token={reset_token}" ) ) return jsonify({"message": "If this email is registered, a reset link has been sent."}), 200 @auth_bp.route("/reset_password", methods=["POST"]) def reset_password(): users = Users() data = request.get_json() token = data.get("token", "") new_password = data.get("new_password", "") if not token or not new_password: return jsonify({"error": "Missing token or new password"}), 400 try: decoded_token = decode_token(token) if decoded_token.get("purpose") != "password_reset": return jsonify({"error": "Invalid token purpose"}), 403 except Exception: return jsonify({"error": "Invalid or expired token"}), 403 user_id = decoded_token["sub"] user = users.get_user_by_id(user_id) if not user: return jsonify({"error": "User not found"}), 404 password_hash = generate_password_hash(new_password) users.update_user_password(user_id, password_hash) return jsonify({"message": "Password has been reset successfully."}), 200 @auth_bp.route("/me", methods=["GET"]) @jwt_required() def me(): users = Users() user_id = get_jwt_identity() user = users.get_user_by_id(user_id) if not user: return jsonify({"error": "User not found"}), 404 return jsonify({ "id": user["id"], "name": user["name"], "contact_name": user['contact_name'], "email": user["email"], "phone": user["phone"], "register_number": user["register_number"], "vat":user["vat"], "address": user["address"], "logo_filename": user["logo_filename"], "terms": user["terms"], "first_order_number": user["first_order_number"], "created_at": user["created_at"], "user_role": user["user_role"] }), 200 # Validate token endpoint @auth_bp.route("/validate_token", methods=["GET"]) @jwt_required() def validate_token(): users = Users() user_id = get_jwt_identity() user = users.get_user_by_id(user_id) if not user: return jsonify({"error": "User not found"}), 404 return jsonify({"message": "Token is valid"}), 200