247 lines
8.8 KiB
Python
247 lines
8.8 KiB
Python
from datetime import datetime
|
|
from database import get_connection, is_postgres
|
|
|
|
class Users:
|
|
def __init__(self):
|
|
self.ph = "%s" if is_postgres() else "?"
|
|
|
|
def user_to_dict(self, row):
|
|
user = {
|
|
'id': row[0],
|
|
'name': row[1],
|
|
'contact_name': row[2],
|
|
'email': row[3],
|
|
'password_hash': row[4],
|
|
'phone': row[5],
|
|
'register_number': row[6],
|
|
'vat':row[7],
|
|
'address': row[8],
|
|
'logo_filename': row[9],
|
|
'terms': row[10],
|
|
'first_order_number': row[11],
|
|
'created_at': row[12],
|
|
'otp_code': row[13],
|
|
'otp_expiration': row[14],
|
|
'user_role': row[15],
|
|
'company_id': row[16],
|
|
'active': row[17],
|
|
'temporary_password': row[18]
|
|
}
|
|
return user
|
|
|
|
def email_to_dict(self, row):
|
|
email = {
|
|
'id': row[0],
|
|
'user_id': row[1],
|
|
'smtp_host': row[2],
|
|
'smtp_port': row[3],
|
|
'smtp_user': row[4],
|
|
'created_at': row[5]
|
|
}
|
|
return email
|
|
|
|
def get_user_by_email(self, email):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT * FROM users WHERE email = {self.ph}", (email,))
|
|
row = cursor.fetchone()
|
|
return self.user_to_dict(row) if row else None
|
|
|
|
def get_user_by_id(self, user_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT * FROM users WHERE id = {self.ph}", (user_id,))
|
|
row = cursor.fetchone()
|
|
return self.user_to_dict(row) if row else None
|
|
|
|
def insert_user(self, name, email, password_hash):
|
|
created_at = datetime.now().isoformat()
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
returning = "RETURNING id" if is_postgres() else ""
|
|
query = f"""
|
|
INSERT INTO users (
|
|
name, email, password_hash, created_at
|
|
) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}) {returning}
|
|
"""
|
|
cursor.execute(query, (name, email, password_hash, created_at))
|
|
inserted_id = None
|
|
if is_postgres():
|
|
inserted_id = cursor.fetchone()[0]
|
|
else:
|
|
inserted_id = cursor.lastrowid
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
return inserted_id
|
|
|
|
def insert_company_user(self, name, email, password_hash, company_id):
|
|
created_at = datetime.now().isoformat()
|
|
company_id = company_id
|
|
user_role = 'company_user'
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
returning = "RETURNING id" if is_postgres() else ""
|
|
query = f"""
|
|
INSERT INTO users (
|
|
name, email, password_hash, created_at, user_role, company_id, temporary_password
|
|
) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}) {returning}
|
|
"""
|
|
cursor.execute(query, (name, email, password_hash, created_at, user_role, company_id, 1))
|
|
inserted_id = None
|
|
if is_postgres():
|
|
inserted_id = cursor.fetchone()[0]
|
|
else:
|
|
inserted_id = cursor.lastrowid
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
return inserted_id
|
|
|
|
def update_user_otp(self, user_id, otp_code, expiration):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET otp_code = {self.ph}, otp_expiration = {self.ph}
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(otp_code, expiration.isoformat(), user_id)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def clear_user_otp(self, user_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET otp_code = NULL, otp_expiration = NULL
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(user_id,)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def update_user_password(self, user_id, new_password_hash):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET password_hash = {self.ph}
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(new_password_hash, user_id)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def update_user(self, data):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET name = {self.ph}, contact_name = {self.ph}, email = {self.ph}, phone = {self.ph}, register_number = {self.ph}, vat = {self.ph}, address = {self.ph}, logo_filename = {self.ph}, terms = {self.ph}, first_order_number = {self.ph}
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(
|
|
data['name'],
|
|
data['contact_name'],
|
|
data['email'],
|
|
data['phone'],
|
|
data['register_number'],
|
|
data['vat'],
|
|
data['address'],
|
|
data['logo_filename'],
|
|
data['terms'],
|
|
data['first_order_number'],
|
|
data['user_id']
|
|
)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def update_user_logo(self, data):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users SET logo_filename = {self.ph} WHERE id = {self.ph}
|
|
""",
|
|
(data['logo_filename'], data['user_id'])
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def get_all_users_with_role(self, role='user'):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT * FROM users WHERE user_role = {self.ph}", (role,))
|
|
rows = cursor.fetchall()
|
|
return [self.user_to_dict(row) for row in rows]
|
|
|
|
#--- email credentials ---
|
|
def insert_email_credentials(self, user_id, smtp_host, smtp_port, smtp_user):
|
|
created_at = datetime.now().isoformat()
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
INSERT INTO email (
|
|
user_id, smtp_host, smtp_port, smtp_user, created_at
|
|
) VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph})
|
|
""",
|
|
(user_id, smtp_host, smtp_port, smtp_user, created_at)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def get_email_credentials(self, user_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"SELECT * FROM email WHERE user_id = {self.ph}", (user_id,))
|
|
row = cursor.fetchone()
|
|
return self.email_to_dict(row) if row else None
|
|
|
|
def update_email_credentials(self, user_id, smtp_host, smtp_port, smtp_user):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE email SET smtp_host={self.ph}, smtp_port={self.ph}, smtp_user={self.ph} WHERE id = {self.ph}
|
|
""",
|
|
(smtp_host, smtp_port, smtp_user, user_id)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def deactivate_user(self, user_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET active = {self.ph}
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(0, user_id)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
|
|
def update_temp_pass(self, user_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE users
|
|
SET temporary_passwrd = {self.ph}
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(0, user_id)
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit() |