Files
telegram-gateway/queue.go
Gabriel Radureau abe77f5873
All checks were successful
CI/CD / test (push) Successful in 20s
CI/CD / build-and-push-image (push) Successful in 58s
Phase 2d — gateway_jobs retention (Janitor goroutine)
Periodic cleanup goroutine, started alongside the worker when DATABASE_URL
is set. Three concerns :

- DELETE rows with status='done' older than QUEUE_DONE_RETENTION (default
  168h / 7 days). Past success rows have no value beyond debug runway.
- UPDATE rows stuck in status='running' for more than QUEUE_STUCK_TIMEOUT
  (default 30m) back to 'pending' so a worker can retry. Handles the
  case of a pod crashing mid-job (without this, jobs stay orphaned forever).
- 'dead' rows are NEVER auto-purged (volume negligible, kept for forensics).

Configurable via env :
- QUEUE_DONE_RETENTION (default 168h)
- QUEUE_STUCK_TIMEOUT  (default 30m)
- QUEUE_JANITOR_INTERVAL (default 1h)

The janitor runs once immediately at startup (recovers anything orphaned
by the previous pod before opening for new traffic), then ticks on the
interval.

Queue interface gains PurgeDone + RecoverStuck — both use Postgres'
make_interval(secs) for safe parameterization.

4 new unit tests via fakeQueue mock (47 total, race clean).
2026-05-09 16:06:54 +02:00

189 lines
5.5 KiB
Go

// Phase 2b — durable async job queue (Postgres). Idempotent on (bot_slug, update_id).
// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
)
// Job is a unit of async work : a Telegram Update routed to a particular bot,
// to be processed by that bot's handler.
type Job struct {
ID int64
BotSlug string
HandlerType string
UpdateID int64
Payload []byte
Attempts int
NextRetryAt time.Time
Status string
LastError string
}
type Queue interface {
// Enqueue inserts a job ; on UNIQUE conflict (Telegram redelivery) it
// returns nil without touching the existing row.
Enqueue(ctx context.Context, j Job) error
// Pop atomically claims the oldest pending job whose next_retry_at has
// passed (FOR UPDATE SKIP LOCKED, status → 'running', attempts++).
// Returns (nil, nil) when there is nothing to claim.
Pop(ctx context.Context) (*Job, error)
// MarkDone marks a job 'done'.
MarkDone(ctx context.Context, id int64) error
// MarkFailed schedules a retry (status back to 'pending', next_retry_at
// in the future). After maxAttempts the row is set to 'dead'.
MarkFailed(ctx context.Context, id int64, attempt int, err error, maxAttempts int) error
// PurgeDone removes rows with status='done' older than `olderThan`.
// Returns the number of rows deleted. Used by the Janitor goroutine.
PurgeDone(ctx context.Context, olderThan time.Duration) (int64, error)
// RecoverStuck reverts rows stuck in status='running' for more than
// `stuckFor` back to 'pending' so a worker can retry. Handles the case
// of a pod dying mid-job. Returns the number of rows reverted.
RecoverStuck(ctx context.Context, stuckFor time.Duration) (int64, error)
}
type PostgresQueue struct {
db *sql.DB
}
func NewPostgresQueue(db *sql.DB) *PostgresQueue { return &PostgresQueue{db: db} }
const (
enqueueSQL = `
INSERT INTO gateway_jobs (bot_slug, handler_type, update_id, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT (bot_slug, update_id) DO NOTHING
`
popSQL = `
UPDATE gateway_jobs
SET status = 'running', attempts = attempts + 1, updated_at = NOW()
WHERE id = (
SELECT id FROM gateway_jobs
WHERE status = 'pending' AND next_retry_at <= NOW()
ORDER BY next_retry_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, bot_slug, handler_type, update_id, payload, attempts, next_retry_at, status, COALESCE(last_error, '')
`
markDoneSQL = `
UPDATE gateway_jobs SET status = 'done', updated_at = NOW(), last_error = NULL WHERE id = $1
`
markRetrySQL = `
UPDATE gateway_jobs
SET status = 'pending',
next_retry_at = $2,
last_error = $3,
updated_at = NOW()
WHERE id = $1
`
markDeadSQL = `
UPDATE gateway_jobs
SET status = 'dead',
last_error = $2,
updated_at = NOW()
WHERE id = $1
`
purgeDoneSQL = `
DELETE FROM gateway_jobs
WHERE status = 'done'
AND updated_at < NOW() - make_interval(secs => $1::int)
`
recoverStuckSQL = `
UPDATE gateway_jobs
SET status = 'pending', updated_at = NOW()
WHERE status = 'running'
AND updated_at < NOW() - make_interval(secs => $1::int)
`
)
func (q *PostgresQueue) Enqueue(ctx context.Context, j Job) error {
_, err := q.db.ExecContext(ctx, enqueueSQL, j.BotSlug, j.HandlerType, j.UpdateID, j.Payload)
return err
}
func (q *PostgresQueue) Pop(ctx context.Context) (*Job, error) {
var j Job
err := q.db.QueryRowContext(ctx, popSQL).Scan(
&j.ID, &j.BotSlug, &j.HandlerType, &j.UpdateID, &j.Payload,
&j.Attempts, &j.NextRetryAt, &j.Status, &j.LastError,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &j, nil
}
func (q *PostgresQueue) MarkDone(ctx context.Context, id int64) error {
_, err := q.db.ExecContext(ctx, markDoneSQL, id)
return err
}
func (q *PostgresQueue) MarkFailed(ctx context.Context, id int64, attempt int, jobErr error, maxAttempts int) error {
msg := ""
if jobErr != nil {
msg = truncate(jobErr.Error(), 4096)
}
if attempt >= maxAttempts {
_, err := q.db.ExecContext(ctx, markDeadSQL, id, msg)
return err
}
retryIn := Backoff(attempt)
next := time.Now().Add(retryIn)
_, err := q.db.ExecContext(ctx, markRetrySQL, id, next, msg)
return err
}
func (q *PostgresQueue) PurgeDone(ctx context.Context, olderThan time.Duration) (int64, error) {
res, err := q.db.ExecContext(ctx, purgeDoneSQL, int(olderThan.Seconds()))
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (q *PostgresQueue) RecoverStuck(ctx context.Context, stuckFor time.Duration) (int64, error) {
res, err := q.db.ExecContext(ctx, recoverStuckSQL, int(stuckFor.Seconds()))
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// Backoff returns the delay before the next attempt of a failed job.
// 30s, 2m, 10m, 1h (capped). Spans roughly one night by attempt 5.
func Backoff(attempt int) time.Duration {
base := 30 * time.Second
switch attempt {
case 1:
return base // 30s
case 2:
return 2 * time.Minute
case 3:
return 10 * time.Minute
default:
return 1 * time.Hour
}
}
// EnqueueWithDefaults is a convenience used by the webhook handler — wraps
// Enqueue with sensible defaults and returns a contextful error message.
func EnqueueWithDefaults(ctx context.Context, q Queue, slug, handlerType string, updateID int64, payload []byte) error {
if q == nil {
return fmt.Errorf("queue not initialized (DATABASE_URL unset ?)")
}
return q.Enqueue(ctx, Job{
BotSlug: slug,
HandlerType: handlerType,
UpdateID: updateID,
Payload: payload,
})
}