Phase 2d — gateway_jobs retention (Janitor goroutine)
Periodic cleanup goroutine, started alongside the worker when DATABASE_URL is set. Three concerns : - DELETE rows with status='done' older than QUEUE_DONE_RETENTION (default 168h / 7 days). Past success rows have no value beyond debug runway. - UPDATE rows stuck in status='running' for more than QUEUE_STUCK_TIMEOUT (default 30m) back to 'pending' so a worker can retry. Handles the case of a pod crashing mid-job (without this, jobs stay orphaned forever). - 'dead' rows are NEVER auto-purged (volume negligible, kept for forensics). Configurable via env : - QUEUE_DONE_RETENTION (default 168h) - QUEUE_STUCK_TIMEOUT (default 30m) - QUEUE_JANITOR_INTERVAL (default 1h) The janitor runs once immediately at startup (recovers anything orphaned by the previous pod before opening for new traffic), then ticks on the interval. Queue interface gains PurgeDone + RecoverStuck — both use Postgres' make_interval(secs) for safe parameterization. 4 new unit tests via fakeQueue mock (47 total, race clean).
This commit is contained in:
82
janitor.go
Normal file
82
janitor.go
Normal file
@@ -0,0 +1,82 @@
|
||||
// Phase 2d — table retention. Voir
|
||||
// ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDoneRetention = 7 * 24 * time.Hour
|
||||
defaultStuckTimeout = 30 * time.Minute
|
||||
defaultJanitorInterval = 1 * time.Hour
|
||||
janitorTickTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// Janitor periodically purges old `done` rows and recovers `running` rows
|
||||
// that have been stuck (pod crashed mid-job). Runs once immediately at
|
||||
// startup so a fresh deploy reclaims any orphans from the previous pod.
|
||||
type Janitor struct {
|
||||
queue Queue
|
||||
doneRetention time.Duration
|
||||
stuckTimeout time.Duration
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func NewJanitor(queue Queue, doneRet, stuckTO, interval time.Duration) *Janitor {
|
||||
if doneRet <= 0 {
|
||||
doneRet = defaultDoneRetention
|
||||
}
|
||||
if stuckTO <= 0 {
|
||||
stuckTO = defaultStuckTimeout
|
||||
}
|
||||
if interval <= 0 {
|
||||
interval = defaultJanitorInterval
|
||||
}
|
||||
return &Janitor{
|
||||
queue: queue,
|
||||
doneRetention: doneRet,
|
||||
stuckTimeout: stuckTO,
|
||||
interval: interval,
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Janitor) Run(ctx context.Context) {
|
||||
log.Printf("janitor started (purge done > %s, recover stuck > %s, every %s)",
|
||||
j.doneRetention, j.stuckTimeout, j.interval)
|
||||
|
||||
// One immediate pass at boot — recovers anything left orphaned by the
|
||||
// previous pod before opening for new traffic.
|
||||
j.tick(ctx)
|
||||
|
||||
t := time.NewTicker(j.interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Print("janitor stopping")
|
||||
return
|
||||
case <-t.C:
|
||||
j.tick(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Janitor) tick(parent context.Context) {
|
||||
ctx, cancel := context.WithTimeout(parent, janitorTickTimeout)
|
||||
defer cancel()
|
||||
|
||||
if n, err := j.queue.PurgeDone(ctx, j.doneRetention); err != nil {
|
||||
log.Printf("janitor PurgeDone error: %v", err)
|
||||
} else if n > 0 {
|
||||
log.Printf("janitor purged %d done rows older than %s", n, j.doneRetention)
|
||||
}
|
||||
|
||||
if n, err := j.queue.RecoverStuck(ctx, j.stuckTimeout); err != nil {
|
||||
log.Printf("janitor RecoverStuck error: %v", err)
|
||||
} else if n > 0 {
|
||||
log.Printf("janitor recovered %d stuck running rows (> %s)", n, j.stuckTimeout)
|
||||
}
|
||||
}
|
||||
117
janitor_test.go
Normal file
117
janitor_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// fakeQueue records calls to PurgeDone / RecoverStuck. Other methods are
|
||||
// no-ops because the janitor doesn't touch them.
|
||||
type fakeQueue struct {
|
||||
mu sync.Mutex
|
||||
|
||||
purgeAges []time.Duration
|
||||
purgeRet int64
|
||||
purgeErr error
|
||||
|
||||
stuckAges []time.Duration
|
||||
stuckRet int64
|
||||
stuckErr error
|
||||
}
|
||||
|
||||
func (f *fakeQueue) Enqueue(_ context.Context, _ Job) error { return nil }
|
||||
func (f *fakeQueue) Pop(_ context.Context) (*Job, error) { return nil, nil }
|
||||
func (f *fakeQueue) MarkDone(_ context.Context, _ int64) error { return nil }
|
||||
func (f *fakeQueue) MarkFailed(_ context.Context, _ int64, _ int, _ error, _ int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeQueue) PurgeDone(_ context.Context, age time.Duration) (int64, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.purgeAges = append(f.purgeAges, age)
|
||||
return f.purgeRet, f.purgeErr
|
||||
}
|
||||
|
||||
func (f *fakeQueue) RecoverStuck(_ context.Context, age time.Duration) (int64, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.stuckAges = append(f.stuckAges, age)
|
||||
return f.stuckRet, f.stuckErr
|
||||
}
|
||||
|
||||
func TestJanitor_TickCallsBothQueueMethods(t *testing.T) {
|
||||
q := &fakeQueue{purgeRet: 3, stuckRet: 1}
|
||||
j := NewJanitor(q, 7*24*time.Hour, 30*time.Minute, time.Hour)
|
||||
|
||||
j.tick(context.Background())
|
||||
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if len(q.purgeAges) != 1 || q.purgeAges[0] != 7*24*time.Hour {
|
||||
t.Fatalf("PurgeDone calls = %v, want one call with 168h", q.purgeAges)
|
||||
}
|
||||
if len(q.stuckAges) != 1 || q.stuckAges[0] != 30*time.Minute {
|
||||
t.Fatalf("RecoverStuck calls = %v, want one call with 30m", q.stuckAges)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJanitor_TickSurvivesPurgeError(t *testing.T) {
|
||||
q := &fakeQueue{purgeErr: errors.New("boom")}
|
||||
j := NewJanitor(q, time.Hour, time.Minute, time.Hour)
|
||||
|
||||
// Should not panic, should still call RecoverStuck despite Purge failure.
|
||||
j.tick(context.Background())
|
||||
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if len(q.stuckAges) != 1 {
|
||||
t.Fatalf("RecoverStuck should still run after PurgeDone error, got %d calls", len(q.stuckAges))
|
||||
}
|
||||
}
|
||||
|
||||
func TestJanitor_DefaultsAppliedOnZeroOrNegative(t *testing.T) {
|
||||
q := &fakeQueue{}
|
||||
j := NewJanitor(q, 0, -time.Second, 0)
|
||||
|
||||
if j.doneRetention != defaultDoneRetention {
|
||||
t.Errorf("doneRetention = %s, want %s", j.doneRetention, defaultDoneRetention)
|
||||
}
|
||||
if j.stuckTimeout != defaultStuckTimeout {
|
||||
t.Errorf("stuckTimeout = %s, want %s", j.stuckTimeout, defaultStuckTimeout)
|
||||
}
|
||||
if j.interval != defaultJanitorInterval {
|
||||
t.Errorf("interval = %s, want %s", j.interval, defaultJanitorInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJanitor_RunStopsOnContextCancel(t *testing.T) {
|
||||
q := &fakeQueue{}
|
||||
// Very short interval so the ticker fires at least once before we cancel.
|
||||
j := NewJanitor(q, time.Hour, time.Minute, 5*time.Millisecond)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
j.Run(ctx)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Let the immediate-startup tick + at least one interval tick fire.
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("janitor did not stop after context cancel")
|
||||
}
|
||||
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if len(q.purgeAges) < 1 {
|
||||
t.Fatalf("expected at least 1 purge call before cancel, got %d", len(q.purgeAges))
|
||||
}
|
||||
}
|
||||
24
main.go
24
main.go
@@ -106,6 +106,14 @@ func runServer() {
|
||||
if queue != nil {
|
||||
worker := NewWorker(queue, registry, tg)
|
||||
go worker.Run(ctx)
|
||||
|
||||
// Phase 2d — janitor : purge done > QUEUE_DONE_RETENTION (default 7d),
|
||||
// recover stuck running > QUEUE_STUCK_TIMEOUT (default 30m), runs every
|
||||
// QUEUE_JANITOR_INTERVAL (default 1h). All overridable via env.
|
||||
doneRet := envDuration("QUEUE_DONE_RETENTION", defaultDoneRetention)
|
||||
stuckTO := envDuration("QUEUE_STUCK_TIMEOUT", defaultStuckTimeout)
|
||||
jInt := envDuration("QUEUE_JANITOR_INTERVAL", defaultJanitorInterval)
|
||||
go NewJanitor(queue, doneRet, stuckTO, jInt).Run(ctx)
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
@@ -140,3 +148,19 @@ func envOr(key, fallback string) string {
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
// envDuration reads a Go duration from env (e.g. "168h", "30m") or returns
|
||||
// the fallback if unset / unparseable. Logs a warning on parse error so the
|
||||
// operator notices a typo without losing the deployment.
|
||||
func envDuration(key string, fallback time.Duration) time.Duration {
|
||||
raw := os.Getenv(key)
|
||||
if raw == "" {
|
||||
return fallback
|
||||
}
|
||||
d, err := time.ParseDuration(raw)
|
||||
if err != nil || d <= 0 {
|
||||
log.Printf("%s=%q invalid, defaulting to %s", key, raw, fallback)
|
||||
return fallback
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
34
queue.go
34
queue.go
@@ -37,6 +37,13 @@ type Queue interface {
|
||||
// MarkFailed schedules a retry (status back to 'pending', next_retry_at
|
||||
// in the future). After maxAttempts the row is set to 'dead'.
|
||||
MarkFailed(ctx context.Context, id int64, attempt int, err error, maxAttempts int) error
|
||||
// PurgeDone removes rows with status='done' older than `olderThan`.
|
||||
// Returns the number of rows deleted. Used by the Janitor goroutine.
|
||||
PurgeDone(ctx context.Context, olderThan time.Duration) (int64, error)
|
||||
// RecoverStuck reverts rows stuck in status='running' for more than
|
||||
// `stuckFor` back to 'pending' so a worker can retry. Handles the case
|
||||
// of a pod dying mid-job. Returns the number of rows reverted.
|
||||
RecoverStuck(ctx context.Context, stuckFor time.Duration) (int64, error)
|
||||
}
|
||||
|
||||
type PostgresQueue struct {
|
||||
@@ -80,6 +87,17 @@ UPDATE gateway_jobs
|
||||
last_error = $2,
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`
|
||||
purgeDoneSQL = `
|
||||
DELETE FROM gateway_jobs
|
||||
WHERE status = 'done'
|
||||
AND updated_at < NOW() - make_interval(secs => $1::int)
|
||||
`
|
||||
recoverStuckSQL = `
|
||||
UPDATE gateway_jobs
|
||||
SET status = 'pending', updated_at = NOW()
|
||||
WHERE status = 'running'
|
||||
AND updated_at < NOW() - make_interval(secs => $1::int)
|
||||
`
|
||||
)
|
||||
|
||||
@@ -123,6 +141,22 @@ func (q *PostgresQueue) MarkFailed(ctx context.Context, id int64, attempt int, j
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *PostgresQueue) PurgeDone(ctx context.Context, olderThan time.Duration) (int64, error) {
|
||||
res, err := q.db.ExecContext(ctx, purgeDoneSQL, int(olderThan.Seconds()))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
func (q *PostgresQueue) RecoverStuck(ctx context.Context, stuckFor time.Duration) (int64, error) {
|
||||
res, err := q.db.ExecContext(ctx, recoverStuckSQL, int(stuckFor.Seconds()))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
// Backoff returns the delay before the next attempt of a failed job.
|
||||
// 30s, 2m, 10m, 1h (capped). Spans roughly one night by attempt 5.
|
||||
func Backoff(attempt int) time.Duration {
|
||||
|
||||
Reference in New Issue
Block a user