404 lines
11 KiB
Go
404 lines
11 KiB
Go
package notify
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestSubscriber_Listen verifies that Listen() returns the correct notification channel.
|
|
func TestSubscriber_Listen(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
ch := sub.Listen()
|
|
require.NotNil(t, ch, "Listen() should return non-nil channel")
|
|
|
|
// Note: Listen() returns a receive-only channel (<-chan), while sub.notifications is
|
|
// bidirectional (chan). They can't be compared directly with assert.Equal, but we can
|
|
// verify the channel works correctly.
|
|
// The implementation correctly restricts external callers to receive-only.
|
|
}
|
|
|
|
// TestSubscriber_ReceiveNotification tests end-to-end notification receiving.
|
|
func TestSubscriber_ReceiveNotification(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
notification := Notification{
|
|
Target: sub.ID,
|
|
Level: LevelSuccess,
|
|
Title: "Test Title",
|
|
Message: "Test Message",
|
|
Details: "Test Details",
|
|
Action: map[string]string{"action": "test"},
|
|
}
|
|
|
|
// Send notification
|
|
go n.Notify(notification)
|
|
|
|
// Receive and verify
|
|
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
require.True(t, ok, "Should receive notification")
|
|
assert.Equal(t, notification.Target, received.Target)
|
|
assert.Equal(t, notification.Level, received.Level)
|
|
assert.Equal(t, notification.Title, received.Title)
|
|
assert.Equal(t, notification.Message, received.Message)
|
|
assert.Equal(t, notification.Details, received.Details)
|
|
assert.Equal(t, notification.Action, received.Action)
|
|
}
|
|
|
|
// TestSubscriber_Unsubscribe verifies that Unsubscribe works correctly.
|
|
func TestSubscriber_Unsubscribe(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, 1, len(n.subscribers), "Should have 1 subscriber")
|
|
|
|
// Unsubscribe
|
|
sub.Unsubscribe()
|
|
|
|
// Verify subscriber removed
|
|
assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after unsubscribe")
|
|
|
|
// Verify channel is closed
|
|
_, ok := <-sub.Listen()
|
|
assert.False(t, ok, "Channel should be closed after unsubscribe")
|
|
}
|
|
|
|
// TestSubscriber_UnsubscribeTwice verifies that calling Unsubscribe() multiple times
|
|
// is safe and doesn't panic from closing a closed channel.
|
|
func TestSubscriber_UnsubscribeTwice(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
// First unsubscribe
|
|
sub.Unsubscribe()
|
|
|
|
// Second unsubscribe should be a safe no-op
|
|
assert.NotPanics(t, func() {
|
|
sub.Unsubscribe()
|
|
}, "Second Unsubscribe() should not panic")
|
|
|
|
// Verify still cleaned up properly
|
|
assert.Equal(t, 0, len(n.subscribers))
|
|
}
|
|
|
|
// TestSubscriber_UnsubscribeThrice verifies that even calling Unsubscribe() three or more
|
|
// times is safe.
|
|
func TestSubscriber_UnsubscribeThrice(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
// Call unsubscribe three times
|
|
assert.NotPanics(t, func() {
|
|
sub.Unsubscribe()
|
|
sub.Unsubscribe()
|
|
sub.Unsubscribe()
|
|
}, "Multiple Unsubscribe() calls should not panic")
|
|
|
|
assert.Equal(t, 0, len(n.subscribers))
|
|
}
|
|
|
|
// TestSubscriber_ChannelClosesOnUnsubscribe verifies that the notification channel
|
|
// is properly closed when unsubscribing.
|
|
func TestSubscriber_ChannelClosesOnUnsubscribe(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
ch := sub.Listen()
|
|
|
|
// Unsubscribe
|
|
sub.Unsubscribe()
|
|
|
|
// Try to receive from closed channel - should return immediately with ok=false
|
|
select {
|
|
case _, ok := <-ch:
|
|
assert.False(t, ok, "Closed channel should return ok=false")
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("Should have returned immediately from closed channel")
|
|
}
|
|
}
|
|
|
|
// TestSubscriber_UnsubscribeWhileBlocked verifies behavior when a goroutine is
|
|
// blocked reading from Listen() when Unsubscribe() is called.
|
|
// The reader should detect the channel closure and exit gracefully.
|
|
func TestSubscriber_UnsubscribeWhileBlocked(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
var wg sync.WaitGroup
|
|
received := false
|
|
|
|
// Start goroutine that blocks reading from channel
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for notification := range sub.Listen() {
|
|
_ = notification
|
|
// This loop will exit when channel closes
|
|
}
|
|
received = true
|
|
}()
|
|
|
|
// Give goroutine time to start blocking
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Unsubscribe while goroutine is blocked
|
|
sub.Unsubscribe()
|
|
|
|
// Wait for goroutine to exit
|
|
done := make(chan bool)
|
|
go func() {
|
|
wg.Wait()
|
|
done <- true
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
assert.True(t, received, "Goroutine should have exited the loop")
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatal("Goroutine did not exit after unsubscribe - possible hang")
|
|
}
|
|
}
|
|
|
|
// TestSubscriber_BufferCapacity verifies that the notification channel has
|
|
// the expected buffer capacity as specified when creating the Notifier.
|
|
func TestSubscriber_BufferCapacity(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
bufferSize int
|
|
}{
|
|
{"unbuffered", 0},
|
|
{"small buffer", 10},
|
|
{"default buffer", 50},
|
|
{"large buffer", 100},
|
|
{"very large buffer", 1000},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
n := NewNotifier(tt.bufferSize)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
ch := sub.Listen()
|
|
capacity := cap(ch)
|
|
assert.Equal(t, tt.bufferSize, capacity,
|
|
"Notification channel should have buffer size of %d", tt.bufferSize)
|
|
|
|
// Cleanup
|
|
sub.Unsubscribe()
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSubscriber_BufferFull tests behavior when the notification buffer fills up.
|
|
// With a buffered channel and TryLock behavior, notifications may be dropped when
|
|
// the subscriber is slow to read.
|
|
func TestSubscriber_BufferFull(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
// Don't read from the channel - let it fill up
|
|
|
|
// Send 60 notifications (more than buffer size of 50)
|
|
sent := 0
|
|
for i := 0; i < 60; i++ {
|
|
notification := Notification{
|
|
Target: sub.ID,
|
|
Level: LevelInfo,
|
|
Message: "Notification",
|
|
}
|
|
// Send in goroutine to avoid blocking
|
|
go func() {
|
|
n.Notify(notification)
|
|
}()
|
|
sent++
|
|
time.Sleep(1 * time.Millisecond) // Small delay
|
|
}
|
|
|
|
// Wait a bit for sends to complete
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Now read what we can
|
|
received := 0
|
|
for {
|
|
select {
|
|
case _, ok := <-sub.Listen():
|
|
if !ok {
|
|
break
|
|
}
|
|
received++
|
|
case <-time.After(100 * time.Millisecond):
|
|
// No more notifications available
|
|
goto done
|
|
}
|
|
}
|
|
done:
|
|
|
|
// We should have received approximately buffer size worth
|
|
// Due to timing and goroutines, we might receive slightly more than 50 if a send
|
|
// was in progress when we started reading, or fewer due to TryLock behavior
|
|
assert.GreaterOrEqual(t, received, 40, "Should receive most notifications")
|
|
assert.LessOrEqual(t, received, 60, "Should not receive all 60 (some should be dropped)")
|
|
|
|
t.Logf("Sent %d notifications, received %d", sent, received)
|
|
}
|
|
|
|
// TestSubscriber_MultipleReceives verifies that a subscriber can receive
|
|
// multiple notifications sequentially.
|
|
func TestSubscriber_MultipleReceives(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
// Send 10 notifications
|
|
for i := 0; i < 10; i++ {
|
|
notification := Notification{
|
|
Target: sub.ID,
|
|
Level: LevelInfo,
|
|
Message: "Notification",
|
|
}
|
|
go n.Notify(notification)
|
|
time.Sleep(5 * time.Millisecond)
|
|
}
|
|
|
|
// Receive all 10
|
|
received := 0
|
|
for i := 0; i < 10; i++ {
|
|
_, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
if ok {
|
|
received++
|
|
}
|
|
}
|
|
|
|
assert.Equal(t, 10, received, "Should receive all 10 notifications")
|
|
}
|
|
|
|
// TestSubscriber_ConcurrentReads verifies that multiple goroutines can safely
|
|
// read from the same subscriber's channel.
|
|
func TestSubscriber_ConcurrentReads(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
var wg sync.WaitGroup
|
|
var mu sync.Mutex
|
|
totalReceived := 0
|
|
|
|
// Start 3 goroutines reading from the same channel
|
|
for i := 0; i < 3; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case _, ok := <-sub.Listen():
|
|
if !ok {
|
|
return
|
|
}
|
|
mu.Lock()
|
|
totalReceived++
|
|
mu.Unlock()
|
|
case <-time.After(500 * time.Millisecond):
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Send 30 notifications
|
|
for i := 0; i < 30; i++ {
|
|
notification := Notification{
|
|
Target: sub.ID,
|
|
Level: LevelInfo,
|
|
Message: "Concurrent test",
|
|
}
|
|
go n.Notify(notification)
|
|
time.Sleep(5 * time.Millisecond)
|
|
}
|
|
|
|
// Wait for all readers
|
|
wg.Wait()
|
|
|
|
// Each notification should only be received by one goroutine
|
|
mu.Lock()
|
|
assert.LessOrEqual(t, totalReceived, 30, "Total received should not exceed sent")
|
|
assert.GreaterOrEqual(t, totalReceived, 1, "Should receive at least some notifications")
|
|
mu.Unlock()
|
|
|
|
// Cleanup
|
|
sub.Unsubscribe()
|
|
}
|
|
|
|
// TestSubscriber_NotifyAfterClose verifies that attempting to notify a subscriber
|
|
// after unsubscribe doesn't cause issues.
|
|
func TestSubscriber_NotifyAfterClose(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
targetID := sub.ID
|
|
|
|
// Unsubscribe
|
|
sub.Unsubscribe()
|
|
|
|
// Wait for cleanup
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Try to notify - should be silently ignored
|
|
notification := Notification{
|
|
Target: targetID,
|
|
Level: LevelError,
|
|
Message: "Should be ignored",
|
|
}
|
|
|
|
assert.NotPanics(t, func() {
|
|
n.Notify(notification)
|
|
}, "Notifying closed subscriber should not panic")
|
|
}
|
|
|
|
// TestSubscriber_UnsubscribedFlag verifies that the unsubscribed flag is properly
|
|
// set and prevents double-close.
|
|
func TestSubscriber_UnsubscribedFlag(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
// Initially should be false
|
|
assert.False(t, sub.unsubscribed, "New subscriber should have unsubscribed=false")
|
|
|
|
// After unsubscribe should be true
|
|
sub.Unsubscribe()
|
|
assert.True(t, sub.unsubscribed, "After Unsubscribe() flag should be true")
|
|
|
|
// Second call should still be safe
|
|
sub.Unsubscribe()
|
|
assert.True(t, sub.unsubscribed, "Flag should remain true")
|
|
}
|
|
|
|
// TestSubscriber_FieldsInitialized verifies all Subscriber fields are properly initialized.
|
|
func TestSubscriber_FieldsInitialized(t *testing.T) {
|
|
n := NewNotifier(50)
|
|
sub, err := n.Subscribe()
|
|
require.NoError(t, err)
|
|
|
|
assert.NotEqual(t, Target(""), sub.ID, "ID should be set")
|
|
assert.NotNil(t, sub.notifications, "notifications channel should be initialized")
|
|
assert.Equal(t, n, sub.notifier, "notifier reference should be set")
|
|
assert.NotNil(t, sub.unsubscribelock, "unsubscribelock should be initialized")
|
|
assert.False(t, sub.unsubscribed, "unsubscribed flag should be false initially")
|
|
}
|