import sqlite3 from typing import Optional class Providers: def __init__(self, db_path="instance/app_database.db"): self.db_path = db_path self._create_providers_table() def _create_providers_table(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS providers ( id INTEGER PRIMARY KEY AUTOINCREMENT, provider_name TEXT, provider_vat TEXT, provider_register_number TEXT, provider_address TEXT, contact_name TEXT, contact_number TEXT, contact_email TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, status TEXT NOT NULL DEFAULT 'Activ' ); """) conn.commit() def add_provider(self, provider): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO providers ( provider_name, provider_vat, provider_register_number, provider_address, contact_name, contact_number, contact_email ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( provider['provider_name'], provider['provider_vat'], provider['provider_register_number'], provider['provider_address'], provider['contact_name'], provider['contact_number'], provider['contact_email'] ) ) conn.commit() return True except sqlite3.IntegrityError: return False def get_all_providers(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM providers """,) rows = cursor.fetchall() result = [] if rows: for row in rows: buffer = { "id": row[0], "provider_name": row[1], "provider_vat": row[2], "provider_register_number": row[3], "provider_address": row[4], "contact_name": row[5], "contact_number": row[6], "contact_email": row[7], "created_at": row[8], "status": row[9] } result.append(buffer) return result return [] def get_provider(self, id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM providers WHERE id = ? """,(id, )) row = cursor.fetchone() result = [] if row: result = { "id": row[0], "provider_name": row[1], "provider_vat": row[2], "provider_register_number": row[3], "provider_address": row[4], "contact_name": row[5], "contact_number": row[6], "contact_email": row[7], "created_at": row[8], "status": row[9] } return result return None def remove_provider(self, id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' DELETE FROM providers WHERE id=?; ''', (id,)) conn.commit() def update_provider(self, provider): #try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" UPDATE providers SET provider_name = ?, provider_vat = ?, provider_register_number = ?, provider_address = ?, contact_name = ?, contact_number = ?, contact_email = ?, status = ? WHERE id = ? """, (provider['provider_name'], provider['provider_vat'], provider['provider_register_number'], provider['provider_address'], provider['contact_name'], provider['contact_number'], provider['contact_email'], provider['status'], provider['id']) ) conn.commit() return True #except Exception: # return False def deactivate(self, id): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" UPDATE providers SET status = ? WHERE id = ? """, ('Inactive', id)) conn.commit() return True except Exception: return False