from datetime import datetime from database import get_connection, is_postgres class Users: def __init__(self): self.ph = "%s" if is_postgres() else "?" def user_to_dict(self, row): user = { 'id': row[0], 'name': row[1], 'contact_name': row[2], 'email': row[3], 'password_hash': row[4], 'phone': row[5], 'register_number': row[6], 'vat':row[7], 'address': row[8], 'logo_filename': row[9], 'terms': row[10], 'first_order_number': row[11], 'created_at': row[12], 'otp_code': row[13], 'otp_expiration': row[14], 'user_role': row[15] } return user def email_to_dict(self, row): email = { 'id': row[0], 'user_id': row[1], 'smtp_host': row[2], 'smtp_port': row[3], 'smtp_user': row[4], 'created_at': row[5] } return email def get_user_by_email(self, email): with get_connection() as conn: cursor = conn.cursor() cursor.execute(f"SELECT * FROM users WHERE email = {self.ph}", (email,)) row = cursor.fetchone() return self.user_to_dict(row) if row else None def get_user_by_id(self, user_id): with get_connection() as conn: cursor = conn.cursor() cursor.execute(f"SELECT * FROM users WHERE id = {self.ph}", (user_id,)) row = cursor.fetchone() return self.user_to_dict(row) if row else None def insert_user(self, name, email, password_hash): created_at = datetime.now().isoformat() with get_connection() as conn: cursor = conn.cursor() returning = "RETURNING id" if is_postgres() else "" query = f""" INSERT INTO users ( name, email, password_hash, created_at ) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}) {returning} """ cursor.execute(query, (name, email, password_hash, created_at)) inserted_id = None if is_postgres(): inserted_id = cursor.fetchone()[0] else: inserted_id = cursor.lastrowid if hasattr(conn, "commit"): conn.commit() return inserted_id def update_user_otp(self, user_id, otp_code, expiration): with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET otp_code = {self.ph}, otp_expiration = {self.ph} WHERE id = {self.ph} """, (otp_code, expiration.isoformat(), user_id) ) if hasattr(conn, "commit"): conn.commit() def clear_user_otp(self, user_id): with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET otp_code = NULL, otp_expiration = NULL WHERE id = {self.ph} """, (user_id,) ) if hasattr(conn, "commit"): conn.commit() def update_user_password(self, user_id, new_password_hash): with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET password_hash = {self.ph} WHERE id = {self.ph} """, (new_password_hash, user_id) ) if hasattr(conn, "commit"): conn.commit() def update_user(self, data): with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET name = {self.ph}, contact_name = {self.ph}, email = {self.ph}, phone = {self.ph}, register_number = {self.ph}, vat = {self.ph}, address = {self.ph}, logo_filename = {self.ph}, terms = {self.ph}, first_order_number = {self.ph} WHERE id = {self.ph} """, ( data['name'], data['contact_name'], data['email'], data['phone'], data['register_number'], data['vat'], data['address'], data['logo_filename'], data['terms'], data['first_order_number'], data['user_id'] ) ) if hasattr(conn, "commit"): conn.commit() def update_user_logo(self, data): with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE users SET logo_filename = {self.ph} WHERE id = {self.ph} """, (data['logo_filename'], data['user_id']) ) if hasattr(conn, "commit"): conn.commit() def get_all_users_with_role(self, role='user'): with get_connection() as conn: cursor = conn.cursor() cursor.execute(f"SELECT * FROM users WHERE user_role = {self.ph}", (role,)) rows = cursor.fetchall() return [self.user_to_dict(row) for row in rows] #--- email credentials --- def insert_email_credentials(self, user_id, smtp_host, smtp_port, smtp_user): created_at = datetime.now().isoformat() with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" INSERT INTO email ( user_id, smtp_host, smtp_port, smtp_user, created_at ) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}) """, (user_id, smtp_host, smtp_port, smtp_user, created_at) ) if hasattr(conn, "commit"): conn.commit() def get_email_credentials(self, user_id): with get_connection() as conn: cursor = conn.cursor() cursor.execute(f"SELECT * FROM email WHERE user_id = {self.ph}", (user_id,)) row = cursor.fetchone() return self.email_to_dict(row) if row else None def update_email_credentials(self, user_id, smtp_host, smtp_port, smtp_user): with get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" UPDATE email SET smtp_host={self.ph}, smtp_port={self.ph}, smtp_user={self.ph} WHERE id = {self.ph} """, (smtp_host, smtp_port, smtp_user, user_id) ) if hasattr(conn, "commit"): conn.commit()