From 799e10dcc2fcc9c1660a7a481df58dd04962857b Mon Sep 17 00:00:00 2001 From: Gabriel Radureau Date: Sat, 9 May 2026 14:38:41 +0200 Subject: [PATCH] =?UTF-8?q?Phase=202b=20=E2=80=94=20durable=20Postgres=20q?= =?UTF-8?q?ueue=20+=20worker=20(gated=20on=20DATABASE=5FURL)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the async dispatch infrastructure : - Postgres pool + embedded migration (CREATE TABLE/INDEX IF NOT EXISTS gateway_jobs). Auto-applied at boot. lib/pq driver (matches webapp convention). - queue.go : Enqueue (idempotent on UNIQUE(bot_slug, update_id) — handles Telegram redelivery), Pop with FOR UPDATE SKIP LOCKED, MarkDone, MarkFailed with exponential backoff (30s → 2m → 10m → 1h → dead at 5). - worker.go : goroutine that drains the queue, dispatches via the same Handler interface as sync, schedules retries on failure, notifies the user once when a job goes to dead. - BotConfig gains `async: bool`. Registry refuses bots with async=true if DATABASE_URL is unset (queue=nil). - Server : when bot.Async, the webhook ack is immediate ; the update payload is enqueued for the worker. When DATABASE_URL is unset (current default), queue/worker stay disabled and only sync handlers (echo, http, auth) work — no breaking change to the running cluster. Refs ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2. --- chart/values.yaml | 7 ++ config.go | 7 ++ db.go | 58 +++++++++++++++ go.mod | 1 + go.sum | 2 + handlers.go | 41 +++++++---- main.go | 33 +++++++-- migrations/001_init.sql | 24 +++++++ queue.go | 154 ++++++++++++++++++++++++++++++++++++++++ server.go | 31 +++++++- worker.go | 108 ++++++++++++++++++++++++++++ 11 files changed, 445 insertions(+), 21 deletions(-) create mode 100644 db.go create mode 100644 migrations/001_init.sql create mode 100644 queue.go create mode 100644 worker.go diff --git a/chart/values.yaml b/chart/values.yaml index 1bbc2ef..ab7e120 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -105,6 +105,13 @@ auth: redisURL: "redis://redis.tools.svc.cluster.local:6379/0" sessionTTL: "24h" +# Async queue (Phase 2b). Quand DATABASE_URL est dans le Secret cluster, +# la queue Postgres + worker s'activent ; sinon tout reste sync (les bots +# avec `async: true` font fail au boot — secure by default). +# Format DSN attendu : postgres://USER:PASSWORD@pgbouncer.tools:6432/DBNAME?sslmode=disable +# L'utilisateur doit avoir CREATE TABLE / CREATE INDEX sur la base cible +# (le pod auto-applique les migrations idempotentes au démarrage). + # k8s Secret consumed by `envFrom`. Phase 1: create it manually with kubectl. # kubectl -n telegram-gateway create secret generic telegram-gateway-bots \ # --from-literal=BOT_FACTORY_TOKEN=… --from-literal=BOT_FACTORY_SECRET=… diff --git a/config.go b/config.go index 020c9d1..ab22555 100644 --- a/config.go +++ b/config.go @@ -20,6 +20,13 @@ type BotConfig struct { // `requireAuth: false` is required to expose a bot publicly. // Forced to false for handler=auth (chicken-and-egg). RequireAuth *bool `yaml:"requireAuth,omitempty"` + // Async, when true, dispatches incoming Updates through the durable + // Postgres queue + worker (Phase 2b) instead of running inline. The + // webhook ack returns 200 immediately ; the worker may retry the job + // with backoff if the handler fails. Required for handlers that may + // outlive the Telegram webhook timeout (shell, script, ollama). + // Requires DATABASE_URL to be set, otherwise registry refuses to start. + Async bool `yaml:"async,omitempty"` // HTTP is the per-bot config for the `http` handler. Required when // handler=http. See handler_http.go. HTTP *HTTPConfig `yaml:"http,omitempty"` diff --git a/db.go b/db.go new file mode 100644 index 0000000..c35f009 --- /dev/null +++ b/db.go @@ -0,0 +1,58 @@ +// Phase 2b — Postgres pool + schema migration. +// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2. +package main + +import ( + "context" + "database/sql" + _ "embed" + "fmt" + "net/url" + "time" + + _ "github.com/lib/pq" +) + +//go:embed migrations/001_init.sql +var initSQL string + +// OpenDB opens a Postgres pool from a DSN, pings, and runs the embedded +// migration (idempotent CREATE TABLE / CREATE INDEX IF NOT EXISTS). +// Caller is responsible for closing the returned *sql.DB. +func OpenDB(ctx context.Context, dsn string) (*sql.DB, error) { + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, fmt.Errorf("sql.Open: %w", err) + } + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(30 * time.Minute) + + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := db.PingContext(pingCtx); err != nil { + _ = db.Close() + return nil, fmt.Errorf("postgres ping: %w", err) + } + + migCtx, cancelMig := context.WithTimeout(ctx, 10*time.Second) + defer cancelMig() + if _, err := db.ExecContext(migCtx, initSQL); err != nil { + _ = db.Close() + return nil, fmt.Errorf("apply migration: %w", err) + } + return db, nil +} + +// RedactDSN strips the password from a Postgres URL for safe logging. +func RedactDSN(dsn string) string { + u, err := url.Parse(dsn) + if err != nil { + return "(invalid dsn)" + } + if u.User != nil { + username := u.User.Username() + u.User = url.User(username) + } + return u.String() +} diff --git a/go.mod b/go.mod index 294c1e5..5e44480 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/arcodange/telegram-gateway go 1.24 require ( + github.com/lib/pq v1.12.3 github.com/redis/go-redis/v9 v9.19.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 7b8ed36..7555530 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/lib/pq v1.12.3 h1:tTWxr2YLKwIvK90ZXEw8GP7UFHtcbTtty8zsI+YjrfQ= +github.com/lib/pq v1.12.3/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k= diff --git a/handlers.go b/handlers.go index 974f342..14377be 100644 --- a/handlers.go +++ b/handlers.go @@ -11,23 +11,30 @@ type Handler interface { } type Bot struct { - Slug string - Token string - Secret string - RequireAuth bool - Handler Handler + Slug string + Token string + Secret string + HandlerLabel string // raw 'handler:' value (echo|auth|http|…) — used by queue rows for diagnostics + RequireAuth bool + Async bool + Handler Handler } +// HandlerType returns the textual handler label for queue rows / logs. +func (b Bot) HandlerType() string { return b.HandlerLabel } + type Registry struct { bots map[string]Bot } // NewRegistry builds the bot routing map from the parsed YAML config + the // per-bot secrets (already merged into BotConfig from env). It receives the -// shared TelegramClient and Auth so per-handler instances can use them. -// `auth` may be nil (no AUTH_SECRET set) ; in that case any bot configured -// with `handler: auth` or `requireAuth: true` is a fatal config error. -func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth) (*Registry, error) { +// shared TelegramClient, Auth and Queue so per-handler instances can use +// them. `auth` may be nil (no AUTH_SECRET set) ; in that case any bot +// configured with `handler: auth` or `requireAuth: true` is a fatal config +// error. `queue` may be nil (no DATABASE_URL set) ; in that case any bot +// with `async: true` is a fatal config error. +func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth, queue Queue) (*Registry, error) { if len(cfg.Bots) == 0 { return nil, fmt.Errorf("no bots configured") } @@ -64,6 +71,10 @@ func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth) (*Registry, error) return nil, fmt.Errorf("bot %s has requireAuth: true (default) but AUTH_SECRET is unset; either set AUTH_SECRET or add `requireAuth: false` to the bot config", slug) } + if b.Async && queue == nil { + return nil, fmt.Errorf("bot %s has async: true but DATABASE_URL is unset (no queue available)", slug) + } + var h Handler switch b.Handler { case "echo": @@ -89,11 +100,13 @@ func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth) (*Registry, error) } bots[slug] = Bot{ - Slug: slug, - Token: token, - Secret: secret, - RequireAuth: requireAuth, - Handler: h, + Slug: slug, + Token: token, + Secret: secret, + HandlerLabel: b.Handler, + RequireAuth: requireAuth, + Async: b.Async, + Handler: h, } } diff --git a/main.go b/main.go index 7d2a32f..1db07de 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "database/sql" "errors" "flag" "log" @@ -46,6 +47,26 @@ func runServer() { tg := NewTelegramClient() + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // Phase 2b — durable queue (Postgres) + worker. Disabled when + // DATABASE_URL is unset ; bots with `async: true` then fail config validation. + var queue Queue + var db *sql.DB + if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" { + var err error + db, err = OpenDB(ctx, dbURL) + if err != nil { + log.Fatalf("init db: %v", err) + } + defer func() { _ = db.Close() }() + queue = NewPostgresQueue(db) + log.Printf("queue + schema ready (db=%s)", RedactDSN(dbURL)) + } else { + log.Print("DATABASE_URL unset → queue/worker disabled (sync handlers still work)") + } + // Phase 1.5 — auth layer (Redis-backed sessions). See // factory/doc/adr/20260509-telegram-gateway-auth.md. authSecret := os.Getenv("AUTH_SECRET") @@ -77,23 +98,25 @@ func runServer() { log.Printf("allowlist active (%d user(s) allowed)", allowlist.Size()) } - registry, err := NewRegistry(cfg, tg, auth) + registry, err := NewRegistry(cfg, tg, auth, queue) if err != nil { log.Fatalf("build registry: %v", err) } + if queue != nil { + worker := NewWorker(queue, registry, tg) + go worker.Run(ctx) + } + srv := &http.Server{ Addr: *addr, - Handler: NewServer(registry, auth, allowlist, tg).Routes(), + Handler: NewServer(registry, auth, allowlist, tg, queue).Routes(), ReadHeaderTimeout: 5 * time.Second, ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() - go func() { log.Printf("telegram-gateway listening on %s (%d bot(s) loaded)", *addr, registry.Count()) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { diff --git a/migrations/001_init.sql b/migrations/001_init.sql new file mode 100644 index 0000000..9763724 --- /dev/null +++ b/migrations/001_init.sql @@ -0,0 +1,24 @@ +-- Phase 2b — durable job queue for async handlers (shell, script, ollama). +-- Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2. +CREATE TABLE IF NOT EXISTS gateway_jobs ( + id BIGSERIAL PRIMARY KEY, + bot_slug TEXT NOT NULL, + handler_type TEXT NOT NULL, + update_id BIGINT NOT NULL, + payload JSONB NOT NULL, + attempts INT NOT NULL DEFAULT 0, + next_retry_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + status TEXT NOT NULL DEFAULT 'pending', + last_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + -- idempotence : si Telegram redélivre (ack > 60s), on ignore le doublon. + CONSTRAINT gateway_jobs_uniq_update UNIQUE (bot_slug, update_id) +); + +CREATE INDEX IF NOT EXISTS gateway_jobs_pickable + ON gateway_jobs (next_retry_at) + WHERE status = 'pending'; + +CREATE INDEX IF NOT EXISTS gateway_jobs_status + ON gateway_jobs (status); diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..bebcd9f --- /dev/null +++ b/queue.go @@ -0,0 +1,154 @@ +// Phase 2b — durable async job queue (Postgres). Idempotent on (bot_slug, update_id). +// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2. +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" +) + +// Job is a unit of async work : a Telegram Update routed to a particular bot, +// to be processed by that bot's handler. +type Job struct { + ID int64 + BotSlug string + HandlerType string + UpdateID int64 + Payload []byte + Attempts int + NextRetryAt time.Time + Status string + LastError string +} + +type Queue interface { + // Enqueue inserts a job ; on UNIQUE conflict (Telegram redelivery) it + // returns nil without touching the existing row. + Enqueue(ctx context.Context, j Job) error + // Pop atomically claims the oldest pending job whose next_retry_at has + // passed (FOR UPDATE SKIP LOCKED, status → 'running', attempts++). + // Returns (nil, nil) when there is nothing to claim. + Pop(ctx context.Context) (*Job, error) + // MarkDone marks a job 'done'. + MarkDone(ctx context.Context, id int64) error + // MarkFailed schedules a retry (status back to 'pending', next_retry_at + // in the future). After maxAttempts the row is set to 'dead'. + MarkFailed(ctx context.Context, id int64, attempt int, err error, maxAttempts int) error +} + +type PostgresQueue struct { + db *sql.DB +} + +func NewPostgresQueue(db *sql.DB) *PostgresQueue { return &PostgresQueue{db: db} } + +const ( + enqueueSQL = ` +INSERT INTO gateway_jobs (bot_slug, handler_type, update_id, payload) +VALUES ($1, $2, $3, $4) +ON CONFLICT (bot_slug, update_id) DO NOTHING +` + popSQL = ` +UPDATE gateway_jobs + SET status = 'running', attempts = attempts + 1, updated_at = NOW() + WHERE id = ( + SELECT id FROM gateway_jobs + WHERE status = 'pending' AND next_retry_at <= NOW() + ORDER BY next_retry_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) +RETURNING id, bot_slug, handler_type, update_id, payload, attempts, next_retry_at, status, COALESCE(last_error, '') +` + markDoneSQL = ` +UPDATE gateway_jobs SET status = 'done', updated_at = NOW(), last_error = NULL WHERE id = $1 +` + markRetrySQL = ` +UPDATE gateway_jobs + SET status = 'pending', + next_retry_at = $2, + last_error = $3, + updated_at = NOW() + WHERE id = $1 +` + markDeadSQL = ` +UPDATE gateway_jobs + SET status = 'dead', + last_error = $2, + updated_at = NOW() + WHERE id = $1 +` +) + +func (q *PostgresQueue) Enqueue(ctx context.Context, j Job) error { + _, err := q.db.ExecContext(ctx, enqueueSQL, j.BotSlug, j.HandlerType, j.UpdateID, j.Payload) + return err +} + +func (q *PostgresQueue) Pop(ctx context.Context) (*Job, error) { + var j Job + err := q.db.QueryRowContext(ctx, popSQL).Scan( + &j.ID, &j.BotSlug, &j.HandlerType, &j.UpdateID, &j.Payload, + &j.Attempts, &j.NextRetryAt, &j.Status, &j.LastError, + ) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return &j, nil +} + +func (q *PostgresQueue) MarkDone(ctx context.Context, id int64) error { + _, err := q.db.ExecContext(ctx, markDoneSQL, id) + return err +} + +func (q *PostgresQueue) MarkFailed(ctx context.Context, id int64, attempt int, jobErr error, maxAttempts int) error { + msg := "" + if jobErr != nil { + msg = truncate(jobErr.Error(), 4096) + } + if attempt >= maxAttempts { + _, err := q.db.ExecContext(ctx, markDeadSQL, id, msg) + return err + } + retryIn := Backoff(attempt) + next := time.Now().Add(retryIn) + _, err := q.db.ExecContext(ctx, markRetrySQL, id, next, msg) + return err +} + +// Backoff returns the delay before the next attempt of a failed job. +// 30s, 2m, 10m, 1h (capped). Spans roughly one night by attempt 5. +func Backoff(attempt int) time.Duration { + base := 30 * time.Second + switch attempt { + case 1: + return base // 30s + case 2: + return 2 * time.Minute + case 3: + return 10 * time.Minute + default: + return 1 * time.Hour + } +} + +// EnqueueWithDefaults is a convenience used by the webhook handler — wraps +// Enqueue with sensible defaults and returns a contextful error message. +func EnqueueWithDefaults(ctx context.Context, q Queue, slug, handlerType string, updateID int64, payload []byte) error { + if q == nil { + return fmt.Errorf("queue not initialized (DATABASE_URL unset ?)") + } + return q.Enqueue(ctx, Job{ + BotSlug: slug, + HandlerType: handlerType, + UpdateID: updateID, + Payload: payload, + }) +} diff --git a/server.go b/server.go index 540e5da..3b81be5 100644 --- a/server.go +++ b/server.go @@ -13,10 +13,11 @@ type Server struct { auth *Auth allowlist Allowlist tg *TelegramClient + queue Queue } -func NewServer(r *Registry, auth *Auth, allow Allowlist, tg *TelegramClient) *Server { - return &Server{registry: r, auth: auth, allowlist: allow, tg: tg} +func NewServer(r *Registry, auth *Auth, allow Allowlist, tg *TelegramClient, queue Queue) *Server { + return &Server{registry: r, auth: auth, allowlist: allow, tg: tg, queue: queue} } func (s *Server) Routes() http.Handler { @@ -103,6 +104,32 @@ func (s *Server) botWebhook(w http.ResponseWriter, r *http.Request) { } } + // Async dispatch (Phase 2b) : enqueue + ack 200 immediately. The worker + // drains the queue and runs the handler asynchronously. Use this for + // handlers that may exceed Telegram's webhook timeout (~60s) or whose + // backend can be temporarily unreachable (e.g. Macbook Ollama dort). + if bot.Async { + if s.queue == nil { + log.Printf("bot=%s update=%d async requested but no queue configured", slug, update.UpdateID) + w.WriteHeader(http.StatusOK) + _, _ = fmt.Fprint(w, "{}") + return + } + // Re-marshal the update : we already decoded once, but the original + // body is preferable for the worker (preserves any unknown fields). + // Use the parsed struct re-encoded — fields we don't model are lost + // but we documented the lenient-decode tradeoff in feedback memory. + payload, _ := json.Marshal(update) + if err := EnqueueWithDefaults(r.Context(), s.queue, slug, bot.HandlerType(), update.UpdateID, payload); err != nil { + log.Printf("bot=%s update=%d enqueue error: %v", slug, update.UpdateID, err) + } else { + log.Printf("bot=%s update=%d enqueued (async)", slug, update.UpdateID) + } + w.WriteHeader(http.StatusOK) + _, _ = fmt.Fprint(w, "{}") + return + } + if err := bot.Handler.Handle(r.Context(), update, bot); err != nil { log.Printf("bot=%s update=%d handler error: %v", slug, update.UpdateID, err) } diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..d972fea --- /dev/null +++ b/worker.go @@ -0,0 +1,108 @@ +// Phase 2b — worker goroutine that drains the durable job queue. +// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2. +package main + +import ( + "context" + "encoding/json" + "log" + "time" +) + +const ( + defaultPollInterval = 1 * time.Second + defaultMaxAttempts = 5 + jobTimeout = 10 * time.Minute +) + +// Worker pops jobs from the queue and dispatches them through the same +// Handler interface used by the sync path. Failures schedule a retry with +// exponential backoff ; after `maxAttempts` the job is moved to 'dead' and +// the user (if reachable) is informed once. +type Worker struct { + queue Queue + registry *Registry + tg *TelegramClient + pollEvery time.Duration + maxAttempts int +} + +func NewWorker(queue Queue, registry *Registry, tg *TelegramClient) *Worker { + return &Worker{ + queue: queue, + registry: registry, + tg: tg, + pollEvery: defaultPollInterval, + maxAttempts: defaultMaxAttempts, + } +} + +// Run drains the queue until ctx is canceled. Intended to be called as a +// goroutine. Errors are logged but never bubble up. +func (w *Worker) Run(ctx context.Context) { + log.Printf("worker started (poll=%s, maxAttempts=%d)", w.pollEvery, w.maxAttempts) + t := time.NewTicker(w.pollEvery) + defer t.Stop() + for { + select { + case <-ctx.Done(): + log.Print("worker stopping") + return + case <-t.C: + w.processOne(ctx) + } + } +} + +func (w *Worker) processOne(parent context.Context) { + ctx, cancel := context.WithTimeout(parent, jobTimeout) + defer cancel() + + job, err := w.queue.Pop(ctx) + if err != nil { + log.Printf("worker: queue.Pop error: %v", err) + return + } + if job == nil { + return // nothing to do + } + + bot, ok := w.registry.Get(job.BotSlug) + if !ok { + log.Printf("worker job=%d: unknown bot slug %q → dead", job.ID, job.BotSlug) + _ = w.queue.MarkFailed(ctx, job.ID, w.maxAttempts, errUnknownBot{slug: job.BotSlug}, w.maxAttempts) + return + } + + var update Update + if err := json.Unmarshal(job.Payload, &update); err != nil { + log.Printf("worker job=%d: payload decode: %v → dead", job.ID, err) + _ = w.queue.MarkFailed(ctx, job.ID, w.maxAttempts, err, w.maxAttempts) + return + } + + if err := bot.Handler.Handle(ctx, update, bot); err != nil { + log.Printf("worker job=%d bot=%s attempt=%d: handler error: %v", job.ID, job.BotSlug, job.Attempts, err) + retryErr := w.queue.MarkFailed(ctx, job.ID, job.Attempts, err, w.maxAttempts) + if retryErr != nil { + log.Printf("worker job=%d: MarkFailed error: %v", job.ID, retryErr) + } + // Notify user once on dead + if job.Attempts >= w.maxAttempts { + if chatID, hasChat := update.ChatID(); hasChat && w.tg != nil { + _ = w.tg.SendMessage(ctx, bot.Token, SendMessageParams{ + ChatID: chatID, + Text: "❌ Le backend est indisponible, j'abandonne après plusieurs tentatives.", + }) + } + } + return + } + if err := w.queue.MarkDone(ctx, job.ID); err != nil { + log.Printf("worker job=%d: MarkDone error: %v", job.ID, err) + } +} + +type errUnknownBot struct{ slug string } + +func (e errUnknownBot) Error() string { return "unknown bot slug: " + e.slug }