Files
TMS/transportmanager/server/models/order_in.py
2025-08-31 17:55:26 +03:00

193 lines
7.2 KiB
Python

from datetime import datetime
from database import get_connection, is_postgres
class OrdersIn:
def __init__(self):
# Parameter placeholder per backend
self.ph = "%s" if is_postgres() else "?"
def order_to_dict(self, row):
return {
"id": row[0],
"order_number": row[1],
"user_id": row[2],
"client_id": row[3],
"products_description": row[4],
"ldb_quantity": row[5],
"kg_quantity": row[6],
"track_reg_number": row[7],
"trailer_reg_number": row[8],
"received_price": row[9],
"created_at": row[10],
}
def order_point_to_dict(self, row):
return {
"id": row[0],
"order_id": row[1],
"destination_id": row[2],
"informatins": row[3],
"point_data": row[4],
"point_hour": row[5],
'point_type': row[6],
}
def create_order(self, data):
created_at = datetime.now().isoformat()
with get_connection() as conn:
cur = conn.cursor()
returning = " RETURNING id" if is_postgres() else ""
cur.execute(
f"""
INSERT INTO orders_in
(user_id, client_id, products_description, received_price, order_number,
ldb_quantity, kg_quantity, track_reg_number, trailer_reg_number, created_at)
VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}){returning}
""",
(
data["user_id"],
data["client_id"],
data['products_description'],
data["received_price"],
data["order_number"],
data["ldb_quantity"],
data["kg_quantity"],
data["track_reg_number"],
data["trailer_reg_number"],
created_at,
),
)
new_id = cur.fetchone()[0] if is_postgres() else getattr(cur, "lastrowid", None)
if hasattr(conn, "commit"):
conn.commit()
return new_id
def update_order(self, data):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(
f"""
UPDATE orders_in SET
user_id = {self.ph}, client_id = {self.ph}, products_description = {self.ph},
received_price = {self.ph}, order_number = {self.ph},
ldb_quantity = {self.ph}, kg_quantity = {self.ph}, track_reg_number = {self.ph},
trailer_reg_number = {self.ph}
WHERE id = {self.ph}
""",
(
data["user_id"],
data["client_id"],
data["products_description"],
data["received_price"],
data["order_number"],
data["ldb_quantity"],
data["kg_quantity"],
data["track_reg_number"],
data["trailer_reg_number"],
data["id"],
),
)
if hasattr(conn, "commit"):
conn.commit()
def delete_order(self, order_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(f"DELETE FROM orders_in WHERE id = {self.ph}", (order_id,))
if hasattr(conn, "commit"):
conn.commit()
def get_order_by_id(self, order_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(f"SELECT * FROM orders_in WHERE id = {self.ph}", (order_id,))
row = cur.fetchone()
return self.order_to_dict(row) if row else None
def get_orders_by_user(self, user_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(
f"SELECT * FROM orders_in WHERE user_id = {self.ph} ORDER BY created_at DESC",
(user_id,),
)
rows = cur.fetchall()
return [self.order_to_dict(row) for row in rows]
def create_order_point(self, data):
with get_connection() as conn:
cur = conn.cursor()
returning = " RETURNING id" if is_postgres() else ""
cur.execute(
f"""
INSERT INTO order_in_points
(order_id, destination_id, informatins, point_data, point_hour, point_type)
VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}){returning}
""",
(
data["order_id"],
data["destination_id"],
data["informatins"],
data["point_data"],
data["point_hour"],
data['point_type'],
),
)
new_id = cur.fetchone()[0] if is_postgres() else getattr(cur, "lastrowid", None)
if hasattr(conn, "commit"):
conn.commit()
return new_id
def update_order_point(self, data):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(
f"""
UPDATE order_in_points SET
order_id = {self.ph}, destination_id = {self.ph}, informatins = {self.ph},
point_data = {self.ph}, point_hour = {self.ph}, point_type = {self.ph}
WHERE id = {self.ph}
""",
(
data["order_id"],
data["destination_id"],
data["informatins"],
data["point_data"],
data["point_hour"],
data['point_type'],
data["id"],
),
)
if hasattr(conn, "commit"):
conn.commit()
def delete_order_point(self, point_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(f"DELETE FROM order_in_points WHERE id = {self.ph}", (point_id,))
if hasattr(conn, "commit"):
conn.commit()
def delete_points_by_order_id(self, order_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(f"DELETE FROM order_in_points WHERE order_id = {self.ph}", (order_id,))
if hasattr(conn, "commit"):
conn.commit()
def get_order_point_by_id(self, point_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(f"SELECT * FROM order_in_points WHERE id = {self.ph}", (point_id,))
row = cur.fetchone()
return self.order_point_to_dict(row) if row else None
def get_order_points_by_order(self, order_id):
with get_connection() as conn:
cur = conn.cursor()
cur.execute(
f"SELECT * FROM order_in_points WHERE order_id = {self.ph} ORDER BY id",
(order_id,),
)
rows = cur.fetchall()
return [self.order_point_to_dict(row) for row in rows]