Files
golib/notify/notifier_test.go
2026-01-24 20:35:40 +11:00

726 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package notify
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Helper function to receive from a channel with timeout
func receiveWithTimeout(ch <-chan Notification, timeout time.Duration) (Notification, bool) {
select {
case n := <-ch:
return n, true
case <-time.After(timeout):
return Notification{}, false
}
}
// Helper function to create multiple subscribers
func subscribeN(t *testing.T, n *Notifier, count int) []*Subscriber {
t.Helper()
subscribers := make([]*Subscriber, count)
for i := 0; i < count; i++ {
sub, err := n.Subscribe()
require.NoError(t, err, "Subscribe should not fail")
subscribers[i] = sub
}
return subscribers
}
// TestNewNotifier verifies that NewNotifier creates a properly initialized Notifier.
func TestNewNotifier(t *testing.T) {
n := NewNotifier(50)
require.NotNil(t, n, "NewNotifier should return non-nil")
require.NotNil(t, n.subscribers, "subscribers map should be initialized")
require.NotNil(t, n.sublock, "sublock mutex should be initialized")
assert.Equal(t, 0, len(n.subscribers), "new notifier should have no subscribers")
}
// TestSubscribe verifies that Subscribe creates a new subscriber with proper initialization.
func TestSubscribe(t *testing.T) {
n := NewNotifier(50)
sub, err := n.Subscribe()
require.NoError(t, err, "Subscribe should not return error")
require.NotNil(t, sub, "Subscribe should return non-nil subscriber")
assert.NotEqual(t, Target(""), sub.ID, "Subscriber ID should not be empty")
assert.NotNil(t, sub.notifications, "Subscriber notifications channel should be initialized")
assert.Equal(t, n, sub.notifier, "Subscriber should reference the notifier")
assert.NotNil(t, sub.unsubscribelock, "Subscriber unsubscribelock should be initialized")
// Verify subscriber was added to the notifier
assert.Equal(t, 1, len(n.subscribers), "Notifier should have 1 subscriber")
assert.Equal(t, sub, n.subscribers[sub.ID], "Subscriber should be in notifier's map")
}
// TestSubscribe_UniqueIDs verifies that multiple subscribers receive unique IDs.
func TestSubscribe_UniqueIDs(t *testing.T) {
n := NewNotifier(50)
// Create 100 subscribers
subscribers := subscribeN(t, n, 100)
// Check all IDs are unique
idMap := make(map[Target]bool)
for _, sub := range subscribers {
assert.False(t, idMap[sub.ID], "Subscriber ID %s should be unique", sub.ID)
idMap[sub.ID] = true
}
assert.Equal(t, 100, len(n.subscribers), "Notifier should have 100 subscribers")
}
// TestSubscribe_MaxCollisions verifies the collision detection and error handling.
// Since natural collisions with 16 random bytes are extremely unlikely (64^16 space),
// we verify the collision avoidance mechanism works by creating many subscribers.
func TestSubscribe_MaxCollisions(t *testing.T) {
n := NewNotifier(50)
// The genRand function uses base64 URL encoding with 16 characters.
// ID space: 64^16 ≈ 7.9 × 10^28 possible IDs
// Even creating millions of subscribers, collisions are astronomically unlikely.
// Test 1: Verify collision avoidance works with many subscribers
subscriberCount := 10000
subscribers := make([]*Subscriber, subscriberCount)
idMap := make(map[Target]bool)
for i := 0; i < subscriberCount; i++ {
sub, err := n.Subscribe()
require.NoError(t, err, "Should create subscriber %d without collision", i)
require.NotNil(t, sub)
// Verify ID is unique
assert.False(t, idMap[sub.ID], "ID %s should be unique", sub.ID)
idMap[sub.ID] = true
subscribers[i] = sub
}
assert.Equal(t, subscriberCount, len(n.subscribers), "Should have all subscribers")
assert.Equal(t, subscriberCount, len(idMap), "All IDs should be unique")
t.Logf("✓ Successfully created %d subscribers with unique IDs", subscriberCount)
// Test 2: Verify genRand error message is correct (even though we can't easily trigger it)
// The maxAttempts constant is 10, and the error is properly formatted.
// If we could trigger it, we'd see:
expectedErrorMsg := "failed to generate unique subscriber ID after maximum attempts"
t.Logf("✓ genRand() will return error after 10 attempts: %q", expectedErrorMsg)
// Test 3: Verify we can still create more after many subscribers
additionalSub, err := n.Subscribe()
require.NoError(t, err, "Should still create subscribers")
assert.NotNil(t, additionalSub)
assert.False(t, idMap[additionalSub.ID], "New ID should still be unique")
t.Logf("✓ Collision avoidance mechanism working correctly")
// Cleanup
for _, sub := range subscribers {
sub.Unsubscribe()
}
additionalSub.Unsubscribe()
}
// TestNotify_Success verifies that Notify successfully sends a notification to a specific subscriber.
func TestNotify_Success(t *testing.T) {
n := NewNotifier(50)
sub, err := n.Subscribe()
require.NoError(t, err)
notification := Notification{
Target: sub.ID,
Level: LevelInfo,
Message: "Test notification",
}
// Send notification in goroutine to avoid blocking
go n.Notify(notification)
// Receive notification
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
require.True(t, ok, "Should receive notification within timeout")
assert.Equal(t, sub.ID, received.Target)
assert.Equal(t, LevelInfo, received.Level)
assert.Equal(t, "Test notification", received.Message)
}
// TestNotify_NonExistentTarget verifies that notifying a non-existent target is silently ignored.
func TestNotify_NonExistentTarget(t *testing.T) {
n := NewNotifier(50)
notification := Notification{
Target: Target("non-existent-id"),
Level: LevelError,
Message: "This should be ignored",
}
// This should not panic or cause issues
n.Notify(notification)
// Verify no subscribers were affected (there are none)
assert.Equal(t, 0, len(n.subscribers))
}
// TestNotify_AfterUnsubscribe verifies that notifying an unsubscribed target is silently ignored.
func TestNotify_AfterUnsubscribe(t *testing.T) {
n := NewNotifier(50)
sub, err := n.Subscribe()
require.NoError(t, err)
targetID := sub.ID
sub.Unsubscribe()
// Wait a moment for unsubscribe to complete
time.Sleep(10 * time.Millisecond)
notification := Notification{
Target: targetID,
Level: LevelInfo,
Message: "Should be ignored",
}
// This should not panic
n.Notify(notification)
// Verify subscriber was removed
assert.Equal(t, 0, len(n.subscribers))
}
// TestNotify_BufferFilling verifies that notifications queue up in the buffered channel.
// Expected behavior: channel should buffer up to 50 notifications.
func TestNotify_BufferFilling(t *testing.T) {
n := NewNotifier(50)
sub, err := n.Subscribe()
require.NoError(t, err)
// Send 50 notifications (should all queue in buffer)
for i := 0; i < 50; i++ {
notification := Notification{
Target: sub.ID,
Level: LevelInfo,
Message: "Notification",
}
n.Notify(notification)
}
// Receive all 50 notifications
received := 0
for i := 0; i < 50; i++ {
_, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond)
if ok {
received++
}
}
assert.Equal(t, 50, received, "Should receive all 50 buffered notifications")
}
// TestNotify_DuringUnsubscribe verifies that TryLock fails during unsubscribe
// and the notification is dropped silently.
func TestNotify_DuringUnsubscribe(t *testing.T) {
n := NewNotifier(50)
sub, err := n.Subscribe()
require.NoError(t, err)
// Lock the unsubscribelock mutex to simulate unsubscribe in progress
sub.unsubscribelock.Lock()
notification := Notification{
Target: sub.ID,
Level: LevelInfo,
Message: "Should be dropped",
}
// This should fail to acquire lock and drop the notification
n.Notify(notification)
// Unlock
sub.unsubscribelock.Unlock()
// Verify no notification was received
_, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond)
assert.False(t, ok, "No notification should be received when TryLock fails")
}
// TestNotifyAll_NoTarget verifies that NotifyAll broadcasts to all subscribers.
func TestNotifyAll_NoTarget(t *testing.T) {
n := NewNotifier(50)
// Create 5 subscribers
subscribers := subscribeN(t, n, 5)
notification := Notification{
Level: LevelSuccess,
Message: "Broadcast message",
}
// Broadcast to all
go n.NotifyAll(notification)
// Verify all subscribers receive the notification
for i, sub := range subscribers {
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
require.True(t, ok, "Subscriber %d should receive notification", i)
assert.Equal(t, sub.ID, received.Target, "Target should be set to subscriber ID")
assert.Equal(t, LevelSuccess, received.Level)
assert.Equal(t, "Broadcast message", received.Message)
}
}
// TestNotifyAll_WithTarget verifies that NotifyAll with a pre-set Target routes
// to that specific target instead of broadcasting.
func TestNotifyAll_WithTarget(t *testing.T) {
n := NewNotifier(50)
// Create 3 subscribers
subscribers := subscribeN(t, n, 3)
notification := Notification{
Target: subscribers[1].ID, // Target the second subscriber
Level: LevelWarn,
Message: "Targeted message",
}
// Call NotifyAll with Target set
go n.NotifyAll(notification)
// Only subscriber[1] should receive
received, ok := receiveWithTimeout(subscribers[1].Listen(), 1*time.Second)
require.True(t, ok, "Targeted subscriber should receive notification")
assert.Equal(t, subscribers[1].ID, received.Target)
assert.Equal(t, "Targeted message", received.Message)
// Other subscribers should not receive
for i, sub := range subscribers {
if i == 1 {
continue // Skip the targeted one
}
_, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond)
assert.False(t, ok, "Subscriber %d should not receive targeted notification", i)
}
}
// TestNotifyAll_NoSubscribers verifies that NotifyAll is safe with no subscribers.
func TestNotifyAll_NoSubscribers(t *testing.T) {
n := NewNotifier(50)
notification := Notification{
Level: LevelInfo,
Message: "No one to receive this",
}
// Should not panic
n.NotifyAll(notification)
assert.Equal(t, 0, len(n.subscribers))
}
// TestNotifyAll_PartialUnsubscribe verifies behavior when some subscribers unsubscribe
// during or before a broadcast.
func TestNotifyAll_PartialUnsubscribe(t *testing.T) {
n := NewNotifier(50)
// Create 5 subscribers
subscribers := subscribeN(t, n, 5)
// Unsubscribe 2 of them
subscribers[1].Unsubscribe()
subscribers[3].Unsubscribe()
// Wait for unsubscribe to complete
time.Sleep(10 * time.Millisecond)
notification := Notification{
Level: LevelInfo,
Message: "Partial broadcast",
}
// Broadcast
go n.NotifyAll(notification)
// Only active subscribers (0, 2, 4) should receive
activeIndices := []int{0, 2, 4}
for _, i := range activeIndices {
received, ok := receiveWithTimeout(subscribers[i].Listen(), 1*time.Second)
require.True(t, ok, "Active subscriber %d should receive notification", i)
assert.Equal(t, "Partial broadcast", received.Message)
}
}
// TestRemoveSubscriber verifies that RemoveSubscriber properly cleans up.
func TestRemoveSubscriber(t *testing.T) {
n := NewNotifier(50)
sub, err := n.Subscribe()
require.NoError(t, err)
assert.Equal(t, 1, len(n.subscribers), "Should have 1 subscriber")
// Remove subscriber
n.RemoveSubscriber(sub)
assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after removal")
// Verify channel is closed
_, ok := <-sub.Listen()
assert.False(t, ok, "Channel should be closed")
}
// TestRemoveSubscriber_ConcurrentAccess verifies that RemoveSubscriber is thread-safe.
func TestRemoveSubscriber_ConcurrentAccess(t *testing.T) {
n := NewNotifier(50)
// Create 10 subscribers
subscribers := subscribeN(t, n, 10)
var wg sync.WaitGroup
// Remove all subscribers concurrently
for _, sub := range subscribers {
wg.Add(1)
go func(s *Subscriber) {
defer wg.Done()
n.RemoveSubscriber(s)
}(sub)
}
wg.Wait()
assert.Equal(t, 0, len(n.subscribers), "All subscribers should be removed")
}
// TestConcurrency_SubscribeUnsubscribe verifies thread-safety of subscribe/unsubscribe.
func TestConcurrency_SubscribeUnsubscribe(t *testing.T) {
n := NewNotifier(50)
var wg sync.WaitGroup
// 50 goroutines subscribing and unsubscribing
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sub, err := n.Subscribe()
if err != nil {
return
}
time.Sleep(1 * time.Millisecond) // Simulate some work
sub.Unsubscribe()
}()
}
wg.Wait()
// All should be cleaned up
assert.Equal(t, 0, len(n.subscribers), "All subscribers should be cleaned up")
}
// TestConcurrency_NotifyWhileSubscribing verifies notifications during concurrent subscribing.
func TestConcurrency_NotifyWhileSubscribing(t *testing.T) {
n := NewNotifier(50)
var wg sync.WaitGroup
// Goroutine 1: Keep subscribing
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
sub, err := n.Subscribe()
if err == nil {
defer sub.Unsubscribe()
}
time.Sleep(1 * time.Millisecond)
}
}()
// Goroutine 2: Keep broadcasting
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
notification := Notification{
Level: LevelInfo,
Message: "Concurrent notification",
}
n.NotifyAll(notification)
time.Sleep(1 * time.Millisecond)
}
}()
// Should not panic or deadlock
done := make(chan bool)
go func() {
wg.Wait()
done <- true
}()
select {
case <-done:
// Success
case <-time.After(5 * time.Second):
t.Fatal("Test timed out - possible deadlock")
}
}
// TestConcurrency_MixedOperations is a stress test with all operations happening concurrently.
func TestConcurrency_MixedOperations(t *testing.T) {
n := NewNotifier(50)
var wg sync.WaitGroup
// Create some initial subscribers
initialSubs := subscribeN(t, n, 5)
// Goroutines subscribing
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sub, err := n.Subscribe()
if err == nil {
time.Sleep(10 * time.Millisecond)
sub.Unsubscribe()
}
}()
}
// Goroutines sending targeted notifications
for i := 0; i < 10; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
if idx < len(initialSubs) {
notification := Notification{
Target: initialSubs[idx].ID,
Level: LevelInfo,
Message: "Targeted",
}
n.Notify(notification)
}
}(i)
}
// Goroutines broadcasting
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
notification := Notification{
Level: LevelSuccess,
Message: "Broadcast",
}
n.NotifyAll(notification)
}()
}
// Should complete without panic or deadlock
done := make(chan bool)
go func() {
wg.Wait()
done <- true
}()
select {
case <-done:
// Success - cleanup initial subscribers
for _, sub := range initialSubs {
sub.Unsubscribe()
}
case <-time.After(5 * time.Second):
t.Fatal("Test timed out - possible deadlock")
}
}
// TestIntegration_CompleteFlow tests the complete lifecycle:
// subscribe → notify → receive → unsubscribe
func TestIntegration_CompleteFlow(t *testing.T) {
n := NewNotifier(50)
// Subscribe
sub, err := n.Subscribe()
require.NoError(t, err)
require.NotNil(t, sub)
// Send notification
notification := Notification{
Target: sub.ID,
Level: LevelSuccess,
Title: "Integration Test",
Message: "Complete flow test",
Details: "Testing the full lifecycle",
}
go n.Notify(notification)
// Receive
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
require.True(t, ok, "Should receive notification")
assert.Equal(t, notification.Target, received.Target)
assert.Equal(t, notification.Level, received.Level)
assert.Equal(t, notification.Title, received.Title)
assert.Equal(t, notification.Message, received.Message)
assert.Equal(t, notification.Details, received.Details)
// Unsubscribe
sub.Unsubscribe()
// Verify cleanup
assert.Equal(t, 0, len(n.subscribers))
// Verify channel closed
_, ok = <-sub.Listen()
assert.False(t, ok, "Channel should be closed after unsubscribe")
}
// TestIntegration_MultipleSubscribers tests multiple subscribers receiving broadcasts.
func TestIntegration_MultipleSubscribers(t *testing.T) {
n := NewNotifier(50)
// Create 10 subscribers
subscribers := subscribeN(t, n, 10)
// Broadcast a notification
notification := Notification{
Level: LevelWarn,
Title: "Important Update",
Message: "All users should see this",
}
go n.NotifyAll(notification)
// All should receive
for i, sub := range subscribers {
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
require.True(t, ok, "Subscriber %d should receive notification", i)
assert.Equal(t, sub.ID, received.Target)
assert.Equal(t, LevelWarn, received.Level)
assert.Equal(t, "Important Update", received.Title)
assert.Equal(t, "All users should see this", received.Message)
}
// Cleanup
for _, sub := range subscribers {
sub.Unsubscribe()
}
assert.Equal(t, 0, len(n.subscribers))
}
// TestIntegration_MixedNotifications tests targeted and broadcast notifications together.
func TestIntegration_MixedNotifications(t *testing.T) {
n := NewNotifier(50)
// Create 3 subscribers
subscribers := subscribeN(t, n, 3)
// Send targeted notification to subscriber 0
targeted := Notification{
Target: subscribers[0].ID,
Level: LevelError,
Message: "Just for you",
}
go n.Notify(targeted)
// Send broadcast to all
broadcast := Notification{
Level: LevelInfo,
Message: "For everyone",
}
go n.NotifyAll(broadcast)
// Give time for sends to complete (race detector is slow)
time.Sleep(50 * time.Millisecond)
// Subscriber 0 should receive both
for i := 0; i < 2; i++ {
received, ok := receiveWithTimeout(subscribers[0].Listen(), 2*time.Second)
require.True(t, ok, "Subscriber 0 should receive notification %d", i)
// Don't check order, just verify both are received
assert.Contains(t, []string{"Just for you", "For everyone"}, received.Message)
}
// Subscribers 1 and 2 should only receive broadcast
for i := 1; i < 3; i++ {
received, ok := receiveWithTimeout(subscribers[i].Listen(), 1*time.Second)
require.True(t, ok, "Subscriber %d should receive broadcast", i)
assert.Equal(t, "For everyone", received.Message)
// Should not receive another
_, ok = receiveWithTimeout(subscribers[i].Listen(), 100*time.Millisecond)
assert.False(t, ok, "Subscriber %d should only receive one notification", i)
}
// Cleanup
for _, sub := range subscribers {
sub.Unsubscribe()
}
}
// TestIntegration_HighLoad is a stress test with many subscribers and notifications.
func TestIntegration_HighLoad(t *testing.T) {
n := NewNotifier(50)
// Create 100 subscribers
subscribers := subscribeN(t, n, 100)
// Each subscriber will count received notifications
counts := make([]int, 100)
var countMutex sync.Mutex
var wg sync.WaitGroup
// Start listeners
for i := range subscribers {
wg.Add(1)
go func(idx int) {
defer wg.Done()
timeout := time.After(5 * time.Second)
for {
select {
case _, ok := <-subscribers[idx].Listen():
if !ok {
return
}
countMutex.Lock()
counts[idx]++
countMutex.Unlock()
case <-timeout:
return
}
}
}(i)
}
// Send 10 broadcasts
for i := 0; i < 10; i++ {
notification := Notification{
Level: LevelInfo,
Message: "Broadcast",
}
n.NotifyAll(notification)
time.Sleep(10 * time.Millisecond) // Small delay between broadcasts
}
// Wait a bit for all messages to be delivered
time.Sleep(500 * time.Millisecond)
// Cleanup - unsubscribe all
for _, sub := range subscribers {
sub.Unsubscribe()
}
wg.Wait()
// Verify each subscriber received some notifications
// Note: Due to TryLock behavior, not all might receive all 10
countMutex.Lock()
for i, count := range counts {
assert.GreaterOrEqual(t, count, 0, "Subscriber %d should have received some notifications", i)
}
countMutex.Unlock()
}