Files

352 lines
12 KiB
Python

import sqlite3
from typing import Optional
class Chat:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_chatsessions_table()
self._create_chatmessages_table()
def _create_chatsessions_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS chatsessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
session_token TEXT,
status TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_message_at DATETIME ,
last_message_from TEXT,
unread_for_admin INTEGER,
unread_for_user INTEGER
);
""")
conn.commit()
def _create_chatmessages_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS chatmessages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER,
session_token TEXT,
sender_type TEXT,
sender_id INTEGER,
text TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
read_at DATETIME
);
""")
conn.commit()
def add_chatsession(self, chatsession):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO chatsessions (user_id, session_token, status, last_message_at, last_message_from, unread_for_admin, unread_for_user)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
chatsession['user_id'],
chatsession['session_token'],
chatsession['status'],
chatsession['last_message_at'],
chatsession['last_message_from'],
chatsession['unread_for_admin'],
chatsession['unread_for_user']
)
)
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_chatsession(self, chat_id: int):
"""Return a single chat session by its primary key id."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM chatsessions
WHERE id = ?
""",
(chat_id,),
)
row = cursor.fetchone()
if row:
return {
'id': row[0],
'user_id': row[1],
'session_token': row[2],
'status': row[3],
'created_at': row[4],
'last_message_at': row[5],
'last_message_from': row[6],
'unread_for_admin': row[7],
'unread_for_user': row[8],
}
return None
def get_chatsession_by_token(self, session_token: str) -> Optional[dict]:
"""Return the chat session associated with a given browser / user session token."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM chatsessions
WHERE session_token = ?
""",
(session_token,),
)
row = cursor.fetchone()
if row:
return {
'id': row[0],
'user_id': row[1],
'session_token': row[2],
'status': row[3],
'created_at': row[4],
'last_message_at': row[5],
'last_message_from': row[6],
'unread_for_admin': row[7],
'unread_for_user': row[8],
}
return None
def get_all_chatsessions(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM chatsessions
""")
rows = cursor.fetchall()
if rows:
buffer = []
for row in rows:
buffer.append(
{
'id': row[0],
'user_id': row[1],
'session_token': row[2],
'status': row[3],
'created_at' : row[4],
'last_message_at': row[5],
'last_message_from': row[6],
'unread_for_admin': row[7],
'unread_for_user': row[8],
}
)
return buffer
return []
def update_chatsession(self, chatsession):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE chatsessions SET
user_id = ?,
status = ?,
last_message_at = ?,
last_message_from = ?,
unread_for_admin = ?,
unread_for_user = ?
WHERE id = ?
''', (
chatsession['user_id'],
chatsession['status'],
chatsession['last_message_at'],
chatsession['last_message_from'],
chatsession['unread_for_admin'],
chatsession['unread_for_user'],
chatsession['id']
)
)
conn.commit()
def delete(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM chatsessions WHERE id=?;
''', (id,))
conn.commit()
def add_chatmessage(self, message: dict) -> bool:
"""Insert a new chat message.
Expected keys in message dict:
- chat_id (int)
- session_token (str)
- sender_type (str: 'client' or 'admin')
- sender_id (int or None)
- text (str)
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO chatmessages (chat_id, session_token, sender_type, sender_id, text)
VALUES (?, ?, ?, ?, ?)
""",
(
message['chat_id'],
message.get('session_token'),
message['sender_type'],
message.get('sender_id'),
message['text'],
),
)
conn.commit()
# Refresh last_message_at on the corresponding chat session
cursor.execute(
"""
UPDATE chatsessions
SET last_message_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(message["chat_id"],),
)
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_chatmessages_for_chat(self, chat_id: int):
"""Return all messages for a given chat, ordered by creation time."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, chat_id, session_token, sender_type, sender_id, text, created_at, read_at
FROM chatmessages
WHERE chat_id = ?
ORDER BY created_at ASC, id ASC
""",
(chat_id,),
)
rows = cursor.fetchall()
messages = []
for row in rows:
messages.append(
{
'id': row[0],
'chat_id': row[1],
'session_token': row[2],
'sender_type': row[3],
'sender_id': row[4],
'text': row[5],
'created_at': row[6],
'read_at': row[7],
}
)
return messages
def mark_messages_as_read(self, chat_id: int, reader_type: str):
"""Mark messages in a chat as read for a given reader.
Typically called with reader_type='admin' or 'client'.
We mark as read all messages where sender_type != reader_type and read_at IS NULL.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE chatmessages
SET read_at = CURRENT_TIMESTAMP
WHERE chat_id = ? AND sender_type != ? AND read_at IS NULL
""",
(chat_id, reader_type),
)
conn.commit()
def delete_chatmessages_for_chat(self, chat_id: int):
"""Delete all messages for a given chat (e.g. when cleaning up a closed session)."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM chatmessages
WHERE chat_id = ?
""",
(chat_id,),
)
conn.commit()
def cleanup_old_sessions(self):
"""Delete chat sessions and their messages older than today.
Any session whose effective date (last_message_at if set, otherwise created_at)
is strictly before today's date will be removed together with its messages.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Identify sessions to delete: older than today based on last_message_at or created_at
cursor.execute(
"""
SELECT id FROM chatsessions
WHERE DATE(COALESCE(last_message_at, created_at)) < DATE('now','localtime')
"""
)
rows = cursor.fetchall()
if not rows:
return
old_ids = [row[0] for row in rows]
# Delete messages for these sessions
cursor.execute(
f"""
DELETE FROM chatmessages
WHERE chat_id IN ({','.join(['?'] * len(old_ids))})
""",
old_ids,
)
# Delete the sessions themselves
cursor.execute(
f"""
DELETE FROM chatsessions
WHERE id IN ({','.join(['?'] * len(old_ids))})
""",
old_ids,
)
conn.commit()
def get_chatsessions_with_messages(self):
"""Return only chat sessions that have at least one message."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT cs.id, cs.user_id, cs.session_token, cs.status,
cs.created_at, cs.last_message_at, cs.last_message_from,
cs.unread_for_admin, cs.unread_for_user
FROM chatsessions cs
WHERE cs.id IN (SELECT DISTINCT chat_id FROM chatmessages)
"""
)
rows = cursor.fetchall()
sessions = []
for row in rows:
sessions.append(
{
"id": row[0],
"user_id": row[1],
"session_token": row[2],
"status": row[3],
"created_at": row[4],
"last_message_at": row[5],
"last_message_from": row[6],
"unread_for_admin": row[7],
"unread_for_user": row[8],
}
)
return sessions