Files
JuridicBloc/server/models/audit.py
2026-06-13 21:46:37 +03:00

109 lines
3.5 KiB
Python

import sqlite3
from dataclasses import dataclass
import hashlib
from typing import Optional
@dataclass
class AuditModel:
id: Optional[int] = None
user_id: Optional[int] = None
action: Optional[str] = None
endpoint: Optional[str] = None
created_at: Optional[str] = None
status: Optional[str] = None
class Audit:
def __init__(self, db_path="instance/app_database.db"):
self.db_path = db_path
self._create_audit_table()
def _create_audit_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
action TEXT,
endpoint TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status TEXT
);
"""
)
conn.commit()
def new_entry(self, entry:AuditModel):
"""Create a new entry."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO audit (user_id, action, endpoint, status)
VALUES (?, ?, ?, ?)
""",
(entry.user_id, entry.action, entry.endpoint, entry.status),
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
return None
def get_all_entries(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM audit")
rows = cursor.fetchall()
return [
AuditModel(
id = row[0],
user_id = row[1],
action = row[2],
endpoint = row[3],
created_at = row[4],
status = row[5]
)
for row in rows
]
def get_entries_by_user_id(self, user_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM audit WHERE user_id = ?", (user_id, ))
rows = cursor.fetchall()
return [
AuditModel(
id = row[0],
user_id = row[1],
action = row[2],
endpoint = row[3],
created_at = row[4],
status = row[5]
)
for row in rows
]
def delete_entries_older_than(self, date_string):
"""
Deletes logs older than the provided date.
Expected date_string format: 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# SQLite allows direct string comparison for ISO 8601 dates
cursor.execute(
"DELETE FROM audit WHERE created_at < ?",
(date_string,)
)
conn.commit()
return cursor.rowcount # Returns the number of deleted rows
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return 0