import flet as ft from models.applications import DBApplications import json import requests import os # All requests now use Authorization and X-Application-Token headers. # application_id is sent as string during /connect. API_BASE_URL = os.getenv("API_BASE_URL", "http://127.0.0.1:5001") class ApplicationPage: def __init__(self, page: ft.Page, dashboard, app): self.page = page self.dashboard = dashboard self.app = app self.db_applications = DBApplications() self.access_token = self.get_access_token() self.access_code = ft.Text("***********") self.selected = None self.all_data = self.get_data() self.data_list = ft.ListView( controls=self.create_list(self.all_data, self.load_details), spacing=10, expand=3, ) self.data_details = ft.TextField( #value=self.get_data(), multiline=True, min_lines=5, max_lines=10, expand=True, read_only=True, label='View', ) self.editor = ft.TextField( #value=self.get_data(), label='Editor', multiline=True, min_lines=5, max_lines=10, #expand=True, read_only=False ) self.update_doc_id = ft.TextField(label="Doc Id") self.update_fileds = ft.TextField(label="Update Fields") self.query_field = ft.TextField(label="Field") self.query_value = ft.TextField(label="Value") self.query_operator = ft.Dropdown( options=[ ft.dropdown.Option("=="), ft.dropdown.Option("!="), ft.dropdown.Option(">"), ft.dropdown.Option(">="), ft.dropdown.Option("<"), ft.dropdown.Option("<="), ft.dropdown.Option("in"), ft.dropdown.Option("contains"), ], value="==" ) self.placeholder = ft.Column() def get_access_token(self): application_token = self.app['access_code'] application_id = str(self.app['id']) data = { 'application_id':application_id, 'application_token':application_token } response = requests.post(f"{API_BASE_URL}/connect", json=data) resp_json = {} try: resp_json = response.json() except Exception: pass print({"request": data, "status": response.status_code, "response": resp_json}) if response.status_code != 200 or 'token' not in resp_json: # Fail gracefully; caller can handle empty token return "" return resp_json['token'] def show_access_code(self, e): self.access_code.value = self.app['access_code'] self.access_code.update() def format_json(self, data): data = json.dumps(data, indent=4) print(data) return data def load_details(self, e): self.selected = e self.data_details.value = self.format_json(e) self.data_details.update() def get_data(self): headers = { 'Authorization': f'Bearer {self.access_token}', 'X-Application-Token': self.app['access_code'] } response = requests.post(f"{API_BASE_URL}/get_all", headers=headers) return json.loads(response.text) if response.status_code == 200 else [] def create_list(self, items, on_click_handler): return [ ft.Container( content=ft.Column( [ ft.Text(item), ] ), border_radius=10, border=ft.border.all(1, ft.Colors.GREY_300), padding=5, #bgcolor = ft.Colors.BLUE_50 if item == self.selected else None, ink=True, on_click=lambda e, id=item: on_click_handler(id) ) for item in items ] def insert_data(self, e): data = self.editor.value.replace("\n", "") data = json.loads(data) if data: document = {"doc":data} print(document) headers = { 'Authorization': f'Bearer {self.access_token}', 'X-Application-Token': self.app['access_code'] } requests.post(f"{API_BASE_URL}/insert", headers=headers, json=document) self.refresh_list('') self.editor.value = '' self.editor.update() def update_data(self, e): headers = { 'Authorization': f'Bearer {self.access_token}', 'X-Application-Token': self.app['access_code'] } if self.update_doc_id.value: json_file = { "doc_id": int(self.update_doc_id.value), "fields": self.update_fileds.value, } else: json_file = { "where": { "field": self.query_field.value, "op": self.query_operator.value, "value": self.query_value.value, }, "fields": self.update_fileds.value, } if self.update_doc_id.value or self.query_field.value: response = requests.post(f"{API_BASE_URL}/update", headers=headers, json=json_file) print(response.text) result = json.loads(response.text) if response.status_code == 200 else [] self.refresh_list('') def delete_data(self, e): headers = { 'Authorization': f'Bearer {self.access_token}', 'X-Application-Token': self.app['access_code'] } if not self.update_doc_id.value: json_file = { "where":{ "field":self.query_field.value, "op":self.query_operator.value, "value":self.query_value.value, } } else: json_file = { "doc_id": int(self.update_doc_id.value), } if self.update_doc_id.value or self.query_field.value: response = requests.post(f"{API_BASE_URL}/remove", headers=headers, json=json_file) print(response.text) result = json.loads(response.text) if response.status_code == 200 else [] self.refresh_list('') def query_data(self, e): '''Added a tiny query DSL so you can filter with { "where": { "field":"user", "op":"==", "value":"abc" } } (supports ==, !=, >, >=, <, <=, in, contains).''' headers = { 'Authorization': f'Bearer {self.access_token}', 'X-Application-Token': self.app['access_code'] } json_file = { "where":{ "field":self.query_field.value, "op":self.query_operator.value, "value":self.query_value.value } } if self.query_field.value and self.query_value.value: response = requests.post(f"{API_BASE_URL}/search", headers=headers, json=json_file) print(response.text) result = json.loads(response.text) if response.status_code == 200 else [] self.data_list.controls.clear() self.data_list.controls = self.create_list(result, self.load_details) self.data_list.update() def refresh_list(self, e): self.all_data = self.get_data() self.data_list.controls.clear() self.data_list.controls = self.create_list(self.all_data, self.load_details) self.data_list.update() def on_search_btn_click(self, e): self.placeholder.controls.clear() self.placeholder.controls = [ ft.Text("Query", weight=ft.FontWeight.BOLD, size=15), self.query_field, self.query_operator, self.query_value, ft.Row( [ ft.Button("Query", on_click=self.query_data), ft.Button("Reset List", on_click=self.refresh_list) ] ) ] self.placeholder.update() def on_inseert_btn_click(self, e): self.placeholder.controls.clear() self.placeholder.controls = [ self.editor, ft.Button("Insert", on_click=self.insert_data) ] self.placeholder.update() def on_update_btn_click(self, e): self.placeholder.controls.clear() self.placeholder.controls = [ ft.Text("Update", weight=ft.FontWeight.BOLD, size=15), self.update_fileds, ft.Text('where'), self.query_field, self.query_operator, self.query_value, ft.Text("or"), self.update_doc_id, ft.Row( [ ft.Button("Update", on_click=self.update_data), ] ) ] self.placeholder.update() def on_delete_btn_click(self, e): self.placeholder.controls.clear() self.placeholder.controls = [ ft.Text("Delete", weight=ft.FontWeight.BOLD, size=15), ft.Text('where'), self.query_field, self.query_operator, self.query_value, ft.Text("or"), self.update_doc_id, ft.Row( [ ft.Button("Delete", on_click=self.delete_data), ] ) ] self.placeholder.update() def build(self): return ft.Container( content=ft.Column( [ ft.Row( [ ft.Text("Manage Application", weight=ft.FontWeight.BOLD, size=20), ft.Button("Show Acess Code",icon=ft.Icons.PASSWORD, on_click=self.show_access_code) ], alignment=ft.MainAxisAlignment.SPACE_BETWEEN ), ft.Row( [ self.access_code ], alignment=ft.MainAxisAlignment.END ), ft.VerticalDivider(width=1), ft.Row( [ ft.Column( [ ft.TextButton( "Database Items", style=ft.ButtonStyle( text_style=ft.TextStyle( weight=ft.FontWeight.BOLD, size=15 ) ), on_click=self.refresh_list ), self.data_details, self.data_list ], expand=1, ), ft.Column( [ ft.Row( [ ft.Text("Editor", weight=ft.FontWeight.BOLD, size=15), ft.Button("Search", on_click = self.on_search_btn_click, icon=ft.Icons.SEARCH), ft.Button("Insert", on_click = self.on_inseert_btn_click, icon=ft.Icons.ADD_CIRCLE), ft.Button("Update", on_click = self.on_update_btn_click, icon=ft.Icons.UPDATE), ft.Button("Delete", on_click = self.on_delete_btn_click, icon=ft.Icons.DELETE), ] ), self.placeholder, ], expand=True, alignment=ft.MainAxisAlignment.START ) ], vertical_alignment=ft.CrossAxisAlignment.START, expand=True ) ], expand=True, ), expand=True )