305 lines
10 KiB
Python
305 lines
10 KiB
Python
from flask import Flask, request, jsonify
|
|
from tinydb import TinyDB, Query, where
|
|
from tinydb.operations import set as ops_set
|
|
import json
|
|
|
|
from flask_cors import CORS
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
|
|
UI_ORIGIN = os.getenv("UI_ORIGIN", "https://db.northdanubesoft.eu")
|
|
CORS(app, resources={r"/*": {"origins": [UI_ORIGIN]}})
|
|
|
|
|
|
|
|
# Single TinyDB file for now (can be swapped to per-table later)
|
|
DB_PATH = os.getenv("DB_PATH", "db.json")
|
|
db = TinyDB(DB_PATH)
|
|
|
|
# --- Helpers -----------------------------------------------------------------
|
|
|
|
def json_body(required: bool = True):
|
|
data = request.get_json(silent=True)
|
|
if data is None:
|
|
raw = request.get_data(as_text=True)
|
|
if raw:
|
|
try:
|
|
data = json.loads(raw)
|
|
except Exception:
|
|
if required:
|
|
return None, (jsonify({"error": "Expected JSON body"}), 400)
|
|
else:
|
|
data = None
|
|
elif isinstance(data, str):
|
|
try:
|
|
data = json.loads(data)
|
|
except Exception:
|
|
if required:
|
|
return None, (jsonify({"error": "Expected JSON body"}), 400)
|
|
else:
|
|
data = None
|
|
if required and data is None:
|
|
return None, (jsonify({"error": "Expected JSON body"}), 400)
|
|
return data, None
|
|
|
|
|
|
def build_query(field: str, op: str, value):
|
|
"""Translate a simple JSON filter into a TinyDB Query.
|
|
Supported ops: ==, !=, >, >=, <, <=, in, contains
|
|
"""
|
|
f = where(field)
|
|
if op == "==":
|
|
return f == value
|
|
if op == "!=":
|
|
return f != value
|
|
if op == ">":
|
|
return f > value
|
|
if op == ">=":
|
|
return f >= value
|
|
if op == "<":
|
|
return f < value
|
|
if op == "<=":
|
|
return f <= value
|
|
if op == "in":
|
|
# value should be a list
|
|
if not isinstance(value, list):
|
|
value = [value]
|
|
return f.one_of(value)
|
|
if op == "contains":
|
|
# substring for strings; membership for lists
|
|
return f.test(lambda v: (isinstance(v, str) and isinstance(value, str) and value in v)
|
|
or (isinstance(v, (list, tuple, set)) and value in v))
|
|
raise ValueError(f"Unsupported op: {op}")
|
|
|
|
def parse_assignment(expr: str):
|
|
"""Parse a string expression like 'field = \"value\"' into (field, value)."""
|
|
if '=' not in expr:
|
|
raise ValueError("Expression must contain '='")
|
|
field_part, value_part = expr.split('=', 1)
|
|
field = field_part.strip()
|
|
value_raw = value_part.strip()
|
|
# Remove trailing comma if present
|
|
if value_raw.endswith(','):
|
|
value_raw = value_raw[:-1].rstrip()
|
|
# Strip quotes if value starts and ends with same quote
|
|
if (len(value_raw) >= 2) and ((value_raw[0] == value_raw[-1]) and value_raw[0] in ("'", '"')):
|
|
value = value_raw[1:-1]
|
|
else:
|
|
# Try to parse JSON for numbers, booleans, null, objects, arrays, or quoted strings
|
|
# Check if looks like JSON
|
|
json_like_start = ('{', '[', '"', '-', 't', 'f', 'n') + tuple(str(i) for i in range(10))
|
|
if value_raw and (value_raw[0] in json_like_start):
|
|
try:
|
|
value = json.loads(value_raw)
|
|
except Exception:
|
|
value = value_raw
|
|
else:
|
|
value = value_raw
|
|
return field, value
|
|
|
|
|
|
# --- Routes ------------------------------------------------------------------
|
|
|
|
@app.route("/healthz", methods=["GET"])
|
|
def healthz():
|
|
return jsonify({"status": "ok"}), 200
|
|
|
|
|
|
@app.route("/insert", methods=["POST"])
|
|
def insert():
|
|
body, err = json_body()
|
|
if err:
|
|
return err
|
|
# Accept either {"doc": {...}} or raw JSON object as the document
|
|
if isinstance(body, dict) and "doc" in body:
|
|
doc = body["doc"]
|
|
else:
|
|
doc = body
|
|
if not isinstance(doc, dict):
|
|
return jsonify({"error": "Body must be an object or {doc: {...}}"}), 400
|
|
|
|
doc_id = db.insert(doc)
|
|
return jsonify({"message": "inserted", "doc_id": doc_id}), 200
|
|
|
|
|
|
@app.route("/insert_many", methods=["POST"])
|
|
def insert_many():
|
|
body, err = json_body()
|
|
if err:
|
|
return err
|
|
docs = body.get("docs") if isinstance(body, dict) else None
|
|
if not isinstance(docs, list) or not all(isinstance(d, dict) for d in docs):
|
|
return jsonify({"error": "Expected {docs: [ {...}, {...} ]}"}), 400
|
|
ids = db.insert_multiple(docs)
|
|
return jsonify({"message": "inserted", "doc_ids": ids}), 200
|
|
|
|
|
|
@app.route("/get_all", methods=["POST", "GET"])
|
|
def get_all():
|
|
data = db.all()
|
|
_items = []
|
|
for item in data:
|
|
_id = item.doc_id
|
|
d = {}
|
|
d[_id] = item
|
|
_items.append(d)
|
|
return jsonify(_items), 200
|
|
|
|
|
|
@app.route("/get", methods=["POST"])
|
|
def get_one():
|
|
body, err = json_body()
|
|
if err:
|
|
return err
|
|
doc_id = body.get("doc_id") if isinstance(body, dict) else None
|
|
if not isinstance(doc_id, int):
|
|
return jsonify({"error": "Expected {doc_id: <int>}"}), 400
|
|
doc = db.get(doc_id=doc_id)
|
|
if doc is None:
|
|
return jsonify({"error": "not found"}), 404
|
|
# Include doc_id so clients can reference it later
|
|
doc_with_id = dict(doc)
|
|
doc_with_id["doc_id"] = doc_id
|
|
return jsonify(doc_with_id), 200
|
|
|
|
|
|
@app.route("/search", methods=["POST"])
|
|
def search():
|
|
body, err = json_body()
|
|
print (body)
|
|
if err:
|
|
return err
|
|
if not isinstance(body, dict):
|
|
return jsonify({"error": "Expected JSON object"}), 400
|
|
|
|
# Accept either a single filter or a list of filters (ANDed)
|
|
filters = body.get("where")
|
|
if isinstance(filters, dict):
|
|
filters = [filters]
|
|
if not isinstance(filters, list) or not filters:
|
|
return jsonify({"error": "Expected {where: {field, op, value}} or a list of them"}), 400
|
|
|
|
try:
|
|
q = None
|
|
for f in filters:
|
|
field = f.get("field")
|
|
op = f.get("op")
|
|
value = f.get("value")
|
|
if not isinstance(field, str) or not isinstance(op, str):
|
|
return jsonify({"error": "Each filter needs 'field' and 'op'"}), 400
|
|
clause = build_query(field, op, value)
|
|
q = clause if q is None else (q & clause)
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
if not callable(q):
|
|
return jsonify({"error":"Invalid query built"}), 400
|
|
results = db.search(q)
|
|
_items = []
|
|
for item in results:
|
|
_id = item.doc_id
|
|
d = {}
|
|
d[_id] = item
|
|
_items.append(d)
|
|
return jsonify(_items), 200
|
|
|
|
|
|
@app.route("/update", methods=["POST"])
|
|
def update():
|
|
body, err = json_body()
|
|
if err:
|
|
return err
|
|
if not isinstance(body, dict):
|
|
return jsonify({"error": "Expected JSON object"}), 400
|
|
print(body)
|
|
# Option A: update by doc_id
|
|
if "doc_id" in body and "fields" in body:
|
|
doc_id = body.get("doc_id")
|
|
fields = body.get("fields")
|
|
if isinstance(fields, str):
|
|
try:
|
|
field, value = parse_assignment(fields)
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to parse fields string: {str(e)}"}), 400
|
|
updated = db.update(ops_set(field, value), doc_ids=[doc_id])
|
|
return jsonify({"updated": len(updated)}), 200
|
|
if not isinstance(doc_id, int) or not isinstance(fields, dict):
|
|
return jsonify({"error": "Expected {doc_id: int, fields: {...}}"}), 400
|
|
updated = db.update(fields, doc_ids=[doc_id])
|
|
return jsonify({"updated": len(updated)}), 200
|
|
|
|
# Option B: update by query
|
|
if "where" in body and "fields" in body:
|
|
filters = body.get("where")
|
|
fields = body.get("fields")
|
|
if isinstance(filters, dict):
|
|
filters = [filters]
|
|
if not isinstance(filters, list):
|
|
return jsonify({"error": "Expected {where: [...], fields: {...}}"}), 400
|
|
try:
|
|
q = None
|
|
for f in filters:
|
|
clause = build_query(f.get("field"), f.get("op"), f.get("value"))
|
|
q = clause if q is None else (q & clause)
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
if isinstance(fields, str):
|
|
try:
|
|
field, value = parse_assignment(fields)
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to parse fields string: {str(e)}"}), 400
|
|
updated = db.update(ops_set(field, value), q)
|
|
return jsonify({"updated": len(updated)}), 200
|
|
if not isinstance(fields, dict):
|
|
return jsonify({"error": "Expected {where: [...], fields: {...}}"}), 400
|
|
updated = db.update(fields, q)
|
|
return jsonify({"updated": len(updated)}), 200
|
|
|
|
return jsonify({"error": "Provide either {doc_id, fields} or {where, fields}"}), 400
|
|
|
|
|
|
@app.route("/remove", methods=["POST"])
|
|
def remove():
|
|
body, err = json_body()
|
|
if err:
|
|
return err
|
|
if not isinstance(body, dict):
|
|
return jsonify({"error": "Expected JSON object"}), 400
|
|
|
|
# Option A: by doc_id
|
|
if "doc_id" in body:
|
|
doc_id = body.get("doc_id")
|
|
if not isinstance(doc_id, int):
|
|
return jsonify({"error": "Expected {doc_id: int}"}), 400
|
|
removed = db.remove(doc_ids=[doc_id])
|
|
return jsonify({"removed": len(removed)}), 200
|
|
|
|
# Option B: by query
|
|
if "where" in body:
|
|
filters = body.get("where")
|
|
if isinstance(filters, dict):
|
|
filters = [filters]
|
|
if not isinstance(filters, list):
|
|
return jsonify({"error": "Expected {where: [...]}"}), 400
|
|
try:
|
|
q = None
|
|
for f in filters:
|
|
clause = build_query(f.get("field"), f.get("op"), f.get("value"))
|
|
q = clause if q is None else (q & clause)
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
removed = db.remove(q)
|
|
return jsonify({"removed": len(removed)}), 200
|
|
|
|
return jsonify({"error": "Provide {doc_id} or {where}"}), 400
|
|
|
|
|
|
@app.route("/truncate", methods=["POST"])
|
|
def truncate():
|
|
db.truncate()
|
|
return jsonify({"message": "truncated"}), 200
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Note: Flask dev server is single-threaded by default; good for local testing.
|
|
app.run(debug=True, port=5001) |