diff --git a/janitor.go b/janitor.go new file mode 100644 index 0000000..bf18ea9 --- /dev/null +++ b/janitor.go @@ -0,0 +1,82 @@ +// Phase 2d โ€” table retention. Voir +// ~/.claude/plans/pour-les-notifications-on-inherited-seal.md ยง Phase 2. +package main + +import ( + "context" + "log" + "time" +) + +const ( + defaultDoneRetention = 7 * 24 * time.Hour + defaultStuckTimeout = 30 * time.Minute + defaultJanitorInterval = 1 * time.Hour + janitorTickTimeout = 30 * time.Second +) + +// Janitor periodically purges old `done` rows and recovers `running` rows +// that have been stuck (pod crashed mid-job). Runs once immediately at +// startup so a fresh deploy reclaims any orphans from the previous pod. +type Janitor struct { + queue Queue + doneRetention time.Duration + stuckTimeout time.Duration + interval time.Duration +} + +func NewJanitor(queue Queue, doneRet, stuckTO, interval time.Duration) *Janitor { + if doneRet <= 0 { + doneRet = defaultDoneRetention + } + if stuckTO <= 0 { + stuckTO = defaultStuckTimeout + } + if interval <= 0 { + interval = defaultJanitorInterval + } + return &Janitor{ + queue: queue, + doneRetention: doneRet, + stuckTimeout: stuckTO, + interval: interval, + } +} + +func (j *Janitor) Run(ctx context.Context) { + log.Printf("janitor started (purge done > %s, recover stuck > %s, every %s)", + j.doneRetention, j.stuckTimeout, j.interval) + + // One immediate pass at boot โ€” recovers anything left orphaned by the + // previous pod before opening for new traffic. + j.tick(ctx) + + t := time.NewTicker(j.interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + log.Print("janitor stopping") + return + case <-t.C: + j.tick(ctx) + } + } +} + +func (j *Janitor) tick(parent context.Context) { + ctx, cancel := context.WithTimeout(parent, janitorTickTimeout) + defer cancel() + + if n, err := j.queue.PurgeDone(ctx, j.doneRetention); err != nil { + log.Printf("janitor PurgeDone error: %v", err) + } else if n > 0 { + log.Printf("janitor purged %d done rows older than %s", n, j.doneRetention) + } + + if n, err := j.queue.RecoverStuck(ctx, j.stuckTimeout); err != nil { + log.Printf("janitor RecoverStuck error: %v", err) + } else if n > 0 { + log.Printf("janitor recovered %d stuck running rows (> %s)", n, j.stuckTimeout) + } +} diff --git a/janitor_test.go b/janitor_test.go new file mode 100644 index 0000000..16287ac --- /dev/null +++ b/janitor_test.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "errors" + "sync" + "testing" + "time" +) + +// fakeQueue records calls to PurgeDone / RecoverStuck. Other methods are +// no-ops because the janitor doesn't touch them. +type fakeQueue struct { + mu sync.Mutex + + purgeAges []time.Duration + purgeRet int64 + purgeErr error + + stuckAges []time.Duration + stuckRet int64 + stuckErr error +} + +func (f *fakeQueue) Enqueue(_ context.Context, _ Job) error { return nil } +func (f *fakeQueue) Pop(_ context.Context) (*Job, error) { return nil, nil } +func (f *fakeQueue) MarkDone(_ context.Context, _ int64) error { return nil } +func (f *fakeQueue) MarkFailed(_ context.Context, _ int64, _ int, _ error, _ int) error { + return nil +} + +func (f *fakeQueue) PurgeDone(_ context.Context, age time.Duration) (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.purgeAges = append(f.purgeAges, age) + return f.purgeRet, f.purgeErr +} + +func (f *fakeQueue) RecoverStuck(_ context.Context, age time.Duration) (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.stuckAges = append(f.stuckAges, age) + return f.stuckRet, f.stuckErr +} + +func TestJanitor_TickCallsBothQueueMethods(t *testing.T) { + q := &fakeQueue{purgeRet: 3, stuckRet: 1} + j := NewJanitor(q, 7*24*time.Hour, 30*time.Minute, time.Hour) + + j.tick(context.Background()) + + q.mu.Lock() + defer q.mu.Unlock() + if len(q.purgeAges) != 1 || q.purgeAges[0] != 7*24*time.Hour { + t.Fatalf("PurgeDone calls = %v, want one call with 168h", q.purgeAges) + } + if len(q.stuckAges) != 1 || q.stuckAges[0] != 30*time.Minute { + t.Fatalf("RecoverStuck calls = %v, want one call with 30m", q.stuckAges) + } +} + +func TestJanitor_TickSurvivesPurgeError(t *testing.T) { + q := &fakeQueue{purgeErr: errors.New("boom")} + j := NewJanitor(q, time.Hour, time.Minute, time.Hour) + + // Should not panic, should still call RecoverStuck despite Purge failure. + j.tick(context.Background()) + + q.mu.Lock() + defer q.mu.Unlock() + if len(q.stuckAges) != 1 { + t.Fatalf("RecoverStuck should still run after PurgeDone error, got %d calls", len(q.stuckAges)) + } +} + +func TestJanitor_DefaultsAppliedOnZeroOrNegative(t *testing.T) { + q := &fakeQueue{} + j := NewJanitor(q, 0, -time.Second, 0) + + if j.doneRetention != defaultDoneRetention { + t.Errorf("doneRetention = %s, want %s", j.doneRetention, defaultDoneRetention) + } + if j.stuckTimeout != defaultStuckTimeout { + t.Errorf("stuckTimeout = %s, want %s", j.stuckTimeout, defaultStuckTimeout) + } + if j.interval != defaultJanitorInterval { + t.Errorf("interval = %s, want %s", j.interval, defaultJanitorInterval) + } +} + +func TestJanitor_RunStopsOnContextCancel(t *testing.T) { + q := &fakeQueue{} + // Very short interval so the ticker fires at least once before we cancel. + j := NewJanitor(q, time.Hour, time.Minute, 5*time.Millisecond) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + j.Run(ctx) + close(done) + }() + + // Let the immediate-startup tick + at least one interval tick fire. + time.Sleep(20 * time.Millisecond) + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("janitor did not stop after context cancel") + } + + q.mu.Lock() + defer q.mu.Unlock() + if len(q.purgeAges) < 1 { + t.Fatalf("expected at least 1 purge call before cancel, got %d", len(q.purgeAges)) + } +} diff --git a/main.go b/main.go index 1db07de..0010097 100644 --- a/main.go +++ b/main.go @@ -106,6 +106,14 @@ func runServer() { if queue != nil { worker := NewWorker(queue, registry, tg) go worker.Run(ctx) + + // Phase 2d โ€” janitor : purge done > QUEUE_DONE_RETENTION (default 7d), + // recover stuck running > QUEUE_STUCK_TIMEOUT (default 30m), runs every + // QUEUE_JANITOR_INTERVAL (default 1h). All overridable via env. + doneRet := envDuration("QUEUE_DONE_RETENTION", defaultDoneRetention) + stuckTO := envDuration("QUEUE_STUCK_TIMEOUT", defaultStuckTimeout) + jInt := envDuration("QUEUE_JANITOR_INTERVAL", defaultJanitorInterval) + go NewJanitor(queue, doneRet, stuckTO, jInt).Run(ctx) } srv := &http.Server{ @@ -140,3 +148,19 @@ func envOr(key, fallback string) string { } return fallback } + +// envDuration reads a Go duration from env (e.g. "168h", "30m") or returns +// the fallback if unset / unparseable. Logs a warning on parse error so the +// operator notices a typo without losing the deployment. +func envDuration(key string, fallback time.Duration) time.Duration { + raw := os.Getenv(key) + if raw == "" { + return fallback + } + d, err := time.ParseDuration(raw) + if err != nil || d <= 0 { + log.Printf("%s=%q invalid, defaulting to %s", key, raw, fallback) + return fallback + } + return d +} diff --git a/queue.go b/queue.go index bebcd9f..a2cc5f8 100644 --- a/queue.go +++ b/queue.go @@ -37,6 +37,13 @@ type Queue interface { // MarkFailed schedules a retry (status back to 'pending', next_retry_at // in the future). After maxAttempts the row is set to 'dead'. MarkFailed(ctx context.Context, id int64, attempt int, err error, maxAttempts int) error + // PurgeDone removes rows with status='done' older than `olderThan`. + // Returns the number of rows deleted. Used by the Janitor goroutine. + PurgeDone(ctx context.Context, olderThan time.Duration) (int64, error) + // RecoverStuck reverts rows stuck in status='running' for more than + // `stuckFor` back to 'pending' so a worker can retry. Handles the case + // of a pod dying mid-job. Returns the number of rows reverted. + RecoverStuck(ctx context.Context, stuckFor time.Duration) (int64, error) } type PostgresQueue struct { @@ -80,6 +87,17 @@ UPDATE gateway_jobs last_error = $2, updated_at = NOW() WHERE id = $1 +` + purgeDoneSQL = ` +DELETE FROM gateway_jobs + WHERE status = 'done' + AND updated_at < NOW() - make_interval(secs => $1::int) +` + recoverStuckSQL = ` +UPDATE gateway_jobs + SET status = 'pending', updated_at = NOW() + WHERE status = 'running' + AND updated_at < NOW() - make_interval(secs => $1::int) ` ) @@ -123,6 +141,22 @@ func (q *PostgresQueue) MarkFailed(ctx context.Context, id int64, attempt int, j return err } +func (q *PostgresQueue) PurgeDone(ctx context.Context, olderThan time.Duration) (int64, error) { + res, err := q.db.ExecContext(ctx, purgeDoneSQL, int(olderThan.Seconds())) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func (q *PostgresQueue) RecoverStuck(ctx context.Context, stuckFor time.Duration) (int64, error) { + res, err := q.db.ExecContext(ctx, recoverStuckSQL, int(stuckFor.Seconds())) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + // Backoff returns the delay before the next attempt of a failed job. // 30s, 2m, 10m, 1h (capped). Spans roughly one night by attempt 5. func Backoff(attempt int) time.Duration {