Files
telegram-gateway/janitor.go
Gabriel Radureau abe77f5873
All checks were successful
CI/CD / test (push) Successful in 20s
CI/CD / build-and-push-image (push) Successful in 58s
Phase 2d — gateway_jobs retention (Janitor goroutine)
Periodic cleanup goroutine, started alongside the worker when DATABASE_URL
is set. Three concerns :

- DELETE rows with status='done' older than QUEUE_DONE_RETENTION (default
  168h / 7 days). Past success rows have no value beyond debug runway.
- UPDATE rows stuck in status='running' for more than QUEUE_STUCK_TIMEOUT
  (default 30m) back to 'pending' so a worker can retry. Handles the
  case of a pod crashing mid-job (without this, jobs stay orphaned forever).
- 'dead' rows are NEVER auto-purged (volume negligible, kept for forensics).

Configurable via env :
- QUEUE_DONE_RETENTION (default 168h)
- QUEUE_STUCK_TIMEOUT  (default 30m)
- QUEUE_JANITOR_INTERVAL (default 1h)

The janitor runs once immediately at startup (recovers anything orphaned
by the previous pod before opening for new traffic), then ticks on the
interval.

Queue interface gains PurgeDone + RecoverStuck — both use Postgres'
make_interval(secs) for safe parameterization.

4 new unit tests via fakeQueue mock (47 total, race clean).
2026-05-09 16:06:54 +02:00

83 lines
2.1 KiB
Go

// Phase 2d — table retention. Voir
// ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
package main
import (
"context"
"log"
"time"
)
const (
defaultDoneRetention = 7 * 24 * time.Hour
defaultStuckTimeout = 30 * time.Minute
defaultJanitorInterval = 1 * time.Hour
janitorTickTimeout = 30 * time.Second
)
// Janitor periodically purges old `done` rows and recovers `running` rows
// that have been stuck (pod crashed mid-job). Runs once immediately at
// startup so a fresh deploy reclaims any orphans from the previous pod.
type Janitor struct {
queue Queue
doneRetention time.Duration
stuckTimeout time.Duration
interval time.Duration
}
func NewJanitor(queue Queue, doneRet, stuckTO, interval time.Duration) *Janitor {
if doneRet <= 0 {
doneRet = defaultDoneRetention
}
if stuckTO <= 0 {
stuckTO = defaultStuckTimeout
}
if interval <= 0 {
interval = defaultJanitorInterval
}
return &Janitor{
queue: queue,
doneRetention: doneRet,
stuckTimeout: stuckTO,
interval: interval,
}
}
func (j *Janitor) Run(ctx context.Context) {
log.Printf("janitor started (purge done > %s, recover stuck > %s, every %s)",
j.doneRetention, j.stuckTimeout, j.interval)
// One immediate pass at boot — recovers anything left orphaned by the
// previous pod before opening for new traffic.
j.tick(ctx)
t := time.NewTicker(j.interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
log.Print("janitor stopping")
return
case <-t.C:
j.tick(ctx)
}
}
}
func (j *Janitor) tick(parent context.Context) {
ctx, cancel := context.WithTimeout(parent, janitorTickTimeout)
defer cancel()
if n, err := j.queue.PurgeDone(ctx, j.doneRetention); err != nil {
log.Printf("janitor PurgeDone error: %v", err)
} else if n > 0 {
log.Printf("janitor purged %d done rows older than %s", n, j.doneRetention)
}
if n, err := j.queue.RecoverStuck(ctx, j.stuckTimeout); err != nil {
log.Printf("janitor RecoverStuck error: %v", err)
} else if n > 0 {
log.Printf("janitor recovered %d stuck running rows (> %s)", n, j.stuckTimeout)
}
}