import sqlite3 from typing import Optional class Chat: def __init__(self, db_path="instance/app_database.db"): self.db_path = db_path self._create_chatsessions_table() self._create_chatmessages_table() def _create_chatsessions_table(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS chatsessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, session_token TEXT, status TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_message_at DATETIME , last_message_from TEXT, unread_for_admin INTEGER, unread_for_user INTEGER ); """) conn.commit() def _create_chatmessages_table(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS chatmessages ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER, session_token TEXT, sender_type TEXT, sender_id INTEGER, text TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, read_at DATETIME ); """) conn.commit() def add_chatsession(self, chatsession): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO chatsessions (user_id, session_token, status, last_message_at, last_message_from, unread_for_admin, unread_for_user) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( chatsession['user_id'], chatsession['session_token'], chatsession['status'], chatsession['last_message_at'], chatsession['last_message_from'], chatsession['unread_for_admin'], chatsession['unread_for_user'] ) ) conn.commit() return True except sqlite3.IntegrityError: return False def get_chatsession(self, chat_id: int): """Return a single chat session by its primary key id.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM chatsessions WHERE id = ? """, (chat_id,), ) row = cursor.fetchone() if row: return { 'id': row[0], 'user_id': row[1], 'session_token': row[2], 'status': row[3], 'created_at': row[4], 'last_message_at': row[5], 'last_message_from': row[6], 'unread_for_admin': row[7], 'unread_for_user': row[8], } return None def get_chatsession_by_token(self, session_token: str) -> Optional[dict]: """Return the chat session associated with a given browser / user session token.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM chatsessions WHERE session_token = ? """, (session_token,), ) row = cursor.fetchone() if row: return { 'id': row[0], 'user_id': row[1], 'session_token': row[2], 'status': row[3], 'created_at': row[4], 'last_message_at': row[5], 'last_message_from': row[6], 'unread_for_admin': row[7], 'unread_for_user': row[8], } return None def get_all_chatsessions(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM chatsessions """) rows = cursor.fetchall() if rows: buffer = [] for row in rows: buffer.append( { 'id': row[0], 'user_id': row[1], 'session_token': row[2], 'status': row[3], 'created_at' : row[4], 'last_message_at': row[5], 'last_message_from': row[6], 'unread_for_admin': row[7], 'unread_for_user': row[8], } ) return buffer return [] def update_chatsession(self, chatsession): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' UPDATE chatsessions SET user_id = ?, status = ?, last_message_at = ?, last_message_from = ?, unread_for_admin = ?, unread_for_user = ? WHERE id = ? ''', ( chatsession['user_id'], chatsession['status'], chatsession['last_message_at'], chatsession['last_message_from'], chatsession['unread_for_admin'], chatsession['unread_for_user'], chatsession['id'] ) ) conn.commit() def delete(self, id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' DELETE FROM chatsessions WHERE id=?; ''', (id,)) conn.commit() def add_chatmessage(self, message: dict) -> bool: """Insert a new chat message. Expected keys in message dict: - chat_id (int) - session_token (str) - sender_type (str: 'client' or 'admin') - sender_id (int or None) - text (str) """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO chatmessages (chat_id, session_token, sender_type, sender_id, text) VALUES (?, ?, ?, ?, ?) """, ( message['chat_id'], message.get('session_token'), message['sender_type'], message.get('sender_id'), message['text'], ), ) conn.commit() # Refresh last_message_at on the corresponding chat session cursor.execute( """ UPDATE chatsessions SET last_message_at = CURRENT_TIMESTAMP WHERE id = ? """, (message["chat_id"],), ) conn.commit() return True except sqlite3.IntegrityError: return False def get_chatmessages_for_chat(self, chat_id: int): """Return all messages for a given chat, ordered by creation time.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, chat_id, session_token, sender_type, sender_id, text, created_at, read_at FROM chatmessages WHERE chat_id = ? ORDER BY created_at ASC, id ASC """, (chat_id,), ) rows = cursor.fetchall() messages = [] for row in rows: messages.append( { 'id': row[0], 'chat_id': row[1], 'session_token': row[2], 'sender_type': row[3], 'sender_id': row[4], 'text': row[5], 'created_at': row[6], 'read_at': row[7], } ) return messages def mark_messages_as_read(self, chat_id: int, reader_type: str): """Mark messages in a chat as read for a given reader. Typically called with reader_type='admin' or 'client'. We mark as read all messages where sender_type != reader_type and read_at IS NULL. """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ UPDATE chatmessages SET read_at = CURRENT_TIMESTAMP WHERE chat_id = ? AND sender_type != ? AND read_at IS NULL """, (chat_id, reader_type), ) conn.commit() def delete_chatmessages_for_chat(self, chat_id: int): """Delete all messages for a given chat (e.g. when cleaning up a closed session).""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ DELETE FROM chatmessages WHERE chat_id = ? """, (chat_id,), ) conn.commit() def cleanup_old_sessions(self): """Delete chat sessions and their messages older than today. Any session whose effective date (last_message_at if set, otherwise created_at) is strictly before today's date will be removed together with its messages. """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Identify sessions to delete: older than today based on last_message_at or created_at cursor.execute( """ SELECT id FROM chatsessions WHERE DATE(COALESCE(last_message_at, created_at)) < DATE('now','localtime') """ ) rows = cursor.fetchall() if not rows: return old_ids = [row[0] for row in rows] # Delete messages for these sessions cursor.execute( f""" DELETE FROM chatmessages WHERE chat_id IN ({','.join(['?'] * len(old_ids))}) """, old_ids, ) # Delete the sessions themselves cursor.execute( f""" DELETE FROM chatsessions WHERE id IN ({','.join(['?'] * len(old_ids))}) """, old_ids, ) conn.commit() def get_chatsessions_with_messages(self): """Return only chat sessions that have at least one message.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ SELECT cs.id, cs.user_id, cs.session_token, cs.status, cs.created_at, cs.last_message_at, cs.last_message_from, cs.unread_for_admin, cs.unread_for_user FROM chatsessions cs WHERE cs.id IN (SELECT DISTINCT chat_id FROM chatmessages) """ ) rows = cursor.fetchall() sessions = [] for row in rows: sessions.append( { "id": row[0], "user_id": row[1], "session_token": row[2], "status": row[3], "created_at": row[4], "last_message_at": row[5], "last_message_from": row[6], "unread_for_admin": row[7], "unread_for_user": row[8], } ) return sessions