Some checks failed
Docker Build / build-and-push-image (push) Has been cancelled
Adds the async dispatch infrastructure : - Postgres pool + embedded migration (CREATE TABLE/INDEX IF NOT EXISTS gateway_jobs). Auto-applied at boot. lib/pq driver (matches webapp convention). - queue.go : Enqueue (idempotent on UNIQUE(bot_slug, update_id) — handles Telegram redelivery), Pop with FOR UPDATE SKIP LOCKED, MarkDone, MarkFailed with exponential backoff (30s → 2m → 10m → 1h → dead at 5). - worker.go : goroutine that drains the queue, dispatches via the same Handler interface as sync, schedules retries on failure, notifies the user once when a job goes to dead. - BotConfig gains `async: bool`. Registry refuses bots with async=true if DATABASE_URL is unset (queue=nil). - Server : when bot.Async, the webhook ack is immediate ; the update payload is enqueued for the worker. When DATABASE_URL is unset (current default), queue/worker stay disabled and only sync handlers (echo, http, auth) work — no breaking change to the running cluster. Refs ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
155 lines
4.3 KiB
Go
155 lines
4.3 KiB
Go
// Phase 2b — durable async job queue (Postgres). Idempotent on (bot_slug, update_id).
|
|
// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// Job is a unit of async work : a Telegram Update routed to a particular bot,
|
|
// to be processed by that bot's handler.
|
|
type Job struct {
|
|
ID int64
|
|
BotSlug string
|
|
HandlerType string
|
|
UpdateID int64
|
|
Payload []byte
|
|
Attempts int
|
|
NextRetryAt time.Time
|
|
Status string
|
|
LastError string
|
|
}
|
|
|
|
type Queue interface {
|
|
// Enqueue inserts a job ; on UNIQUE conflict (Telegram redelivery) it
|
|
// returns nil without touching the existing row.
|
|
Enqueue(ctx context.Context, j Job) error
|
|
// Pop atomically claims the oldest pending job whose next_retry_at has
|
|
// passed (FOR UPDATE SKIP LOCKED, status → 'running', attempts++).
|
|
// Returns (nil, nil) when there is nothing to claim.
|
|
Pop(ctx context.Context) (*Job, error)
|
|
// MarkDone marks a job 'done'.
|
|
MarkDone(ctx context.Context, id int64) error
|
|
// MarkFailed schedules a retry (status back to 'pending', next_retry_at
|
|
// in the future). After maxAttempts the row is set to 'dead'.
|
|
MarkFailed(ctx context.Context, id int64, attempt int, err error, maxAttempts int) error
|
|
}
|
|
|
|
type PostgresQueue struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
func NewPostgresQueue(db *sql.DB) *PostgresQueue { return &PostgresQueue{db: db} }
|
|
|
|
const (
|
|
enqueueSQL = `
|
|
INSERT INTO gateway_jobs (bot_slug, handler_type, update_id, payload)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT (bot_slug, update_id) DO NOTHING
|
|
`
|
|
popSQL = `
|
|
UPDATE gateway_jobs
|
|
SET status = 'running', attempts = attempts + 1, updated_at = NOW()
|
|
WHERE id = (
|
|
SELECT id FROM gateway_jobs
|
|
WHERE status = 'pending' AND next_retry_at <= NOW()
|
|
ORDER BY next_retry_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT 1
|
|
)
|
|
RETURNING id, bot_slug, handler_type, update_id, payload, attempts, next_retry_at, status, COALESCE(last_error, '')
|
|
`
|
|
markDoneSQL = `
|
|
UPDATE gateway_jobs SET status = 'done', updated_at = NOW(), last_error = NULL WHERE id = $1
|
|
`
|
|
markRetrySQL = `
|
|
UPDATE gateway_jobs
|
|
SET status = 'pending',
|
|
next_retry_at = $2,
|
|
last_error = $3,
|
|
updated_at = NOW()
|
|
WHERE id = $1
|
|
`
|
|
markDeadSQL = `
|
|
UPDATE gateway_jobs
|
|
SET status = 'dead',
|
|
last_error = $2,
|
|
updated_at = NOW()
|
|
WHERE id = $1
|
|
`
|
|
)
|
|
|
|
func (q *PostgresQueue) Enqueue(ctx context.Context, j Job) error {
|
|
_, err := q.db.ExecContext(ctx, enqueueSQL, j.BotSlug, j.HandlerType, j.UpdateID, j.Payload)
|
|
return err
|
|
}
|
|
|
|
func (q *PostgresQueue) Pop(ctx context.Context) (*Job, error) {
|
|
var j Job
|
|
err := q.db.QueryRowContext(ctx, popSQL).Scan(
|
|
&j.ID, &j.BotSlug, &j.HandlerType, &j.UpdateID, &j.Payload,
|
|
&j.Attempts, &j.NextRetryAt, &j.Status, &j.LastError,
|
|
)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &j, nil
|
|
}
|
|
|
|
func (q *PostgresQueue) MarkDone(ctx context.Context, id int64) error {
|
|
_, err := q.db.ExecContext(ctx, markDoneSQL, id)
|
|
return err
|
|
}
|
|
|
|
func (q *PostgresQueue) MarkFailed(ctx context.Context, id int64, attempt int, jobErr error, maxAttempts int) error {
|
|
msg := ""
|
|
if jobErr != nil {
|
|
msg = truncate(jobErr.Error(), 4096)
|
|
}
|
|
if attempt >= maxAttempts {
|
|
_, err := q.db.ExecContext(ctx, markDeadSQL, id, msg)
|
|
return err
|
|
}
|
|
retryIn := Backoff(attempt)
|
|
next := time.Now().Add(retryIn)
|
|
_, err := q.db.ExecContext(ctx, markRetrySQL, id, next, msg)
|
|
return err
|
|
}
|
|
|
|
// Backoff returns the delay before the next attempt of a failed job.
|
|
// 30s, 2m, 10m, 1h (capped). Spans roughly one night by attempt 5.
|
|
func Backoff(attempt int) time.Duration {
|
|
base := 30 * time.Second
|
|
switch attempt {
|
|
case 1:
|
|
return base // 30s
|
|
case 2:
|
|
return 2 * time.Minute
|
|
case 3:
|
|
return 10 * time.Minute
|
|
default:
|
|
return 1 * time.Hour
|
|
}
|
|
}
|
|
|
|
// EnqueueWithDefaults is a convenience used by the webhook handler — wraps
|
|
// Enqueue with sensible defaults and returns a contextful error message.
|
|
func EnqueueWithDefaults(ctx context.Context, q Queue, slug, handlerType string, updateID int64, payload []byte) error {
|
|
if q == nil {
|
|
return fmt.Errorf("queue not initialized (DATABASE_URL unset ?)")
|
|
}
|
|
return q.Enqueue(ctx, Job{
|
|
BotSlug: slug,
|
|
HandlerType: handlerType,
|
|
UpdateID: updateID,
|
|
Payload: payload,
|
|
})
|
|
}
|