109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
from flask import Blueprint, request, jsonify
|
|
from flask_jwt_extended import jwt_required, get_jwt_identity
|
|
from models.users import Users, UserModel
|
|
from models.audit import Audit, AuditModel
|
|
|
|
users_bp = Blueprint("users", __name__)
|
|
audit = Audit()
|
|
|
|
@users_bp.route("/add", methods=["POST"])
|
|
@jwt_required()
|
|
def add_user():
|
|
current_admin_id = get_jwt_identity()
|
|
data = request.get_json()
|
|
|
|
email = data.get("email")
|
|
workspace_id = data.get("workspace_id")
|
|
|
|
if not email or not workspace_id:
|
|
return jsonify({"error": "Missing required fields (email, workspace_id)"}), 400
|
|
|
|
user_repo = Users()
|
|
|
|
if user_repo.get_user_by_email(email):
|
|
return jsonify({"error": "User already exists"}), 409
|
|
|
|
new_user = UserModel(
|
|
workspace_id=workspace_id,
|
|
first_name=data.get("first_name"),
|
|
last_name=data.get("last_name"),
|
|
email=email,
|
|
password=user_repo.hash_password(data.get("password")) if data.get("password") else None,
|
|
address=data.get("address"),
|
|
profession=data.get("profession"),
|
|
role=data.get("role", "user"),
|
|
status=data.get("status", "active"),
|
|
profile_pic=data.get("profile_pic"),
|
|
active=1
|
|
)
|
|
|
|
user_id = user_repo.add_user(new_user)
|
|
if user_id:
|
|
audit.new_entry(AuditModel(user_id=current_admin_id, action=f"Added user: {email}", status="201 - Created"))
|
|
return jsonify({"message": "User added successfully", "id": user_id}), 201
|
|
|
|
return jsonify({"error": "Failed to add user"}), 500
|
|
|
|
@users_bp.route("/<int:user_id>", methods=["GET"])
|
|
@jwt_required()
|
|
def get_user(user_id):
|
|
user_repo = Users()
|
|
user = user_repo.get_user(user_id)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
|
|
# Convertim obiectul dataclass în dicționar pentru JSON
|
|
return jsonify(vars(user)), 200
|
|
|
|
@users_bp.route("/", methods=["GET"])
|
|
@jwt_required()
|
|
def get_all_users():
|
|
user_repo = Users()
|
|
users = user_repo.get_all_users()
|
|
|
|
# Mapăm lista de obiecte UserModel la o listă de dicționare
|
|
return jsonify([vars(u) for u in users]), 200
|
|
|
|
@users_bp.route("/update/<int:user_id>", methods=["PUT"])
|
|
@jwt_required()
|
|
def update_user(user_id):
|
|
current_admin_id = get_jwt_identity()
|
|
data = request.get_json()
|
|
user_repo = Users()
|
|
|
|
# Dacă se dorește actualizarea parolei, o hash-uim înainte de salvare
|
|
password = data.get("password")
|
|
hashed_password = user_repo.hash_password(password) if password else None
|
|
|
|
success = user_repo.update_user(
|
|
user_id,
|
|
first_name=data.get("first_name"),
|
|
last_name=data.get("last_name"),
|
|
email=data.get("email"),
|
|
password=hashed_password,
|
|
address=data.get("address"),
|
|
profession=data.get("profession"),
|
|
role=data.get("role"),
|
|
status=data.get("status"),
|
|
profile_pic=data.get("profile_pic"),
|
|
active=data.get("active"),
|
|
can_create_articles=data.get("can_create_articles")
|
|
)
|
|
|
|
if success:
|
|
audit.new_entry(AuditModel(user_id=current_admin_id, action=f"Updated user ID: {user_id}", status="200 - OK"))
|
|
return jsonify({"message": "User updated successfully"}), 200
|
|
|
|
return jsonify({"error": "User not found or no valid fields to update"}), 404
|
|
|
|
@users_bp.route("/delete/<int:user_id>", methods=["DELETE"])
|
|
@jwt_required()
|
|
def delete_user(user_id):
|
|
current_admin_id = get_jwt_identity()
|
|
user_repo = Users()
|
|
|
|
if user_repo.delete_user(user_id):
|
|
audit.new_entry(AuditModel(user_id=current_admin_id, action=f"Deleted user ID: {user_id}", status="200 - OK"))
|
|
return jsonify({"message": "User deleted successfully"}), 200
|
|
|
|
return jsonify({"error": "User not found"}), 404 |