Files
solardb/client/pages/home/application_page.py

367 lines
13 KiB
Python

import flet as ft
from models.applications import DBApplications
import json
import requests
import os
# All requests now use Authorization and X-Application-Token headers.
# application_id is sent as string during /connect.
API_BASE_URL = os.getenv("API_BASE_URL", "http://127.0.0.1:5001")
class ApplicationPage:
def __init__(self, page: ft.Page, dashboard, app):
self.page = page
self.dashboard = dashboard
self.app = app
self.db_applications = DBApplications()
self.access_token = self.get_access_token()
self.access_code = ft.Text("***********")
self.selected = None
self.show_hide_access_code = ft.Button("Show Acess Code",icon=ft.Icons.PASSWORD, on_click=self.show_access_code)
self.all_data = self.get_data()
self.data_list = ft.ListView(
controls=self.create_list(self.all_data, self.load_details),
spacing=10,
expand=3,
)
self.data_details = ft.TextField(
#value=self.get_data(),
multiline=True,
min_lines=5,
max_lines=10,
expand=True,
read_only=True,
label='View',
)
self.editor = ft.TextField(
#value=self.get_data(),
label='Insert valid JSON data',
multiline=True,
min_lines=5,
max_lines=10,
#expand=True,
read_only=False
)
self.update_doc_id = ft.TextField(label="Doc Id")
self.update_filed = ft.TextField(label="Field")
self.update_value = ft.TextField(label="Value")
self.query_field = ft.TextField(label="Field")
self.query_value = ft.TextField(label="Value")
self.query_operator = ft.Dropdown(
options=[
ft.dropdown.Option("=="),
ft.dropdown.Option("!="),
ft.dropdown.Option(">"),
ft.dropdown.Option(">="),
ft.dropdown.Option("<"),
ft.dropdown.Option("<="),
ft.dropdown.Option("in"),
ft.dropdown.Option("contains"),
],
value="=="
)
self.placeholder = ft.Column()
def get_access_token(self):
application_token = self.app['access_code']
application_id = str(self.app['id'])
data = {
'application_id':application_id,
'application_token':application_token
}
response = requests.post(f"{API_BASE_URL}/connect", json=data)
resp_json = {}
try:
resp_json = response.json()
except Exception:
pass
print({"request": data, "status": response.status_code, "response": resp_json})
if response.status_code != 200 or 'token' not in resp_json:
# Fail gracefully; caller can handle empty token
return ""
return resp_json['token']
def show_access_code(self, e):
if self.access_code.value == '***********':
self.access_code.value = self.app['access_code']
self.show_hide_access_code.text = "Hide Acess Code"
else:
self.access_code.value = "***********"
self.show_hide_access_code.text = "Show Acess Code"
self.access_code.update()
self.show_hide_access_code.update()
def format_json(self, data):
data = json.dumps(data, indent=4)
print(data)
return data
def load_details(self, e):
self.selected = e
self.data_details.value = self.format_json(e)
self.data_details.update()
def get_data(self):
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Application-Token': self.app['access_code']
}
response = requests.post(f"{API_BASE_URL}/get_all", headers=headers)
return json.loads(response.text) if response.status_code == 200 else []
def create_list(self, items, on_click_handler):
return [
ft.Container(
content=ft.Column(
[
ft.Text(item),
]
),
border_radius=10,
border=ft.border.all(1, ft.Colors.GREY_300),
padding=5,
#bgcolor = ft.Colors.BLUE_50 if item == self.selected else None,
ink=True,
on_click=lambda e, id=item: on_click_handler(id)
) for item in items
]
def insert_data(self, e):
data = self.editor.value.replace("\n", "")
data = json.loads(data)
if data:
document = {"doc":data}
print(document)
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Application-Token': self.app['access_code']
}
requests.post(f"{API_BASE_URL}/insert", headers=headers, json=document)
self.refresh_list('')
self.editor.value = ''
self.editor.update()
def update_data(self, e):
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Application-Token': self.app['access_code']
}
if self.update_doc_id.value:
json_file = {
"doc_id": int(self.update_doc_id.value),
"fields": {self.update_filed.value: self.update_value.value},
}
else:
json_file = {
"where": {
"field": self.query_field.value,
"op": self.query_operator.value,
"value": self.query_value.value,
},
"fields": {self.update_filed.value: self.update_value.value},
}
if self.update_doc_id.value or self.query_field.value:
response = requests.post(f"{API_BASE_URL}/update", headers=headers, json=json_file)
print(response.text)
result = json.loads(response.text) if response.status_code == 200 else []
self.refresh_list('')
self.update_filed.value = ''
self.update_filed.update()
self.update_value.value = ''
self.update_value.update()
self.query_field.value = ''
self.query_field.update()
self.query_value.value = ''
self.query_value.update()
self.update_doc_id.value = ''
self.update_doc_id.update()
self.placeholder.controls.clear()
def delete_data(self, e):
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Application-Token': self.app['access_code']
}
if not self.update_doc_id.value:
json_file = {
"where":{
"field":self.query_field.value,
"op":self.query_operator.value,
"value":self.query_value.value,
}
}
else:
json_file = {
"doc_id": int(self.update_doc_id.value),
}
if self.update_doc_id.value or self.query_field.value:
response = requests.post(f"{API_BASE_URL}/remove", headers=headers, json=json_file)
print(response.text)
result = json.loads(response.text) if response.status_code == 200 else []
self.refresh_list('')
self.query_field.value = ''
self.query_field.update()
self.query_value.value = ''
self.query_value.update()
self.update_doc_id.value = ''
self.update_doc_id.update()
def query_data(self, e):
'''Added a tiny query DSL so you can filter with { "where": { "field":"user", "op":"==", "value":"abc" } } (supports ==, !=, >, >=, <, <=, in, contains).'''
headers = {
'Authorization': f'Bearer {self.access_token}',
'X-Application-Token': self.app['access_code']
}
json_file = {
"where":{
"field":self.query_field.value,
"op":self.query_operator.value,
"value":self.query_value.value
}
}
if self.query_field.value and self.query_value.value:
response = requests.post(f"{API_BASE_URL}/search", headers=headers, json=json_file)
print(response.text)
result = json.loads(response.text) if response.status_code == 200 else []
self.data_list.controls.clear()
self.data_list.controls = self.create_list(result, self.load_details)
self.data_list.update()
def refresh_list(self, e):
self.all_data = self.get_data()
self.data_list.controls.clear()
self.data_list.controls = self.create_list(self.all_data, self.load_details)
self.data_list.update()
def on_search_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
ft.Text("Query", weight=ft.FontWeight.BOLD, size=15),
self.query_field,
self.query_operator,
self.query_value,
ft.Row(
[
ft.Button("Query", on_click=self.query_data),
ft.Button("Reset List", on_click=self.refresh_list)
]
)
]
self.placeholder.update()
def on_inseert_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
self.editor,
ft.Button("Insert", on_click=self.insert_data)
]
self.placeholder.update()
def on_update_btn_click(self, e):
self.placeholder.controls = [
ft.Text("Update", weight=ft.FontWeight.BOLD, size=15),
self.update_filed,
self.update_value,
ft.Text('where'),
self.query_field,
self.query_operator,
self.query_value,
ft.Text("or"),
self.update_doc_id,
ft.Row(
[
ft.Button("Update", on_click=self.update_data),
]
)
]
self.placeholder.update()
def on_delete_btn_click(self, e):
self.placeholder.controls.clear()
self.placeholder.controls = [
ft.Text("Delete", weight=ft.FontWeight.BOLD, size=15),
ft.Text('where'),
self.query_field,
self.query_operator,
self.query_value,
ft.Text("or"),
self.update_doc_id,
ft.Row(
[
ft.Button("Delete", on_click=self.delete_data),
]
)
]
self.placeholder.update()
def build(self):
return ft.Container(
content=ft.Column(
[
ft.Row(
[
ft.Text("Manage Application", weight=ft.FontWeight.BOLD, size=20),
self.show_hide_access_code
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN
),
ft.Row(
[
self.access_code
],
alignment=ft.MainAxisAlignment.END
),
ft.VerticalDivider(width=1),
ft.Row(
[
ft.Column(
[
ft.TextButton(
"Database Items",
style=ft.ButtonStyle(
text_style=ft.TextStyle(
weight=ft.FontWeight.BOLD,
size=15
)
),
on_click=self.refresh_list
),
self.data_details,
self.data_list
],
expand=1,
),
ft.Column(
[
ft.Row(
[
ft.Text("Editor", weight=ft.FontWeight.BOLD, size=15),
ft.Button("Search", on_click = self.on_search_btn_click, icon=ft.Icons.SEARCH),
ft.Button("Insert", on_click = self.on_inseert_btn_click, icon=ft.Icons.ADD_CIRCLE),
ft.Button("Update", on_click = self.on_update_btn_click, icon=ft.Icons.UPDATE),
ft.Button("Delete", on_click = self.on_delete_btn_click, icon=ft.Icons.DELETE),
]
),
self.placeholder,
],
expand=True,
alignment=ft.MainAxisAlignment.START
)
],
vertical_alignment=ft.CrossAxisAlignment.START,
expand=True
)
],
expand=True,
),
expand=True
)