#!/usr/bin/env bash # Snapshot the read-only state of the Arcodange Dolibarr into one JSON file. # # Usage: # snapshot.sh [--out PATH] # default: ./snapshot-YYYY-MM-DDTHHMMSS.json # snapshot.sh --print-only # write to stdout instead of a file # # The snapshot is content-addressable: it includes a SHA-256 of the # serialized payload (computed AFTER stable key-sorting) so two snapshots # of the same state hash identically. Useful for: # - cohort review evidence packs (sign + send) # - drift detection between dates (diff two snapshots) # - archival before a known-risky change # # What's included (everything the dolibarr-* family reads): # - status (Dolibarr version) # - thirdparties (full list + detail) # - invoices (full list + per-invoice detail + per-invoice payments) # - recurring invoice templates (probed 1..MAX_TEMPLATE_ID) # - products # - bank accounts # # Excluded by design: # - PDF attachments (binary, would bloat the snapshot ~50KB each) # - users/info (would leak ai_agent details) # - any non-read endpoints # # Requires: curl, jq, python3 (with hashlib — standard lib). set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" DOL_CURL="${SCRIPT_DIR}/../../dolibarr/scripts/dol-curl.sh" MAX_TEMPLATE_ID=20 EMPTY_TPL_TOLERANCE=5 OUT="" PRINT_ONLY=0 while [[ $# -gt 0 ]]; do case "$1" in --out) OUT="$2"; shift 2 ;; --print-only) PRINT_ONLY=1; shift ;; --max-template-id) MAX_TEMPLATE_ID="$2"; shift 2 ;; -h|--help) sed -n '2,20p' "$0" | sed 's/^# \{0,1\}//'; exit 0 ;; *) echo "snapshot.sh: unknown arg: $1" >&2; exit 2 ;; esac done WORK="$(mktemp -d -t dolsnap.XXXXXX)" trap 'rm -rf "${WORK}"' EXIT fetch_into() { local out_file="$1" path="$2" "${DOL_CURL}" "${path}" > "${out_file}" 2>/dev/null || { # On HTTP error, dol-curl prints body+stderr; capture body for record. "${DOL_CURL}" "${path}" > "${out_file}" 2>&1 || true } } # 1. Liveness + status fetch_into "${WORK}/status.json" /status # 2. Thirdparties (list + detail) fetch_into "${WORK}/tps_list.json" /thirdparties TP_IDS=$(python3 -c " import json,sys try: d = json.load(open(sys.argv[1])) except: d = [] if isinstance(d, list): print(' '.join(str(t['id']) for t in d if t.get('id'))) " "${WORK}/tps_list.json") mkdir -p "${WORK}/tps" for id in ${TP_IDS}; do fetch_into "${WORK}/tps/${id}.json" "/thirdparties/${id}"; done # 3. Invoices (list + detail + payments) fetch_into "${WORK}/inv_list.json" '/invoices?limit=500&sortfield=t.datef&sortorder=ASC' INV_IDS=$(python3 -c " import json,sys try: d = json.load(open(sys.argv[1])) except: d = [] print(' '.join(str(i['id']) for i in d if i.get('id'))) " "${WORK}/inv_list.json") mkdir -p "${WORK}/inv" "${WORK}/pay" for id in ${INV_IDS}; do fetch_into "${WORK}/inv/${id}.json" "/invoices/${id}" fetch_into "${WORK}/pay/${id}.json" "/invoices/${id}/payments" done # 4. Recurring templates (probe) mkdir -p "${WORK}/tpl" CONSECUTIVE_EMPTY=0 for tid in $(seq 1 "${MAX_TEMPLATE_ID}"); do fetch_into "${WORK}/tpl/${tid}.json" "/invoices/templates/${tid}" REAL=$(python3 -c "import json,sys try: d=json.load(open(sys.argv[1])); print('1' if d.get('id') else '0') except: print('0')" "${WORK}/tpl/${tid}.json") if [[ "${REAL}" == "1" ]]; then CONSECUTIVE_EMPTY=0 else CONSECUTIVE_EMPTY=$((CONSECUTIVE_EMPTY+1)) rm "${WORK}/tpl/${tid}.json" [[ ${CONSECUTIVE_EMPTY} -ge ${EMPTY_TPL_TOLERANCE} ]] && break fi done # 5. Products + bank accounts fetch_into "${WORK}/products.json" /products fetch_into "${WORK}/bankaccounts.json" /bankaccounts # 6. Compose the snapshot python3 - "${WORK}" <<'PY' > "${WORK}/snapshot.json" import json, os, sys, datetime, hashlib work = sys.argv[1] def load(path, default): try: return json.load(open(path)) except (FileNotFoundError, json.JSONDecodeError): return default def load_dir(dirname): out = {} full = os.path.join(work, dirname) if not os.path.isdir(full): return out for fn in sorted(os.listdir(full)): if not fn.endswith(".json"): continue key = fn[:-len(".json")] out[key] = load(os.path.join(full, fn), None) return out data = { "status": load(os.path.join(work, "status.json"), {}), "thirdparties": { "list": load(os.path.join(work, "tps_list.json"), []), "detail": load_dir("tps"), }, "invoices": { "list": load(os.path.join(work, "inv_list.json"), []), "detail": load_dir("inv"), "payments": load_dir("pay"), }, "recurring_templates": load_dir("tpl"), "products": load(os.path.join(work, "products.json"), []), "bank_accounts": load(os.path.join(work, "bankaccounts.json"), []), } # content_hash is the sha256 of `data` only — excludes timestamp + metadata, # so two snapshots of identical Dolibarr state hash identically. # (Drift detection is then: compare content_hash, done.) content_serialized = json.dumps(data, sort_keys=True, ensure_ascii=False).encode("utf-8") content_hash = "sha256:" + hashlib.sha256(content_serialized).hexdigest() payload = { "schema_version": "1", "captured_at": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "instance": "erp.arcodange.lab", "content_hash": content_hash, "data": data, } print(json.dumps(payload, indent=2, ensure_ascii=False, sort_keys=True)) PY # 7. Output if [[ "${PRINT_ONLY}" == "1" ]]; then cat "${WORK}/snapshot.json" else if [[ -z "${OUT}" ]]; then OUT="./snapshot-$(date -u +%Y-%m-%dT%H%M%SZ).json" fi cp "${WORK}/snapshot.json" "${OUT}" SIZE=$(stat -f %z "${OUT}" 2>/dev/null || stat -c %s "${OUT}") HASH=$(python3 -c "import json,sys; print(json.load(open(sys.argv[1]))['content_hash'])" "${OUT}") echo "wrote ${OUT} (${SIZE} bytes)" echo " ${HASH}" fi