Files
telegram-gateway/server.go
Gabriel Radureau 799e10dcc2
Some checks failed
Docker Build / build-and-push-image (push) Has been cancelled
Phase 2b — durable Postgres queue + worker (gated on DATABASE_URL)
Adds the async dispatch infrastructure :

- Postgres pool + embedded migration (CREATE TABLE/INDEX IF NOT EXISTS
  gateway_jobs). Auto-applied at boot. lib/pq driver (matches webapp
  convention).
- queue.go : Enqueue (idempotent on UNIQUE(bot_slug, update_id) — handles
  Telegram redelivery), Pop with FOR UPDATE SKIP LOCKED, MarkDone,
  MarkFailed with exponential backoff (30s → 2m → 10m → 1h → dead at 5).
- worker.go : goroutine that drains the queue, dispatches via the same
  Handler interface as sync, schedules retries on failure, notifies the
  user once when a job goes to dead.
- BotConfig gains `async: bool`. Registry refuses bots with async=true
  if DATABASE_URL is unset (queue=nil).
- Server : when bot.Async, the webhook ack is immediate ; the update
  payload is enqueued for the worker.

When DATABASE_URL is unset (current default), queue/worker stay disabled
and only sync handlers (echo, http, auth) work — no breaking change to
the running cluster.

Refs ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
2026-05-09 14:38:41 +02:00

140 lines
4.4 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
)
type Server struct {
registry *Registry
auth *Auth
allowlist Allowlist
tg *TelegramClient
queue Queue
}
func NewServer(r *Registry, auth *Auth, allow Allowlist, tg *TelegramClient, queue Queue) *Server {
return &Server{registry: r, auth: auth, allowlist: allow, tg: tg, queue: queue}
}
func (s *Server) Routes() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/healthz", s.health)
mux.HandleFunc("/readyz", s.ready)
mux.HandleFunc("/bot/", s.botWebhook)
return chain(mux, recoverMW, accessLogMW)
}
func (s *Server) health(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "OK")
}
func (s *Server) ready(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "OK")
}
func (s *Server) botWebhook(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
slug := strings.TrimPrefix(r.URL.Path, "/bot/")
slug = strings.Trim(slug, "/")
if slug == "" || strings.Contains(slug, "/") {
http.Error(w, "bot slug missing or malformed", http.StatusBadRequest)
return
}
bot, ok := s.registry.Get(slug)
if !ok {
http.Error(w, "unknown bot", http.StatusNotFound)
return
}
if !verifyTelegramSecret(r.Header.Get("X-Telegram-Bot-Api-Secret-Token"), bot.Secret) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
var update Update
// NOTE: pas de DisallowUnknownFields — Telegram ajoute des champs
// (entities, sticker, photo, forum_topic…) au fil du temps. On reste
// tolérant et on n'extrait que ce dont on a besoin.
if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
http.Error(w, "bad update payload", http.StatusBadRequest)
return
}
// Allowlist gate (Phase 1.5). When ALLOWED_USERS is set and the user is
// not in it, ack 200 and silently drop. See ADR 20260509.
userID, _ := update.UserID()
if !s.allowlist.IsAllowed(userID) {
log.Printf("bot=%s update=%d dropped: user=%d not in ALLOWED_USERS", slug, update.UpdateID, userID)
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "{}")
return
}
// requireAuth gate (Phase 1.5). When the bot opts in, redirect
// unauthenticated users to /auth on the principal bot.
if bot.RequireAuth {
ok, err := s.auth.IsAuthed(r.Context(), userID)
if err != nil {
log.Printf("bot=%s update=%d auth check error: %v", slug, update.UpdateID, err)
// Fail-closed : on Redis errors, refuse access (consistent with the ADR).
ok = false
}
if !ok {
if chatID, hasChat := update.ChatID(); hasChat && s.tg != nil {
_ = s.tg.SendMessage(r.Context(), bot.Token, SendMessageParams{
ChatID: chatID,
Text: "🔒 Authentifie-toi d'abord avec `/auth <code>` chez @arcodange_factory_bot",
})
}
log.Printf("bot=%s update=%d gated: user=%d not authed", slug, update.UpdateID, userID)
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "{}")
return
}
}
// Async dispatch (Phase 2b) : enqueue + ack 200 immediately. The worker
// drains the queue and runs the handler asynchronously. Use this for
// handlers that may exceed Telegram's webhook timeout (~60s) or whose
// backend can be temporarily unreachable (e.g. Macbook Ollama dort).
if bot.Async {
if s.queue == nil {
log.Printf("bot=%s update=%d async requested but no queue configured", slug, update.UpdateID)
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "{}")
return
}
// Re-marshal the update : we already decoded once, but the original
// body is preferable for the worker (preserves any unknown fields).
// Use the parsed struct re-encoded — fields we don't model are lost
// but we documented the lenient-decode tradeoff in feedback memory.
payload, _ := json.Marshal(update)
if err := EnqueueWithDefaults(r.Context(), s.queue, slug, bot.HandlerType(), update.UpdateID, payload); err != nil {
log.Printf("bot=%s update=%d enqueue error: %v", slug, update.UpdateID, err)
} else {
log.Printf("bot=%s update=%d enqueued (async)", slug, update.UpdateID)
}
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "{}")
return
}
if err := bot.Handler.Handle(r.Context(), update, bot); err != nil {
log.Printf("bot=%s update=%d handler error: %v", slug, update.UpdateID, err)
}
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprint(w, "{}")
}