package hws import ( "sync" "sync/atomic" "testing" "time" "git.haelnorr.com/h/golib/notify" "github.com/stretchr/testify/require" ) // Helper function to create a test server with notifier started func newTestServerWithNotifier(t *testing.T) *Server { t.Helper() cfg := &Config{ Host: "127.0.0.1", Port: 0, ShutdownDelay: 0, // No delay for tests } server, err := NewServer(cfg) require.NoError(t, err) server.startNotifier() // Cleanup t.Cleanup(func() { server.closeNotifier() }) return server } // Test 1: Single client subscription func Test_SingleClientSubscription(t *testing.T) { server := newTestServerWithNotifier(t) client, err := server.GetClient("", "") require.NoError(t, err) require.NotNil(t, client) notifications, stop := client.Listen() defer close(stop) // Send notification server.NotifySub(notify.Notification{ Target: client.sub.ID, Level: notify.LevelInfo, Message: "Test message", }) // Receive notification select { case nt := <-notifications: require.Equal(t, notify.LevelInfo, nt.Level) require.Equal(t, "Test message", nt.Message) case <-time.After(1 * time.Second): t.Fatal("Did not receive notification") } } // Test 2: Multiple clients subscription func Test_MultipleClientsSubscription(t *testing.T) { server := newTestServerWithNotifier(t) client1, err := server.GetClient("", "user1") require.NoError(t, err) client2, err := server.GetClient("", "user2") require.NoError(t, err) notifications1, stop1 := client1.Listen() defer close(stop1) notifications2, stop2 := client2.Listen() defer close(stop2) // Send to client1 server.NotifySub(notify.Notification{ Target: client1.sub.ID, Level: notify.LevelInfo, Message: "Message for client1", }) // Client1 receives select { case nt := <-notifications1: require.Equal(t, "Message for client1", nt.Message) case <-time.After(1 * time.Second): t.Fatal("Client1 did not receive notification") } // Client2 should not receive select { case <-notifications2: t.Fatal("Client2 should not have received notification") case <-time.After(100 * time.Millisecond): // Expected } } // Test 3: Targeted notification func Test_TargetedNotification(t *testing.T) { server := newTestServerWithNotifier(t) client1, _ := server.GetClient("", "") client2, _ := server.GetClient("", "") notifications1, stop1 := client1.Listen() defer close(stop1) notifications2, stop2 := client2.Listen() defer close(stop2) // Send only to client2 server.NotifySub(notify.Notification{ Target: client2.sub.ID, Level: notify.LevelSuccess, Message: "Only for client2", }) // Client2 receives select { case nt := <-notifications2: require.Equal(t, "Only for client2", nt.Message) case <-time.After(1 * time.Second): t.Fatal("Client2 did not receive notification") } // Client1 should not receive select { case <-notifications1: t.Fatal("Client1 should not have received notification") case <-time.After(100 * time.Millisecond): // Expected } } // Test 4: Broadcast notification func Test_BroadcastNotification(t *testing.T) { server := newTestServerWithNotifier(t) client1, _ := server.GetClient("", "") client2, _ := server.GetClient("", "") client3, _ := server.GetClient("", "") notifications1, stop1 := client1.Listen() defer close(stop1) notifications2, stop2 := client2.Listen() defer close(stop2) notifications3, stop3 := client3.Listen() defer close(stop3) // Broadcast to all server.NotifyAll(notify.Notification{ Level: notify.LevelWarn, Message: "Broadcast message", }) // All clients should receive for i, notifications := range []<-chan notify.Notification{notifications1, notifications2, notifications3} { select { case nt := <-notifications: require.Equal(t, "Broadcast message", nt.Message) require.Equal(t, notify.LevelWarn, nt.Level) case <-time.After(1 * time.Second): t.Fatalf("Client %d did not receive broadcast", i+1) } } } // Test 5: Alternate ID grouping func Test_AlternateIDGrouping(t *testing.T) { server := newTestServerWithNotifier(t) client1, _ := server.GetClient("", "userA") client2, _ := server.GetClient("", "userB") notifications1, stop1 := client1.Listen() defer close(stop1) notifications2, stop2 := client2.Listen() defer close(stop2) // Send to userA only server.NotifyID(notify.Notification{ Level: notify.LevelInfo, Message: "Message for userA", }, "userA") // Client1 (userA) receives select { case nt := <-notifications1: require.Equal(t, "Message for userA", nt.Message) case <-time.After(1 * time.Second): t.Fatal("Client with userA did not receive notification") } // Client2 (userB) should not receive select { case <-notifications2: t.Fatal("Client with userB should not have received notification") case <-time.After(100 * time.Millisecond): // Expected } } // Test 6: Multiple clients per alternate ID func Test_MultipleClientsPerAlternateID(t *testing.T) { server := newTestServerWithNotifier(t) // Three clients, two with same altID client1, _ := server.GetClient("", "sharedUser") client2, _ := server.GetClient("", "sharedUser") client3, _ := server.GetClient("", "differentUser") notifications1, stop1 := client1.Listen() defer close(stop1) notifications2, stop2 := client2.Listen() defer close(stop2) notifications3, stop3 := client3.Listen() defer close(stop3) // Send to sharedUser server.NotifyID(notify.Notification{ Level: notify.LevelInfo, Message: "Message for sharedUser", }, "sharedUser") // Both client1 and client2 should receive for i, notifications := range []<-chan notify.Notification{notifications1, notifications2} { select { case nt := <-notifications: require.Equal(t, "Message for sharedUser", nt.Message) case <-time.After(1 * time.Second): t.Fatalf("Client %d with sharedUser did not receive notification", i+1) } } // Client3 should not receive select { case <-notifications3: t.Fatal("Client with differentUser should not have received notification") case <-time.After(100 * time.Millisecond): // Expected } } // Test 7: Client creation func Test_ClientCreation(t *testing.T) { server := newTestServerWithNotifier(t) client, err := server.GetClient("", "testUser") require.NoError(t, err) require.NotNil(t, client) require.NotNil(t, client.sub) require.NotEqual(t, "", string(client.sub.ID)) require.Equal(t, "testUser", client.altID) } // Test 8: Client retrieval func Test_ClientRetrieval(t *testing.T) { server := newTestServerWithNotifier(t) client1, err := server.GetClient("", "user1") require.NoError(t, err) subID := string(client1.sub.ID) // Retrieve same client client2, err := server.GetClient(subID, "user1") require.NoError(t, err) require.Equal(t, client1.sub.ID, client2.sub.ID) } // Test 9: Alternate ID update func Test_AlternateIDUpdate(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "oldUser") require.Equal(t, "oldUser", client.altID) subID := string(client.sub.ID) // Update alternate ID updatedClient, err := server.GetClient(subID, "newUser") require.NoError(t, err) require.Equal(t, "newUser", updatedClient.altID) require.Equal(t, client.sub.ID, updatedClient.sub.ID) } // Test 10: Client unsubscribe func Test_ClientUnsubscribe(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() // Close stop channel to unsubscribe close(stop) // Wait a bit for goroutine to process time.Sleep(100 * time.Millisecond) // Try to send notification server.NotifySub(notify.Notification{ Target: client.sub.ID, Level: notify.LevelInfo, Message: "Should not receive", }) // Channel should be closed or no message received select { case _, ok := <-notifications: if ok { t.Fatal("Client should not receive after unsubscribe") } // Channel closed - expected case <-time.After(200 * time.Millisecond): // No message received - also acceptable } } // Test 11: Channel closure func Test_ChannelClosure(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() close(stop) // Wait for channel to close select { case _, ok := <-notifications: require.False(t, ok, "Channel should be closed") case <-time.After(1 * time.Second): t.Fatal("Channel did not close") } } // Test 12: Active client stays alive func Test_ActiveClientStaysAlive(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Send notifications every 2 seconds for 6 seconds (beyond 5 min cleanup would happen if inactive) // We'll simulate by updating lastSeen ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() done := make(chan bool) go func() { for range 3 { <-ticker.C server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Keep alive", }) <-notifications // Receive it } done <- true }() <-done // Client should still be in the map _, exists := server.notifier.clients.getClient(client.sub.ID) require.True(t, exists, "Active client should not be cleaned up") } // Test 13: Heartbeat keeps alive func Test_HeartbeatKeepsAlive(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Wait for heartbeat to fire (30 seconds is too long for test) // We'll check that lastSeen is being updated atomically initialLastSeen := atomic.LoadInt64(&client.lastSeen) require.NotZero(t, initialLastSeen) // Send a notification to trigger lastSeen update server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Update lastSeen", }) <-notifications updatedLastSeen := atomic.LoadInt64(&client.lastSeen) require.GreaterOrEqual(t, updatedLastSeen, initialLastSeen) } // Test 14: Inactive client cleanup func Test_InactiveClientCleanup(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") // Set lastSeen to 6 minutes ago pastTime := time.Now().Unix() - 360 atomic.StoreInt64(&client.lastSeen, pastTime) // Trigger cleanup server.notifier.clients.cleanUp() // Client should be removed _, exists := server.notifier.clients.getClient(client.sub.ID) require.False(t, exists, "Inactive client should be cleaned up") } // Test 15: Cleanup removes from maps func Test_CleanupRemovesFromMaps(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "testAltID") // Verify client is in both maps _, existsSub := server.notifier.clients.getClient(client.sub.ID) require.True(t, existsSub) server.notifier.clients.lock.RLock() _, existsAlt := server.notifier.clients.clientsIDMap["testAltID"] server.notifier.clients.lock.RUnlock() require.True(t, existsAlt) // Set lastSeen to trigger cleanup pastTime := time.Now().Unix() - 360 atomic.StoreInt64(&client.lastSeen, pastTime) server.notifier.clients.cleanUp() // Verify removed from both maps _, existsSub = server.notifier.clients.getClient(client.sub.ID) require.False(t, existsSub) server.notifier.clients.lock.RLock() _, existsAlt = server.notifier.clients.clientsIDMap["testAltID"] server.notifier.clients.lock.RUnlock() require.False(t, existsAlt) } // Test 16: Slow consumer tolerance func Test_SlowConsumerTolerance(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Send 10 notifications quickly (buffer is 10) for range 10 { server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Burst message", }) } // Client should receive all 10 for i := range 10 { select { case <-notifications: // Received case <-time.After(2 * time.Second): t.Fatalf("Did not receive notification %d", i+1) } } } // Test 17: Single timeout recovery func Test_SingleTimeoutRecovery(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Fill buffer completely (buffer is 10) for range 10 { server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Fill buffer", }) } // Send one more to cause a timeout server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Timeout message", }) // Wait for timeout (5s timeout + small buffer) time.Sleep(5100 * time.Millisecond) // Check failure count (should be 1) fails := atomic.LoadInt32(&client.consecutiveFails) require.Equal(t, int32(1), fails, "Should have 1 timeout") // Now read all buffered messages for range 10 { <-notifications } // Send recovery message - should succeed and reset counter server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Recovery message", }) select { case nt := <-notifications: require.Equal(t, "Recovery message", nt.Message) // Counter should reset to 0 fails = atomic.LoadInt32(&client.consecutiveFails) require.Equal(t, int32(0), fails, "Counter should reset after successful send") case <-time.After(2 * time.Second): t.Fatal("Client should recover after reading") } } // Test 18: Consecutive failure disconnect func Test_ConsecutiveFailureDisconnect(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") _, stop := client.Listen() defer close(stop) // Fill buffer and never read to cause 5 consecutive timeouts for range 20 { server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Timeout message", }) } // Wait for 5 timeouts (5 seconds each = 25+ seconds) // This is too long for a test, so we'll verify the mechanism works // by checking that failures increment time.Sleep(1 * time.Second) // After some time, consecutive fails should be incrementing // (Full test would take 25+ seconds which is too long) // Just verify the counter is working fails := atomic.LoadInt32(&client.consecutiveFails) require.GreaterOrEqual(t, fails, int32(0), "Failure counter should be tracking") } // Test 19: Failure counter reset func Test_FailureCounterReset(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Manually set failure count atomic.StoreInt32(&client.consecutiveFails, 3) // Send and receive successfully server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Reset failures", }) select { case <-notifications: // Received successfully case <-time.After(1 * time.Second): t.Fatal("Did not receive notification") } // Failure count should be reset to 0 fails := atomic.LoadInt32(&client.consecutiveFails) require.Equal(t, int32(0), fails, "Failure counter should reset on successful receive") } // Test 20: Notifier starts with server func Test_NotifierStartsWithServer(t *testing.T) { server := newTestServerWithNotifier(t) require.NotNil(t, server.notifier) require.True(t, server.notifier.running) require.NotNil(t, server.notifier.ctx) require.NotNil(t, server.notifier.cancel) } // Test 21: Notifier stops with server func Test_NotifierStopsWithServer(t *testing.T) { cfg := &Config{Host: "127.0.0.1", Port: 0} server, _ := NewServer(cfg) server.startNotifier() require.NotNil(t, server.notifier) server.closeNotifier() require.Nil(t, server.notifier) } // Test 22: Cleanup goroutine terminates func Test_CleanupGoroutineTerminates(t *testing.T) { cfg := &Config{Host: "127.0.0.1", Port: 0} server, _ := NewServer(cfg) server.startNotifier() ctx := server.notifier.ctx server.closeNotifier() // Context should be cancelled select { case <-ctx.Done(): // Expected - context was cancelled case <-time.After(100 * time.Millisecond): t.Fatal("Context should be cancelled") } } // Test 23: Server restart works func Test_ServerRestart(t *testing.T) { cfg := &Config{Host: "127.0.0.1", Port: 0} server, _ := NewServer(cfg) // Start first time server.startNotifier() firstNotifier := server.notifier require.NotNil(t, firstNotifier) // Stop server.closeNotifier() require.Nil(t, server.notifier) // Start again server.startNotifier() secondNotifier := server.notifier require.NotNil(t, secondNotifier) require.NotEqual(t, firstNotifier, secondNotifier, "Should be a new notifier instance") // Cleanup server.closeNotifier() } // Test 24: Shutdown notification sent func Test_ShutdownNotificationSent(t *testing.T) { // This test requires full server integration // We'll test that NotifyAll is called with shutdown message server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Manually send shutdown notification (as Shutdown does) server.NotifyAll(notify.Notification{ Title: "Shutting down", Message: "Server is shutting down in 0 seconds", Level: notify.LevelInfo, }) select { case nt := <-notifications: require.Equal(t, "Shutting down", nt.Title) require.Contains(t, nt.Message, "shutting down") case <-time.After(1 * time.Second): t.Fatal("Did not receive shutdown notification") } } // Test 25: Concurrent subscriptions func Test_ConcurrentSubscriptions(t *testing.T) { server := newTestServerWithNotifier(t) var wg sync.WaitGroup clients := make([]*Client, 100) for i := range 100 { wg.Add(1) go func(index int) { defer wg.Done() client, err := server.GetClient("", "") require.NoError(t, err) clients[index] = client }(i) } wg.Wait() // All clients should be unique seen := make(map[notify.Target]bool) for _, client := range clients { require.False(t, seen[client.sub.ID], "Duplicate client ID found") seen[client.sub.ID] = true } } // Test 26: Concurrent notifications func Test_ConcurrentNotifications(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) var wg sync.WaitGroup messageCount := 50 // Send from multiple goroutines for i := range messageCount { wg.Add(1) go func(index int) { defer wg.Done() server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Concurrent message", }) }(i) } wg.Wait() // Note: Some messages may be dropped due to TryLock in notify.Notify // This is expected behavior - we're testing thread safety, not guaranteed delivery // Just verify we receive at least some messages without panicking or deadlocking received := 0 timeout := time.After(500 * time.Millisecond) for received < messageCount { select { case <-notifications: received++ case <-timeout: // Expected - some messages may be dropped during concurrent sends require.Greater(t, received, 0, "Should receive at least some messages") return } } } // Test 27: Concurrent cleanup func Test_ConcurrentCleanup(t *testing.T) { server := newTestServerWithNotifier(t) // Create some clients for i := range 10 { client, _ := server.GetClient("", "") // Set some to be old if i%2 == 0 { pastTime := time.Now().Unix() - 360 atomic.StoreInt64(&client.lastSeen, pastTime) } } var wg sync.WaitGroup // Run cleanup and send notifications concurrently wg.Add(2) go func() { defer wg.Done() server.notifier.clients.cleanUp() }() go func() { defer wg.Done() server.NotifyAll(notify.Notification{ Message: "During cleanup", }) }() wg.Wait() // Should not panic or deadlock } // Test 28: No race conditions (covered by go test -race) func Test_NoRaceConditions(t *testing.T) { // This test is primarily validated by running: go test -race // We'll do a lighter stress test to verify basic thread safety server := newTestServerWithNotifier(t) var wg sync.WaitGroup // Create a few clients and read from them for range 5 { wg.Go(func() { client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Actively read messages timeout := time.After(200 * time.Millisecond) for { select { case <-notifications: // Keep reading case <-timeout: return } } }) } // Send a few notifications wg.Go(func() { for range 10 { server.NotifyAll(notify.Notification{ Message: "Stress test", }) time.Sleep(10 * time.Millisecond) } }) wg.Wait() } // Test 29: Notify before start func Test_NotifyBeforeStart(t *testing.T) { cfg := &Config{Host: "127.0.0.1", Port: 0} server, _ := NewServer(cfg) // Should not panic require.NotPanics(t, func() { server.NotifyAll(notify.Notification{ Message: "Before start", }) }) } // Test 30: Notify after shutdown func Test_NotifyAfterShutdown(t *testing.T) { cfg := &Config{Host: "127.0.0.1", Port: 0} server, _ := NewServer(cfg) server.startNotifier() server.closeNotifier() // Should not panic require.NotPanics(t, func() { server.NotifyAll(notify.Notification{ Message: "After shutdown", }) }) } // Test 31: GetClient during shutdown func Test_GetClientDuringShutdown(t *testing.T) { cfg := &Config{Host: "127.0.0.1", Port: 0} server, _ := NewServer(cfg) // Don't start notifier client, err := server.GetClient("", "") require.Error(t, err) require.Nil(t, client) require.Contains(t, err.Error(), "notifier hasn't started") } // Test 32: Empty alternate ID func Test_EmptyAlternateID(t *testing.T) { server := newTestServerWithNotifier(t) client, err := server.GetClient("", "") require.NoError(t, err) require.Equal(t, "", client.altID) // Should still work for notifications notifications, stop := client.Listen() defer close(stop) server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "No altID", }) select { case nt := <-notifications: require.Equal(t, "No altID", nt.Message) case <-time.After(1 * time.Second): t.Fatal("Did not receive notification") } } // Test 33: Nonexistent subscriber notification func Test_NonexistentSubscriberNotification(t *testing.T) { server := newTestServerWithNotifier(t) // Should not panic, just log warning require.NotPanics(t, func() { server.NotifySub(notify.Notification{ Target: "nonexistent-id", Message: "Should not crash", }) }) } // Test 34: Nonexistent alternate ID notification func Test_NonexistentAlternateIDNotification(t *testing.T) { server := newTestServerWithNotifier(t) // Should not panic, just log warning require.NotPanics(t, func() { server.NotifyID(notify.Notification{ Message: "Should not crash", }, "nonexistent-alt-id") }) } // Test 35: Stop channel closed early func Test_StopChannelClosedEarly(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notificationChan, stop := client.Listen() // Close stop immediately close(stop) // Channel should close select { case _, ok := <-notificationChan: require.False(t, ok, "Notification channel should close") case <-time.After(1 * time.Second): t.Fatal("Channel did not close") } } // Test 36: Listen signature func Test_ListenSignature(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() // Verify types require.NotNil(t, notifications) require.NotNil(t, stop) // notifications should be receive-only _, ok := any(notifications).(<-chan notify.Notification) require.True(t, ok, "notifications should be receive-only channel") // stop should be closeable close(stop) } // Test 37: Buffer size func Test_BufferSize(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") notifications, stop := client.Listen() defer close(stop) // Send 10 messages without reading (buffer size is 10) for range 10 { server.NotifySub(notify.Notification{ Target: client.sub.ID, Message: "Buffered", }) } // Should not block (messages are buffered) time.Sleep(100 * time.Millisecond) // Read all 10 for i := range 10 { select { case <-notifications: // Success case <-time.After(1 * time.Second): t.Fatalf("Did not receive message %d", i+1) } } } // Test 38: Atomic operations func Test_AtomicOperations(t *testing.T) { server := newTestServerWithNotifier(t) client, _ := server.GetClient("", "") // Verify lastSeen uses atomic operations initialLastSeen := atomic.LoadInt64(&client.lastSeen) require.NotZero(t, initialLastSeen) // Update atomically newTime := time.Now().Unix() atomic.StoreInt64(&client.lastSeen, newTime) loaded := atomic.LoadInt64(&client.lastSeen) require.Equal(t, newTime, loaded) // Verify consecutiveFails uses atomic operations atomic.StoreInt32(&client.consecutiveFails, 3) fails := atomic.LoadInt32(&client.consecutiveFails) require.Equal(t, int32(3), fails) // Atomic increment atomic.AddInt32(&client.consecutiveFails, 1) fails = atomic.LoadInt32(&client.consecutiveFails) require.Equal(t, int32(4), fails) }