130 lines
4.6 KiB
Python
130 lines
4.6 KiB
Python
from datetime import datetime, timedelta
|
|
from database import get_connection, is_postgres
|
|
|
|
class Subscription:
|
|
def __init__(self):
|
|
# Parameter placeholder per backend
|
|
self.ph = "%s" if is_postgres() else "?"
|
|
|
|
def subscription_to_dict(self, row):
|
|
return {
|
|
"id": row[0],
|
|
"user_id": row[1], #company id
|
|
"plan": row[2],
|
|
"start_date": row[3],
|
|
"end_date": row[4],
|
|
"status": row[5],
|
|
"register_number": row[6],
|
|
"created_at": row[7],
|
|
}
|
|
|
|
def create(self, user_id, plan, start_date, end_date, register_number, status="active"):
|
|
created_at = datetime.now().isoformat()
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
returning = " RETURNING id" if is_postgres() else ""
|
|
cursor.execute(
|
|
f"""
|
|
INSERT INTO subscriptions (user_id, plan, start_date, end_date, status, register_number, created_at)
|
|
VALUES ({self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}, {self.ph}){returning}
|
|
""",
|
|
(user_id, plan, start_date, end_date, status, register_number, created_at),
|
|
)
|
|
new_id = cursor.fetchone()[0] if is_postgres() else getattr(cursor, "lastrowid", None)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
return new_id
|
|
|
|
def get_by_user_id(self, user_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
SELECT * FROM subscriptions
|
|
WHERE user_id = {self.ph}
|
|
ORDER BY start_date DESC
|
|
""",
|
|
(user_id,),
|
|
)
|
|
rows = cursor.fetchall()
|
|
return [self.subscription_to_dict(row) for row in rows]
|
|
|
|
def get_by_id(self, subscription_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
SELECT * FROM subscriptions
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(subscription_id,),
|
|
)
|
|
row = cursor.fetchone()
|
|
return self.subscription_to_dict(row) if row else None
|
|
|
|
def update_status(self, subscription_id, new_status):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE subscriptions
|
|
SET status = {self.ph}
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(new_status, subscription_id),
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
return cursor.rowcount if hasattr(cursor, "rowcount") else 0
|
|
|
|
def delete(self, subscription_id):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
DELETE FROM subscriptions
|
|
WHERE id = {self.ph}
|
|
""",
|
|
(subscription_id,),
|
|
)
|
|
if hasattr(conn, "commit"):
|
|
conn.commit()
|
|
return cursor.rowcount if hasattr(cursor, "rowcount") else 0
|
|
|
|
def get_first_2_months_subscription_for_register_number(self, register_number):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
SELECT * FROM subscriptions
|
|
WHERE register_number = {self.ph} AND plan = 'first_2_months' AND status = 'active'
|
|
""",
|
|
(register_number,),
|
|
)
|
|
row = cursor.fetchone()
|
|
return self.subscription_to_dict(row) if row else None
|
|
|
|
def get_all(self):
|
|
with get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT * FROM subscriptions
|
|
ORDER BY created_at DESC
|
|
"""
|
|
)
|
|
rows = cursor.fetchall()
|
|
return [self.subscription_to_dict(row) for row in rows]
|
|
|
|
def update_subscription_statuses(self):
|
|
now = datetime.now()
|
|
subscriptions = self.get_all()
|
|
|
|
for sub in subscriptions:
|
|
end_date = datetime.fromisoformat(sub["end_date"])
|
|
days_left = (end_date - now).days
|
|
|
|
if days_left < 0 and sub["status"] != "expired":
|
|
self.update_status(sub["id"], "expired")
|
|
elif 0 <= days_left <= 5 and sub["status"] != "less_than_5_days":
|
|
self.update_status(sub["id"], "less_than_5_days") |