add packageing module

This commit is contained in:
2025-11-10 14:49:00 +02:00
parent a18ad541f2
commit 9d0cc5b221
6 changed files with 847 additions and 150 deletions

View File

@@ -0,0 +1,127 @@
import sqlite3
from typing import Optional
class BulkProducts:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_bulk_products_table()
def _create_bulk_products_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS bulk_products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bill_id INTEGER,
name TEXT,
mesure_unit TEXT,
quantity DOUBLE,
price DOUBLE,
vat INTEGER default 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
def add_bulk_products_product(self, bill_id, name, mesure_unit, quantity, price, vat):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO bulk_products (bill_id, name, mesure_unit, quantity, price, vat)
VALUES (?, ?, ?, ?, ?, ?)
""", (bill_id, name, mesure_unit, quantity, price, vat))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_all(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM bulk_products
""",)
rows = cursor.fetchall()
result = []
if rows:
for row in rows:
buffer = {
"id": row[0],
"bill_id": row[1],
"name": row[2],
"mesure_unit": row[3],
"quantity": row[4],
"price": row[5],
"vat": row[6],
"created_at": row[7]
}
result.append(buffer)
return result
return []
def get_one(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM bulk_products WHERE id = ?
""",(id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"bill_id": row[1],
"name": row[2],
"mesure_unit": row[3],
"quantity": row[4],
"price": row[5],
"vat": row[6],
"created_at": row[7]
}
return result
return None
def get_product_by_bill_id(self, bill_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM bulk_products WHERE bill_id = ?
""",(bill_id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"bill_id": row[1],
"name": row[2],
"mesure_unit": row[3],
"quantity": row[4],
"price": row[5],
"vat": row[6],
"created_at": row[7]
}
return result
return None
def remove(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM bulk_products WHERE id=?;
''', (id,))
conn.commit()
def update(self, id, bill_id, name, mesure_unit, quantity, price, vat):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE bulk_products
SET bill_id = ?, name = ?, mesure_unit = ?, quantity = ?, price = ?, vat = ?
WHERE id = ?
""", (bill_id, name, mesure_unit, quantity, price, vat, id))
conn.commit()
return True
except Exception:
return False

View File

@@ -0,0 +1,123 @@
import sqlite3
from typing import Optional
class PackingProducts:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_packing_products_table()
def _create_packing_products_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS packing_products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
stock INTEGER DEFAULT 0,
enter_price DOUBLE,
stock_min INTEGER,
stock_max INTEGER,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
def add_packing_product(self, product_id, stock, enter_price, stock_min, stock_max):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO packing_products (product_id, stock, enter_price, stock_min, stock_max)
VALUES (?, ?, ?, ?, ?)
""", (product_id, stock, enter_price, stock_min, stock_max))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_all(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM packing_products
""",)
rows = cursor.fetchall()
result = []
if rows:
for row in rows:
buffer = {
"id": row[0],
"product_id": row[1],
"stock": row[2],
"enter_price": row[3],
"stock_min": row[4],
"stock_max": row[5],
"created_at": row[6]
}
result.append(buffer)
return result
return []
def get_packing_product(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM packing_products WHERE id = ?
""",(id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"product_id": row[1],
"stock": row[2],
"enter_price": row[3],
"stock_min": row[4],
"stock_max": row[5],
"created_at": row[6]
}
return result
return None
def get_packing_product_by_product_id(self, product_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM packing_products WHERE product_id = ?
""",(product_id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"product_id": row[1],
"stock": row[2],
"enter_price": row[3],
"stock_min": row[4],
"stock_max": row[5],
"created_at": row[6]
}
return result
return None
def remove_packing_product(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM packing_products WHERE id=?;
''', (id,))
conn.commit()
def update_packing_products(self, id, product_id, stock, enter_price, stock_min, stock_max):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE packing_products
SET product_id = ?, stock = ?, enter_price = ?, stock_min = ?, stock_max = ?
WHERE id = ?
""", (product_id, stock, enter_price, stock_min, stock_max, id))
conn.commit()
return True
except Exception:
return False

View File

@@ -0,0 +1,112 @@
import sqlite3
from typing import Optional
class ProviderBills:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_provider_bills_table()
def _create_provider_bills_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS provider_bills (
id INTEGER PRIMARY KEY AUTOINCREMENT,
number TEXT,
date TEXT,
provider_id INTEGER
);
""")
conn.commit()
def add_provider_bills(self, number, date, provider_id):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO provider_bills (number, date, provider_id)
VALUES (?, ?, ?)
""", (number, date, provider_id))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_all(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM provider_bills
""",)
rows = cursor.fetchall()
result = []
if rows:
for row in rows:
buffer = {
"id": row[0],
"number": row[1],
"date": row[2],
"provider_id": row[3]
}
result.append(buffer)
return result
return []
def get_one(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM provider_bills WHERE id = ?
""",(id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"number": row[1],
"date": row[2],
"provider_id": row[3]
}
return result
return None
def get_bill_by_provider_id(self, provider_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM provider_bills WHERE provider_id = ?
""",(provider_id, ))
row = cursor.fetchone()
result = []
if row:
result = {
"id": row[0],
"number": row[1],
"date": row[2],
"provider_id": row[3],
"created_at": row[4]
}
return result
return None
def remove(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM provider_bills WHERE id=?;
''', (id,))
conn.commit()
def update(self, id, number, date, provider_id):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE provider_bills
SET number = ?, date = ?, provider_id = ?
WHERE id = ?
""", (number, date, provider_id, id))
conn.commit()
return True
except Exception:
return False