Some checks failed
Docker Build / build-and-push-image (push) Has been cancelled
Adds the async dispatch infrastructure : - Postgres pool + embedded migration (CREATE TABLE/INDEX IF NOT EXISTS gateway_jobs). Auto-applied at boot. lib/pq driver (matches webapp convention). - queue.go : Enqueue (idempotent on UNIQUE(bot_slug, update_id) — handles Telegram redelivery), Pop with FOR UPDATE SKIP LOCKED, MarkDone, MarkFailed with exponential backoff (30s → 2m → 10m → 1h → dead at 5). - worker.go : goroutine that drains the queue, dispatches via the same Handler interface as sync, schedules retries on failure, notifies the user once when a job goes to dead. - BotConfig gains `async: bool`. Registry refuses bots with async=true if DATABASE_URL is unset (queue=nil). - Server : when bot.Async, the webhook ack is immediate ; the update payload is enqueued for the worker. When DATABASE_URL is unset (current default), queue/worker stay disabled and only sync handlers (echo, http, auth) work — no breaking change to the running cluster. Refs ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
143 lines
3.8 KiB
Go
143 lines
3.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"flag"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
const defaultListenAddr = ":8080"
|
|
const defaultConfigPath = "/etc/telegram-gateway/bots.yaml"
|
|
|
|
func main() {
|
|
subcmd := ""
|
|
if len(os.Args) > 1 && os.Args[1] != "" && os.Args[1][0] != '-' {
|
|
subcmd = os.Args[1]
|
|
os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
|
|
}
|
|
|
|
switch subcmd {
|
|
case "setwebhook":
|
|
runSetWebhook()
|
|
case "deletewebhook":
|
|
runDeleteWebhook()
|
|
case "", "serve":
|
|
runServer()
|
|
default:
|
|
log.Fatalf("unknown subcommand: %s (expected: serve | setwebhook | deletewebhook)", subcmd)
|
|
}
|
|
}
|
|
|
|
func runServer() {
|
|
addr := flag.String("addr", envOr("LISTEN_ADDR", defaultListenAddr), "listen address")
|
|
configPath := flag.String("config", envOr("CONFIG_PATH", defaultConfigPath), "bot routing config (YAML)")
|
|
flag.Parse()
|
|
|
|
cfg, err := LoadConfig(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("load config: %v", err)
|
|
}
|
|
|
|
tg := NewTelegramClient()
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
// Phase 2b — durable queue (Postgres) + worker. Disabled when
|
|
// DATABASE_URL is unset ; bots with `async: true` then fail config validation.
|
|
var queue Queue
|
|
var db *sql.DB
|
|
if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" {
|
|
var err error
|
|
db, err = OpenDB(ctx, dbURL)
|
|
if err != nil {
|
|
log.Fatalf("init db: %v", err)
|
|
}
|
|
defer func() { _ = db.Close() }()
|
|
queue = NewPostgresQueue(db)
|
|
log.Printf("queue + schema ready (db=%s)", RedactDSN(dbURL))
|
|
} else {
|
|
log.Print("DATABASE_URL unset → queue/worker disabled (sync handlers still work)")
|
|
}
|
|
|
|
// Phase 1.5 — auth layer (Redis-backed sessions). See
|
|
// factory/doc/adr/20260509-telegram-gateway-auth.md.
|
|
authSecret := os.Getenv("AUTH_SECRET")
|
|
redisURL := envOr("REDIS_URL", "redis://redis.tools.svc.cluster.local:6379/0")
|
|
ttl := 24 * time.Hour
|
|
if raw := os.Getenv("AUTH_SESSION_TTL"); raw != "" {
|
|
if d, err := time.ParseDuration(raw); err == nil && d > 0 {
|
|
ttl = d
|
|
} else {
|
|
log.Printf("AUTH_SESSION_TTL=%q invalid, defaulting to 24h", raw)
|
|
}
|
|
}
|
|
var auth *Auth
|
|
if authSecret != "" {
|
|
var aerr error
|
|
auth, aerr = NewAuth(redisURL, authSecret, ttl)
|
|
if aerr != nil {
|
|
log.Fatalf("init auth: %v", aerr)
|
|
}
|
|
log.Printf("auth layer initialized (TTL=%s, redis=%s)", ttl, redisURL)
|
|
} else {
|
|
log.Print("AUTH_SECRET unset → auth layer disabled (no bot may have handler=auth or requireAuth: true)")
|
|
}
|
|
|
|
allowlist := NewAllowlist(os.Getenv("ALLOWED_USERS"))
|
|
if allowlist.Open() {
|
|
log.Print("ALLOWED_USERS empty → allowlist open to all")
|
|
} else {
|
|
log.Printf("allowlist active (%d user(s) allowed)", allowlist.Size())
|
|
}
|
|
|
|
registry, err := NewRegistry(cfg, tg, auth, queue)
|
|
if err != nil {
|
|
log.Fatalf("build registry: %v", err)
|
|
}
|
|
|
|
if queue != nil {
|
|
worker := NewWorker(queue, registry, tg)
|
|
go worker.Run(ctx)
|
|
}
|
|
|
|
srv := &http.Server{
|
|
Addr: *addr,
|
|
Handler: NewServer(registry, auth, allowlist, tg, queue).Routes(),
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 30 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
log.Printf("telegram-gateway listening on %s (%d bot(s) loaded)", *addr, registry.Count())
|
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
log.Fatalf("server: %v", err)
|
|
}
|
|
}()
|
|
|
|
<-ctx.Done()
|
|
log.Print("shutdown signal received, draining...")
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
if err := srv.Shutdown(shutdownCtx); err != nil {
|
|
log.Printf("graceful shutdown error: %v", err)
|
|
}
|
|
log.Print("bye")
|
|
}
|
|
|
|
func envOr(key, fallback string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return fallback
|
|
}
|