init commit
This commit is contained in:
194
transportmanager/server/models/user.py
Normal file
194
transportmanager/server/models/user.py
Normal file
@@ -0,0 +1,194 @@
|
||||
from datetime import datetime
|
||||
from database import get_connection, is_postgres
|
||||
|
||||
class Users:
|
||||
def __init__(self):
|
||||
self.ph = "%s" if is_postgres() else "?"
|
||||
|
||||
def user_to_dict(self, row):
|
||||
user = {
|
||||
'id': row[0],
|
||||
'name': row[1],
|
||||
'contact_name': row[2],
|
||||
'email': row[3],
|
||||
'password_hash': row[4],
|
||||
'phone': row[5],
|
||||
'register_number': row[6],
|
||||
'vat':row[7],
|
||||
'address': row[8],
|
||||
'logo_filename': row[9],
|
||||
'terms': row[10],
|
||||
'first_order_number': row[11],
|
||||
'created_at': row[12],
|
||||
'otp_code': row[13],
|
||||
'otp_expiration': row[14],
|
||||
'user_role': row[15]
|
||||
}
|
||||
return user
|
||||
|
||||
def email_to_dict(self, row):
|
||||
email = {
|
||||
'id': row[0],
|
||||
'user_id': row[1],
|
||||
'smtp_host': row[2],
|
||||
'smtp_port': row[3],
|
||||
'smtp_user': row[4],
|
||||
'created_at': row[5]
|
||||
}
|
||||
return email
|
||||
|
||||
def get_user_by_email(self, email):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"SELECT * FROM users WHERE email = {self.ph}", (email,))
|
||||
row = cursor.fetchone()
|
||||
return self.user_to_dict(row) if row else None
|
||||
|
||||
def get_user_by_id(self, user_id):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"SELECT * FROM users WHERE id = {self.ph}", (user_id,))
|
||||
row = cursor.fetchone()
|
||||
return self.user_to_dict(row) if row else None
|
||||
|
||||
def insert_user(self, name, email, password_hash):
|
||||
created_at = datetime.now().isoformat()
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
returning = "RETURNING id" if is_postgres() else ""
|
||||
query = f"""
|
||||
INSERT INTO users (
|
||||
name, email, password_hash, created_at
|
||||
) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}) {returning}
|
||||
"""
|
||||
cursor.execute(query, (name, email, password_hash, created_at))
|
||||
inserted_id = None
|
||||
if is_postgres():
|
||||
inserted_id = cursor.fetchone()[0]
|
||||
else:
|
||||
inserted_id = cursor.lastrowid
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
return inserted_id
|
||||
|
||||
def update_user_otp(self, user_id, otp_code, expiration):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE users
|
||||
SET otp_code = {self.ph}, otp_expiration = {self.ph}
|
||||
WHERE id = {self.ph}
|
||||
""",
|
||||
(otp_code, expiration.isoformat(), user_id)
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
|
||||
def clear_user_otp(self, user_id):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE users
|
||||
SET otp_code = NULL, otp_expiration = NULL
|
||||
WHERE id = {self.ph}
|
||||
""",
|
||||
(user_id,)
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
|
||||
def update_user_password(self, user_id, new_password_hash):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE users
|
||||
SET password_hash = {self.ph}
|
||||
WHERE id = {self.ph}
|
||||
""",
|
||||
(new_password_hash, user_id)
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
|
||||
def update_user(self, data):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE users
|
||||
SET name = {self.ph}, contact_name = {self.ph}, email = {self.ph}, phone = {self.ph}, register_number = {self.ph}, vat = {self.ph}, address = {self.ph}, logo_filename = {self.ph}, terms = {self.ph}, first_order_number = {self.ph}
|
||||
WHERE id = {self.ph}
|
||||
""",
|
||||
(
|
||||
data['name'],
|
||||
data['contact_name'],
|
||||
data['email'],
|
||||
data['phone'],
|
||||
data['register_number'],
|
||||
data['vat'],
|
||||
data['address'],
|
||||
data['logo_filename'],
|
||||
data['terms'],
|
||||
data['first_order_number'],
|
||||
data['user_id']
|
||||
)
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
|
||||
def update_user_logo(self, data):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE users SET logo_filename = {self.ph} WHERE id = {self.ph}
|
||||
""",
|
||||
(data['logo_filename'], data['user_id'])
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
|
||||
def get_all_users_with_role(self, role='user'):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"SELECT * FROM users WHERE user_role = {self.ph}", (role,))
|
||||
rows = cursor.fetchall()
|
||||
return [self.user_to_dict(row) for row in rows]
|
||||
|
||||
#--- email credentials ---
|
||||
def insert_email_credentials(self, user_id, smtp_host, smtp_port, smtp_user):
|
||||
created_at = datetime.now().isoformat()
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
INSERT INTO email (
|
||||
user_id, smtp_host, smtp_port, smtp_user, created_at
|
||||
) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph})
|
||||
""",
|
||||
(user_id, smtp_host, smtp_port, smtp_user, created_at)
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
|
||||
def get_email_credentials(self, user_id):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"SELECT * FROM email WHERE user_id = {self.ph}", (user_id,))
|
||||
row = cursor.fetchone()
|
||||
return self.email_to_dict(row) if row else None
|
||||
|
||||
def update_email_credentials(self, user_id, smtp_host, smtp_port, smtp_user):
|
||||
with get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE email SET smtp_host={self.ph}, smtp_port={self.ph}, smtp_user={self.ph} WHERE id = {self.ph}
|
||||
""",
|
||||
(smtp_host, smtp_port, smtp_user, user_id)
|
||||
)
|
||||
if hasattr(conn, "commit"):
|
||||
conn.commit()
|
||||
Reference in New Issue
Block a user