add jws token
This commit is contained in:
@@ -7,8 +7,18 @@ class DBApplications:
|
|||||||
default_path = os.getenv("DB_PATH", "instance/dev.db")
|
default_path = os.getenv("DB_PATH", "instance/dev.db")
|
||||||
self.db_path = os.path.abspath(default_path)
|
self.db_path = os.path.abspath(default_path)
|
||||||
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
||||||
|
self.drop_table()
|
||||||
self.create_table()
|
self.create_table()
|
||||||
|
|
||||||
|
def drop_table(self):
|
||||||
|
with sqlite3.connect(self.db_path, check_same_thread=False) as conn:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("""
|
||||||
|
DROP TABLE applications;
|
||||||
|
|
||||||
|
""")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
def create_table(self):
|
def create_table(self):
|
||||||
with sqlite3.connect(self.db_path, check_same_thread=False) as conn:
|
with sqlite3.connect(self.db_path, check_same_thread=False) as conn:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|||||||
@@ -4,6 +4,9 @@ import json
|
|||||||
import requests
|
import requests
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
# All requests now use Authorization and X-Application-Token headers.
|
||||||
|
# application_id is sent as string during /connect.
|
||||||
|
|
||||||
API_BASE_URL = os.getenv("API_BASE_URL", "http://127.0.0.1:5001")
|
API_BASE_URL = os.getenv("API_BASE_URL", "http://127.0.0.1:5001")
|
||||||
|
|
||||||
class ApplicationPage:
|
class ApplicationPage:
|
||||||
@@ -12,7 +15,7 @@ class ApplicationPage:
|
|||||||
self.dashboard = dashboard
|
self.dashboard = dashboard
|
||||||
self.app = app
|
self.app = app
|
||||||
self.db_applications = DBApplications()
|
self.db_applications = DBApplications()
|
||||||
|
self.access_token = self.get_access_token()
|
||||||
self.access_code = ft.Text("***********")
|
self.access_code = ft.Text("***********")
|
||||||
self.selected = None
|
self.selected = None
|
||||||
|
|
||||||
@@ -60,10 +63,30 @@ class ApplicationPage:
|
|||||||
)
|
)
|
||||||
self.placeholder = ft.Column()
|
self.placeholder = ft.Column()
|
||||||
|
|
||||||
|
def get_access_token(self):
|
||||||
|
application_token = self.app['access_code']
|
||||||
|
application_id = str(self.app['id'])
|
||||||
|
data = {
|
||||||
|
'application_id':application_id,
|
||||||
|
'application_token':application_token
|
||||||
|
}
|
||||||
|
response = requests.post(f"{API_BASE_URL}/connect", json=data)
|
||||||
|
resp_json = {}
|
||||||
|
try:
|
||||||
|
resp_json = response.json()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
print({"request": data, "status": response.status_code, "response": resp_json})
|
||||||
|
if response.status_code != 200 or 'token' not in resp_json:
|
||||||
|
# Fail gracefully; caller can handle empty token
|
||||||
|
return ""
|
||||||
|
return resp_json['token']
|
||||||
|
|
||||||
def show_access_code(self, e):
|
def show_access_code(self, e):
|
||||||
self.access_code.value = self.app['access_code']
|
self.access_code.value = self.app['access_code']
|
||||||
self.access_code.update()
|
self.access_code.update()
|
||||||
|
|
||||||
|
|
||||||
def format_json(self, data):
|
def format_json(self, data):
|
||||||
data = json.dumps(data, indent=4)
|
data = json.dumps(data, indent=4)
|
||||||
print(data)
|
print(data)
|
||||||
@@ -75,7 +98,11 @@ class ApplicationPage:
|
|||||||
self.data_details.update()
|
self.data_details.update()
|
||||||
|
|
||||||
def get_data(self):
|
def get_data(self):
|
||||||
response = requests.post(f"{API_BASE_URL}/get_all")
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.access_token}',
|
||||||
|
'X-Application-Token': self.app['access_code']
|
||||||
|
}
|
||||||
|
response = requests.post(f"{API_BASE_URL}/get_all", headers=headers)
|
||||||
return json.loads(response.text) if response.status_code == 200 else []
|
return json.loads(response.text) if response.status_code == 200 else []
|
||||||
|
|
||||||
def create_list(self, items, on_click_handler):
|
def create_list(self, items, on_click_handler):
|
||||||
@@ -101,12 +128,20 @@ class ApplicationPage:
|
|||||||
if data:
|
if data:
|
||||||
document = {"doc":data}
|
document = {"doc":data}
|
||||||
print(document)
|
print(document)
|
||||||
requests.post(f"{API_BASE_URL}/insert", json=json.dumps(document))
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.access_token}',
|
||||||
|
'X-Application-Token': self.app['access_code']
|
||||||
|
}
|
||||||
|
requests.post(f"{API_BASE_URL}/insert", headers=headers, json=document)
|
||||||
self.refresh_list('')
|
self.refresh_list('')
|
||||||
self.editor.value = ''
|
self.editor.value = ''
|
||||||
self.editor.update()
|
self.editor.update()
|
||||||
|
|
||||||
def update_data(self, e):
|
def update_data(self, e):
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.access_token}',
|
||||||
|
'X-Application-Token': self.app['access_code']
|
||||||
|
}
|
||||||
if self.update_doc_id.value:
|
if self.update_doc_id.value:
|
||||||
json_file = {
|
json_file = {
|
||||||
"doc_id": int(self.update_doc_id.value),
|
"doc_id": int(self.update_doc_id.value),
|
||||||
@@ -114,22 +149,26 @@ class ApplicationPage:
|
|||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
json_file = {
|
json_file = {
|
||||||
"where":{
|
"where": {
|
||||||
"field":self.query_field.value,
|
"field": self.query_field.value,
|
||||||
"op":self.query_operator.value,
|
"op": self.query_operator.value,
|
||||||
"value":self.query_value.value,
|
"value": self.query_value.value,
|
||||||
|
},
|
||||||
"fields": self.update_fileds.value,
|
"fields": self.update_fileds.value,
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if self.update_doc_id or self.query_field.value:
|
if self.update_doc_id.value or self.query_field.value:
|
||||||
response = requests.post(f"{API_BASE_URL}/update", json=json.dumps(json_file))
|
response = requests.post(f"{API_BASE_URL}/update", headers=headers, json=json_file)
|
||||||
print(response.text)
|
print(response.text)
|
||||||
result = json.loads(response.text) if response.status_code == 200 else []
|
result = json.loads(response.text) if response.status_code == 200 else []
|
||||||
self.refresh_list('')
|
self.refresh_list('')
|
||||||
|
|
||||||
def delete_data(self, e):
|
def delete_data(self, e):
|
||||||
if self.update_doc_id.value == None:
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.access_token}',
|
||||||
|
'X-Application-Token': self.app['access_code']
|
||||||
|
}
|
||||||
|
if not self.update_doc_id.value:
|
||||||
json_file = {
|
json_file = {
|
||||||
"where":{
|
"where":{
|
||||||
"field":self.query_field.value,
|
"field":self.query_field.value,
|
||||||
@@ -141,14 +180,18 @@ class ApplicationPage:
|
|||||||
json_file = {
|
json_file = {
|
||||||
"doc_id": int(self.update_doc_id.value),
|
"doc_id": int(self.update_doc_id.value),
|
||||||
}
|
}
|
||||||
if self.update_doc_id or self.query_field.value:
|
if self.update_doc_id.value or self.query_field.value:
|
||||||
response = requests.post(f"{API_BASE_URL}/remove", json=json.dumps(json_file))
|
response = requests.post(f"{API_BASE_URL}/remove", headers=headers, json=json_file)
|
||||||
print(response.text)
|
print(response.text)
|
||||||
result = json.loads(response.text) if response.status_code == 200 else []
|
result = json.loads(response.text) if response.status_code == 200 else []
|
||||||
self.refresh_list('')
|
self.refresh_list('')
|
||||||
|
|
||||||
def query_data(self, e):
|
def query_data(self, e):
|
||||||
'''Added a tiny query DSL so you can filter with { "where": { "field":"user", "op":"==", "value":"abc" } } (supports ==, !=, >, >=, <, <=, in, contains).'''
|
'''Added a tiny query DSL so you can filter with { "where": { "field":"user", "op":"==", "value":"abc" } } (supports ==, !=, >, >=, <, <=, in, contains).'''
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.access_token}',
|
||||||
|
'X-Application-Token': self.app['access_code']
|
||||||
|
}
|
||||||
json_file = {
|
json_file = {
|
||||||
"where":{
|
"where":{
|
||||||
"field":self.query_field.value,
|
"field":self.query_field.value,
|
||||||
@@ -157,7 +200,7 @@ class ApplicationPage:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if self.query_field.value and self.query_value.value:
|
if self.query_field.value and self.query_value.value:
|
||||||
response = requests.post(f"{API_BASE_URL}/search", json=json.dumps(json_file))
|
response = requests.post(f"{API_BASE_URL}/search", headers=headers, json=json_file)
|
||||||
print(response.text)
|
print(response.text)
|
||||||
result = json.loads(response.text) if response.status_code == 200 else []
|
result = json.loads(response.text) if response.status_code == 200 else []
|
||||||
self.data_list.controls.clear()
|
self.data_list.controls.clear()
|
||||||
|
|||||||
0
data/GQ52ANB328FV.json
Normal file
0
data/GQ52ANB328FV.json
Normal file
@@ -5,17 +5,26 @@ import json
|
|||||||
|
|
||||||
from flask_cors import CORS
|
from flask_cors import CORS
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
|
import hmac
|
||||||
|
import hashlib
|
||||||
|
import base64
|
||||||
|
import re
|
||||||
|
from typing import Dict, Optional, Tuple
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
UI_ORIGIN = os.getenv("UI_ORIGIN", "https://db.northdanubesoft.eu")
|
UI_ORIGIN = os.getenv("UI_ORIGIN", "https://db.northdanubesoft.eu")
|
||||||
CORS(app, resources={r"/*": {"origins": [UI_ORIGIN]}})
|
CORS(app, resources={r"/*": {"origins": [UI_ORIGIN]}})
|
||||||
|
|
||||||
|
# --- Auth & DB config ---------------------------------------------------------
|
||||||
|
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-me")
|
||||||
|
TOKEN_TTL = int(os.getenv("TOKEN_TTL", "3600")) # seconds
|
||||||
|
# Default to a writable local ./data dir in dev; override to /data in Docker via env
|
||||||
|
DB_DIR = os.path.abspath(os.getenv("DB_DIR", os.path.join(os.path.dirname(__file__), "..", "data")))
|
||||||
|
|
||||||
|
# Cache TinyDB instances per application token file
|
||||||
# Single TinyDB file for now (can be swapped to per-table later)
|
_DB_CACHE: Dict[str, TinyDB] = {}
|
||||||
DB_PATH = os.getenv("DB_PATH", "db.json")
|
|
||||||
db = TinyDB(DB_PATH)
|
|
||||||
|
|
||||||
# --- Helpers -----------------------------------------------------------------
|
# --- Helpers -----------------------------------------------------------------
|
||||||
|
|
||||||
@@ -43,6 +52,85 @@ def json_body(required: bool = True):
|
|||||||
return None, (jsonify({"error": "Expected JSON body"}), 400)
|
return None, (jsonify({"error": "Expected JSON body"}), 400)
|
||||||
return data, None
|
return data, None
|
||||||
|
|
||||||
|
# --- Token helpers ------------------------------------------------------------
|
||||||
|
|
||||||
|
def _b64url(data: bytes) -> str:
|
||||||
|
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
|
||||||
|
|
||||||
|
def _b64url_json(obj: dict) -> str:
|
||||||
|
return _b64url(json.dumps(obj, separators=(",", ":")).encode("utf-8"))
|
||||||
|
|
||||||
|
def sign_token(app_id: str, ttl: int = TOKEN_TTL) -> str:
|
||||||
|
now = int(time.time())
|
||||||
|
payload = {"app_id": app_id, "iat": now, "exp": now + ttl}
|
||||||
|
header = {"alg": "HS256", "typ": "JWT"}
|
||||||
|
h = _b64url_json(header)
|
||||||
|
p = _b64url_json(payload)
|
||||||
|
to_sign = f"{h}.{p}".encode("utf-8")
|
||||||
|
sig = hmac.new(SECRET_KEY.encode("utf-8"), to_sign, hashlib.sha256).digest()
|
||||||
|
return f"{h}.{p}.{_b64url(sig)}"
|
||||||
|
|
||||||
|
def verify_token(token: str) -> Tuple[bool, Optional[dict], Optional[str]]:
|
||||||
|
try:
|
||||||
|
parts = token.split(".")
|
||||||
|
if len(parts) != 3:
|
||||||
|
return False, None, "Malformed token"
|
||||||
|
h_b64, p_b64, s_b64 = parts
|
||||||
|
to_sign = f"{h_b64}.{p_b64}".encode("utf-8")
|
||||||
|
sig = base64.urlsafe_b64decode(s_b64 + "==")
|
||||||
|
expected = hmac.new(SECRET_KEY.encode("utf-8"), to_sign, hashlib.sha256).digest()
|
||||||
|
if not hmac.compare_digest(sig, expected):
|
||||||
|
return False, None, "Bad signature"
|
||||||
|
payload = json.loads(base64.urlsafe_b64decode(p_b64 + "==").decode("utf-8"))
|
||||||
|
now = int(time.time())
|
||||||
|
if payload.get("exp", 0) < now:
|
||||||
|
return False, None, "Expired"
|
||||||
|
return True, payload, None
|
||||||
|
except Exception as e:
|
||||||
|
return False, None, str(e)
|
||||||
|
|
||||||
|
# --- Access token / DB helpers -----------------------------------------------
|
||||||
|
|
||||||
|
def _sanitize_filename(name: str) -> str:
|
||||||
|
# allow only safe chars; replace others with '_'
|
||||||
|
return re.sub(r"[^A-Za-z0-9._-]", "_", name)[:128]
|
||||||
|
|
||||||
|
def get_application_token_from(body: Optional[dict]) -> Optional[str]:
|
||||||
|
# Prefer header, then JSON body
|
||||||
|
hdr = request.headers.get("X-Application-Token")
|
||||||
|
if hdr:
|
||||||
|
return hdr.strip()
|
||||||
|
if isinstance(body, dict):
|
||||||
|
at = body.get("application_token")
|
||||||
|
if isinstance(at, str) and at.strip():
|
||||||
|
return at.strip()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_db_for_application_token(application_token: str) -> TinyDB:
|
||||||
|
safe = _sanitize_filename(application_token)
|
||||||
|
path = os.path.join(DB_DIR, f"{safe}.json")
|
||||||
|
# return cached TinyDB if exists
|
||||||
|
db = _DB_CACHE.get(path)
|
||||||
|
if db is None:
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
db = TinyDB(path)
|
||||||
|
_DB_CACHE[path] = db
|
||||||
|
return db
|
||||||
|
|
||||||
|
def authenticate_and_select_db(body: Optional[dict]) -> Tuple[Optional[TinyDB], Optional[tuple]]:
|
||||||
|
# Verify Bearer token
|
||||||
|
authz = request.headers.get("Authorization", "")
|
||||||
|
if not authz.startswith("Bearer "):
|
||||||
|
return None, (jsonify({"error": "Missing Authorization Bearer token"}), 401)
|
||||||
|
ok, payload, err = verify_token(authz[len("Bearer "):].strip())
|
||||||
|
if not ok:
|
||||||
|
return None, (jsonify({"error": f"Unauthorized: {err}"}), 401)
|
||||||
|
# Get application token to select DB
|
||||||
|
application_token = get_application_token_from(body)
|
||||||
|
if not application_token:
|
||||||
|
return None, (jsonify({"error": "Missing application_token (header X-Application-Token or JSON body)"}), 400)
|
||||||
|
db = get_db_for_application_token(application_token)
|
||||||
|
return db, None
|
||||||
|
|
||||||
def build_query(field: str, op: str, value):
|
def build_query(field: str, op: str, value):
|
||||||
"""Translate a simple JSON filter into a TinyDB Query.
|
"""Translate a simple JSON filter into a TinyDB Query.
|
||||||
@@ -98,6 +186,26 @@ def parse_assignment(expr: str):
|
|||||||
value = value_raw
|
value = value_raw
|
||||||
return field, value
|
return field, value
|
||||||
|
|
||||||
|
@app.route("/connect", methods=["POST"])
|
||||||
|
def connect():
|
||||||
|
body, err = json_body()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
app_id = body.get("application_id") if isinstance(body, dict) else None
|
||||||
|
application_token = body.get("application_token") if isinstance(body, dict) else None
|
||||||
|
if not isinstance(app_id, str) or not app_id.strip():
|
||||||
|
return jsonify({"error": "application_id required"}), 400
|
||||||
|
if not isinstance(application_token, str) or not application_token.strip():
|
||||||
|
return jsonify({"error": "application_token required"}), 400
|
||||||
|
# Ensure DB file exists for this application token
|
||||||
|
_ = get_db_for_application_token(application_token)
|
||||||
|
tok = sign_token(app_id.strip())
|
||||||
|
return jsonify({
|
||||||
|
"token": tok,
|
||||||
|
"expires_in": TOKEN_TTL,
|
||||||
|
"token_type": "Bearer",
|
||||||
|
"application_token": application_token
|
||||||
|
}), 200
|
||||||
|
|
||||||
# --- Routes ------------------------------------------------------------------
|
# --- Routes ------------------------------------------------------------------
|
||||||
|
|
||||||
@@ -111,6 +219,9 @@ def insert():
|
|||||||
body, err = json_body()
|
body, err = json_body()
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
# Accept either {"doc": {...}} or raw JSON object as the document
|
# Accept either {"doc": {...}} or raw JSON object as the document
|
||||||
if isinstance(body, dict) and "doc" in body:
|
if isinstance(body, dict) and "doc" in body:
|
||||||
doc = body["doc"]
|
doc = body["doc"]
|
||||||
@@ -128,6 +239,9 @@ def insert_many():
|
|||||||
body, err = json_body()
|
body, err = json_body()
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
docs = body.get("docs") if isinstance(body, dict) else None
|
docs = body.get("docs") if isinstance(body, dict) else None
|
||||||
if not isinstance(docs, list) or not all(isinstance(d, dict) for d in docs):
|
if not isinstance(docs, list) or not all(isinstance(d, dict) for d in docs):
|
||||||
return jsonify({"error": "Expected {docs: [ {...}, {...} ]}"}), 400
|
return jsonify({"error": "Expected {docs: [ {...}, {...} ]}"}), 400
|
||||||
@@ -137,12 +251,15 @@ def insert_many():
|
|||||||
|
|
||||||
@app.route("/get_all", methods=["POST", "GET"])
|
@app.route("/get_all", methods=["POST", "GET"])
|
||||||
def get_all():
|
def get_all():
|
||||||
|
body, _ = json_body(required=False)
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
data = db.all()
|
data = db.all()
|
||||||
_items = []
|
_items = []
|
||||||
for item in data:
|
for item in data:
|
||||||
_id = item.doc_id
|
d = dict(item)
|
||||||
d = {}
|
d["doc_id"] = item.doc_id
|
||||||
d[_id] = item
|
|
||||||
_items.append(d)
|
_items.append(d)
|
||||||
return jsonify(_items), 200
|
return jsonify(_items), 200
|
||||||
|
|
||||||
@@ -152,6 +269,9 @@ def get_one():
|
|||||||
body, err = json_body()
|
body, err = json_body()
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
doc_id = body.get("doc_id") if isinstance(body, dict) else None
|
doc_id = body.get("doc_id") if isinstance(body, dict) else None
|
||||||
if not isinstance(doc_id, int):
|
if not isinstance(doc_id, int):
|
||||||
return jsonify({"error": "Expected {doc_id: <int>}"}), 400
|
return jsonify({"error": "Expected {doc_id: <int>}"}), 400
|
||||||
@@ -170,6 +290,9 @@ def search():
|
|||||||
print (body)
|
print (body)
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
if not isinstance(body, dict):
|
if not isinstance(body, dict):
|
||||||
return jsonify({"error": "Expected JSON object"}), 400
|
return jsonify({"error": "Expected JSON object"}), 400
|
||||||
|
|
||||||
@@ -197,9 +320,8 @@ def search():
|
|||||||
results = db.search(q)
|
results = db.search(q)
|
||||||
_items = []
|
_items = []
|
||||||
for item in results:
|
for item in results:
|
||||||
_id = item.doc_id
|
d = dict(item)
|
||||||
d = {}
|
d["doc_id"] = item.doc_id
|
||||||
d[_id] = item
|
|
||||||
_items.append(d)
|
_items.append(d)
|
||||||
return jsonify(_items), 200
|
return jsonify(_items), 200
|
||||||
|
|
||||||
@@ -209,6 +331,9 @@ def update():
|
|||||||
body, err = json_body()
|
body, err = json_body()
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
if not isinstance(body, dict):
|
if not isinstance(body, dict):
|
||||||
return jsonify({"error": "Expected JSON object"}), 400
|
return jsonify({"error": "Expected JSON object"}), 400
|
||||||
print(body)
|
print(body)
|
||||||
@@ -263,6 +388,9 @@ def remove():
|
|||||||
body, err = json_body()
|
body, err = json_body()
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
if not isinstance(body, dict):
|
if not isinstance(body, dict):
|
||||||
return jsonify({"error": "Expected JSON object"}), 400
|
return jsonify({"error": "Expected JSON object"}), 400
|
||||||
|
|
||||||
@@ -296,10 +424,15 @@ def remove():
|
|||||||
|
|
||||||
@app.route("/truncate", methods=["POST"])
|
@app.route("/truncate", methods=["POST"])
|
||||||
def truncate():
|
def truncate():
|
||||||
|
body, _ = json_body(required=False)
|
||||||
|
db, auth_err = authenticate_and_select_db(body)
|
||||||
|
if auth_err:
|
||||||
|
return auth_err
|
||||||
db.truncate()
|
db.truncate()
|
||||||
return jsonify({"message": "truncated"}), 200
|
return jsonify({"message": "truncated"}), 200
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Note: Flask dev server is single-threaded by default; good for local testing.
|
# Note: Flask dev server is single-threaded by default; good for local testing.
|
||||||
|
# In Docker, gunicorn is used to run this app.
|
||||||
app.run(debug=True, port=5001)
|
app.run(debug=True, port=5001)
|
||||||
Reference in New Issue
Block a user