""" SCRIPTFLOW Deploy API Runs on the Hetzner host (in a container) with the sites/ volume mounted. Lets Hermes deploy projects automatically — no manual copy-paste. Security: - All endpoints require header X-Deploy-Token matching DEPLOY_TOKEN. - slug is sanitized to [a-z0-9-] so it can never escape /srv/sites. - Caddy sits in front with on-demand TLS. """ import io import os import re import json import shutil import tarfile import datetime from typing import Optional from fastapi import FastAPI, Header, HTTPException, UploadFile, File, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import uuid DEPLOY_TOKEN = os.environ.get("DEPLOY_TOKEN", "") SITES_DIR = os.environ.get("SITES_DIR", "/srv/sites") # control app reads /projects.json relative to its origin -> sites/control/projects.json REGISTRY_PATH = os.environ.get("REGISTRY_PATH", "/srv/sites/control/projects.json") # Jobs bridge: the Redesigner tab drops crawl/redesign jobs here, Hermes picks them up. JOBS_PATH = os.environ.get("JOBS_PATH", "/srv/sites/_jobs/redesign_jobs.json") SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,40}$") app = FastAPI(title="Scriptflow Deploy API", version="1.4.0") # The Redesigner tab submits jobs cross-origin. Allow any scriptflow subdomain. app.add_middleware( CORSMiddleware, allow_origin_regex=r"https://[a-z0-9-]+\.scriptflow-agent\.de", allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) def auth(token: Optional[str]): if not DEPLOY_TOKEN: raise HTTPException(500, "DEPLOY_TOKEN not configured on server") if token != DEPLOY_TOKEN: raise HTTPException(401, "invalid deploy token") def safe_slug(slug: str) -> str: if not SLUG_RE.match(slug): raise HTTPException(400, "invalid slug (allowed: a-z 0-9 -)") return slug def _safe_extract(tar: tarfile.TarFile, dest: str): """Prevent path traversal (tarbomb / ../ escapes).""" dest_abs = os.path.abspath(dest) for member in tar.getmembers(): target = os.path.abspath(os.path.join(dest, member.name)) if not (target == dest_abs or target.startswith(dest_abs + os.sep)): raise HTTPException(400, f"unsafe path in archive: {member.name}") tar.extractall(dest) @app.get("/api/health") def health(): return { "ok": True, "service": "deploy-api", "sites_dir": SITES_DIR, "sites": sorted(os.listdir(SITES_DIR)) if os.path.isdir(SITES_DIR) else [], "time": datetime.datetime.utcnow().isoformat() + "Z", } @app.post("/api/deploy/{slug}") async def deploy(slug: str, file: UploadFile = File(...), x_deploy_token: Optional[str] = Header(default=None)): auth(x_deploy_token) slug = safe_slug(slug) dest = os.path.join(SITES_DIR, slug) raw = await file.read() if not raw: raise HTTPException(400, "empty upload") # extract into a temp dir first, then swap (atomic-ish) tmp = dest + ".incoming" if os.path.exists(tmp): shutil.rmtree(tmp) os.makedirs(tmp, exist_ok=True) try: with tarfile.open(fileobj=io.BytesIO(raw), mode="r:gz") as tar: _safe_extract(tar, tmp) except HTTPException: shutil.rmtree(tmp, ignore_errors=True) raise except Exception as e: shutil.rmtree(tmp, ignore_errors=True) raise HTTPException(400, f"bad archive: {e}") # require an index.html somewhere has_index = any("index.html" in files for _, _, files in os.walk(tmp)) if not has_index: shutil.rmtree(tmp, ignore_errors=True) raise HTTPException(400, "archive has no index.html") # Preserve runtime-managed files that must survive a redeploy. # The control app's registry (projects.json) is written live via /api/registry # and must NOT be clobbered by whatever stale copy is bundled in the build. # If the live file exists, it always wins over the uploaded one. PRESERVE = ["projects.json"] if os.path.isdir(dest): for fname in PRESERVE: live = os.path.join(dest, fname) if os.path.isfile(live): shutil.copy2(live, os.path.join(tmp, fname)) # swap if os.path.exists(dest): shutil.rmtree(dest) os.rename(tmp, dest) return { "ok": True, "slug": slug, "url": f"https://{slug}.scriptflow-agent.de", "files": sorted(os.listdir(dest))[:50], "deployed_at": datetime.datetime.utcnow().isoformat() + "Z", } @app.post("/api/preview/{slug}") async def preview_deploy(slug: str, file: UploadFile = File(...), x_deploy_token: Optional[str] = Header(default=None)): """Deploy to preview.scriptflow-agent.de/{slug}/ — shared subdomain.""" auth(x_deploy_token) slug = safe_slug(slug) # Preview files go under sites/preview/{slug}/ dest = os.path.join(SITES_DIR, "preview", slug) raw = await file.read() if not raw: raise HTTPException(400, "empty upload") tmp = dest + ".incoming" if os.path.exists(tmp): shutil.rmtree(tmp) os.makedirs(tmp, exist_ok=True) try: with tarfile.open(fileobj=io.BytesIO(raw), mode="r:gz") as tar: _safe_extract(tar, tmp) except HTTPException: shutil.rmtree(tmp, ignore_errors=True) raise except Exception as e: shutil.rmtree(tmp, ignore_errors=True) raise HTTPException(400, f"bad archive: {e}") # Swap if os.path.exists(dest): shutil.rmtree(dest) os.rename(tmp, dest) return { "ok": True, "slug": slug, "url": f"https://preview.scriptflow-agent.de/{slug}/", "files": sorted(os.listdir(dest))[:50], "deployed_at": datetime.datetime.utcnow().isoformat() + "Z", } @app.delete("/api/preview/{slug}") def preview_undeploy(slug: str, x_deploy_token: Optional[str] = Header(default=None)): """Delete a preview deployment.""" auth(x_deploy_token) slug = safe_slug(slug) dest = os.path.join(SITES_DIR, "preview", slug) if not os.path.isdir(dest): raise HTTPException(404, "preview not found") shutil.rmtree(dest) return {"ok": True, "removed": slug} @app.delete("/api/deploy/{slug}") def undeploy(slug: str, x_deploy_token: Optional[str] = Header(default=None)): auth(x_deploy_token) slug = safe_slug(slug) dest = os.path.join(SITES_DIR, slug) if not os.path.isdir(dest): raise HTTPException(404, "not deployed") shutil.rmtree(dest) return {"ok": True, "removed": slug} @app.get("/api/registry") def get_registry(x_deploy_token: Optional[str] = Header(default=None)): auth(x_deploy_token) try: with open(REGISTRY_PATH, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: return {"projects": [], "services": []} @app.put("/api/registry") async def put_registry(request: Request, x_deploy_token: Optional[str] = Header(default=None)): auth(x_deploy_token) try: body = await request.json() except Exception: raise HTTPException(400, "body must be JSON") if not isinstance(body, dict) or "projects" not in body: raise HTTPException(400, "registry must be an object with a 'projects' key") os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True) tmp = REGISTRY_PATH + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(body, f, ensure_ascii=False, indent=2) os.replace(tmp, REGISTRY_PATH) return {"ok": True, "projects": len(body.get("projects", []))} # --------------------------------------------------------------------------- # Jobs bridge — the Redesigner tab queues crawl/redesign jobs, Hermes works them # --------------------------------------------------------------------------- MAX_PENDING_JOBS = 50 # Job types that bypass the regular queue and are always processed first. HIGH_PRIORITY_TYPES = {"section-generate", "styleguide-generate"} def _job_priority(job: dict) -> int: """Lower number = higher priority. High-priority types skip ahead of redesign jobs.""" return 0 if job.get("type") in HIGH_PRIORITY_TYPES else 1 def _read_jobs() -> list: try: with open(JOBS_PATH, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, list) else [] except (FileNotFoundError, json.JSONDecodeError): return [] def _read_jobs_prioritised() -> list: """Return jobs with pending high-priority jobs sorted to the front. Done/error jobs keep their original order (chronological) for history display.""" jobs = _read_jobs() pending = sorted([j for j in jobs if j.get("status") == "pending"], key=_job_priority) non_pending = [j for j in jobs if j.get("status") != "pending"] return pending + non_pending def _write_jobs(jobs: list): os.makedirs(os.path.dirname(JOBS_PATH), exist_ok=True) tmp = JOBS_PATH + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(jobs, f, ensure_ascii=False, indent=2) os.replace(tmp, JOBS_PATH) @app.post("/api/jobs") async def create_job(request: Request): """Public (CORS-restricted to control tab). Queues a redesign job. No deploy token — but the payload is validated and pending jobs are capped so it can't be abused to fill the disk.""" try: body = await request.json() except Exception: raise HTTPException(400, "body must be JSON") business = body.get("business") url = body.get("url") if not isinstance(business, str) or not business.strip(): raise HTTPException(400, "missing 'business' name") if not isinstance(url, str) or not url.startswith(("http://", "https://")): raise HTTPException(400, "missing or invalid 'url' (must be http/https)") jobs = _read_jobs() pending = [j for j in jobs if j.get("status") == "pending"] if len(pending) >= MAX_PENDING_JOBS: raise HTTPException(429, "too many pending jobs, try later") job = { "id": uuid.uuid4().hex[:12], "business": business.strip()[:120], "url": url.strip()[:300], "type": str(body.get("type", "restaurant"))[:40], "address": str(body.get("address", ""))[:200], "status": "pending", "created_at": datetime.datetime.utcnow().isoformat() + "Z", "result_slug": None, "result_url": None, "error": None, } jobs.append(job) _write_jobs(jobs) return {"ok": True, "job": job} @app.get("/api/jobs") def list_jobs(status: Optional[str] = None, x_deploy_token: Optional[str] = Header(default=None)): """Hermes reads the queue (deploy token required). Pending high-priority jobs (section-generate, styleguide-generate) are always returned before regular redesign jobs so they get picked up first.""" auth(x_deploy_token) jobs = _read_jobs_prioritised() if status: jobs = [j for j in jobs if j.get("status") == status] return {"jobs": jobs} @app.get("/api/jobs/public") def list_jobs_public(): """Tab reads job statuses to show progress (no token, CORS-restricted).""" jobs = _read_jobs() slim = [{ "id": j.get("id"), "business": j.get("business"), "url": j.get("url"), "status": j.get("status"), "created_at": j.get("created_at"), "result_url": j.get("result_url"), "error": j.get("error"), } for j in jobs] return {"jobs": slim} @app.post("/api/jobs/claim") async def claim_job(request: Request, x_deploy_token: Optional[str] = Header(default=None)): """Atomically claim the next pending job and mark it as 'working'. Returns the claimed job, or 404 if no pending jobs are available. Safe to call from multiple concurrent workers — FastAPI's single-threaded event loop guarantees only one coroutine runs this at a time.""" auth(x_deploy_token) try: body = await request.json() except Exception: body = {} worker_id = str(body.get("worker_id", ""))[:64] jobs = _read_jobs_prioritised() found = None for j in jobs: if j.get("status") == "pending": found = j break if not found: raise HTTPException(404, "no pending jobs") now = datetime.datetime.utcnow().isoformat() + "Z" all_jobs = _read_jobs() for j in all_jobs: if j.get("id") == found["id"]: j["status"] = "working" j["worker_id"] = worker_id j["updated_at"] = now found = j break _write_jobs(all_jobs) return {"ok": True, "job": found} @app.put("/api/jobs/{job_id}") async def update_job(job_id: str, request: Request, x_deploy_token: Optional[str] = Header(default=None)): """Hermes updates job status/result (deploy token required).""" auth(x_deploy_token) try: body = await request.json() except Exception: raise HTTPException(400, "body must be JSON") jobs = _read_jobs() found = None for j in jobs: if j.get("id") == job_id: for k in ("status", "result_slug", "result_url", "error", "progress", "status_msg", "styleguide", "knowledge", "wireframe", "timeline"): if k in body: j[k] = body[k] j["updated_at"] = datetime.datetime.utcnow().isoformat() + "Z" found = j break if not found: raise HTTPException(404, "job not found") _write_jobs(jobs) return {"ok": True, "job": found} @app.delete("/api/jobs/{job_id}") def delete_job(job_id: str, x_deploy_token: Optional[str] = Header(default=None)): auth(x_deploy_token) jobs = _read_jobs() new = [j for j in jobs if j.get("id") != job_id] if len(new) == len(jobs): raise HTTPException(404, "job not found") _write_jobs(new) return {"ok": True, "removed": job_id}