Phase 2b — durable Postgres queue + worker (gated on DATABASE_URL)
Some checks failed
Docker Build / build-and-push-image (push) Has been cancelled
Some checks failed
Docker Build / build-and-push-image (push) Has been cancelled
Adds the async dispatch infrastructure : - Postgres pool + embedded migration (CREATE TABLE/INDEX IF NOT EXISTS gateway_jobs). Auto-applied at boot. lib/pq driver (matches webapp convention). - queue.go : Enqueue (idempotent on UNIQUE(bot_slug, update_id) — handles Telegram redelivery), Pop with FOR UPDATE SKIP LOCKED, MarkDone, MarkFailed with exponential backoff (30s → 2m → 10m → 1h → dead at 5). - worker.go : goroutine that drains the queue, dispatches via the same Handler interface as sync, schedules retries on failure, notifies the user once when a job goes to dead. - BotConfig gains `async: bool`. Registry refuses bots with async=true if DATABASE_URL is unset (queue=nil). - Server : when bot.Async, the webhook ack is immediate ; the update payload is enqueued for the worker. When DATABASE_URL is unset (current default), queue/worker stay disabled and only sync handlers (echo, http, auth) work — no breaking change to the running cluster. Refs ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
This commit is contained in:
@@ -105,6 +105,13 @@ auth:
|
|||||||
redisURL: "redis://redis.tools.svc.cluster.local:6379/0"
|
redisURL: "redis://redis.tools.svc.cluster.local:6379/0"
|
||||||
sessionTTL: "24h"
|
sessionTTL: "24h"
|
||||||
|
|
||||||
|
# Async queue (Phase 2b). Quand DATABASE_URL est dans le Secret cluster,
|
||||||
|
# la queue Postgres + worker s'activent ; sinon tout reste sync (les bots
|
||||||
|
# avec `async: true` font fail au boot — secure by default).
|
||||||
|
# Format DSN attendu : postgres://USER:PASSWORD@pgbouncer.tools:6432/DBNAME?sslmode=disable
|
||||||
|
# L'utilisateur doit avoir CREATE TABLE / CREATE INDEX sur la base cible
|
||||||
|
# (le pod auto-applique les migrations idempotentes au démarrage).
|
||||||
|
|
||||||
# k8s Secret consumed by `envFrom`. Phase 1: create it manually with kubectl.
|
# k8s Secret consumed by `envFrom`. Phase 1: create it manually with kubectl.
|
||||||
# kubectl -n telegram-gateway create secret generic telegram-gateway-bots \
|
# kubectl -n telegram-gateway create secret generic telegram-gateway-bots \
|
||||||
# --from-literal=BOT_FACTORY_TOKEN=… --from-literal=BOT_FACTORY_SECRET=…
|
# --from-literal=BOT_FACTORY_TOKEN=… --from-literal=BOT_FACTORY_SECRET=…
|
||||||
|
|||||||
@@ -20,6 +20,13 @@ type BotConfig struct {
|
|||||||
// `requireAuth: false` is required to expose a bot publicly.
|
// `requireAuth: false` is required to expose a bot publicly.
|
||||||
// Forced to false for handler=auth (chicken-and-egg).
|
// Forced to false for handler=auth (chicken-and-egg).
|
||||||
RequireAuth *bool `yaml:"requireAuth,omitempty"`
|
RequireAuth *bool `yaml:"requireAuth,omitempty"`
|
||||||
|
// Async, when true, dispatches incoming Updates through the durable
|
||||||
|
// Postgres queue + worker (Phase 2b) instead of running inline. The
|
||||||
|
// webhook ack returns 200 immediately ; the worker may retry the job
|
||||||
|
// with backoff if the handler fails. Required for handlers that may
|
||||||
|
// outlive the Telegram webhook timeout (shell, script, ollama).
|
||||||
|
// Requires DATABASE_URL to be set, otherwise registry refuses to start.
|
||||||
|
Async bool `yaml:"async,omitempty"`
|
||||||
// HTTP is the per-bot config for the `http` handler. Required when
|
// HTTP is the per-bot config for the `http` handler. Required when
|
||||||
// handler=http. See handler_http.go.
|
// handler=http. See handler_http.go.
|
||||||
HTTP *HTTPConfig `yaml:"http,omitempty"`
|
HTTP *HTTPConfig `yaml:"http,omitempty"`
|
||||||
|
|||||||
58
db.go
Normal file
58
db.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
// Phase 2b — Postgres pool + schema migration.
|
||||||
|
// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
_ "embed"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed migrations/001_init.sql
|
||||||
|
var initSQL string
|
||||||
|
|
||||||
|
// OpenDB opens a Postgres pool from a DSN, pings, and runs the embedded
|
||||||
|
// migration (idempotent CREATE TABLE / CREATE INDEX IF NOT EXISTS).
|
||||||
|
// Caller is responsible for closing the returned *sql.DB.
|
||||||
|
func OpenDB(ctx context.Context, dsn string) (*sql.DB, error) {
|
||||||
|
db, err := sql.Open("postgres", dsn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("sql.Open: %w", err)
|
||||||
|
}
|
||||||
|
db.SetMaxOpenConns(10)
|
||||||
|
db.SetMaxIdleConns(5)
|
||||||
|
db.SetConnMaxLifetime(30 * time.Minute)
|
||||||
|
|
||||||
|
pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := db.PingContext(pingCtx); err != nil {
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, fmt.Errorf("postgres ping: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
migCtx, cancelMig := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
defer cancelMig()
|
||||||
|
if _, err := db.ExecContext(migCtx, initSQL); err != nil {
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, fmt.Errorf("apply migration: %w", err)
|
||||||
|
}
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedactDSN strips the password from a Postgres URL for safe logging.
|
||||||
|
func RedactDSN(dsn string) string {
|
||||||
|
u, err := url.Parse(dsn)
|
||||||
|
if err != nil {
|
||||||
|
return "(invalid dsn)"
|
||||||
|
}
|
||||||
|
if u.User != nil {
|
||||||
|
username := u.User.Username()
|
||||||
|
u.User = url.User(username)
|
||||||
|
}
|
||||||
|
return u.String()
|
||||||
|
}
|
||||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module github.com/arcodange/telegram-gateway
|
|||||||
go 1.24
|
go 1.24
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/lib/pq v1.12.3
|
||||||
github.com/redis/go-redis/v9 v9.19.0
|
github.com/redis/go-redis/v9 v9.19.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
|||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
|
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
|
||||||
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||||
|
github.com/lib/pq v1.12.3 h1:tTWxr2YLKwIvK90ZXEw8GP7UFHtcbTtty8zsI+YjrfQ=
|
||||||
|
github.com/lib/pq v1.12.3/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k=
|
github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k=
|
||||||
|
|||||||
41
handlers.go
41
handlers.go
@@ -11,23 +11,30 @@ type Handler interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Bot struct {
|
type Bot struct {
|
||||||
Slug string
|
Slug string
|
||||||
Token string
|
Token string
|
||||||
Secret string
|
Secret string
|
||||||
RequireAuth bool
|
HandlerLabel string // raw 'handler:' value (echo|auth|http|…) — used by queue rows for diagnostics
|
||||||
Handler Handler
|
RequireAuth bool
|
||||||
|
Async bool
|
||||||
|
Handler Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HandlerType returns the textual handler label for queue rows / logs.
|
||||||
|
func (b Bot) HandlerType() string { return b.HandlerLabel }
|
||||||
|
|
||||||
type Registry struct {
|
type Registry struct {
|
||||||
bots map[string]Bot
|
bots map[string]Bot
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRegistry builds the bot routing map from the parsed YAML config + the
|
// NewRegistry builds the bot routing map from the parsed YAML config + the
|
||||||
// per-bot secrets (already merged into BotConfig from env). It receives the
|
// per-bot secrets (already merged into BotConfig from env). It receives the
|
||||||
// shared TelegramClient and Auth so per-handler instances can use them.
|
// shared TelegramClient, Auth and Queue so per-handler instances can use
|
||||||
// `auth` may be nil (no AUTH_SECRET set) ; in that case any bot configured
|
// them. `auth` may be nil (no AUTH_SECRET set) ; in that case any bot
|
||||||
// with `handler: auth` or `requireAuth: true` is a fatal config error.
|
// configured with `handler: auth` or `requireAuth: true` is a fatal config
|
||||||
func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth) (*Registry, error) {
|
// error. `queue` may be nil (no DATABASE_URL set) ; in that case any bot
|
||||||
|
// with `async: true` is a fatal config error.
|
||||||
|
func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth, queue Queue) (*Registry, error) {
|
||||||
if len(cfg.Bots) == 0 {
|
if len(cfg.Bots) == 0 {
|
||||||
return nil, fmt.Errorf("no bots configured")
|
return nil, fmt.Errorf("no bots configured")
|
||||||
}
|
}
|
||||||
@@ -64,6 +71,10 @@ func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth) (*Registry, error)
|
|||||||
return nil, fmt.Errorf("bot %s has requireAuth: true (default) but AUTH_SECRET is unset; either set AUTH_SECRET or add `requireAuth: false` to the bot config", slug)
|
return nil, fmt.Errorf("bot %s has requireAuth: true (default) but AUTH_SECRET is unset; either set AUTH_SECRET or add `requireAuth: false` to the bot config", slug)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.Async && queue == nil {
|
||||||
|
return nil, fmt.Errorf("bot %s has async: true but DATABASE_URL is unset (no queue available)", slug)
|
||||||
|
}
|
||||||
|
|
||||||
var h Handler
|
var h Handler
|
||||||
switch b.Handler {
|
switch b.Handler {
|
||||||
case "echo":
|
case "echo":
|
||||||
@@ -89,11 +100,13 @@ func NewRegistry(cfg *Config, tg *TelegramClient, auth *Auth) (*Registry, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
bots[slug] = Bot{
|
bots[slug] = Bot{
|
||||||
Slug: slug,
|
Slug: slug,
|
||||||
Token: token,
|
Token: token,
|
||||||
Secret: secret,
|
Secret: secret,
|
||||||
RequireAuth: requireAuth,
|
HandlerLabel: b.Handler,
|
||||||
Handler: h,
|
RequireAuth: requireAuth,
|
||||||
|
Async: b.Async,
|
||||||
|
Handler: h,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
33
main.go
33
main.go
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
@@ -46,6 +47,26 @@ func runServer() {
|
|||||||
|
|
||||||
tg := NewTelegramClient()
|
tg := NewTelegramClient()
|
||||||
|
|
||||||
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
// Phase 2b — durable queue (Postgres) + worker. Disabled when
|
||||||
|
// DATABASE_URL is unset ; bots with `async: true` then fail config validation.
|
||||||
|
var queue Queue
|
||||||
|
var db *sql.DB
|
||||||
|
if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" {
|
||||||
|
var err error
|
||||||
|
db, err = OpenDB(ctx, dbURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("init db: %v", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = db.Close() }()
|
||||||
|
queue = NewPostgresQueue(db)
|
||||||
|
log.Printf("queue + schema ready (db=%s)", RedactDSN(dbURL))
|
||||||
|
} else {
|
||||||
|
log.Print("DATABASE_URL unset → queue/worker disabled (sync handlers still work)")
|
||||||
|
}
|
||||||
|
|
||||||
// Phase 1.5 — auth layer (Redis-backed sessions). See
|
// Phase 1.5 — auth layer (Redis-backed sessions). See
|
||||||
// factory/doc/adr/20260509-telegram-gateway-auth.md.
|
// factory/doc/adr/20260509-telegram-gateway-auth.md.
|
||||||
authSecret := os.Getenv("AUTH_SECRET")
|
authSecret := os.Getenv("AUTH_SECRET")
|
||||||
@@ -77,23 +98,25 @@ func runServer() {
|
|||||||
log.Printf("allowlist active (%d user(s) allowed)", allowlist.Size())
|
log.Printf("allowlist active (%d user(s) allowed)", allowlist.Size())
|
||||||
}
|
}
|
||||||
|
|
||||||
registry, err := NewRegistry(cfg, tg, auth)
|
registry, err := NewRegistry(cfg, tg, auth, queue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("build registry: %v", err)
|
log.Fatalf("build registry: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if queue != nil {
|
||||||
|
worker := NewWorker(queue, registry, tg)
|
||||||
|
go worker.Run(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
Addr: *addr,
|
Addr: *addr,
|
||||||
Handler: NewServer(registry, auth, allowlist, tg).Routes(),
|
Handler: NewServer(registry, auth, allowlist, tg, queue).Routes(),
|
||||||
ReadHeaderTimeout: 5 * time.Second,
|
ReadHeaderTimeout: 5 * time.Second,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
WriteTimeout: 30 * time.Second,
|
WriteTimeout: 30 * time.Second,
|
||||||
IdleTimeout: 60 * time.Second,
|
IdleTimeout: 60 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
defer stop()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Printf("telegram-gateway listening on %s (%d bot(s) loaded)", *addr, registry.Count())
|
log.Printf("telegram-gateway listening on %s (%d bot(s) loaded)", *addr, registry.Count())
|
||||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
|||||||
24
migrations/001_init.sql
Normal file
24
migrations/001_init.sql
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
-- Phase 2b — durable job queue for async handlers (shell, script, ollama).
|
||||||
|
-- Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
|
||||||
|
CREATE TABLE IF NOT EXISTS gateway_jobs (
|
||||||
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
bot_slug TEXT NOT NULL,
|
||||||
|
handler_type TEXT NOT NULL,
|
||||||
|
update_id BIGINT NOT NULL,
|
||||||
|
payload JSONB NOT NULL,
|
||||||
|
attempts INT NOT NULL DEFAULT 0,
|
||||||
|
next_retry_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
last_error TEXT,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
-- idempotence : si Telegram redélivre (ack > 60s), on ignore le doublon.
|
||||||
|
CONSTRAINT gateway_jobs_uniq_update UNIQUE (bot_slug, update_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS gateway_jobs_pickable
|
||||||
|
ON gateway_jobs (next_retry_at)
|
||||||
|
WHERE status = 'pending';
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS gateway_jobs_status
|
||||||
|
ON gateway_jobs (status);
|
||||||
154
queue.go
Normal file
154
queue.go
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
// Phase 2b — durable async job queue (Postgres). Idempotent on (bot_slug, update_id).
|
||||||
|
// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Job is a unit of async work : a Telegram Update routed to a particular bot,
|
||||||
|
// to be processed by that bot's handler.
|
||||||
|
type Job struct {
|
||||||
|
ID int64
|
||||||
|
BotSlug string
|
||||||
|
HandlerType string
|
||||||
|
UpdateID int64
|
||||||
|
Payload []byte
|
||||||
|
Attempts int
|
||||||
|
NextRetryAt time.Time
|
||||||
|
Status string
|
||||||
|
LastError string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Queue interface {
|
||||||
|
// Enqueue inserts a job ; on UNIQUE conflict (Telegram redelivery) it
|
||||||
|
// returns nil without touching the existing row.
|
||||||
|
Enqueue(ctx context.Context, j Job) error
|
||||||
|
// Pop atomically claims the oldest pending job whose next_retry_at has
|
||||||
|
// passed (FOR UPDATE SKIP LOCKED, status → 'running', attempts++).
|
||||||
|
// Returns (nil, nil) when there is nothing to claim.
|
||||||
|
Pop(ctx context.Context) (*Job, error)
|
||||||
|
// MarkDone marks a job 'done'.
|
||||||
|
MarkDone(ctx context.Context, id int64) error
|
||||||
|
// MarkFailed schedules a retry (status back to 'pending', next_retry_at
|
||||||
|
// in the future). After maxAttempts the row is set to 'dead'.
|
||||||
|
MarkFailed(ctx context.Context, id int64, attempt int, err error, maxAttempts int) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type PostgresQueue struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPostgresQueue(db *sql.DB) *PostgresQueue { return &PostgresQueue{db: db} }
|
||||||
|
|
||||||
|
const (
|
||||||
|
enqueueSQL = `
|
||||||
|
INSERT INTO gateway_jobs (bot_slug, handler_type, update_id, payload)
|
||||||
|
VALUES ($1, $2, $3, $4)
|
||||||
|
ON CONFLICT (bot_slug, update_id) DO NOTHING
|
||||||
|
`
|
||||||
|
popSQL = `
|
||||||
|
UPDATE gateway_jobs
|
||||||
|
SET status = 'running', attempts = attempts + 1, updated_at = NOW()
|
||||||
|
WHERE id = (
|
||||||
|
SELECT id FROM gateway_jobs
|
||||||
|
WHERE status = 'pending' AND next_retry_at <= NOW()
|
||||||
|
ORDER BY next_retry_at ASC
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
RETURNING id, bot_slug, handler_type, update_id, payload, attempts, next_retry_at, status, COALESCE(last_error, '')
|
||||||
|
`
|
||||||
|
markDoneSQL = `
|
||||||
|
UPDATE gateway_jobs SET status = 'done', updated_at = NOW(), last_error = NULL WHERE id = $1
|
||||||
|
`
|
||||||
|
markRetrySQL = `
|
||||||
|
UPDATE gateway_jobs
|
||||||
|
SET status = 'pending',
|
||||||
|
next_retry_at = $2,
|
||||||
|
last_error = $3,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
`
|
||||||
|
markDeadSQL = `
|
||||||
|
UPDATE gateway_jobs
|
||||||
|
SET status = 'dead',
|
||||||
|
last_error = $2,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE id = $1
|
||||||
|
`
|
||||||
|
)
|
||||||
|
|
||||||
|
func (q *PostgresQueue) Enqueue(ctx context.Context, j Job) error {
|
||||||
|
_, err := q.db.ExecContext(ctx, enqueueSQL, j.BotSlug, j.HandlerType, j.UpdateID, j.Payload)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *PostgresQueue) Pop(ctx context.Context) (*Job, error) {
|
||||||
|
var j Job
|
||||||
|
err := q.db.QueryRowContext(ctx, popSQL).Scan(
|
||||||
|
&j.ID, &j.BotSlug, &j.HandlerType, &j.UpdateID, &j.Payload,
|
||||||
|
&j.Attempts, &j.NextRetryAt, &j.Status, &j.LastError,
|
||||||
|
)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &j, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *PostgresQueue) MarkDone(ctx context.Context, id int64) error {
|
||||||
|
_, err := q.db.ExecContext(ctx, markDoneSQL, id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *PostgresQueue) MarkFailed(ctx context.Context, id int64, attempt int, jobErr error, maxAttempts int) error {
|
||||||
|
msg := ""
|
||||||
|
if jobErr != nil {
|
||||||
|
msg = truncate(jobErr.Error(), 4096)
|
||||||
|
}
|
||||||
|
if attempt >= maxAttempts {
|
||||||
|
_, err := q.db.ExecContext(ctx, markDeadSQL, id, msg)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
retryIn := Backoff(attempt)
|
||||||
|
next := time.Now().Add(retryIn)
|
||||||
|
_, err := q.db.ExecContext(ctx, markRetrySQL, id, next, msg)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff returns the delay before the next attempt of a failed job.
|
||||||
|
// 30s, 2m, 10m, 1h (capped). Spans roughly one night by attempt 5.
|
||||||
|
func Backoff(attempt int) time.Duration {
|
||||||
|
base := 30 * time.Second
|
||||||
|
switch attempt {
|
||||||
|
case 1:
|
||||||
|
return base // 30s
|
||||||
|
case 2:
|
||||||
|
return 2 * time.Minute
|
||||||
|
case 3:
|
||||||
|
return 10 * time.Minute
|
||||||
|
default:
|
||||||
|
return 1 * time.Hour
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnqueueWithDefaults is a convenience used by the webhook handler — wraps
|
||||||
|
// Enqueue with sensible defaults and returns a contextful error message.
|
||||||
|
func EnqueueWithDefaults(ctx context.Context, q Queue, slug, handlerType string, updateID int64, payload []byte) error {
|
||||||
|
if q == nil {
|
||||||
|
return fmt.Errorf("queue not initialized (DATABASE_URL unset ?)")
|
||||||
|
}
|
||||||
|
return q.Enqueue(ctx, Job{
|
||||||
|
BotSlug: slug,
|
||||||
|
HandlerType: handlerType,
|
||||||
|
UpdateID: updateID,
|
||||||
|
Payload: payload,
|
||||||
|
})
|
||||||
|
}
|
||||||
31
server.go
31
server.go
@@ -13,10 +13,11 @@ type Server struct {
|
|||||||
auth *Auth
|
auth *Auth
|
||||||
allowlist Allowlist
|
allowlist Allowlist
|
||||||
tg *TelegramClient
|
tg *TelegramClient
|
||||||
|
queue Queue
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(r *Registry, auth *Auth, allow Allowlist, tg *TelegramClient) *Server {
|
func NewServer(r *Registry, auth *Auth, allow Allowlist, tg *TelegramClient, queue Queue) *Server {
|
||||||
return &Server{registry: r, auth: auth, allowlist: allow, tg: tg}
|
return &Server{registry: r, auth: auth, allowlist: allow, tg: tg, queue: queue}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Routes() http.Handler {
|
func (s *Server) Routes() http.Handler {
|
||||||
@@ -103,6 +104,32 @@ func (s *Server) botWebhook(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Async dispatch (Phase 2b) : enqueue + ack 200 immediately. The worker
|
||||||
|
// drains the queue and runs the handler asynchronously. Use this for
|
||||||
|
// handlers that may exceed Telegram's webhook timeout (~60s) or whose
|
||||||
|
// backend can be temporarily unreachable (e.g. Macbook Ollama dort).
|
||||||
|
if bot.Async {
|
||||||
|
if s.queue == nil {
|
||||||
|
log.Printf("bot=%s update=%d async requested but no queue configured", slug, update.UpdateID)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = fmt.Fprint(w, "{}")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Re-marshal the update : we already decoded once, but the original
|
||||||
|
// body is preferable for the worker (preserves any unknown fields).
|
||||||
|
// Use the parsed struct re-encoded — fields we don't model are lost
|
||||||
|
// but we documented the lenient-decode tradeoff in feedback memory.
|
||||||
|
payload, _ := json.Marshal(update)
|
||||||
|
if err := EnqueueWithDefaults(r.Context(), s.queue, slug, bot.HandlerType(), update.UpdateID, payload); err != nil {
|
||||||
|
log.Printf("bot=%s update=%d enqueue error: %v", slug, update.UpdateID, err)
|
||||||
|
} else {
|
||||||
|
log.Printf("bot=%s update=%d enqueued (async)", slug, update.UpdateID)
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = fmt.Fprint(w, "{}")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := bot.Handler.Handle(r.Context(), update, bot); err != nil {
|
if err := bot.Handler.Handle(r.Context(), update, bot); err != nil {
|
||||||
log.Printf("bot=%s update=%d handler error: %v", slug, update.UpdateID, err)
|
log.Printf("bot=%s update=%d handler error: %v", slug, update.UpdateID, err)
|
||||||
}
|
}
|
||||||
|
|||||||
108
worker.go
Normal file
108
worker.go
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
// Phase 2b — worker goroutine that drains the durable job queue.
|
||||||
|
// Voir ~/.claude/plans/pour-les-notifications-on-inherited-seal.md § Phase 2.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultPollInterval = 1 * time.Second
|
||||||
|
defaultMaxAttempts = 5
|
||||||
|
jobTimeout = 10 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// Worker pops jobs from the queue and dispatches them through the same
|
||||||
|
// Handler interface used by the sync path. Failures schedule a retry with
|
||||||
|
// exponential backoff ; after `maxAttempts` the job is moved to 'dead' and
|
||||||
|
// the user (if reachable) is informed once.
|
||||||
|
type Worker struct {
|
||||||
|
queue Queue
|
||||||
|
registry *Registry
|
||||||
|
tg *TelegramClient
|
||||||
|
pollEvery time.Duration
|
||||||
|
maxAttempts int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWorker(queue Queue, registry *Registry, tg *TelegramClient) *Worker {
|
||||||
|
return &Worker{
|
||||||
|
queue: queue,
|
||||||
|
registry: registry,
|
||||||
|
tg: tg,
|
||||||
|
pollEvery: defaultPollInterval,
|
||||||
|
maxAttempts: defaultMaxAttempts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run drains the queue until ctx is canceled. Intended to be called as a
|
||||||
|
// goroutine. Errors are logged but never bubble up.
|
||||||
|
func (w *Worker) Run(ctx context.Context) {
|
||||||
|
log.Printf("worker started (poll=%s, maxAttempts=%d)", w.pollEvery, w.maxAttempts)
|
||||||
|
t := time.NewTicker(w.pollEvery)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Print("worker stopping")
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
w.processOne(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) processOne(parent context.Context) {
|
||||||
|
ctx, cancel := context.WithTimeout(parent, jobTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
job, err := w.queue.Pop(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("worker: queue.Pop error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if job == nil {
|
||||||
|
return // nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
bot, ok := w.registry.Get(job.BotSlug)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("worker job=%d: unknown bot slug %q → dead", job.ID, job.BotSlug)
|
||||||
|
_ = w.queue.MarkFailed(ctx, job.ID, w.maxAttempts, errUnknownBot{slug: job.BotSlug}, w.maxAttempts)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var update Update
|
||||||
|
if err := json.Unmarshal(job.Payload, &update); err != nil {
|
||||||
|
log.Printf("worker job=%d: payload decode: %v → dead", job.ID, err)
|
||||||
|
_ = w.queue.MarkFailed(ctx, job.ID, w.maxAttempts, err, w.maxAttempts)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bot.Handler.Handle(ctx, update, bot); err != nil {
|
||||||
|
log.Printf("worker job=%d bot=%s attempt=%d: handler error: %v", job.ID, job.BotSlug, job.Attempts, err)
|
||||||
|
retryErr := w.queue.MarkFailed(ctx, job.ID, job.Attempts, err, w.maxAttempts)
|
||||||
|
if retryErr != nil {
|
||||||
|
log.Printf("worker job=%d: MarkFailed error: %v", job.ID, retryErr)
|
||||||
|
}
|
||||||
|
// Notify user once on dead
|
||||||
|
if job.Attempts >= w.maxAttempts {
|
||||||
|
if chatID, hasChat := update.ChatID(); hasChat && w.tg != nil {
|
||||||
|
_ = w.tg.SendMessage(ctx, bot.Token, SendMessageParams{
|
||||||
|
ChatID: chatID,
|
||||||
|
Text: "❌ Le backend est indisponible, j'abandonne après plusieurs tentatives.",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := w.queue.MarkDone(ctx, job.ID); err != nil {
|
||||||
|
log.Printf("worker job=%d: MarkDone error: %v", job.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type errUnknownBot struct{ slug string }
|
||||||
|
|
||||||
|
func (e errUnknownBot) Error() string { return "unknown bot slug: " + e.slug }
|
||||||
Reference in New Issue
Block a user