// Phase 2b — worker goroutine that drains the durable job queue. // Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2. package main import ( "context" "encoding/json" "log" "time" ) const ( defaultPollInterval = 1 * time.Second defaultMaxAttempts = 5 jobTimeout = 10 * time.Minute ) // Worker pops jobs from the queue and dispatches them through the same // Handler interface used by the sync path. Failures schedule a retry with // exponential backoff ; after `maxAttempts` the job is moved to 'dead' and // the user (if reachable) is informed once. type Worker struct { queue Queue registry *Registry tg *TelegramClient pollEvery time.Duration maxAttempts int } func NewWorker(queue Queue, registry *Registry, tg *TelegramClient) *Worker { return &Worker{ queue: queue, registry: registry, tg: tg, pollEvery: defaultPollInterval, maxAttempts: defaultMaxAttempts, } } // Run drains the queue until ctx is canceled. Intended to be called as a // goroutine. Errors are logged but never bubble up. func (w *Worker) Run(ctx context.Context) { log.Printf("worker started (poll=%s, maxAttempts=%d)", w.pollEvery, w.maxAttempts) t := time.NewTicker(w.pollEvery) defer t.Stop() for { select { case <-ctx.Done(): log.Print("worker stopping") return case <-t.C: w.processOne(ctx) } } } func (w *Worker) processOne(parent context.Context) { ctx, cancel := context.WithTimeout(parent, jobTimeout) defer cancel() job, err := w.queue.Pop(ctx) if err != nil { log.Printf("worker: queue.Pop error: %v", err) return } if job == nil { return // nothing to do } bot, ok := w.registry.Get(job.BotSlug) if !ok { log.Printf("worker job=%d: unknown bot slug %q → dead", job.ID, job.BotSlug) _ = w.queue.MarkFailed(ctx, job.ID, w.maxAttempts, errUnknownBot{slug: job.BotSlug}, w.maxAttempts) return } var update Update if err := json.Unmarshal(job.Payload, &update); err != nil { log.Printf("worker job=%d: payload decode: %v → dead", job.ID, err) _ = w.queue.MarkFailed(ctx, job.ID, w.maxAttempts, err, w.maxAttempts) return } if err := bot.Handler.Handle(ctx, update, bot); err != nil { log.Printf("worker job=%d bot=%s attempt=%d: handler error: %v", job.ID, job.BotSlug, job.Attempts, err) retryErr := w.queue.MarkFailed(ctx, job.ID, job.Attempts, err, w.maxAttempts) if retryErr != nil { log.Printf("worker job=%d: MarkFailed error: %v", job.ID, retryErr) } // Notify user once on dead if job.Attempts >= w.maxAttempts { if chatID, hasChat := update.ChatID(); hasChat && w.tg != nil { _ = w.tg.SendMessage(ctx, bot.Token, SendMessageParams{ ChatID: chatID, Text: "❌ Le backend est indisponible, j'abandonne après plusieurs tentatives.", }) } } return } if err := w.queue.MarkDone(ctx, job.ID); err != nil { log.Printf("worker job=%d: MarkDone error: %v", job.ID, err) } } type errUnknownBot struct{ slug string } func (e errUnknownBot) Error() string { return "unknown bot slug: " + e.slug }