ISO = {"EURO": "EUR", "USD": "USD", "CHF": "CHF", "GBP": "GBP"} ALL_OPTION = "--ALL--" def fetch_rates_for_base(base_ui: str) -> dict: """ Returns dict like {"EURO": 1.0, "USD": 1.0712, "CHF": 0.9573, "GBP": 0.8451} meaning: 1 BASE = X other currency. """ base_iso = ISO[base_ui] symbols = [c for c in ["EUR","USD","CHF","GBP"] if c != base_iso] url = "https://api.frankfurter.app/latest" params = {"from": base_iso, "to": ",".join(symbols)} r = requests.get(url, params=params, timeout=6) r.raise_for_status() data = r.json() rates = {k: float(v) for k, v in data["rates"].items()} rates[base_iso] = 1.0 return {ui: rates[iso] for ui, iso in ISO.items()} import flet as ft from pages.report_order_out_page import ReportOrderOutPage from pages.report_order_in_page import ReportOrderInPage import requests from config import API_BASE_URL from datetime import datetime class ReportsPage: def __init__(self, page: ft.Page, dashboard): self.page = page self.dashboard = dashboard self.start_date = ft.Text() self.end_date = ft.Text() # self.client_filter = ft.TextField(label="Client", expand=True) # self.transporter_filter = ft.TextField(label="Transporter", expand=True) self.status_text = ft.Text("") self.results_text = ft.Text("") self.rows = [] self.rows_copy = [] self.total = ft.Text("Total: ", weight=ft.FontWeight.BOLD) self.currency_list = [] self.convert_currency_placeholder = ft.Container() self.convert_currency = ft.Button("Convert Currency", on_click=self.on_convert_curency_btn_click) self.data_table = ft.DataTable( columns=[ ft.DataColumn(label=ft.Text("Order Type")), ft.DataColumn(label=ft.Text("Order No")), ft.DataColumn(label=ft.Text("Client")), ft.DataColumn(label=ft.Text("Transporter")), ft.DataColumn(label=ft.Text("Date")), ft.DataColumn(label=ft.Text("Expenses")), ft.DataColumn(label=ft.Text("Paid")), ft.DataColumn(label=ft.Text("Received")), ft.DataColumn(label=ft.Text("Profit")), #ft.DataColumn(label=ft.Text("Currency")), ], rows=[], border=ft.border.all(1, ft.Colors.GREY_300), expand=True ) self.all_clients = [] self.all_transporters = [] self.create_table_rows_data() self.clients_filter = ft.Dropdown( options=[ ft.dropdown.Option(text=ALL_OPTION, key=ALL_OPTION), *[ft.dropdown.Option(text=client['name'], key=client['name']) for client in self.all_clients] ], width=250, label="Clients", hint_text="Select client", on_change=self.filter_by_client ) self.clients_filter_placeholder = ft.Container(content=self.clients_filter) self.transporters_filter = ft.Dropdown( options=[ ft.dropdown.Option(text=ALL_OPTION, key=ALL_OPTION), *[ft.dropdown.Option(text=transporter['name'], key=transporter['name']) for transporter in self.all_transporters] ], width=250, label="Transporters", hint_text="Select transporter", on_change=self.filter_by_transporter ) self.transporters_filter_placeholder = ft.Container(content=self.transporters_filter) self.convert_courrency_dialog_placeholder = ft.Column() self.convert_currency_choice = ft.RadioGroup( content=ft.Row( [ ft.Radio(value="USD", label="USD"), ft.Radio(value="EURO", label="EURO"), ft.Radio(value="CHF", label="CHF"), ft.Radio(value="GBP", label="GBP"), ] ), on_change=self.radiogroup_changed, ) self.convert_courency_dialog = ft.AlertDialog( title=ft.Text("Select Currency"), content=ft.Column( [ self.convert_currency_choice, self.convert_courrency_dialog_placeholder ], width=400, height=300 ), actions=[ ft.TextButton("Cancel", on_click=self.on_cancel_btn_click), ft.Button("Confirm", on_click=self.on_confirm_btn_click) ] ) self.euro = ft.TextField( label="EURO", input_filter=ft.InputFilter(allow=True, regex_string=r"^[0-9]*\.?[0-9]*$", replacement_string="") ) self.usd = ft.TextField( label="USD", input_filter=ft.InputFilter(allow=True, regex_string=r"^[0-9]*\.?[0-9]*$", replacement_string="") ) self.chf = ft.TextField( label="CHF", input_filter=ft.InputFilter(allow=True, regex_string=r"^[0-9]*\.?[0-9]*$", replacement_string="") ) self.gbp = ft.TextField( label="GBP", input_filter=ft.InputFilter(allow=True, regex_string=r"^[0-9]*\.?[0-9]*$", replacement_string="") ) def radiogroup_changed(self, e): self.convert_courrency_dialog_placeholder.controls.clear() self.convert_courrency_dialog_placeholder.controls.append( ft.Text(f"Sets the currency exchange rates in relation to {self.convert_currency_choice.value}"), ) if self.convert_currency_choice.value == 'USD': self.convert_courrency_dialog_placeholder.controls.append( self.euro, ) self.convert_courrency_dialog_placeholder.controls.append( self.chf, ) self.convert_courrency_dialog_placeholder.controls.append( self.gbp, ) if self.convert_currency_choice.value == 'EURO': self.convert_courrency_dialog_placeholder.controls.append( self.usd, ) self.convert_courrency_dialog_placeholder.controls.append( self.chf, ) self.convert_courrency_dialog_placeholder.controls.append( self.gbp, ) if self.convert_currency_choice.value == 'CHF': self.convert_courrency_dialog_placeholder.controls.append( self.usd, ) self.convert_courrency_dialog_placeholder.controls.append( self.euro, ) self.convert_courrency_dialog_placeholder.controls.append( self.gbp, ) if self.convert_currency_choice.value == 'GBP': self.convert_courrency_dialog_placeholder.controls.append( self.usd, ) self.convert_courrency_dialog_placeholder.controls.append( self.euro, ) self.convert_courrency_dialog_placeholder.controls.append( self.chf, ) try: fx = fetch_rates_for_base(self.convert_currency_choice.value) self.euro.value = f"{fx['EURO']:.4f}" self.usd.value = f"{fx['USD']:.4f}" self.chf.value = f"{fx['CHF']:.4f}" self.gbp.value = f"{fx['GBP']:.4f}" except Exception: pass self.convert_courrency_dialog_placeholder.update() def create_table_rows_data(self): all_orders_in = self.get_orders_in() all_orders_out = self.get_orders_out() all_orders = [] orders_in_ids = [] for order_out in all_orders_out: if order_out['order_in_number']: orders_in_ids.append(order_out['order_in_number']) all_orders += all_orders_out for order_in in all_orders_in: if order_in['order_number'] not in orders_in_ids: all_orders.append(order_in) total = 0 for order in all_orders: self.convert_currency_placeholder.content = self.convert_currency if order['currency_received'] not in self.currency_list: self.currency_list.append(order['currency_received']) if order['currency_paid'] not in self.currency_list: self.currency_list.append(order['currency_paid']) if order.get('status') != 'active': continue order_number = order['order_number'] client_name = self.get_client(order['client_id'])['name'] transporter_name = self.get_transporter(order['transporter_id'])['name'] order_date = order['created_at'].split("T")[0] paid = f"{order['paid_price']} {order['currency_paid']}" received = f"{order['received_price']} {order['currency_received']}" currency = order['currency_received'] if order['currency_paid'] == order['currency_received'] else '' expenses = 0 for order_in in all_orders_in: if 'order_in_number' in order and order['order_in_number'] == order_in['order_number']: expenses = order_in.get('expenses', 0) or 0 order_type = 'Out' if 'order_in_number' in order else 'In' if currency: try: profit_value = round(float(order['received_price']) - float(order['paid_price']) - float(expenses), 2) profit = f"{profit_value} {currency}" except Exception as e: print(e) profit = '0' else: profit = '0' row = ft.DataRow( cells=[ ft.DataCell(ft.Text(order_type)), ft.DataCell(ft.Text(order_number)), ft.DataCell(ft.Text(client_name)), ft.DataCell(ft.Text(transporter_name)), ft.DataCell(ft.Text(order_date)), ft.DataCell(ft.Text(expenses)), ft.DataCell(ft.Text(paid)), ft.DataCell(ft.Text(received)), ft.DataCell(ft.Text(profit)), ], ) # row_data structure: # [0] type, [1] order_no, [2] client, [3] transporter, [4] date, # [5] expenses (string), [6] paid ("amount CUR"), [7] received ("amount CUR"), # [8] profit ("amount CUR" or "0"), [9] currency (base for profit) row_data = [ order_type, order_number, client_name, transporter_name, order_date, str(expenses), paid, received, profit, currency, ] self.rows.append(row_data) self.data_table.rows.append(row) try: total += float(profit.split(" ")[0]) except Exception: pass self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.rows_copy = list(self.rows) def on_report_orders_in_btn_click(self, e): order_in_page = ReportOrderInPage(self.page, self.dashboard) self.dashboard.placeholder.content = order_in_page.build() self.dashboard.placeholder.update() def on_report_orders_out_btn_click(self, e): orders_out_page = ReportOrderOutPage(self.page, self.dashboard) self.dashboard.placeholder.content = orders_out_page.build() self.dashboard.placeholder.update() def get_orders_out(self): try: token = self.page.client_storage.get("token") headers = {"Authorization": f"Bearer {token}"} response = requests.get(f"{API_BASE_URL}/orders/list", headers=headers) return response.json() if response.status_code == 200 else [] except Exception as e: print("Error loading orders:", e) def get_orders_in(self): try: token = self.page.client_storage.get("token") headers = {"Authorization": f"Bearer {token}"} response = requests.get(f"{API_BASE_URL}/orders_in/list", headers=headers) #print(response.text) return response.json() if response.status_code == 200 else [] except Exception as e: print("Error loading orders:", e) def get_client_access(self): token = self.page.client_storage.get("token") headers = {"Authorization": f"Bearer {token}"} response = requests.get(f"{API_BASE_URL}/profile/", headers=headers, timeout=10) user = response.json() if user['user_role'] == 'user': return True else: id = self.page.session.get("user_id") response = requests.get(f"{API_BASE_URL}/company_user/access/{id}", headers=headers) return True if response.json()['report'] == 1 else False def get_client(self, id): try: token = self.page.client_storage.get("token") headers = {"Authorization": f"Bearer {token}"} response = requests.get(f"{API_BASE_URL}/clients/{id}", headers=headers) if response.json() not in self.all_clients: self.all_clients.append(response.json()) return response.json() if response.status_code == 200 else None except Exception as e: print("Error loading clients:", e) def get_transporter(self, id): try: token = self.page.client_storage.get("token") headers = {"Authorization": f"Bearer {token}"} response = requests.get(f"{API_BASE_URL}/transporters/{id}", headers=headers) if response.json() not in self.all_transporters: self.all_transporters.append(response.json()) return response.json() if response.status_code == 200 else None except Exception as e: print("Error loading transporters:", e) def on_convert_curency_btn_click(self, e): self.page.open(self.convert_courency_dialog) base = self.convert_currency_choice.value or "EURO" try: fx = fetch_rates_for_base(base) self.euro.value = f"{fx['EURO']:.4f}" self.usd.value = f"{fx['USD']:.4f}" self.chf.value = f"{fx['CHF']:.4f}" self.gbp.value = f"{fx['GBP']:.4f}" self.convert_courrency_dialog_placeholder.update() except Exception as ex: self.page.snack_bar = ft.SnackBar(ft.Text(f"FX fetch failed: {ex}")) self.page.snack_bar.open = True self.page.update() def on_cancel_btn_click(self, e): self.page.close(self.convert_courency_dialog) def on_confirm_btn_click(self, e): self.page.close(self.convert_courency_dialog) # Build rates relative to the selected base currency. # Example: if base is EURO, then rates['USD'] should be "1 EURO = X USD". # To convert an amount from USD -> EURO we divide by rates['USD']. base = self.convert_currency_choice.value # Normalize and validate rate inputs def _to_float(v): try: return float(v) except Exception: return None rates = { 'EURO': _to_float(self.euro.value), 'USD': _to_float(self.usd.value), 'CHF': _to_float(self.chf.value), 'GBP': _to_float(self.gbp.value), } # Helper to convert any amount to the selected base currency def to_base(amount, currency): try: amt = float(amount) except Exception: return 0.0 if currency == base: return amt rate = rates.get(currency) # If the rate is missing or invalid, keep original (safer than crashing) if not rate or rate == 0: return amt # Convert from currency -> base by dividing when rates are "1 base = rate[currency] currency" return amt / rate # Rebuild table in base currency total = 0.0 self.data_table.rows.clear() source_rows = list(self.rows_copy) for r in source_rows: # r structure: # [0] type, [1] order_no, [2] client, [3] transporter, [4] date, # [5] expenses, [6] paid ("amount CUR"), [7] received ("amount CUR"), # [8] profit, [9] currency order_type = r[0] order_number = r[1] client_name = r[2] transporter_name = r[3] order_date = r[4] expenses_str = r[5] or "0" paid_str = r[6] received_str = r[7] try: paid_number, paid_currency = paid_str.split(" ") except ValueError: paid_number, paid_currency = "0", base try: received_number, received_currency = received_str.split(" ") except ValueError: received_number, received_currency = "0", base expenses_number = expenses_str or "0" paid_base = to_base(paid_number, paid_currency) received_base = to_base(received_number, received_currency) expenses_base = to_base(expenses_number, paid_currency) profit_base = round(float(received_base) - float(paid_base) - float(expenses_base), 2) row = ft.DataRow( cells=[ ft.DataCell(ft.Text(order_type)), ft.DataCell(ft.Text(order_number)), ft.DataCell(ft.Text(client_name)), ft.DataCell(ft.Text(transporter_name)), ft.DataCell(ft.Text(order_date)), ft.DataCell(ft.Text(expenses_str)), ft.DataCell(ft.Text(f"{paid_base:.2f} {base}")), ft.DataCell(ft.Text(f"{received_base:.2f} {base}")), ft.DataCell(ft.Text(f"{profit_base:.2f} {base}")), ], ) self.data_table.rows.append(row) total += profit_base self.data_table.update() self.total.value = f"Total: {total:.2f} {base}" self.total.update() def on_reset_btn_click(self, e): # --- Recreate Clients dropdown to avoid sticky selection text on Flet 0.28.3 --- try: client_options = list(self.clients_filter.options) except Exception: client_options = [] new_clients_dd = ft.Dropdown( options=client_options, width=250, label="Clients", hint_text="Select client", on_change=self.filter_by_client, value=None, ) self.clients_filter = new_clients_dd self.clients_filter_placeholder.content = self.clients_filter self.clients_filter_placeholder.update() # --- Recreate Transporters dropdown --- try: transporter_options = list(self.transporters_filter.options) except Exception: transporter_options = [] new_transporters_dd = ft.Dropdown( options=transporter_options, width=250, label="Transporters", hint_text="Select transporter", on_change=self.filter_by_transporter, value=None, ) self.transporters_filter = new_transporters_dd self.transporters_filter_placeholder.content = self.transporters_filter self.transporters_filter_placeholder.update() self.page.update() self.rows_copy = list(self.rows) total = 0 self.data_table.rows.clear() for r in self.rows_copy: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) try: total += float(r[8].split(" ")[0]) except Exception: pass self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() self.start_date.value = "" self.start_date.update() self.end_date.value = "" self.end_date.update() def on_start_date_click(self, e): self.start_date.value = e.control.value.strftime('%Y-%m-%d') self.start_date.update() total = 0 self.data_table.rows.clear() buffer = [] data = datetime.strptime(self.start_date.value, '%Y-%m-%d') for r in self.rows_copy: obj_date = datetime.strptime(r[4], '%Y-%m-%d') if data <= obj_date: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) buffer.append(r) try: total += float(r[8].split(" ")[0]) except Exception: pass self.rows_copy = buffer self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() def on_end_date_click(self, e): self.end_date.value = e.control.value.strftime('%Y-%m-%d') self.end_date.update() total = 0 self.data_table.rows.clear() buffer = [] data = datetime.strptime(self.end_date.value, '%Y-%m-%d') for r in self.rows_copy: obj_date = datetime.strptime(r[4], '%Y-%m-%d') if data >= obj_date: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) buffer.append(r) try: total += float(r[8].split(" ")[0]) except Exception: pass self.rows_copy = buffer self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() def filter_by_client(self, e): total = 0 self.data_table.rows.clear() # If ALL or None is selected, show all rows if self.clients_filter.value in (None, ALL_OPTION): self.rows_copy = list(self.rows) for r in self.rows_copy: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) try: total += float(r[8].split(" ")[0]) except Exception: pass self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() return buffer = [] for r in self.rows_copy: if r[2] == self.clients_filter.value: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) buffer.append(r) try: total += float(r[8].split(" ")[0]) except Exception: pass self.rows_copy = buffer self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() def filter_by_transporter(self, e): total = 0 self.data_table.rows.clear() # If ALL or None is selected, show all rows if self.transporters_filter.value in (None, ALL_OPTION): self.rows_copy = list(self.rows) for r in self.rows_copy: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) try: total += float(r[8].split(" ")[0]) except Exception: pass self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() return buffer = [] for r in self.rows_copy: if r[3] == self.transporters_filter.value: row = ft.DataRow( cells=[ ft.DataCell(ft.Text(r[0])), # type ft.DataCell(ft.Text(r[1])), # order no ft.DataCell(ft.Text(r[2])), # client ft.DataCell(ft.Text(r[3])), # transporter ft.DataCell(ft.Text(r[4])), # date ft.DataCell(ft.Text(r[5])), # expenses ft.DataCell(ft.Text(r[6])), # paid ft.DataCell(ft.Text(r[7])), # received ft.DataCell(ft.Text(r[8])), # profit ], ) self.data_table.rows.append(row) buffer.append(r) try: total += float(r[8].split(" ")[0]) except Exception: pass self.rows_copy = buffer self.data_table.update() self.total.value = f"Total: {total}" if len(self.currency_list)==1 else "" self.total.update() def build(self): return ft.Container( content=ft.Column( [ ft.Row( [ ft.Text("Reports", size=24, weight=ft.FontWeight.BOLD), self.total, ft.Row( [ ft.Button( "Report Orders In", on_click=self.on_report_orders_in_btn_click, width=200, icon=ft.Icons.ASSESSMENT, ), ft.Button( "Report Orders Out", on_click=self.on_report_orders_out_btn_click, width=200, icon=ft.Icons.ASSESSMENT, ) ] ) ], alignment=ft.MainAxisAlignment.SPACE_BETWEEN, ), ft.Row( [ ft.Row( [ self.start_date, ft.ElevatedButton( "Start Date", on_click=lambda _: self.page.open(ft.DatePicker( first_date=datetime(year=2000, month=10, day=1), last_date=datetime(year=2095, month=10, day=1), on_change=self.on_start_date_click, )), width=120, icon=ft.Icons.CALENDAR_MONTH ), self.end_date, ft.ElevatedButton( "End Date", on_click=lambda _: self.page.open(ft.DatePicker( first_date=datetime(year=2000, month=10, day=1), last_date=datetime(year=2095, month=10, day=1), on_change=self.on_end_date_click, )), width=120, icon=ft.Icons.CALENDAR_MONTH ), ft.Text(), self.clients_filter_placeholder, self.transporters_filter_placeholder, ft.ElevatedButton("Reset", on_click=self.on_reset_btn_click, width=120), ] ), self.convert_currency_placeholder ], alignment=ft.MainAxisAlignment.SPACE_BETWEEN ), ft.Row( [ self.data_table ], alignment=ft.MainAxisAlignment.CENTER, ) ], expand=True, alignment=ft.MainAxisAlignment.START, ), expand=True ) if self.get_client_access() else ft.Container( content=ft.Column( [ ft.Row( [ ft.Text("Reports", size=24, weight=ft.FontWeight.BOLD), ], alignment=ft.MainAxisAlignment.SPACE_BETWEEN ), ft.Row( [ ft.Text( "You do not have access to this page content", size=24, weight=ft.FontWeight.BOLD, color=ft.Colors.RED ) ], alignment=ft.MainAxisAlignment.CENTER ), ft.Text("") ], alignment=ft.MainAxisAlignment.SPACE_BETWEEN, expand=True ), expand=True )