package parallel import ( "fmt" "runtime" "sync" "time" "github.com/rs/zerolog/log" ) // ResourceMonitor monitors system resources during parallel test execution type ResourceMonitor struct { startTime time.Time maxMemoryMB float64 maxGoroutines int checkInterval time.Duration stopChan chan bool wg sync.WaitGroup mutex sync.Mutex } // NewResourceMonitor creates a new resource monitor type ResourceStats struct { MemoryMB float64 Goroutines int CPUUsage float64 TestDuration time.Duration } func NewResourceMonitor(interval time.Duration) *ResourceMonitor { return &ResourceMonitor{ checkInterval: interval, stopChan: make(chan bool), } } // StartMonitoring starts monitoring system resources func (rm *ResourceMonitor) StartMonitoring() { rm.startTime = time.Now() rm.wg.Add(1) go func() { defer rm.wg.Done() ticker := time.NewTicker(rm.checkInterval) defer ticker.Stop() for { select { case <-rm.stopChan: return case <-ticker.C: rm.checkResources() } } }() } // StopMonitoring stops the resource monitor func (rm *ResourceMonitor) StopMonitoring() { close(rm.stopChan) rm.wg.Wait() } // checkResources checks current system resource usage func (rm *ResourceMonitor) checkResources() { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) currentMemoryMB := float64(memStats.Alloc) / 1024 / 1024 currentGoroutines := runtime.NumGoroutine() rm.mutex.Lock() if currentMemoryMB > rm.maxMemoryMB { rm.maxMemoryMB = currentMemoryMB } if currentGoroutines > rm.maxGoroutines { rm.maxGoroutines = currentGoroutines } rm.mutex.Unlock() log.Debug(). Float64("memory_mb", currentMemoryMB). Int("goroutines", currentGoroutines). Msg("Resource usage update") } // GetResourceStats gets the collected resource statistics func (rm *ResourceMonitor) GetResourceStats() ResourceStats { rm.mutex.Lock() defer rm.mutex.Unlock() return ResourceStats{ MemoryMB: rm.maxMemoryMB, Goroutines: rm.maxGoroutines, TestDuration: time.Since(rm.startTime), } } // LogResourceSummary logs a summary of resource usage func (rm *ResourceMonitor) LogResourceSummary() { stats := rm.GetResourceStats() log.Info(). Float64("max_memory_mb", stats.MemoryMB). Int("max_goroutines", stats.Goroutines). Str("duration", stats.TestDuration.String()). Msg("Parallel Test Resource Usage Summary") } // CheckResourceLimits checks if resource usage exceeds specified limits func (rm *ResourceMonitor) CheckResourceLimits(maxMemoryMB float64, maxGoroutines int) (bool, string) { stats := rm.GetResourceStats() if stats.MemoryMB > maxMemoryMB { return false, fmt.Sprintf("Memory limit exceeded: %.1fMB > %.1fMB", stats.MemoryMB, maxMemoryMB) } if stats.Goroutines > maxGoroutines { return false, fmt.Sprintf("Goroutine limit exceeded: %d > %d", stats.Goroutines, maxGoroutines) } return true, "Within resource limits" } // MonitorTestExecution monitors a single test execution with timeout func MonitorTestExecution(testName string, timeout time.Duration, testFunc func() error) error { done := make(chan error, 1) // Start the test in a goroutine go func() { done <- testFunc() }() // Wait for test completion or timeout select { case err := <-done: return err case <-time.After(timeout): return fmt.Errorf("test '%s' exceeded timeout of %v", testName, timeout) } } // ParallelTestRunner runs multiple tests in parallel with resource monitoring type ParallelTestRunner struct { maxParallel int semaphore chan struct{} monitor *ResourceMonitor } // NewParallelTestRunner creates a new parallel test runner func NewParallelTestRunner(maxParallel int) *ParallelTestRunner { return &ParallelTestRunner{ maxParallel: maxParallel, semaphore: make(chan struct{}, maxParallel), monitor: NewResourceMonitor(1 * time.Second), } } // RunTestsInParallel runs tests in parallel func (ptr *ParallelTestRunner) RunTestsInParallel(tests []func() error) ([]error, error) { var errors []error var mutex sync.Mutex ptr.monitor.StartMonitoring() defer ptr.monitor.StopMonitoring() var wg sync.WaitGroup for _, test := range tests { wg.Add(1) // Acquire semaphore slot ptr.semaphore <- struct{}{} go func(t func() error) { defer wg.Done() defer func() { <-ptr.semaphore }() if err := t(); err != nil { mutex.Lock() errors = append(errors, err) mutex.Unlock() } }(test) } wg.Wait() ptr.monitor.LogResourceSummary() if len(errors) > 0 { return errors, fmt.Errorf("%d tests failed", len(errors)) } return nil, nil }