import sqlite3 from dataclasses import dataclass import hashlib from typing import Optional @dataclass class UserModel: id: Optional[int] = None workspace_id: Optional[int] = None first_name: Optional[str] = None last_name: Optional[str] = None email: Optional[str] = None password: Optional[str] = None address: Optional[str] = None profession: Optional[str] = None role: Optional[str] = None status: Optional[str] = None profile_pic: Optional[str] = None created_at: Optional[str] = None otp_code: Optional[str] = None otp_expiration: Optional[str] = None active: Optional[int] = None can_create_articles: Optional[int] = 0 class Users: def __init__(self, db_path="instance/app_database.db"): self.db_path = db_path self._create_users_table() def _create_users_table(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, workspace_id INTEGER NOT NULL, first_name TEXT, last_name TEXT, email TEXT UNIQUE, password TEXT, address TEXT, profession TEXT, role TEXT DEFAULT 'user', status TEXT, profile_pic TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, otp_code TEXT, otp_expiration TIMESTAMPTZ, active INTEGER DEFAULT 1, can_create_articles INTEGER DEFAULT 0 ); """ ) try: cursor.execute("ALTER TABLE users ADD COLUMN can_create_articles INTEGER DEFAULT 0;") except sqlite3.OperationalError: pass conn.commit() def update_user_otp(self, user_id, otp_code, expiration): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET otp_code = ?, otp_expiration = ? WHERE id = ? """, (otp_code, expiration.isoformat(), user_id) ) conn.commit() def clear_user_otp(self, user_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET otp_code = NULL, otp_expiration = NULL WHERE id = ? """, (user_id,) ) if hasattr(conn, "commit"): conn.commit() def update_password(self, email, password): '''Update user password''' with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' UPDATE users SET password = ? WHERE email = ? ''', (self.hash_password(password), email)) conn.commit() def hash_password(self, password: str) -> bytes: return hashlib.md5(password.encode('utf-8')).hexdigest() def authenticate(self, email, password): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM users WHERE email = ? AND password = ? """, (email, self.hash_password(password))) row = cursor.fetchone() if not row: return None return UserModel( id=row[0], workspace_id=row[1], first_name=row[2], last_name=row[3], email=row[4], address=row[6], profession=row[7], role=row[8], status=row[9], profile_pic=row[10], created_at=row[11], otp_code=row[12], otp_expiration=row[13], active=row[14], can_create_articles=row[15] if len(row) > 15 else 0 ) def register_user(self, email, password, workspace_id): """Register a new user.""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO users (workspace_id, email, password) VALUES (?, ?, ?) """, (workspace_id, email, self.hash_password(password))) conn.commit() return True except sqlite3.IntegrityError: return False # Username already exist def add_user(self, user:UserModel): """Create a new post.""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO users (workspace_id, first_name, last_name, email, password, address, profession, role, status, profile_pic, can_create_articles) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (user.workspace_id, user.first_name, user.last_name, user.email, user.password, user.address, user.profession, user.role, user.status, user.profile_pic, user.can_create_articles or 0), ) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: return None def get_user(self, user_id: int) -> UserModel | None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) row = cursor.fetchone() if not row: return None return UserModel( id=row[0], workspace_id=row[1], first_name=row[2], last_name=row[3], email=row[4], address=row[6], profession=row[7], role=row[8], status=row[9], profile_pic=row[10], created_at=row[11], otp_code=row[12], otp_expiration=row[13], active=row[14], can_create_articles=row[15] if len(row) > 15 else 0 ) def get_user_by_email(self, email: str) -> UserModel | None: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE email = ?", (email,)) row = cursor.fetchone() if not row: return None return UserModel( id=row[0], workspace_id=row[1], first_name=row[2], last_name=row[3], email=row[4], password=row[5], address=row[6], profession=row[7], role=row[8], status=row[9], profile_pic=row[10], created_at=row[11], otp_code=row[12], otp_expiration=row[13], active=row[14], can_create_articles=row[15] if len(row) > 15 else 0 ) def get_users_by_workspace_id(self, workspace_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE workspace_id = ?", (workspace_id,)) rows = cursor.fetchall() return [ UserModel( id=row[0], workspace_id=row[1], first_name=row[2], last_name=row[3], email=row[4], address=row[6], profession=row[7], role=row[8], status=row[9], profile_pic=row[10], created_at=row[11], otp_code=row[12], otp_expiration=row[13], active=row[14], can_create_articles=row[15] if len(row) > 15 else 0 ) for row in rows ] def get_all_users(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users") rows = cursor.fetchall() return [ UserModel( id=row[0], workspace_id=row[1], first_name=row[2], last_name=row[3], email=row[4], address=row[6], profession=row[7], role=row[8], status=row[9], profile_pic=row[10], created_at=row[11], otp_code=row[12], otp_expiration=row[13], active=row[14], can_create_articles=row[15] if len(row) > 15 else 0 ) for row in rows ] def update_user(self, user_id, first_name=None, last_name=None, email=None, password = None, address = None, profession = None, role = None, status = None, profile_pic=None, active=None, can_create_articles=None): if first_name is None and last_name is None and email is None and password is None and address is None and profession is None and role is None and status is None and profile_pic is None and active is None and can_create_articles is None: return False fields = [] params = [] if first_name is not None: fields.append("first_name = ?") params.append(first_name) if last_name is not None: fields.append("last_name = ?") params.append(last_name) if email is not None: fields.append("email = ?") params.append(email) if password is not None: fields.append("password = ?") params.append(password) if address is not None: fields.append("address = ?") params.append(address) if profession is not None: fields.append("profession = ?") params.append(profession) if role is not None: fields.append("role = ?") params.append(role) if status is not None: fields.append("status = ?") params.append(status) if profile_pic is not None: fields.append("profile_pic = ?") params.append(profile_pic) if active is not None: fields.append("active = ?") params.append(active) if can_create_articles is not None: fields.append("can_create_articles = ?") params.append(can_create_articles) params.append(user_id) query = f"UPDATE users SET {', '.join(fields)} WHERE id = ?" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(query, tuple(params)) conn.commit() return cursor.rowcount > 0 #Do not use this method if you do not delete first in cascade, better use inactivate def delete_user(self, user_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() return cursor.rowcount > 0 def inactivate_user(self, user_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("UPDATE users SET status = ? WHERE id = ?", ('inactive',user_id,)) conn.commit() return cursor.rowcount > 0