first commit

This commit is contained in:
2025-09-17 08:36:17 +03:00
commit c2613de507
22 changed files with 1287 additions and 0 deletions

BIN
client/instance/dev.db Normal file

Binary file not shown.

50
client/main.py Normal file
View File

@@ -0,0 +1,50 @@
import flet as ft
from pages.auth.auth import Auth
from pages.home.dashboard import Dashboard
import os
os.environ["FLET_SECRET_KEY"] = os.urandom(12).hex()
def main(page: ft.Page):
page.title = "Solar DB"
page.theme_mode = ft.ThemeMode.LIGHT
page.theme = ft.Theme(color_scheme=ft.ColorScheme(primary=ft.Colors.BLUE))
page.vertical_alignment = ft.MainAxisAlignment.CENTER
page.horizontal_alignment = ft.CrossAxisAlignment.CENTER
page.padding = 0
def route_change(route):
page.controls.clear()
if route == "/auth":
if page.client_storage.get("is_authenticated"):
page.go("/dashboard")
return
login = Auth(page)
page.add(login.build())
page.update()
return
if route in ("/dashboard", "/", None):
if not page.client_storage.get("is_authenticated"):
page.go("/auth")
return
dashboard = Dashboard(page)
page.add(dashboard.build())
page.update()
return
# 5) Fallback 404
page.add(ft.Text("404: Page not found"))
page.update()
page.on_route_change = lambda _: route_change(page.route)
page.go("/auth")
ft.app(
target=main,
assets_dir="assets",
upload_dir="uploads",
view=ft.WEB_BROWSER,
port=5000
)

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,87 @@
import sqlite3
class DBApplications:
def __init__(self):
self.db_path = 'instance/dev.db'
self.create_table()
def create_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
name TEXT NOT NULL UNIQUE,
access_code TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'active'
);
""")
conn.commit()
def insert_application(self, user_id, name, access_code: str) -> bool:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO applications (user_id, name, access_code)
VALUES (?, ?, ?)
""", (user_id, name, access_code))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_application_by_id(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM applications
WHERE id = ?
""", (id,))
row = cursor.fetchone()
if row:
return {
"id": row[0],
"user_id":row[1],
"name": row[2],
"access_code": row[3],
"created_at": row[4],
"status": row[5],
}
else:
return None
def get_applications(self, user_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM applications
WHERE user_id = ?
""", (user_id,))
rows = cursor.fetchall()
all_rows = []
if rows:
for row in rows:
row = {
"id": row[0],
"user_id":row[1],
"name": row[2],
"access_code": row[3],
"created_at": row[4],
"status": row[5],
}
all_rows.append(row)
return all_rows
else:
return all_rows
def delete(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM applications WHERE id = ?
""", (id,))
conn.commit()

72
client/models/users.py Normal file
View File

@@ -0,0 +1,72 @@
import sqlite3
class Users:
def __init__(self):
self.db_path = 'instance/dev.db'
self.create_table()
def create_table(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'active'
);
""")
conn.commit()
def insert_user(self, email, password: str) -> bool:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (email, password)
VALUES (?, ?)
""", (email, password))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_user_by_id(self, id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM users
WHERE id = ?
""", (id,))
row = cursor.fetchone()
if row:
return {
"id": row[0],
"email": row[1],
"password": row[2],
"created_at": row[3],
"status":row[4],
}
else:
return None
def get_user_by_email(self, email):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM users
WHERE email = ?
""", (email,))
row = cursor.fetchone()
if row:
return {
"id": row[0],
"email": row[1],
"password": row[2],
"created_at": row[3],
"status":row[4],
}
else:
return None

Binary file not shown.

Binary file not shown.

Binary file not shown.

11
client/pages/auth/auth.py Normal file
View File

@@ -0,0 +1,11 @@
import flet as ft
from pages.auth.login import Login
class Auth:
def __init__(self, page: ft.Page):
self.page = page
self.login = Login(self.page, self)
self.placeholder = ft.Container(content=self.login.build())
def build(self):
return self.placeholder

View File

@@ -0,0 +1,41 @@
import flet as ft
class ForgotPassword:
def __init__(self, page: ft.Page, auth, login):
self.page = page
self.auth = auth
self.login = login
self.email = ft.TextField(label="E-mail")
self.code = ft.TextField(label="Code")
self.password = ft.TextField(
label="Password",
password=True,
can_reveal_password=True
)
self.confirm_password = ft.TextField(
label="Confirm Password",
password=True,
can_reveal_password=True
)
self.error = ft.Text(color=ft.Colors.RED)
def on_login_btn_click(self, e):
self.auth.placeholder.content.clean()
self.auth.placeholder.content = self.login.build()
self.auth.placeholder.update()
def build(self):
return ft.Container(
ft.Column(
[
ft.Text("Forgot Password?"),
self.email,
ft.Button("Recover"),
self.error,
ft.TextButton("Back to Login", on_click=self.on_login_btn_click)
],
spacing=20,
horizontal_alignment=ft.CrossAxisAlignment.CENTER
),
width=400,
)

View File

@@ -0,0 +1,76 @@
import flet as ft
from pages.auth.register import Register
from pages.auth.forgot_password import ForgotPassword
from models.users import Users
import hashlib
class Login:
def __init__(self, page: ft.Page, auth) -> None:
self.page = page
self.auth = auth
self.email = ft.TextField(label="E-mail")
self.password = ft.TextField(
label="Password",
password=True,
can_reveal_password=True
)
self.error = ft.Text(color=ft.Colors.RED)
def on_login_btn_click(self, e):
email = self.email.value
password = self.password.value
users = Users()
user = users.get_user_by_email(email)
if user['password'] == hashlib.md5(password.encode('utf-8')).hexdigest():
self.page.client_storage.set("is_authenticated", True)
self.page.client_storage.set('user_id', user['id'])
self.page.go('/')
else:
self.error.value = "Invalid credentials!"
self.error.update()
def on_register_btn_click(self, e):
register = Register(self.page, self.auth, self)
self.auth.placeholder.content.clean()
self.auth.placeholder.content = register.build()
self.auth.placeholder.update()
def on_forgot_password_btn_click(self, e):
forgot_password = ForgotPassword(self.page, self.auth, self)
self.auth.placeholder.content.clean()
self.auth.placeholder.content = forgot_password.build()
self.auth.placeholder.update()
def build(self):
return ft.Container(
ft.Column(
[
self.email,
self.password,
self.error,
ft.Button(
"Login",
width=150,
on_click=self.on_login_btn_click
),
ft.Row(
[
ft.TextButton(
"Register",
on_click=self.on_register_btn_click
),
ft.TextButton(
"Forgot Password",
on_click=self.on_forgot_password_btn_click
)
],
expand=True,
alignment=ft.MainAxisAlignment.SPACE_BETWEEN
),
],
spacing=20,
horizontal_alignment=ft.CrossAxisAlignment.CENTER
),
width=400,
)

View File

@@ -0,0 +1,120 @@
import flet as ft
import time
import re
import hashlib
from models.users import Users
from pages.auth.forgot_password import ForgotPassword
class Register:
def __init__(self, page: ft.Page, auth, login) -> None:
self.page = page
self.auth = auth
self.login = login
self.email = ft.TextField(label="E-mail")
self.password = ft.TextField(
label="Password",
password=True,
can_reveal_password=True
)
self.confirm_password = ft.TextField(
label="Confirm Password",
password=True,
can_reveal_password=True
)
self.error = ft.Text(color=ft.Colors.RED)
def on_login_btn_click(self, e):
self.auth.placeholder.content.clean()
self.auth.placeholder.content = self.login.build()
self.auth.placeholder.update()
def on_register_btn_click(self, e):
if not self.verify_email():
self.error.value = "Please insert a valid email address!"
self.error.update()
return
if not self.verify_password():
self.error.value = "Please a stronger password!"
self.error.update()
return
if not self.verify_confirm_password():
self.error.value = "Password and confirm password do not match!"
self.error.update()
return
if not self.register_user():
self.error.value = "Email already registred!"
self.error.update()
return
self.error.value = "User registered, you can now login!"
self.error.color = ft.Colors.GREEN
self.error.update()
time.sleep(3)
self.auth.placeholder.content.clean()
self.auth.placeholder.content = self.login.build()
self.auth.placeholder.update()
def on_forgot_password_btn_click(self, e):
forgot_password = ForgotPassword(self.page, self.auth, self.login)
self.auth.placeholder.content.clean()
self.auth.placeholder.content = forgot_password.build()
self.auth.placeholder.update()
def verify_email(self):
email = self.email.value
if not re.match(r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$", email):
return False
return True
def verify_password(self):
passwd = self.password.value
if len(passwd) >= 8 and re.search(r"\d", passwd) and re.search(r"[A-Z]", passwd):
return True
return False
def verify_confirm_password(self):
return True if self.password.value == self.confirm_password.value else False
def register_user(self):
email = self.email.value
password = self.password.value
users = Users()
if users.get_user_by_email(email) is not None:
return False
passwd_hash = hashlib.md5(password.encode('utf-8')).hexdigest()
users.insert_user(email, passwd_hash)
return True
def build(self):
return ft.Container(
ft.Column(
[
self.email,
self.password,
self.confirm_password,
self.error,
ft.Button(
"Register",
width=150,
on_click=self.on_register_btn_click
),
ft.Row(
[
ft.TextButton(
"Login",
on_click=self.on_login_btn_click
),
ft.TextButton(
"Forgot Password",
on_click=self.on_forgot_password_btn_click
)
],
expand=True,
alignment=ft.MainAxisAlignment.SPACE_BETWEEN
),
],
spacing=20,
horizontal_alignment=ft.CrossAxisAlignment.CENTER
),
width=400,
)

View File

@@ -0,0 +1,295 @@
import flet as ft
from models.applications import DBApplications
import json
import requests
class ApplicationPage:
def __init__(self, page: ft.Page, dashboard, app):
self.page = page
self.dashboard = dashboard
self.app = app
self.db_applications = DBApplications()
self.access_code = ft.Text("***********")
self.selected = None
self.all_data = self.get_data()
self.data_list = ft.ListView(
controls=self.create_list(self.all_data, self.load_details),
spacing=10,
expand=3,
)
self.data_details = ft.TextField(
#value=self.get_data(),
multiline=True,
min_lines=5,
max_lines=10,
expand=True,
read_only=True,
label='View',
)
self.editor = ft.TextField(
#value=self.get_data(),
label='Editor',
multiline=True,
min_lines=5,
max_lines=10,
#expand=True,
read_only=False
)
self.update_doc_id = ft.TextField(label="Doc Id")
self.update_fileds = ft.TextField(label="Update Fields")
self.query_field = ft.TextField(label="Field")
self.query_value = ft.TextField(label="Value")
self.query_operator = ft.Dropdown(
options=[
ft.dropdown.Option("=="),
ft.dropdown.Option("!="),
ft.dropdown.Option(">"),
ft.dropdown.Option(">="),
ft.dropdown.Option("<"),
ft.dropdown.Option("<="),
ft.dropdown.Option("in"),
ft.dropdown.Option("contains"),
],
value="=="
)
self.placeholder = ft.Column()
def show_access_code(self, e):
self.access_code.value = self.app['access_code']
self.access_code.update()
def format_json(self, data):
data = json.dumps(data, indent=4)
print(data)
return data
def load_details(self, e):
self.selected = e
self.data_details.value = self.format_json(e)
self.data_details.update()
def get_data(self):
response = requests.post('http://127.0.0.1:5001/get_all')
return json.loads(response.text) if response.status_code == 200 else []
def create_list(self, items, on_click_handler):
return [
ft.Container(
content=ft.Column(
[
ft.Text(item),
]
),
border_radius=10,
border=ft.border.all(1, ft.Colors.GREY_300),
padding=5,
#bgcolor = ft.Colors.BLUE_50 if item == self.selected else None,
ink=True,
on_click=lambda e, id=item: on_click_handler(id)
) for item in items
]
def insert_data(self, e):
data = self.editor.value.replace("\n", "")
data = json.loads(data)
if data:
document = {"doc":data}
print(document)
requests.post('http://127.0.0.1:5001/insert', json=json.dumps(document))
self.refresh_list('')
self.editor.value = ''
self.editor.update()
def update_data(self, e):
if self.update_doc_id.value:
json_file = {
"doc_id": int(self.update_doc_id.value),
"fields": self.update_fileds.value,
}
else:
json_file = {
"where":{
"field":self.query_field.value,
"op":self.query_operator.value,
"value":self.query_value.value,
"fields": self.update_fileds.value,
}
}
if self.update_doc_id or self.query_field.value:
response = requests.post('http://127.0.0.1:5001/update', json=json.dumps(json_file))
print(response.text)
result = json.loads(response.text) if response.status_code == 200 else []
self.refresh_list('')
def delete_data(self, e):
if self.update_doc_id.value == None:
json_file = {
"where":{
"field":self.query_field.value,
"op":self.query_operator.value,
"value":self.query_value.value,
}
}
else:
json_file = {
"doc_id": int(self.update_doc_id.value),
}
if self.update_doc_id or self.query_field.value:
response = requests.post('http://127.0.0.1:5001/remove', json=json.dumps(json_file))
print(response.text)
result = json.loads(response.text) if response.status_code == 200 else []
self.refresh_list('')
def query_data(self, e):
'''Added a tiny query DSL so you can filter with { "where": { "field":"user", "op":"==", "value":"abc" } } (supports ==, !=, >, >=, <, <=, in, contains).'''
json_file = {
"where":{
"field":self.query_field.value,
"op":self.query_operator.value,
"value":self.query_value.value
}
}
if self.query_field.value and self.query_value.value:
response = requests.post('http://127.0.0.1:5001/search', json=json.dumps(json_file))
print(response.text)
result = json.loads(response.text) if response.status_code == 200 else []
self.data_list.controls.clear()
self.data_list.controls = self.create_list(result, self.load_details)
self.data_list.update()
def refresh_list(self, e):
self.all_data = self.get_data()
self.data_list.controls.clear()
self.data_list.controls = self.create_list(self.all_data, self.load_details)
self.data_list.update()
def on_search_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
ft.Text("Query", weight=ft.FontWeight.BOLD, size=15),
self.query_field,
self.query_operator,
self.query_value,
ft.Row(
[
ft.Button("Query", on_click=self.query_data),
ft.Button("Reset List", on_click=self.refresh_list)
]
)
]
self.placeholder.update()
def on_inseert_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
self.editor,
ft.Button("Insert", on_click=self.insert_data)
]
self.placeholder.update()
def on_update_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
ft.Text("Update", weight=ft.FontWeight.BOLD, size=15),
self.update_fileds,
ft.Text('where'),
self.query_field,
self.query_operator,
self.query_value,
ft.Text("or"),
self.update_doc_id,
ft.Row(
[
ft.Button("Update", on_click=self.update_data),
]
)
]
self.placeholder.update()
def on_delete_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
ft.Text("Delete", weight=ft.FontWeight.BOLD, size=15),
ft.Text('where'),
self.query_field,
self.query_operator,
self.query_value,
ft.Text("or"),
self.update_doc_id,
ft.Row(
[
ft.Button("Delete", on_click=self.delete_data),
]
)
]
self.placeholder.update()
def build(self):
return ft.Container(
content=ft.Column(
[
ft.Row(
[
ft.Text("Manage Application", weight=ft.FontWeight.BOLD, size=20),
ft.Button("Show Acess Code",icon=ft.Icons.PASSWORD, on_click=self.show_access_code)
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN
),
ft.Row(
[
self.access_code
],
alignment=ft.MainAxisAlignment.END
),
ft.VerticalDivider(width=1),
ft.Row(
[
ft.Column(
[
ft.TextButton(
"Database Items",
style=ft.ButtonStyle(
text_style=ft.TextStyle(
weight=ft.FontWeight.BOLD,
size=15
)
),
on_click=self.refresh_list
),
self.data_details,
self.data_list
],
expand=1,
),
ft.Column(
[
ft.Row(
[
ft.Text("Editor", weight=ft.FontWeight.BOLD, size=15),
ft.Button("Search", on_click = self.on_search_btn_click, icon=ft.Icons.SEARCH),
ft.Button("Insert", on_click = self.on_inseert_btn_click, icon=ft.Icons.ADD_CIRCLE),
ft.Button("Update", on_click = self.on_update_btn_click, icon=ft.Icons.UPDATE),
ft.Button("Delete", on_click = self.on_delete_btn_click, icon=ft.Icons.DELETE),
]
),
self.placeholder,
],
expand=True,
alignment=ft.MainAxisAlignment.START
)
],
vertical_alignment=ft.CrossAxisAlignment.START,
expand=True
)
],
expand=True,
),
expand=True
)

View File

@@ -0,0 +1,164 @@
import flet as ft
from models.applications import DBApplications
import random
import string
from pages.home.application_page import ApplicationPage
class Applications:
def __init__(self, page: ft.Page, dashboard):
self.page = page
self.dashboard = dashboard
self.db_applications = DBApplications()
self.user_id = self.page.client_storage.get('user_id')
self.add_dialog = ft.AlertDialog(
title=ft.Text("Add Application"),
content=ft.TextField(label="Name"),
actions=[
ft.FilledButton(
"Save",
width=100,
on_click=self.on_save_btn_click,
bgcolor=ft.Colors.BLUE
),
ft.FilledButton(
"Cancel",
width=100,
on_click=self.on_cancel_btn_click,
bgcolor=ft.Colors.GREY
),
]
)
self.all_applications = self.db_applications.get_applications(self.user_id)
self.applications_list = ft.Column(
controls=self.create_list(self.all_applications, self.on_manage_app_btn_click, self.on_delete_app_btn_click),
)
self.delete_dialog = ft.AlertDialog(
title="Delete Application?",
actions=[
ft.FilledButton(
"Yes", on_click=self.on_yes_button_click,
width=100,
bgcolor=ft.Colors.BLUE
),
ft.FilledButton(
"No", on_click=self.on_no_button_click,
width=100,
bgcolor=ft.Colors.GREY
)
]
)
self.selected_application_id = None
def on_yes_button_click(self, e):
self.page.close(self.delete_dialog)
self.db_applications.delete(self.selected_application_id)
self.selected_application_id = None
self.all_applications = self.db_applications.get_applications(self.user_id)
self.applications_list.controls.clear()
self.applications_list.controls = self.create_list(self.all_applications, self.on_manage_app_btn_click, self.on_delete_app_btn_click)
self.applications_list.update()
def on_no_button_click(self, e):
self.page.close(self.delete_dialog)
def on_add_btn_click(self, e):
self.page.open(self.add_dialog)
def on_save_btn_click(self, e):
application_name = self.add_dialog.content.value
self.save_new_application(application_name)
self.add_dialog.content.value = ''
self.page.close(self.add_dialog)
self.all_applications = self.db_applications.get_applications(self.user_id)
self.applications_list.controls.clear()
self.applications_list.controls = self.create_list(self.all_applications, self.on_manage_app_btn_click, self.on_delete_app_btn_click)
self.applications_list.update()
def on_cancel_btn_click(self, e):
self.page.close(self.add_dialog)
def save_new_application(self, name):
access_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12))
self.db_applications.insert_application(self.user_id, name, access_code)
def on_manage_app_btn_click(self, item):
applications = ApplicationPage(self.page, self.dashboard, item)
self.dashboard.placeholder.content.clean()
self.dashboard.placeholder.content = applications.build()
self.dashboard.placeholder.update()
def on_delete_app_btn_click(self, id):
self.selected_application_id = id
self.page.open(self.delete_dialog)
def create_list(self, items, on_click_handler, on_click_handler2):
elements = []
row = ft.Row()
counter = 0
for item in items:
row.controls.append(
ft.Container(
ft.Row(
[
ft.Icon(ft.Icons.PHONE_ANDROID, size=100),
ft.Column(
[
ft.Text(item["name"] if len(item['name']) < 35 else item['name'][:35]+"...", expand=True, weight=ft.FontWeight.BOLD),
ft.Row(
[
ft.IconButton(
on_click=lambda e, id=item: on_click_handler(id),
icon = ft.Icons.EDIT,
),
ft.IconButton(
on_click=lambda e, id=item['id']: on_click_handler2(id),
icon=ft.Icons.DELETE,
icon_color=ft.Colors.RED
)
]
)
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
expand=True
)
]
),
border_radius=10,
border=ft.border.all(1, ft.Colors.GREY_300),
padding=5
)
)
counter += 1
if counter % 3 == 0:
elements.append(row)
row = ft.Row()
if len(row.controls)> 0:
elements.append(row)
print(elements)
return elements
def build(self):
return ft.Container(
content=ft.Column(
[
ft.Row(
[
ft.Text("Applications", weight=ft.FontWeight.BOLD, size=20),
ft.FloatingActionButton(icon=ft.Icons.ADD, on_click=self.on_add_btn_click)
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN
),
ft.Row(
[
self.applications_list
],
alignment=ft.MainAxisAlignment.CENTER,
)
],
expand=True
),
padding=10,
expand=True
)

View File

@@ -0,0 +1,74 @@
import flet as ft
from pages.home.applications import Applications
class Dashboard:
def __init__(self, page: ft.Page):
self.page = page
self.applications = Applications(self.page, self)
self.placeholder = ft.Container(
content=self.applications.build(),
expand=True
)
self.rail = ft.NavigationRail(
selected_index=0,
min_width=100,
min_extended_width=400,
group_alignment=-0.9,
leading=ft.Text("Logo"),
destinations=[
ft.NavigationRailDestination(
icon=ft.Icons.LIST_ALT_OUTLINED,
selected_icon=ft.Icons.LIST_ALT,
label="Applications",
),
ft.NavigationRailDestination(
icon=ft.Icons.SETTINGS_OUTLINED,
selected_icon=ft.Icon(ft.Icons.SETTINGS),
label_content=ft.Text("Settings"),
),
ft.NavigationRailDestination(
icon=ft.Icon(ft.Icons.LOGOUT_OUTLINED),
selected_icon=ft.Icon(ft.Icons.LOGOUT),
label="Logout",
),
],
on_change=lambda e: self.navigate(e)
)
def navigate(self, e):
print(e.data)
if e.data == '0':
applications = Applications(self.page, self)
self.placeholder.content.clean()
self.placeholder.content = applications.build()
self.placeholder.update()
if e.data == '2':
self.page.client_storage.remove("is_authenticated")
self.page.go('/auth')
def build(self):
return ft.Container(
content=ft.Column(
[
ft.Row(
[
self.rail,
ft.VerticalDivider(width=1),
ft.Column(
[
self.placeholder
],
alignment=ft.MainAxisAlignment.START,
expand=True
),
],
expand=True,
alignment=ft.MainAxisAlignment.START,
vertical_alignment=ft.CrossAxisAlignment.START
)
],
expand=True
),
expand=True,
padding=10
)

1
solarDb/db.json Normal file
View File

@@ -0,0 +1 @@
{"_default": {"2": {"test": "abcd"}, "3": {"test": "abcdefg"}, "4": {"test23": "abcd"}, "5": {"test23": "abcd"}, "6": {"abcd": "test"}, "7": {"test": "foaie ferde"}, "8": {"test": "abcdefg", "test2": {"abcderfg": "1234"}}, "9": {"test": "abcdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "10": {"test": "abknojncd"}, "11": {"test": "abcuihdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "12": {"boby": 10, "test": "abknojncd"}, "13": {"test": "abcuihdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "14": {"id": 9, "test": "abcdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "15": {"id": 90, "test": "abcdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "16": {"test": "abcdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "17": {"doc_id": 9, "test": "abcdefg", "test2": {"abcderfg": ["1234", "99543"]}}, "18": {"doc_id": 9, "test": "abcdefg", "test2": {"rrrr": ["1234", "99543"]}}, "19": {"test2": {"abcderfg": ["1234", "99543"]}}, "20": {"test": "abcdefgh"}}}

296
solarDb/server.py Normal file
View File

@@ -0,0 +1,296 @@
from flask import Flask, request, jsonify
from tinydb import TinyDB, Query, where
from tinydb.operations import set as ops_set
import json
app = Flask(__name__)
# Single TinyDB file for now (can be swapped to per-table later)
db = TinyDB("db.json")
# --- Helpers -----------------------------------------------------------------
def json_body(required: bool = True):
data = request.get_json(silent=True)
if data is None:
raw = request.get_data(as_text=True)
if raw:
try:
data = json.loads(raw)
except Exception:
if required:
return None, (jsonify({"error": "Expected JSON body"}), 400)
else:
data = None
elif isinstance(data, str):
try:
data = json.loads(data)
except Exception:
if required:
return None, (jsonify({"error": "Expected JSON body"}), 400)
else:
data = None
if required and data is None:
return None, (jsonify({"error": "Expected JSON body"}), 400)
return data, None
def build_query(field: str, op: str, value):
"""Translate a simple JSON filter into a TinyDB Query.
Supported ops: ==, !=, >, >=, <, <=, in, contains
"""
f = where(field)
if op == "==":
return f == value
if op == "!=":
return f != value
if op == ">":
return f > value
if op == ">=":
return f >= value
if op == "<":
return f < value
if op == "<=":
return f <= value
if op == "in":
# value should be a list
if not isinstance(value, list):
value = [value]
return f.one_of(value)
if op == "contains":
# substring for strings; membership for lists
return f.test(lambda v: (isinstance(v, str) and isinstance(value, str) and value in v)
or (isinstance(v, (list, tuple, set)) and value in v))
raise ValueError(f"Unsupported op: {op}")
def parse_assignment(expr: str):
"""Parse a string expression like 'field = \"value\"' into (field, value)."""
if '=' not in expr:
raise ValueError("Expression must contain '='")
field_part, value_part = expr.split('=', 1)
field = field_part.strip()
value_raw = value_part.strip()
# Remove trailing comma if present
if value_raw.endswith(','):
value_raw = value_raw[:-1].rstrip()
# Strip quotes if value starts and ends with same quote
if (len(value_raw) >= 2) and ((value_raw[0] == value_raw[-1]) and value_raw[0] in ("'", '"')):
value = value_raw[1:-1]
else:
# Try to parse JSON for numbers, booleans, null, objects, arrays, or quoted strings
# Check if looks like JSON
json_like_start = ('{', '[', '"', '-', 't', 'f', 'n') + tuple(str(i) for i in range(10))
if value_raw and (value_raw[0] in json_like_start):
try:
value = json.loads(value_raw)
except Exception:
value = value_raw
else:
value = value_raw
return field, value
# --- Routes ------------------------------------------------------------------
@app.route("/healthz", methods=["GET"])
def healthz():
return jsonify({"status": "ok"}), 200
@app.route("/insert", methods=["POST"])
def insert():
body, err = json_body()
if err:
return err
# Accept either {"doc": {...}} or raw JSON object as the document
if isinstance(body, dict) and "doc" in body:
doc = body["doc"]
else:
doc = body
if not isinstance(doc, dict):
return jsonify({"error": "Body must be an object or {doc: {...}}"}), 400
doc_id = db.insert(doc)
return jsonify({"message": "inserted", "doc_id": doc_id}), 200
@app.route("/insert_many", methods=["POST"])
def insert_many():
body, err = json_body()
if err:
return err
docs = body.get("docs") if isinstance(body, dict) else None
if not isinstance(docs, list) or not all(isinstance(d, dict) for d in docs):
return jsonify({"error": "Expected {docs: [ {...}, {...} ]}"}), 400
ids = db.insert_multiple(docs)
return jsonify({"message": "inserted", "doc_ids": ids}), 200
@app.route("/get_all", methods=["POST", "GET"])
def get_all():
data = db.all()
_items = []
for item in data:
_id = item.doc_id
d = {}
d[_id] = item
_items.append(d)
return jsonify(_items), 200
@app.route("/get", methods=["POST"])
def get_one():
body, err = json_body()
if err:
return err
doc_id = body.get("doc_id") if isinstance(body, dict) else None
if not isinstance(doc_id, int):
return jsonify({"error": "Expected {doc_id: <int>}"}), 400
doc = db.get(doc_id=doc_id)
if doc is None:
return jsonify({"error": "not found"}), 404
# Include doc_id so clients can reference it later
doc_with_id = dict(doc)
doc_with_id["doc_id"] = doc_id
return jsonify(doc_with_id), 200
@app.route("/search", methods=["POST"])
def search():
body, err = json_body()
print (body)
if err:
return err
if not isinstance(body, dict):
return jsonify({"error": "Expected JSON object"}), 400
# Accept either a single filter or a list of filters (ANDed)
filters = body.get("where")
if isinstance(filters, dict):
filters = [filters]
if not isinstance(filters, list) or not filters:
return jsonify({"error": "Expected {where: {field, op, value}} or a list of them"}), 400
try:
q = None
for f in filters:
field = f.get("field")
op = f.get("op")
value = f.get("value")
if not isinstance(field, str) or not isinstance(op, str):
return jsonify({"error": "Each filter needs 'field' and 'op'"}), 400
clause = build_query(field, op, value)
q = clause if q is None else (q & clause)
except ValueError as e:
return jsonify({"error": str(e)}), 400
if not callable(q):
return jsonify({"error":"Invalid query built"}), 400
results = db.search(q)
_items = []
for item in results:
_id = item.doc_id
d = {}
d[_id] = item
_items.append(d)
return jsonify(_items), 200
@app.route("/update", methods=["POST"])
def update():
body, err = json_body()
if err:
return err
if not isinstance(body, dict):
return jsonify({"error": "Expected JSON object"}), 400
print(body)
# Option A: update by doc_id
if "doc_id" in body and "fields" in body:
doc_id = body.get("doc_id")
fields = body.get("fields")
if isinstance(fields, str):
try:
field, value = parse_assignment(fields)
except Exception as e:
return jsonify({"error": f"Failed to parse fields string: {str(e)}"}), 400
updated = db.update(ops_set(field, value), doc_ids=[doc_id])
return jsonify({"updated": len(updated)}), 200
if not isinstance(doc_id, int) or not isinstance(fields, dict):
return jsonify({"error": "Expected {doc_id: int, fields: {...}}"}), 400
updated = db.update(fields, doc_ids=[doc_id])
return jsonify({"updated": len(updated)}), 200
# Option B: update by query
if "where" in body and "fields" in body:
filters = body.get("where")
fields = body.get("fields")
if isinstance(filters, dict):
filters = [filters]
if not isinstance(filters, list):
return jsonify({"error": "Expected {where: [...], fields: {...}}"}), 400
try:
q = None
for f in filters:
clause = build_query(f.get("field"), f.get("op"), f.get("value"))
q = clause if q is None else (q & clause)
except ValueError as e:
return jsonify({"error": str(e)}), 400
if isinstance(fields, str):
try:
field, value = parse_assignment(fields)
except Exception as e:
return jsonify({"error": f"Failed to parse fields string: {str(e)}"}), 400
updated = db.update(ops_set(field, value), q)
return jsonify({"updated": len(updated)}), 200
if not isinstance(fields, dict):
return jsonify({"error": "Expected {where: [...], fields: {...}}"}), 400
updated = db.update(fields, q)
return jsonify({"updated": len(updated)}), 200
return jsonify({"error": "Provide either {doc_id, fields} or {where, fields}"}), 400
@app.route("/remove", methods=["POST"])
def remove():
body, err = json_body()
if err:
return err
if not isinstance(body, dict):
return jsonify({"error": "Expected JSON object"}), 400
# Option A: by doc_id
if "doc_id" in body:
doc_id = body.get("doc_id")
if not isinstance(doc_id, int):
return jsonify({"error": "Expected {doc_id: int}"}), 400
removed = db.remove(doc_ids=[doc_id])
return jsonify({"removed": len(removed)}), 200
# Option B: by query
if "where" in body:
filters = body.get("where")
if isinstance(filters, dict):
filters = [filters]
if not isinstance(filters, list):
return jsonify({"error": "Expected {where: [...]}"}), 400
try:
q = None
for f in filters:
clause = build_query(f.get("field"), f.get("op"), f.get("value"))
q = clause if q is None else (q & clause)
except ValueError as e:
return jsonify({"error": str(e)}), 400
removed = db.remove(q)
return jsonify({"removed": len(removed)}), 200
return jsonify({"error": "Provide {doc_id} or {where}"}), 400
@app.route("/truncate", methods=["POST"])
def truncate():
db.truncate()
return jsonify({"message": "truncated"}), 200
if __name__ == "__main__":
# Note: Flask dev server is single-threaded by default; good for local testing.
app.run(debug=True, port=5001)