Files
telegram-gateway/main.go
Gabriel Radureau abe77f5873
All checks were successful
CI/CD / test (push) Successful in 20s
CI/CD / build-and-push-image (push) Successful in 58s
Phase 2d — gateway_jobs retention (Janitor goroutine)
Periodic cleanup goroutine, started alongside the worker when DATABASE_URL
is set. Three concerns :

- DELETE rows with status='done' older than QUEUE_DONE_RETENTION (default
  168h / 7 days). Past success rows have no value beyond debug runway.
- UPDATE rows stuck in status='running' for more than QUEUE_STUCK_TIMEOUT
  (default 30m) back to 'pending' so a worker can retry. Handles the
  case of a pod crashing mid-job (without this, jobs stay orphaned forever).
- 'dead' rows are NEVER auto-purged (volume negligible, kept for forensics).

Configurable via env :
- QUEUE_DONE_RETENTION (default 168h)
- QUEUE_STUCK_TIMEOUT  (default 30m)
- QUEUE_JANITOR_INTERVAL (default 1h)

The janitor runs once immediately at startup (recovers anything orphaned
by the previous pod before opening for new traffic), then ticks on the
interval.

Queue interface gains PurgeDone + RecoverStuck — both use Postgres'
make_interval(secs) for safe parameterization.

4 new unit tests via fakeQueue mock (47 total, race clean).
2026-05-09 16:06:54 +02:00

167 lines
4.7 KiB
Go

package main
import (
"context"
"database/sql"
"errors"
"flag"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
const defaultListenAddr = ":8080"
const defaultConfigPath = "/etc/telegram-gateway/bots.yaml"
func main() {
subcmd := ""
if len(os.Args) > 1 && os.Args[1] != "" && os.Args[1][0] != '-' {
subcmd = os.Args[1]
os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
}
switch subcmd {
case "setwebhook":
runSetWebhook()
case "deletewebhook":
runDeleteWebhook()
case "", "serve":
runServer()
default:
log.Fatalf("unknown subcommand: %s (expected: serve | setwebhook | deletewebhook)", subcmd)
}
}
func runServer() {
addr := flag.String("addr", envOr("LISTEN_ADDR", defaultListenAddr), "listen address")
configPath := flag.String("config", envOr("CONFIG_PATH", defaultConfigPath), "bot routing config (YAML)")
flag.Parse()
cfg, err := LoadConfig(*configPath)
if err != nil {
log.Fatalf("load config: %v", err)
}
tg := NewTelegramClient()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Phase 2b — durable queue (Postgres) + worker. Disabled when
// DATABASE_URL is unset ; bots with `async: true` then fail config validation.
var queue Queue
var db *sql.DB
if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" {
var err error
db, err = OpenDB(ctx, dbURL)
if err != nil {
log.Fatalf("init db: %v", err)
}
defer func() { _ = db.Close() }()
queue = NewPostgresQueue(db)
log.Printf("queue + schema ready (db=%s)", RedactDSN(dbURL))
} else {
log.Print("DATABASE_URL unset → queue/worker disabled (sync handlers still work)")
}
// Phase 1.5 — auth layer (Redis-backed sessions). See
// factory/doc/adr/20260509-telegram-gateway-auth.md.
authSecret := os.Getenv("AUTH_SECRET")
redisURL := envOr("REDIS_URL", "redis://redis.tools.svc.cluster.local:6379/0")
ttl := 24 * time.Hour
if raw := os.Getenv("AUTH_SESSION_TTL"); raw != "" {
if d, err := time.ParseDuration(raw); err == nil && d > 0 {
ttl = d
} else {
log.Printf("AUTH_SESSION_TTL=%q invalid, defaulting to 24h", raw)
}
}
var auth *Auth
if authSecret != "" {
var aerr error
auth, aerr = NewAuth(redisURL, authSecret, ttl)
if aerr != nil {
log.Fatalf("init auth: %v", aerr)
}
log.Printf("auth layer initialized (TTL=%s, redis=%s)", ttl, redisURL)
} else {
log.Print("AUTH_SECRET unset → auth layer disabled (no bot may have handler=auth or requireAuth: true)")
}
allowlist := NewAllowlist(os.Getenv("ALLOWED_USERS"))
if allowlist.Open() {
log.Print("ALLOWED_USERS empty → allowlist open to all")
} else {
log.Printf("allowlist active (%d user(s) allowed)", allowlist.Size())
}
registry, err := NewRegistry(cfg, tg, auth, queue)
if err != nil {
log.Fatalf("build registry: %v", err)
}
if queue != nil {
worker := NewWorker(queue, registry, tg)
go worker.Run(ctx)
// Phase 2d — janitor : purge done > QUEUE_DONE_RETENTION (default 7d),
// recover stuck running > QUEUE_STUCK_TIMEOUT (default 30m), runs every
// QUEUE_JANITOR_INTERVAL (default 1h). All overridable via env.
doneRet := envDuration("QUEUE_DONE_RETENTION", defaultDoneRetention)
stuckTO := envDuration("QUEUE_STUCK_TIMEOUT", defaultStuckTimeout)
jInt := envDuration("QUEUE_JANITOR_INTERVAL", defaultJanitorInterval)
go NewJanitor(queue, doneRet, stuckTO, jInt).Run(ctx)
}
srv := &http.Server{
Addr: *addr,
Handler: NewServer(registry, auth, allowlist, tg, queue).Routes(),
ReadHeaderTimeout: 5 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 60 * time.Second,
}
go func() {
log.Printf("telegram-gateway listening on %s (%d bot(s) loaded)", *addr, registry.Count())
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("server: %v", err)
}
}()
<-ctx.Done()
log.Print("shutdown signal received, draining...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("graceful shutdown error: %v", err)
}
log.Print("bye")
}
func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
// envDuration reads a Go duration from env (e.g. "168h", "30m") or returns
// the fallback if unset / unparseable. Logs a warning on parse error so the
// operator notices a typo without losing the deployment.
func envDuration(key string, fallback time.Duration) time.Duration {
raw := os.Getenv(key)
if raw == "" {
return fallback
}
d, err := time.ParseDuration(raw)
if err != nil || d <= 0 {
log.Printf("%s=%q invalid, defaulting to %s", key, raw, fallback)
return fallback
}
return d
}