Files
tainagustului/UI_V2/dbActions/products.py
2025-10-27 21:11:31 +02:00

140 lines
5.2 KiB
Python

import sqlite3
from typing import Optional
class Products:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_products_table()
def _create_products_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT NOT NULL,
details TEXT NOT NULL,
price REAL,
discount REAL,
quantity REAL,
aviability TEXT NOT NULL,
category_id INTEGER,
image TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
def add(self, product) -> bool:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO products (name, description, details, price, discount, quantity, aviability, category_id, image)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (product['name'], product['description'],product['details'],product['price'], product['discount'], product['quantity'], product['aviability'], product['category_id'], product['image']))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM products
WHERE id = ?
""", (id,))
row = cursor.fetchone()
if row:
return {
"id": row[0],
"name": row[1],
"description": row[2],
"details":row[3],
"price": row[4],
"discount": row[5],
"quantity": row[6],
"aviability": row[7],
"category_id": row[8],
"image":row[9],
"created_at": row[10]
}
else:
return None
def get_all(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM products
""")
rows = cursor.fetchall()
if rows:
products = []
for row in rows:
product={
"id": row[0],
"name": row[1],
"description": row[2],
"details":row[3],
"price": row[4],
"discount": row[5],
"quantity": row[6],
"aviability": row[7],
"category_id": row[8],
"image":row[9],
"created_at": row[10]
}
products.append(product)
return products
else:
return []
def get_all_by_category(self, category_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM products
WHERE category_id = ?
""", (category_id,))
rows = cursor.fetchall()
if rows:
products = []
for row in rows:
product={
"id": row[0],
"name": row[1],
"description": row[2],
"details":row[3],
"price": row[4],
"discount": row[5],
"quantity": row[6],
"aviability": row[7],
"category_id": row[8],
"image": row[9],
"created_at": row[10],
}
products.append(product)
return products
else:
return []
def update(self, product, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE products SET name = ?, description =?, details=?, price=?, discount=?, quantity=?, aviability=?, category_id=?, image=?
WHERE id = ?
''', (product['name'], product['description'], product['details'], product['price'], product['discount'], product['quantity'], product['aviability'], product['category_id'], product['image'] , id))
conn.commit()
def delete(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM products WHERE id=?;
''', (id,))
conn.commit()