Files
tainagustului/UI_V2/dbActions/packing_products.py

123 lines
4.5 KiB
Python

import sqlite3
from typing import Optional
class PackingProducts:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_packing_products_table()
def _create_packing_products_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS packing_products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
stock INTEGER DEFAULT 0,
enter_price DOUBLE,
stock_min INTEGER,
stock_max INTEGER,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
def add_packing_product(self, product_id, stock, enter_price, stock_min, stock_max):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO packing_products (product_id, stock, enter_price, stock_min, stock_max)
VALUES (?, ?, ?, ?, ?)
""", (product_id, stock, enter_price, stock_min, stock_max))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_all(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM packing_products
""",)
rows = cursor.fetchall()
result = []
if rows:
for row in rows:
buffer = {
"id": row[0],
"product_id": row[1],
"stock": row[2],
"enter_price": row[3],
"stock_min": row[4],
"stock_max": row[5],
"created_at": row[6]
}
result.append(buffer)
return result
return []
def get_packing_product(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM packing_products WHERE id = ?
""",(id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"product_id": row[1],
"stock": row[2],
"enter_price": row[3],
"stock_min": row[4],
"stock_max": row[5],
"created_at": row[6]
}
return result
return None
def get_packing_product_by_product_id(self, product_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM packing_products WHERE product_id = ?
""",(product_id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"product_id": row[1],
"stock": row[2],
"enter_price": row[3],
"stock_min": row[4],
"stock_max": row[5],
"created_at": row[6]
}
return result
return None
def remove_packing_product(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM packing_products WHERE id=?;
''', (id,))
conn.commit()
def update_packing_products(self, id, product_id, stock, enter_price, stock_min, stock_max):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE packing_products
SET product_id = ?, stock = ?, enter_price = ?, stock_min = ?, stock_max = ?
WHERE id = ?
""", (product_id, stock, enter_price, stock_min, stock_max, id))
conn.commit()
return True
except Exception:
return False