Files
dance-lessons-coach/pkg/server/server.go
Gabriel Radureau 5bc97545f4 feat(user): magic-link expired-token cleanup loop (ADR-0028 Phase A consequence)
Periodically delete magic-link tokens past their expires_at — the rows
accumulate without cleanup since the consume endpoint only marks them
consumed but never removes them, and DeleteExpiredMagicLinkTokens (added
in Phase A.3, PR #61) was never wired up.

Mirrors the JWT secret cleanup pattern (ADR-0021).

- pkg/user/magic_link_cleanup.go: MagicLinkCleanupRunner with
  StartCleanupLoop(ctx, interval) and an inner runOnce(ctx) for testability
- pkg/user/magic_link_cleanup_test.go: unit tests cover happy path,
  error propagation, ctx-cancel termination, zero-interval no-op
- pkg/server/server.go: start the loop right after the JWT cleanup loop
- pkg/config/config.go: auth.magic_link.cleanup_interval (default 1h),
  env DLC_AUTH_MAGIC_LINK_CLEANUP_INTERVAL, getter GetMagicLinkCleanupInterval

Mostly authored by Mistral Vibe in batch1-task-b worktree (parallel with
batch1-task-a OIDC config). Mistral hit --max-price 1.50 right before the
final commit step, Claude finished the ship via trainer takeover (Q-045
pattern, 2nd recurrence in two batches — to be addressed in Phase 1bis).
2026-05-05 13:05:15 +02:00

858 lines
27 KiB
Go

//go:generate swag init -g ../../cmd/server/main.go -d ../../pkg/greet,../../pkg/server --parseDependency && mv ../../docs/* ./docs/
package server
import (
"context"
"embed"
"fmt"
"net"
"net/http"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/rs/zerolog/log"
httpSwagger "github.com/swaggo/http-swagger"
"dance-lessons-coach/pkg/cache"
"dance-lessons-coach/pkg/config"
"dance-lessons-coach/pkg/email"
"dance-lessons-coach/pkg/greet"
"dance-lessons-coach/pkg/middleware"
"dance-lessons-coach/pkg/telemetry"
"dance-lessons-coach/pkg/user"
userapi "dance-lessons-coach/pkg/user/api"
"dance-lessons-coach/pkg/validation"
"dance-lessons-coach/pkg/version"
"encoding/json"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
//go:embed docs/swagger.json
var swaggerJSON embed.FS
// CancelableContext wraps a context.Context and exposes a Cancel() method so
// that Server.Run() can cancel readiness during graceful shutdown via the type
// assertion it already performs. Callers that don't need controlled cancellation
// (tests, CLI) can pass a plain context.Background() — the assertion silently
// fails and readiness is never explicitly cancelled, which is harmless.
type CancelableContext struct {
context.Context
cancel context.CancelFunc
}
// NewCancelableContext creates a CancelableContext whose Cancel() method will
// be invoked by Server.Run() at the start of graceful shutdown, before the
// 1-second readiness propagation window. The returned CancelFunc is a no-op
// after Cancel() has been called, so it is safe to defer in main.
func NewCancelableContext(parent context.Context) (*CancelableContext, context.CancelFunc) {
ctx, cancel := context.WithCancel(parent)
return &CancelableContext{Context: ctx, cancel: cancel}, cancel
}
// Cancel satisfies the interface checked in Run() and cancels the context.
func (c *CancelableContext) Cancel() { c.cancel() }
type Server struct {
router *chi.Mux
readyCtx context.Context
withOTEL bool
config *config.Config
tracerProvider *sdktrace.TracerProvider
validator *validation.Validator
userRepo user.UserRepository
userService user.UserService
cacheService cache.Service
startedAt time.Time
}
func NewServer(cfg *config.Config, readyCtx context.Context) *Server {
// Initialize default user repository and services (Postgres from cfg)
userRepo, userService, err := initializeUserServices(cfg)
if err != nil {
log.Warn().Err(err).Msg("Failed to initialize user services, user functionality will be disabled")
}
return NewServerWithUserRepo(cfg, readyCtx, userRepo, userService)
}
// NewServerWithUserRepo builds a Server with caller-provided userRepo + userService.
// Used by BDD test infra to inject a per-scenario repository (e.g., one connected
// to an isolated PostgreSQL schema). Pass nil for both to disable user functionality.
//
// The validator + cache services are still built from cfg internally; they don't
// need per-scenario isolation today.
func NewServerWithUserRepo(cfg *config.Config, readyCtx context.Context, userRepo user.UserRepository, userService user.UserService) *Server {
validator, err := validation.GetValidatorFromConfig(cfg)
if err != nil {
log.Error().Err(err).Msg("Failed to create validator, continuing without validation")
} else {
log.Trace().Msg("Validator created successfully")
}
var cacheService cache.Service
if cfg.GetCacheEnabled() {
cacheService = cache.NewInMemoryService(
time.Duration(cfg.GetCacheDefaultTTLSeconds())*time.Second,
time.Duration(cfg.GetCacheCleanupIntervalSeconds())*time.Second,
)
log.Trace().Msg("Cache service initialized")
} else {
log.Trace().Msg("Cache service disabled")
}
s := &Server{
router: chi.NewRouter(),
readyCtx: readyCtx,
withOTEL: cfg.GetTelemetryEnabled(),
config: cfg,
validator: validator,
userRepo: userRepo,
userService: userService,
cacheService: cacheService,
startedAt: time.Now(),
}
s.setupRoutes()
return s
}
// GetAuthService returns the auth service for test cleanup
// This allows test suites to reset JWT secrets between tests
func (s *Server) GetAuthService() user.AuthService {
return s.userService
}
// GetCacheService returns the cache service for test cleanup
// This allows test suites to flush cache between tests
func (s *Server) GetCacheService() cache.Service {
return s.cacheService
}
// initializeUserServices initializes the user repository and unified user service
func initializeUserServices(cfg *config.Config) (user.UserRepository, user.UserService, error) {
// Create user repository using PostgreSQL
repo, err := user.NewPostgresRepository(cfg)
if err != nil {
return nil, nil, fmt.Errorf("failed to create PostgreSQL user repository: %w", err)
}
// Create JWT config.
// GetTTL is a method value — it captures cfg, so when WatchAndApply
// re-unmarshals into the same Config struct on file changes, every
// subsequent token generation reads the new TTL (ADR-0023 Phase 2).
// ExpirationTime is kept as a static fallback for tests that build
// JWTConfig manually without a Config.
jwtConfig := user.JWTConfig{
Secret: cfg.GetJWTSecret(),
ExpirationTime: 24 * time.Hour,
GetTTL: cfg.GetJWTTTL,
Issuer: "dance-lessons-coach",
}
// Create unified user service
userService := user.NewUserService(repo, jwtConfig, cfg.GetAdminMasterPassword())
return repo, userService, nil
}
func (s *Server) setupRoutes() {
// Use Zerolog middleware instead of Chi's default logger
s.router.Use(chimiddleware.RequestLogger(&chimiddleware.DefaultLogFormatter{
Logger: &log.Logger,
NoColor: false,
}))
// Health endpoint at root level
s.router.Get("/api/health", s.handleHealth)
// Readiness endpoint at root level
s.router.Get("/api/ready", s.handleReadiness)
// Version endpoint at root level
s.router.Get("/api/version", s.handleVersion)
// Kubernetes-style health endpoint at root level
s.router.Get("/api/healthz", s.handleHealthz)
// Info endpoint - composite aggregator
s.router.Get("/api/info", s.handleInfo)
// API routes
s.router.Route("/api/v1", func(r chi.Router) {
r.Use(s.getAllMiddlewares()...)
s.registerApiV1Routes(r)
})
// Admin routes
s.router.Route("/api/admin", func(r chi.Router) {
r.Use(s.getAllMiddlewares()...)
r.Post("/cache/flush", s.handleAdminCacheFlush)
})
// Register v2 routes ALWAYS (ADR-0023 Phase 4 hot-reload). The
// v2EnabledGate middleware checks the live config on every request
// and returns 404 when api.v2_enabled is false. This lets the flag
// be flipped via config hot-reload without a router rebuild.
s.router.Route("/api/v2", func(r chi.Router) {
r.Use(s.getAllMiddlewares()...)
r.Use(s.v2EnabledGate)
s.registerApiV2Routes(r)
})
// Add Swagger UI with embedded spec
// Serve the embedded swagger.json file
s.router.Handle("/swagger/doc.json", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := swaggerJSON.ReadFile("docs/swagger.json")
if err != nil {
log.Error().Err(err).Msg("Failed to read embedded swagger.json")
http.Error(w, "Failed to read swagger.json", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}))
// Setup Swagger UI handler
s.router.Get("/swagger/*", httpSwagger.WrapHandler)
}
func (s *Server) registerApiV1Routes(r chi.Router) {
// Create rate limit middleware
rateLimitMiddleware := middleware.NewRateLimiter(middleware.RateLimitConfig{
Enabled: s.config.GetRateLimitEnabled(),
RequestsPerMinute: s.config.GetRateLimitRequestsPerMinute(),
BurstSize: s.config.GetRateLimitBurstSize(),
})
// Create auth middleware if available
var authMiddleware *AuthMiddleware
if s.userService != nil {
authMiddleware = NewAuthMiddleware(s.userService)
}
r.Route("/greet", func(r chi.Router) {
// Add rate limiting middleware for greet endpoint
r.Use(rateLimitMiddleware.Middleware)
// Add optional authentication middleware
if authMiddleware != nil {
r.Use(authMiddleware.Middleware)
}
r.Get("/", s.handleGreetQuery)
r.Get("/{name}", s.handleGreetPath)
})
// Register user authentication routes
if s.userService != nil && s.userRepo != nil {
// Use unified user service - much simpler!
if s.userService != nil {
handler := userapi.NewAuthHandler(s.userService, s.userService, s.validator)
r.Route("/auth", func(r chi.Router) {
handler.RegisterRoutes(r)
// Magic-link routes (ADR-0028 Phase A). Mounted only when the
// userRepo also implements MagicLinkRepository (PostgresRepository does).
if mlRepo, ok := s.userRepo.(user.MagicLinkRepository); ok {
emailCfg := s.config.GetEmailConfig()
sender := email.NewSMTPSender(email.SMTPConfig{
Host: emailCfg.SMTPHost,
Port: emailCfg.SMTPPort,
Username: emailCfg.SMTPUsername,
Password: emailCfg.SMTPPassword,
UseTLS: emailCfg.SMTPUseTLS,
Timeout: emailCfg.Timeout,
})
mlHandler := userapi.NewMagicLinkHandler(
mlRepo,
s.userService,
s.userRepo,
sender,
s.config.GetMagicLinkConfig(),
emailCfg.From,
s.validator,
)
mlHandler.RegisterRoutes(r)
}
})
// Register admin routes
adminHandler := userapi.NewAdminHandler(s.userService)
r.Route("/admin", func(r chi.Router) {
adminHandler.RegisterRoutes(r)
})
}
}
}
func (s *Server) registerApiV2Routes(r chi.Router) {
greetServiceV2 := greet.NewServiceV2()
greetHandlerV2 := greet.NewApiV2GreetHandler(greetServiceV2, s.validator)
r.Route("/greet", func(r chi.Router) {
greetHandlerV2.RegisterRoutes(r)
})
}
// v2EnabledGate is the middleware that gates the /api/v2/* subtree on the
// live api.v2_enabled config value (ADR-0023 Phase 4 hot-reload). When
// disabled, returns 404 with the same body shape as a missing route would
// emit, so clients see "v2 doesn't exist" rather than "v2 is forbidden".
//
// Flipping the config at runtime via Config.WatchAndApply takes effect on
// the next request — no router rebuild, no restart.
func (s *Server) v2EnabledGate(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !s.config.GetV2Enabled() {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"not_found","message":"v2 API is currently disabled"}`))
return
}
next.ServeHTTP(w, r)
})
}
// getAllMiddlewares returns all middleware including OpenTelemetry if enabled
func (s *Server) getAllMiddlewares() []func(http.Handler) http.Handler {
middlewares := []func(http.Handler) http.Handler{
chimiddleware.StripSlashes,
chimiddleware.Recoverer,
}
if s.withOTEL {
middlewares = append(middlewares, func(next http.Handler) http.Handler {
return otelhttp.NewHandler(next, "")
})
}
return middlewares
}
// handleHealth godoc
//
// @Summary Health check
// @Description Check if the service is healthy
// @Tags System/Health
// @Accept json
// @Produce json
// @Success 200 {object} map[string]string "Service is healthy"
// @Router /health [get]
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
log.Trace().Msg("Health check requested")
w.Write([]byte(`{"status":"healthy"}`))
}
// handleReadiness godoc
//
// @Summary Readiness check
// @Description Check if the service is ready to accept traffic including detailed connection status
// @Tags System/Health
// @Accept json
// @Produce json
// @Success 200 {object} object "Service is ready with connection details"
// @Failure 503 {object} object "Service is not ready with failure details"
// @Router /ready [get]
func (s *Server) handleReadiness(w http.ResponseWriter, r *http.Request) {
log.Trace().Msg("Readiness check requested")
// Check if server is shutting down
select {
case <-s.readyCtx.Done():
log.Trace().Msg("Readiness check: not ready (shutting down)")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]interface{}{
"ready": false,
"reason": "server_shutting_down",
"connections": map[string]interface{}{
"database": "not_checked",
},
})
return
default:
// Server is not shutting down, check all connections
connectionStatus := make(map[string]interface{})
allHealthy := true
var failureReason string
// Check database if available
if s.userRepo != nil {
if err := s.userRepo.CheckDatabaseHealth(r.Context()); err != nil {
log.Warn().Err(err).Msg("Database health check failed")
connectionStatus["database"] = map[string]interface{}{
"status": "unhealthy",
"error": err.Error(),
}
allHealthy = false
failureReason = "database_unhealthy"
} else {
connectionStatus["database"] = map[string]interface{}{
"status": "healthy",
}
}
} else {
connectionStatus["database"] = map[string]interface{}{
"status": "not_configured",
}
}
if allHealthy {
log.Trace().Msg("Readiness check: ready")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"ready": true,
"connections": connectionStatus,
})
} else {
log.Warn().Str("reason", failureReason).Msg("Readiness check: not ready")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]interface{}{
"ready": false,
"reason": failureReason,
"connections": connectionStatus,
})
}
}
}
// handleVersion godoc
//
// @Summary Get API version
// @Description Returns the API version information
// @Tags System/Version
// @Accept plain,json
// @Produce plain,json
// @Param format query string false "Response format (plain, full, json)" Enums(plain, full, json) default(plain)
// @Success 200 {string} string "Version information"
// @Router /version [get]
func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
log.Trace().Msg("Version check requested")
// Get format parameter
format := r.URL.Query().Get("format")
if format == "" {
format = "plain" // default format
}
// Check cache if enabled
cacheKey := "version:" + format
if s.cacheService != nil {
if cached, ok := s.cacheService.Get(cacheKey); ok {
log.Trace().Str("cache_key", cacheKey).Msg("Cache hit for version")
w.Header().Set("Content-Type", "text/plain")
if format == "json" {
w.Header().Set("Content-Type", "application/json")
}
w.Write([]byte(cached.(string)))
return
}
}
// Build response
var response string
switch format {
case "plain":
w.Header().Set("Content-Type", "text/plain")
response = version.Short()
case "full":
w.Header().Set("Content-Type", "text/plain")
response = version.Full()
case "json":
w.Header().Set("Content-Type", "application/json")
response = fmt.Sprintf(`{
"version": "%s",
"commit": "%s",
"built": "%s",
"go": "%s"
}`, version.Version, version.Commit, version.Date, version.GoVersion)
default:
w.Header().Set("Content-Type", "text/plain")
response = version.Short()
}
// Cache the response for 60 seconds if cache is enabled
if s.cacheService != nil {
s.cacheService.Set(cacheKey, response, 60*time.Second)
log.Trace().Str("cache_key", cacheKey).Msg("Cached version response")
}
w.Write([]byte(response))
}
// HealthzResponse represents the Kubernetes-style health check response
type HealthzResponse struct {
Status string `json:"status"`
Version string `json:"version"`
UptimeSeconds int64 `json:"uptime_seconds"`
Timestamp time.Time `json:"timestamp"`
}
// InfoResponse represents the JSON response for /api/info
type InfoResponse struct {
Version string `json:"version"`
CommitShort string `json:"commit_short"`
BuildDate string `json:"build_date"`
UptimeSeconds int64 `json:"uptime_seconds"`
CacheEnabled bool `json:"cache_enabled"`
HealthzStatus string `json:"healthz_status"`
GoVersion string `json:"go_version"`
}
// handleHealthz godoc
//
// @Summary Kubernetes-style health check
// @Description Returns rich health info for liveness/readiness probes
// @Tags System/Health
// @Produce json
// @Success 200 {object} HealthzResponse
// @Router /healthz [get]
func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
log.Trace().Msg("Healthz check requested")
resp := HealthzResponse{
Status: "healthy",
Version: version.Version,
UptimeSeconds: int64(time.Since(s.startedAt).Seconds()),
Timestamp: time.Now().UTC(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// handleInfo godoc
//
// @Summary Get composite info
// @Description Returns aggregated version, build, uptime, cache, and health info
// @Tags System/Info
// @Produce json
// @Success 200 {object} InfoResponse
// @Router /info [get]
func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) {
log.Trace().Msg("Info endpoint requested")
// Build commit_short from version.Commit (first 8 chars if available)
commitShort := version.Commit
if len(commitShort) > 8 {
commitShort = commitShort[:8]
}
// Build response
resp := InfoResponse{
Version: version.Version,
CommitShort: commitShort,
BuildDate: version.Date,
UptimeSeconds: int64(time.Since(s.startedAt).Seconds()),
CacheEnabled: s.cacheService != nil,
HealthzStatus: "healthy",
GoVersion: runtime.Version(),
}
// Cache key
cacheKey := "info:json"
// Check cache if enabled
if s.cacheService != nil {
if cached, ok := s.cacheService.Get(cacheKey); ok {
log.Trace().Str("cache_key", cacheKey).Msg("Cache hit for info")
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache", "HIT")
w.Write([]byte(cached.(string)))
return
}
}
// Marshal response
data, err := json.Marshal(resp)
if err != nil {
http.Error(w, `{"error":"server_error"}`, http.StatusInternalServerError)
return
}
// Cache the response
if s.cacheService != nil {
s.cacheService.Set(cacheKey, string(data),
time.Duration(s.config.GetCacheDefaultTTLSeconds())*time.Second)
w.Header().Set("X-Cache", "MISS")
log.Trace().Str("cache_key", cacheKey).Msg("Cached info response")
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}
// handleGreetQuery godoc
//
// @Summary Get greeting with cache
// @Description Returns greeting for name from query param with caching
// @Tags API/v1/Greeting
// @Accept json
// @Produce json
// @Param name query string false "Name to greet"
// @Success 200 {object} map[string]string "Greeting message"
// @Failure 400 {object} map[string]string "Invalid request"
// @Router /v1/greet [get]
func (s *Server) handleGreetQuery(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
cacheKey := "greet:v1:" + name
// Check cache if enabled
if s.cacheService != nil {
if cached, ok := s.cacheService.Get(cacheKey); ok {
log.Trace().Str("cache_key", cacheKey).Msg("Cache hit for greet")
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache", "HIT")
w.Write([]byte(cached.(string)))
return
}
}
// Compute response
greetService := greet.NewService()
message := greetService.Greet(r.Context(), name)
response, err := json.Marshal(map[string]string{"message": message})
if err != nil {
http.Error(w, `{"error":"server_error"}`, http.StatusInternalServerError)
return
}
// Cache the response for 60 seconds if cache is enabled
if s.cacheService != nil {
s.cacheService.Set(cacheKey, string(response), 60*time.Second)
w.Header().Set("X-Cache", "MISS")
log.Trace().Str("cache_key", cacheKey).Msg("Cached greet response")
}
w.Header().Set("Content-Type", "application/json")
w.Write(response)
}
// handleGreetPath godoc
//
// @Summary Get personalized greeting with cache
// @Description Returns greeting for name from path param with caching
// @Tags API/v1/Greeting
// @Accept json
// @Produce json
// @Param name path string true "Name to greet"
// @Success 200 {object} map[string]string "Greeting message"
// @Failure 400 {object} map[string]string "Invalid request"
// @Router /v1/greet/{name} [get]
func (s *Server) handleGreetPath(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
cacheKey := "greet:v1:" + name
// Check cache if enabled
if s.cacheService != nil {
if cached, ok := s.cacheService.Get(cacheKey); ok {
log.Trace().Str("cache_key", cacheKey).Msg("Cache hit for greet")
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache", "HIT")
w.Write([]byte(cached.(string)))
return
}
}
// Compute response
greetService := greet.NewService()
message := greetService.Greet(r.Context(), name)
response, err := json.Marshal(map[string]string{"message": message})
if err != nil {
http.Error(w, `{"error":"server_error"}`, http.StatusInternalServerError)
return
}
// Cache the response for 60 seconds if cache is enabled
if s.cacheService != nil {
s.cacheService.Set(cacheKey, string(response), 60*time.Second)
w.Header().Set("X-Cache", "MISS")
log.Trace().Str("cache_key", cacheKey).Msg("Cached greet response")
}
w.Header().Set("Content-Type", "application/json")
w.Write(response)
}
// handleAdminCacheFlush godoc
//
// @Summary Flush cache
// @Description Flushes the entire cache, requires admin authentication
// @Tags API/Admin
// @Accept json
// @Produce json
// @Param X-Admin-Password header string true "Admin master password"
// @Success 200 {object} map[string]interface{} "Cache flushed successfully"
// @Failure 401 {object} map[string]string "Unauthorized"
// @Failure 503 {object} map[string]string "Cache disabled"
// @Router /admin/cache/flush [post]
func (s *Server) handleAdminCacheFlush(w http.ResponseWriter, r *http.Request) {
if s.cacheService == nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{"error": "cache_disabled"})
return
}
// Admin auth - check X-Admin-Password header
masterPassword := r.Header.Get("X-Admin-Password")
if masterPassword == "" {
http.Error(w, `{"error":"unauthorized","message":"Admin password required"}`, http.StatusUnauthorized)
return
}
_, err := s.userService.AdminAuthenticate(r.Context(), masterPassword)
if err != nil {
http.Error(w, `{"error":"unauthorized","message":"Invalid admin password"}`, http.StatusUnauthorized)
return
}
itemCount := s.cacheService.ItemCount()
s.cacheService.Flush()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"flushed": true,
"items_flushed": itemCount,
"timestamp": time.Now().UTC().Format(time.RFC3339),
})
}
func (s *Server) Router() http.Handler {
return s.router
}
// Run starts the HTTP server and handles graceful shutdown
func (s *Server) Run() error {
// Initialize OpenTelemetry if enabled
var err error
var telemetrySetup *telemetry.Setup
if s.withOTEL {
log.Trace().Msg("Initializing OpenTelemetry tracing")
telemetrySetup = &telemetry.Setup{
ServiceName: s.config.GetServiceName(),
OTLPEndpoint: s.config.GetOTLPEndpoint(),
Insecure: s.config.GetTelemetryInsecure(),
SamplerType: s.config.GetSamplerType(),
SamplerRatio: s.config.GetSamplerRatio(),
Version: version.Short(),
}
if s.tracerProvider, err = telemetrySetup.InitializeTracing(context.Background()); err != nil {
log.Error().Err(err).Msg("Failed to initialize OpenTelemetry, continuing without tracing")
s.withOTEL = false
telemetrySetup = nil
} else {
log.Trace().Msg("OpenTelemetry tracing initialized successfully")
}
}
// Setup signal context for graceful shutdown
rootCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Create ongoing context for active requests
ongoingCtx, stopOngoingGracefully := context.WithCancel(context.Background())
defer stopOngoingGracefully()
// Start the JWT secret cleanup loop (ADR-0021). The loop runs until rootCtx
// is cancelled (graceful shutdown), removing non-primary secrets whose
// ExpiresAt is in the past.
if s.userService != nil {
s.userService.StartJWTSecretCleanupLoop(rootCtx, s.config.GetJWTSecretCleanupInterval())
}
// Start the magic-link expired-token cleanup loop (ADR-0028 Phase A consequence).
if mlRepo, ok := s.userRepo.(user.MagicLinkRepository); ok {
runner := user.NewMagicLinkCleanupRunner(mlRepo)
runner.StartCleanupLoop(rootCtx, s.config.GetMagicLinkCleanupInterval())
}
// Wire the sampler hot-reload callback (ADR-0023 Phase 3, sub-phase 3.3).
// telemetrySetup is non-nil only when telemetry was successfully initialized
// at startup — hot-reloading telemetry-on is out of scope (see ADR-0023).
// The callback updates the SamplerType/Ratio on the captured Setup, then
// rebuilds the global tracer provider via ReconfigureTracerProvider.
if telemetrySetup != nil {
s.config.SetSamplerReconfigureCallback(func(ctx context.Context, samplerType string, samplerRatio float64) error {
telemetrySetup.SamplerType = samplerType
telemetrySetup.SamplerRatio = samplerRatio
newTP, rerr := telemetrySetup.ReconfigureTracerProvider(ctx, s.tracerProvider)
if rerr != nil {
return rerr
}
if newTP != nil {
s.tracerProvider = newTP
}
return nil
})
}
// Start config hot-reload watcher (ADR-0023 Phase 1+2+3).
// Stops automatically on rootCtx cancellation.
s.config.WatchAndApply(rootCtx)
// Create HTTP server
log.Trace().Str("address", s.config.GetServerAddress()).Msg("Server running")
srv := &http.Server{
Addr: s.config.GetServerAddress(),
Handler: s.router,
BaseContext: func(_ net.Listener) context.Context {
return ongoingCtx
},
}
// Start the HTTP server in a separate goroutine
serverErrors := make(chan error, 1)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErrors <- err
}
close(serverErrors)
}()
// Wait for signal
<-rootCtx.Done()
stop()
log.Trace().Msg("Shutdown signal received")
// Cancel readiness context to stop accepting new requests
if cancelReady, ok := s.readyCtx.(interface{ Cancel() }); ok {
cancelReady.Cancel()
}
log.Trace().Msg("Readiness set to false, no longer accepting new requests")
// Give time for readiness check to propagate
time.Sleep(1 * time.Second)
log.Trace().Msg("Readiness check propagated, now waiting for ongoing requests to finish.")
// Create shutdown context with timeout from config
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), s.config.Shutdown.Timeout)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("Server shutdown failed")
return err
}
log.Trace().Msg("Server shutdown complete")
// Shutdown OpenTelemetry tracer provider
if s.tracerProvider != nil {
if err := telemetry.Shutdown(context.Background(), s.tracerProvider); err != nil {
log.Error().Err(err).Msg("Failed to shutdown OpenTelemetry tracer provider")
} else {
log.Trace().Msg("OpenTelemetry tracer provider shutdown complete")
}
}
// Return any server errors
if err, ok := <-serverErrors; ok {
return err
}
return nil
}