- Added port management system with PortManager for parallel execution - Implemented resource monitoring with ResourceMonitor and ParallelTestRunner - Created test-all-features-parallel.sh for parallel feature test execution - Added comprehensive BDD_TAGS.md documentation for tag usage - Implemented port allocation, conflict detection, and resource tracking - Added timeout detection and controlled parallelism Generated by Mistral Vibe. Co-Authored-By: Mistral Vibe <vibe@mistral.ai>
199 lines
4.5 KiB
Go
199 lines
4.5 KiB
Go
package parallel
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// ResourceMonitor monitors system resources during parallel test execution
|
|
type ResourceMonitor struct {
|
|
startTime time.Time
|
|
maxMemoryMB float64
|
|
maxGoroutines int
|
|
checkInterval time.Duration
|
|
stopChan chan bool
|
|
wg sync.WaitGroup
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// NewResourceMonitor creates a new resource monitor
|
|
type ResourceStats struct {
|
|
MemoryMB float64
|
|
Goroutines int
|
|
CPUUsage float64
|
|
TestDuration time.Duration
|
|
}
|
|
|
|
func NewResourceMonitor(interval time.Duration) *ResourceMonitor {
|
|
return &ResourceMonitor{
|
|
checkInterval: interval,
|
|
stopChan: make(chan bool),
|
|
}
|
|
}
|
|
|
|
// StartMonitoring starts monitoring system resources
|
|
func (rm *ResourceMonitor) StartMonitoring() {
|
|
rm.startTime = time.Now()
|
|
rm.wg.Add(1)
|
|
|
|
go func() {
|
|
defer rm.wg.Done()
|
|
|
|
ticker := time.NewTicker(rm.checkInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-rm.stopChan:
|
|
return
|
|
case <-ticker.C:
|
|
rm.checkResources()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// StopMonitoring stops the resource monitor
|
|
func (rm *ResourceMonitor) StopMonitoring() {
|
|
close(rm.stopChan)
|
|
rm.wg.Wait()
|
|
}
|
|
|
|
// checkResources checks current system resource usage
|
|
func (rm *ResourceMonitor) checkResources() {
|
|
var memStats runtime.MemStats
|
|
runtime.ReadMemStats(&memStats)
|
|
|
|
currentMemoryMB := float64(memStats.Alloc) / 1024 / 1024
|
|
currentGoroutines := runtime.NumGoroutine()
|
|
|
|
rm.mutex.Lock()
|
|
if currentMemoryMB > rm.maxMemoryMB {
|
|
rm.maxMemoryMB = currentMemoryMB
|
|
}
|
|
if currentGoroutines > rm.maxGoroutines {
|
|
rm.maxGoroutines = currentGoroutines
|
|
}
|
|
rm.mutex.Unlock()
|
|
|
|
log.Debug().
|
|
Float64("memory_mb", currentMemoryMB).
|
|
Int("goroutines", currentGoroutines).
|
|
Msg("Resource usage update")
|
|
}
|
|
|
|
// GetResourceStats gets the collected resource statistics
|
|
func (rm *ResourceMonitor) GetResourceStats() ResourceStats {
|
|
rm.mutex.Lock()
|
|
defer rm.mutex.Unlock()
|
|
|
|
return ResourceStats{
|
|
MemoryMB: rm.maxMemoryMB,
|
|
Goroutines: rm.maxGoroutines,
|
|
TestDuration: time.Since(rm.startTime),
|
|
}
|
|
}
|
|
|
|
// LogResourceSummary logs a summary of resource usage
|
|
func (rm *ResourceMonitor) LogResourceSummary() {
|
|
stats := rm.GetResourceStats()
|
|
|
|
log.Info().
|
|
Float64("max_memory_mb", stats.MemoryMB).
|
|
Int("max_goroutines", stats.Goroutines).
|
|
Str("duration", stats.TestDuration.String()).
|
|
Msg("Parallel Test Resource Usage Summary")
|
|
}
|
|
|
|
// CheckResourceLimits checks if resource usage exceeds specified limits
|
|
func (rm *ResourceMonitor) CheckResourceLimits(maxMemoryMB float64, maxGoroutines int) (bool, string) {
|
|
stats := rm.GetResourceStats()
|
|
|
|
if stats.MemoryMB > maxMemoryMB {
|
|
return false, fmt.Sprintf("Memory limit exceeded: %.1fMB > %.1fMB", stats.MemoryMB, maxMemoryMB)
|
|
}
|
|
|
|
if stats.Goroutines > maxGoroutines {
|
|
return false, fmt.Sprintf("Goroutine limit exceeded: %d > %d", stats.Goroutines, maxGoroutines)
|
|
}
|
|
|
|
return true, "Within resource limits"
|
|
}
|
|
|
|
// MonitorTestExecution monitors a single test execution with timeout
|
|
func MonitorTestExecution(testName string, timeout time.Duration, testFunc func() error) error {
|
|
done := make(chan error, 1)
|
|
|
|
// Start the test in a goroutine
|
|
go func() {
|
|
done <- testFunc()
|
|
}()
|
|
|
|
// Wait for test completion or timeout
|
|
select {
|
|
case err := <-done:
|
|
return err
|
|
case <-time.After(timeout):
|
|
return fmt.Errorf("test '%s' exceeded timeout of %v", testName, timeout)
|
|
}
|
|
}
|
|
|
|
// ParallelTestRunner runs multiple tests in parallel with resource monitoring
|
|
type ParallelTestRunner struct {
|
|
maxParallel int
|
|
semaphore chan struct{}
|
|
monitor *ResourceMonitor
|
|
}
|
|
|
|
// NewParallelTestRunner creates a new parallel test runner
|
|
func NewParallelTestRunner(maxParallel int) *ParallelTestRunner {
|
|
return &ParallelTestRunner{
|
|
maxParallel: maxParallel,
|
|
semaphore: make(chan struct{}, maxParallel),
|
|
monitor: NewResourceMonitor(1 * time.Second),
|
|
}
|
|
}
|
|
|
|
// RunTestsInParallel runs tests in parallel
|
|
func (ptr *ParallelTestRunner) RunTestsInParallel(tests []func() error) ([]error, error) {
|
|
var errors []error
|
|
var mutex sync.Mutex
|
|
|
|
ptr.monitor.StartMonitoring()
|
|
defer ptr.monitor.StopMonitoring()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for _, test := range tests {
|
|
wg.Add(1)
|
|
|
|
// Acquire semaphore slot
|
|
ptr.semaphore <- struct{}{}
|
|
|
|
go func(t func() error) {
|
|
defer wg.Done()
|
|
defer func() { <-ptr.semaphore }()
|
|
|
|
if err := t(); err != nil {
|
|
mutex.Lock()
|
|
errors = append(errors, err)
|
|
mutex.Unlock()
|
|
}
|
|
}(test)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
ptr.monitor.LogResourceSummary()
|
|
|
|
if len(errors) > 0 {
|
|
return errors, fmt.Errorf("%d tests failed", len(errors))
|
|
}
|
|
|
|
return nil, nil
|
|
}
|