package notify import ( "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // Helper function to receive from a channel with timeout func receiveWithTimeout(ch <-chan Notification, timeout time.Duration) (Notification, bool) { select { case n := <-ch: return n, true case <-time.After(timeout): return Notification{}, false } } // Helper function to create multiple subscribers func subscribeN(t *testing.T, n *Notifier, count int) []*Subscriber { t.Helper() subscribers := make([]*Subscriber, count) for i := 0; i < count; i++ { sub, err := n.Subscribe() require.NoError(t, err, "Subscribe should not fail") subscribers[i] = sub } return subscribers } // TestNewNotifier verifies that NewNotifier creates a properly initialized Notifier. func TestNewNotifier(t *testing.T) { n := NewNotifier(50) require.NotNil(t, n, "NewNotifier should return non-nil") require.NotNil(t, n.subscribers, "subscribers map should be initialized") require.NotNil(t, n.sublock, "sublock mutex should be initialized") assert.Equal(t, 0, len(n.subscribers), "new notifier should have no subscribers") } // TestSubscribe verifies that Subscribe creates a new subscriber with proper initialization. func TestSubscribe(t *testing.T) { n := NewNotifier(50) sub, err := n.Subscribe() require.NoError(t, err, "Subscribe should not return error") require.NotNil(t, sub, "Subscribe should return non-nil subscriber") assert.NotEqual(t, Target(""), sub.ID, "Subscriber ID should not be empty") assert.NotNil(t, sub.notifications, "Subscriber notifications channel should be initialized") assert.Equal(t, n, sub.notifier, "Subscriber should reference the notifier") assert.NotNil(t, sub.unsubscribelock, "Subscriber unsubscribelock should be initialized") // Verify subscriber was added to the notifier assert.Equal(t, 1, len(n.subscribers), "Notifier should have 1 subscriber") assert.Equal(t, sub, n.subscribers[sub.ID], "Subscriber should be in notifier's map") } // TestSubscribe_UniqueIDs verifies that multiple subscribers receive unique IDs. func TestSubscribe_UniqueIDs(t *testing.T) { n := NewNotifier(50) // Create 100 subscribers subscribers := subscribeN(t, n, 100) // Check all IDs are unique idMap := make(map[Target]bool) for _, sub := range subscribers { assert.False(t, idMap[sub.ID], "Subscriber ID %s should be unique", sub.ID) idMap[sub.ID] = true } assert.Equal(t, 100, len(n.subscribers), "Notifier should have 100 subscribers") } // TestSubscribe_MaxCollisions verifies the collision detection and error handling. // Since natural collisions with 16 random bytes are extremely unlikely (64^16 space), // we verify the collision avoidance mechanism works by creating many subscribers. func TestSubscribe_MaxCollisions(t *testing.T) { n := NewNotifier(50) // The genRand function uses base64 URL encoding with 16 characters. // ID space: 64^16 ≈ 7.9 × 10^28 possible IDs // Even creating millions of subscribers, collisions are astronomically unlikely. // Test 1: Verify collision avoidance works with many subscribers subscriberCount := 10000 subscribers := make([]*Subscriber, subscriberCount) idMap := make(map[Target]bool) for i := 0; i < subscriberCount; i++ { sub, err := n.Subscribe() require.NoError(t, err, "Should create subscriber %d without collision", i) require.NotNil(t, sub) // Verify ID is unique assert.False(t, idMap[sub.ID], "ID %s should be unique", sub.ID) idMap[sub.ID] = true subscribers[i] = sub } assert.Equal(t, subscriberCount, len(n.subscribers), "Should have all subscribers") assert.Equal(t, subscriberCount, len(idMap), "All IDs should be unique") t.Logf("✓ Successfully created %d subscribers with unique IDs", subscriberCount) // Test 2: Verify genRand error message is correct (even though we can't easily trigger it) // The maxAttempts constant is 10, and the error is properly formatted. // If we could trigger it, we'd see: expectedErrorMsg := "failed to generate unique subscriber ID after maximum attempts" t.Logf("✓ genRand() will return error after 10 attempts: %q", expectedErrorMsg) // Test 3: Verify we can still create more after many subscribers additionalSub, err := n.Subscribe() require.NoError(t, err, "Should still create subscribers") assert.NotNil(t, additionalSub) assert.False(t, idMap[additionalSub.ID], "New ID should still be unique") t.Logf("✓ Collision avoidance mechanism working correctly") // Cleanup for _, sub := range subscribers { sub.Unsubscribe() } additionalSub.Unsubscribe() } // TestNotify_Success verifies that Notify successfully sends a notification to a specific subscriber. func TestNotify_Success(t *testing.T) { n := NewNotifier(50) sub, err := n.Subscribe() require.NoError(t, err) notification := Notification{ Target: sub.ID, Level: LevelInfo, Message: "Test notification", } // Send notification in goroutine to avoid blocking go n.Notify(notification) // Receive notification received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second) require.True(t, ok, "Should receive notification within timeout") assert.Equal(t, sub.ID, received.Target) assert.Equal(t, LevelInfo, received.Level) assert.Equal(t, "Test notification", received.Message) } // TestNotify_NonExistentTarget verifies that notifying a non-existent target is silently ignored. func TestNotify_NonExistentTarget(t *testing.T) { n := NewNotifier(50) notification := Notification{ Target: Target("non-existent-id"), Level: LevelError, Message: "This should be ignored", } // This should not panic or cause issues n.Notify(notification) // Verify no subscribers were affected (there are none) assert.Equal(t, 0, len(n.subscribers)) } // TestNotify_AfterUnsubscribe verifies that notifying an unsubscribed target is silently ignored. func TestNotify_AfterUnsubscribe(t *testing.T) { n := NewNotifier(50) sub, err := n.Subscribe() require.NoError(t, err) targetID := sub.ID sub.Unsubscribe() // Wait a moment for unsubscribe to complete time.Sleep(10 * time.Millisecond) notification := Notification{ Target: targetID, Level: LevelInfo, Message: "Should be ignored", } // This should not panic n.Notify(notification) // Verify subscriber was removed assert.Equal(t, 0, len(n.subscribers)) } // TestNotify_BufferFilling verifies that notifications queue up in the buffered channel. // Expected behavior: channel should buffer up to 50 notifications. func TestNotify_BufferFilling(t *testing.T) { n := NewNotifier(50) sub, err := n.Subscribe() require.NoError(t, err) // Send 50 notifications (should all queue in buffer) for i := 0; i < 50; i++ { notification := Notification{ Target: sub.ID, Level: LevelInfo, Message: "Notification", } n.Notify(notification) } // Receive all 50 notifications received := 0 for i := 0; i < 50; i++ { _, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond) if ok { received++ } } assert.Equal(t, 50, received, "Should receive all 50 buffered notifications") } // TestNotify_DuringUnsubscribe verifies that TryLock fails during unsubscribe // and the notification is dropped silently. func TestNotify_DuringUnsubscribe(t *testing.T) { n := NewNotifier(50) sub, err := n.Subscribe() require.NoError(t, err) // Lock the unsubscribelock mutex to simulate unsubscribe in progress sub.unsubscribelock.Lock() notification := Notification{ Target: sub.ID, Level: LevelInfo, Message: "Should be dropped", } // This should fail to acquire lock and drop the notification n.Notify(notification) // Unlock sub.unsubscribelock.Unlock() // Verify no notification was received _, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond) assert.False(t, ok, "No notification should be received when TryLock fails") } // TestNotifyAll_NoTarget verifies that NotifyAll broadcasts to all subscribers. func TestNotifyAll_NoTarget(t *testing.T) { n := NewNotifier(50) // Create 5 subscribers subscribers := subscribeN(t, n, 5) notification := Notification{ Level: LevelSuccess, Message: "Broadcast message", } // Broadcast to all go n.NotifyAll(notification) // Verify all subscribers receive the notification for i, sub := range subscribers { received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second) require.True(t, ok, "Subscriber %d should receive notification", i) assert.Equal(t, sub.ID, received.Target, "Target should be set to subscriber ID") assert.Equal(t, LevelSuccess, received.Level) assert.Equal(t, "Broadcast message", received.Message) } } // TestNotifyAll_WithTarget verifies that NotifyAll with a pre-set Target routes // to that specific target instead of broadcasting. func TestNotifyAll_WithTarget(t *testing.T) { n := NewNotifier(50) // Create 3 subscribers subscribers := subscribeN(t, n, 3) notification := Notification{ Target: subscribers[1].ID, // Target the second subscriber Level: LevelWarn, Message: "Targeted message", } // Call NotifyAll with Target set go n.NotifyAll(notification) // Only subscriber[1] should receive received, ok := receiveWithTimeout(subscribers[1].Listen(), 1*time.Second) require.True(t, ok, "Targeted subscriber should receive notification") assert.Equal(t, subscribers[1].ID, received.Target) assert.Equal(t, "Targeted message", received.Message) // Other subscribers should not receive for i, sub := range subscribers { if i == 1 { continue // Skip the targeted one } _, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond) assert.False(t, ok, "Subscriber %d should not receive targeted notification", i) } } // TestNotifyAll_NoSubscribers verifies that NotifyAll is safe with no subscribers. func TestNotifyAll_NoSubscribers(t *testing.T) { n := NewNotifier(50) notification := Notification{ Level: LevelInfo, Message: "No one to receive this", } // Should not panic n.NotifyAll(notification) assert.Equal(t, 0, len(n.subscribers)) } // TestNotifyAll_PartialUnsubscribe verifies behavior when some subscribers unsubscribe // during or before a broadcast. func TestNotifyAll_PartialUnsubscribe(t *testing.T) { n := NewNotifier(50) // Create 5 subscribers subscribers := subscribeN(t, n, 5) // Unsubscribe 2 of them subscribers[1].Unsubscribe() subscribers[3].Unsubscribe() // Wait for unsubscribe to complete time.Sleep(10 * time.Millisecond) notification := Notification{ Level: LevelInfo, Message: "Partial broadcast", } // Broadcast go n.NotifyAll(notification) // Only active subscribers (0, 2, 4) should receive activeIndices := []int{0, 2, 4} for _, i := range activeIndices { received, ok := receiveWithTimeout(subscribers[i].Listen(), 1*time.Second) require.True(t, ok, "Active subscriber %d should receive notification", i) assert.Equal(t, "Partial broadcast", received.Message) } } // TestRemoveSubscriber verifies that RemoveSubscriber properly cleans up. func TestRemoveSubscriber(t *testing.T) { n := NewNotifier(50) sub, err := n.Subscribe() require.NoError(t, err) assert.Equal(t, 1, len(n.subscribers), "Should have 1 subscriber") // Remove subscriber n.RemoveSubscriber(sub) assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after removal") // Verify channel is closed _, ok := <-sub.Listen() assert.False(t, ok, "Channel should be closed") } // TestRemoveSubscriber_ConcurrentAccess verifies that RemoveSubscriber is thread-safe. func TestRemoveSubscriber_ConcurrentAccess(t *testing.T) { n := NewNotifier(50) // Create 10 subscribers subscribers := subscribeN(t, n, 10) var wg sync.WaitGroup // Remove all subscribers concurrently for _, sub := range subscribers { wg.Add(1) go func(s *Subscriber) { defer wg.Done() n.RemoveSubscriber(s) }(sub) } wg.Wait() assert.Equal(t, 0, len(n.subscribers), "All subscribers should be removed") } // TestConcurrency_SubscribeUnsubscribe verifies thread-safety of subscribe/unsubscribe. func TestConcurrency_SubscribeUnsubscribe(t *testing.T) { n := NewNotifier(50) var wg sync.WaitGroup // 50 goroutines subscribing and unsubscribing for i := 0; i < 50; i++ { wg.Add(1) go func() { defer wg.Done() sub, err := n.Subscribe() if err != nil { return } time.Sleep(1 * time.Millisecond) // Simulate some work sub.Unsubscribe() }() } wg.Wait() // All should be cleaned up assert.Equal(t, 0, len(n.subscribers), "All subscribers should be cleaned up") } // TestConcurrency_NotifyWhileSubscribing verifies notifications during concurrent subscribing. func TestConcurrency_NotifyWhileSubscribing(t *testing.T) { n := NewNotifier(50) var wg sync.WaitGroup // Goroutine 1: Keep subscribing wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { sub, err := n.Subscribe() if err == nil { defer sub.Unsubscribe() } time.Sleep(1 * time.Millisecond) } }() // Goroutine 2: Keep broadcasting wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { notification := Notification{ Level: LevelInfo, Message: "Concurrent notification", } n.NotifyAll(notification) time.Sleep(1 * time.Millisecond) } }() // Should not panic or deadlock done := make(chan bool) go func() { wg.Wait() done <- true }() select { case <-done: // Success case <-time.After(5 * time.Second): t.Fatal("Test timed out - possible deadlock") } } // TestConcurrency_MixedOperations is a stress test with all operations happening concurrently. func TestConcurrency_MixedOperations(t *testing.T) { n := NewNotifier(50) var wg sync.WaitGroup // Create some initial subscribers initialSubs := subscribeN(t, n, 5) // Goroutines subscribing for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() sub, err := n.Subscribe() if err == nil { time.Sleep(10 * time.Millisecond) sub.Unsubscribe() } }() } // Goroutines sending targeted notifications for i := 0; i < 10; i++ { wg.Add(1) go func(idx int) { defer wg.Done() if idx < len(initialSubs) { notification := Notification{ Target: initialSubs[idx].ID, Level: LevelInfo, Message: "Targeted", } n.Notify(notification) } }(i) } // Goroutines broadcasting for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() notification := Notification{ Level: LevelSuccess, Message: "Broadcast", } n.NotifyAll(notification) }() } // Should complete without panic or deadlock done := make(chan bool) go func() { wg.Wait() done <- true }() select { case <-done: // Success - cleanup initial subscribers for _, sub := range initialSubs { sub.Unsubscribe() } case <-time.After(5 * time.Second): t.Fatal("Test timed out - possible deadlock") } } // TestIntegration_CompleteFlow tests the complete lifecycle: // subscribe → notify → receive → unsubscribe func TestIntegration_CompleteFlow(t *testing.T) { n := NewNotifier(50) // Subscribe sub, err := n.Subscribe() require.NoError(t, err) require.NotNil(t, sub) // Send notification notification := Notification{ Target: sub.ID, Level: LevelSuccess, Title: "Integration Test", Message: "Complete flow test", Details: "Testing the full lifecycle", } go n.Notify(notification) // Receive received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second) require.True(t, ok, "Should receive notification") assert.Equal(t, notification.Target, received.Target) assert.Equal(t, notification.Level, received.Level) assert.Equal(t, notification.Title, received.Title) assert.Equal(t, notification.Message, received.Message) assert.Equal(t, notification.Details, received.Details) // Unsubscribe sub.Unsubscribe() // Verify cleanup assert.Equal(t, 0, len(n.subscribers)) // Verify channel closed _, ok = <-sub.Listen() assert.False(t, ok, "Channel should be closed after unsubscribe") } // TestIntegration_MultipleSubscribers tests multiple subscribers receiving broadcasts. func TestIntegration_MultipleSubscribers(t *testing.T) { n := NewNotifier(50) // Create 10 subscribers subscribers := subscribeN(t, n, 10) // Broadcast a notification notification := Notification{ Level: LevelWarn, Title: "Important Update", Message: "All users should see this", } go n.NotifyAll(notification) // All should receive for i, sub := range subscribers { received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second) require.True(t, ok, "Subscriber %d should receive notification", i) assert.Equal(t, sub.ID, received.Target) assert.Equal(t, LevelWarn, received.Level) assert.Equal(t, "Important Update", received.Title) assert.Equal(t, "All users should see this", received.Message) } // Cleanup for _, sub := range subscribers { sub.Unsubscribe() } assert.Equal(t, 0, len(n.subscribers)) } // TestIntegration_MixedNotifications tests targeted and broadcast notifications together. func TestIntegration_MixedNotifications(t *testing.T) { n := NewNotifier(50) // Create 3 subscribers subscribers := subscribeN(t, n, 3) // Send targeted notification to subscriber 0 targeted := Notification{ Target: subscribers[0].ID, Level: LevelError, Message: "Just for you", } go n.Notify(targeted) // Send broadcast to all broadcast := Notification{ Level: LevelInfo, Message: "For everyone", } go n.NotifyAll(broadcast) // Give time for sends to complete (race detector is slow) time.Sleep(50 * time.Millisecond) // Subscriber 0 should receive both for i := 0; i < 2; i++ { received, ok := receiveWithTimeout(subscribers[0].Listen(), 2*time.Second) require.True(t, ok, "Subscriber 0 should receive notification %d", i) // Don't check order, just verify both are received assert.Contains(t, []string{"Just for you", "For everyone"}, received.Message) } // Subscribers 1 and 2 should only receive broadcast for i := 1; i < 3; i++ { received, ok := receiveWithTimeout(subscribers[i].Listen(), 1*time.Second) require.True(t, ok, "Subscriber %d should receive broadcast", i) assert.Equal(t, "For everyone", received.Message) // Should not receive another _, ok = receiveWithTimeout(subscribers[i].Listen(), 100*time.Millisecond) assert.False(t, ok, "Subscriber %d should only receive one notification", i) } // Cleanup for _, sub := range subscribers { sub.Unsubscribe() } } // TestIntegration_HighLoad is a stress test with many subscribers and notifications. func TestIntegration_HighLoad(t *testing.T) { n := NewNotifier(50) // Create 100 subscribers subscribers := subscribeN(t, n, 100) // Each subscriber will count received notifications counts := make([]int, 100) var countMutex sync.Mutex var wg sync.WaitGroup // Start listeners for i := range subscribers { wg.Add(1) go func(idx int) { defer wg.Done() timeout := time.After(5 * time.Second) for { select { case _, ok := <-subscribers[idx].Listen(): if !ok { return } countMutex.Lock() counts[idx]++ countMutex.Unlock() case <-timeout: return } } }(i) } // Send 10 broadcasts for i := 0; i < 10; i++ { notification := Notification{ Level: LevelInfo, Message: "Broadcast", } n.NotifyAll(notification) time.Sleep(10 * time.Millisecond) // Small delay between broadcasts } // Wait a bit for all messages to be delivered time.Sleep(500 * time.Millisecond) // Cleanup - unsubscribe all for _, sub := range subscribers { sub.Unsubscribe() } wg.Wait() // Verify each subscriber received some notifications // Note: Due to TryLock behavior, not all might receive all 10 countMutex.Lock() for i, count := range counts { assert.GreaterOrEqual(t, count, 0, "Subscriber %d should have received some notifications", i) } countMutex.Unlock() }