import sqlite3 from typing import Optional class PackingProducts: def __init__(self, db_path="instance/app_database.db"): self.db_path = db_path self._create_packing_products_table() def _create_packing_products_table(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS packing_products ( id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER, stock INTEGER DEFAULT 0, enter_price DOUBLE, stock_min INTEGER, stock_max INTEGER, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); """) conn.commit() def add_packing_product(self, product_id, stock, enter_price, stock_min, stock_max): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO packing_products (product_id, stock, enter_price, stock_min, stock_max) VALUES (?, ?, ?, ?, ?) """, (product_id, stock, enter_price, stock_min, stock_max)) conn.commit() return True except sqlite3.IntegrityError: return False def get_all(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM packing_products """,) rows = cursor.fetchall() result = [] if rows: for row in rows: buffer = { "id": row[0], "product_id": row[1], "stock": row[2], "enter_price": row[3], "stock_min": row[4], "stock_max": row[5], "created_at": row[6] } result.append(buffer) return result return [] def get_packing_product(self, id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM packing_products WHERE id = ? """,(id, )) row = cursor.fetchone() result = [] if row: result = { "id": row[0], "product_id": row[1], "stock": row[2], "enter_price": row[3], "stock_min": row[4], "stock_max": row[5], "created_at": row[6] } return result return None def get_packing_product_by_product_id(self, product_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM packing_products WHERE product_id = ? """,(product_id, )) row = cursor.fetchone() result = [] if row: result = { "id": row[0], "product_id": row[1], "stock": row[2], "enter_price": row[3], "stock_min": row[4], "stock_max": row[5], "created_at": row[6] } return result return None def remove_packing_product(self, id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' DELETE FROM packing_products WHERE id=?; ''', (id,)) conn.commit() def update_packing_products(self, id, product_id, stock, enter_price, stock_min, stock_max): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" UPDATE packing_products SET product_id = ?, stock = ?, enter_price = ?, stock_min = ?, stock_max = ? WHERE id = ? """, (product_id, stock, enter_price, stock_min, stock_max, id)) conn.commit() return True except Exception: return False