import sqlite3 from dataclasses import dataclass import hashlib from typing import Optional @dataclass class DocumentsStandardModel: id: Optional[int] = None category_id: Optional[int] = None user_id: Optional[int] = None name: Optional[str] = None path: Optional[str] = None created_at: Optional[str] = None access: Optional[str] = None class DocumentsStandard: def __init__(self, db_path="instance/app_database.db"): self.db_path = db_path self._create_audit_table() def _create_audit_table(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS documents_standard ( id INTEGER PRIMARY KEY AUTOINCREMENT, category_id INTEGER, user_id INTEGER NOT NULL, name TEXT, path TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, access TEXT ); """ ) conn.commit() def new_entry(self, entry:DocumentsStandardModel): """Create a new entry.""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO documents_standard (category_id, user_id, name, path, access) VALUES (?, ?, ?, ?, ?) """, (entry.category_id, entry.user_id, entry.name, entry.path, entry.access), ) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: return None def get_all_entries(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM documents_standard") rows = cursor.fetchall() return [ DocumentsStandardModel( id = row[0], category_id = row[1], user_id = row[2], name = row[3], path = row[4], created_at = row[5], access = row[6] ) for row in rows ] def get_entries_by_user_id(self, user_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM documents_standard WHERE user_id = ?", (user_id, )) rows = cursor.fetchall() return [ DocumentsStandardModel( id = row[0], category_id = row[1], user_id = row[2], name = row[3], path = row[4], created_at = row[5], access = row[6] ) for row in rows ] def get_entries_by_category(self, category_id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM documents_standard WHERE category_id = ?", (category_id, )) rows = cursor.fetchall() return [ DocumentsStandardModel( id = row[0], category_id = row[1], user_id = row[2], name = row[3], path = row[4], created_at = row[5], access = row[6] ) for row in rows ] def get_entry_by_id(self, id): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM documents_standard WHERE id = ?", (id, )) row = cursor.fetchone() if row: return DocumentsStandardModel( id = row[0], category_id = row[1], user_id = row[2], name = row[3], path = row[4], created_at = row[5], access = row[6] ) return None def delete_entry(self, id): try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # SQLite allows direct string comparison for ISO 8601 dates cursor.execute( "DELETE FROM documents_standard WHERE id = ? ", (id,) ) conn.commit() return cursor.rowcount # Returns the number of deleted rows except sqlite3.Error as e: print(f"An error occurred: {e}") return 0