added the notify module
This commit is contained in:
403
notify/subscriber_test.go
Normal file
403
notify/subscriber_test.go
Normal file
@@ -0,0 +1,403 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestSubscriber_Listen verifies that Listen() returns the correct notification channel.
|
||||
func TestSubscriber_Listen(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := sub.Listen()
|
||||
require.NotNil(t, ch, "Listen() should return non-nil channel")
|
||||
|
||||
// Note: Listen() returns a receive-only channel (<-chan), while sub.notifications is
|
||||
// bidirectional (chan). They can't be compared directly with assert.Equal, but we can
|
||||
// verify the channel works correctly.
|
||||
// The implementation correctly restricts external callers to receive-only.
|
||||
}
|
||||
|
||||
// TestSubscriber_ReceiveNotification tests end-to-end notification receiving.
|
||||
func TestSubscriber_ReceiveNotification(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
notification := Notification{
|
||||
Target: sub.ID,
|
||||
Level: LevelSuccess,
|
||||
Title: "Test Title",
|
||||
Message: "Test Message",
|
||||
Details: "Test Details",
|
||||
Action: map[string]string{"action": "test"},
|
||||
}
|
||||
|
||||
// Send notification
|
||||
go n.Notify(notification)
|
||||
|
||||
// Receive and verify
|
||||
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
||||
require.True(t, ok, "Should receive notification")
|
||||
assert.Equal(t, notification.Target, received.Target)
|
||||
assert.Equal(t, notification.Level, received.Level)
|
||||
assert.Equal(t, notification.Title, received.Title)
|
||||
assert.Equal(t, notification.Message, received.Message)
|
||||
assert.Equal(t, notification.Details, received.Details)
|
||||
assert.Equal(t, notification.Action, received.Action)
|
||||
}
|
||||
|
||||
// TestSubscriber_Unsubscribe verifies that Unsubscribe works correctly.
|
||||
func TestSubscriber_Unsubscribe(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(n.subscribers), "Should have 1 subscriber")
|
||||
|
||||
// Unsubscribe
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Verify subscriber removed
|
||||
assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after unsubscribe")
|
||||
|
||||
// Verify channel is closed
|
||||
_, ok := <-sub.Listen()
|
||||
assert.False(t, ok, "Channel should be closed after unsubscribe")
|
||||
}
|
||||
|
||||
// TestSubscriber_UnsubscribeTwice verifies that calling Unsubscribe() multiple times
|
||||
// is safe and doesn't panic from closing a closed channel.
|
||||
func TestSubscriber_UnsubscribeTwice(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
// First unsubscribe
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Second unsubscribe should be a safe no-op
|
||||
assert.NotPanics(t, func() {
|
||||
sub.Unsubscribe()
|
||||
}, "Second Unsubscribe() should not panic")
|
||||
|
||||
// Verify still cleaned up properly
|
||||
assert.Equal(t, 0, len(n.subscribers))
|
||||
}
|
||||
|
||||
// TestSubscriber_UnsubscribeThrice verifies that even calling Unsubscribe() three or more
|
||||
// times is safe.
|
||||
func TestSubscriber_UnsubscribeThrice(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Call unsubscribe three times
|
||||
assert.NotPanics(t, func() {
|
||||
sub.Unsubscribe()
|
||||
sub.Unsubscribe()
|
||||
sub.Unsubscribe()
|
||||
}, "Multiple Unsubscribe() calls should not panic")
|
||||
|
||||
assert.Equal(t, 0, len(n.subscribers))
|
||||
}
|
||||
|
||||
// TestSubscriber_ChannelClosesOnUnsubscribe verifies that the notification channel
|
||||
// is properly closed when unsubscribing.
|
||||
func TestSubscriber_ChannelClosesOnUnsubscribe(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := sub.Listen()
|
||||
|
||||
// Unsubscribe
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Try to receive from closed channel - should return immediately with ok=false
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
assert.False(t, ok, "Closed channel should return ok=false")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("Should have returned immediately from closed channel")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubscriber_UnsubscribeWhileBlocked verifies behavior when a goroutine is
|
||||
// blocked reading from Listen() when Unsubscribe() is called.
|
||||
// The reader should detect the channel closure and exit gracefully.
|
||||
func TestSubscriber_UnsubscribeWhileBlocked(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
received := false
|
||||
|
||||
// Start goroutine that blocks reading from channel
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for notification := range sub.Listen() {
|
||||
_ = notification
|
||||
// This loop will exit when channel closes
|
||||
}
|
||||
received = true
|
||||
}()
|
||||
|
||||
// Give goroutine time to start blocking
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Unsubscribe while goroutine is blocked
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Wait for goroutine to exit
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
wg.Wait()
|
||||
done <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
assert.True(t, received, "Goroutine should have exited the loop")
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Goroutine did not exit after unsubscribe - possible hang")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubscriber_BufferCapacity verifies that the notification channel has
|
||||
// the expected buffer capacity as specified when creating the Notifier.
|
||||
func TestSubscriber_BufferCapacity(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
bufferSize int
|
||||
}{
|
||||
{"unbuffered", 0},
|
||||
{"small buffer", 10},
|
||||
{"default buffer", 50},
|
||||
{"large buffer", 100},
|
||||
{"very large buffer", 1000},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
n := NewNotifier(tt.bufferSize)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := sub.Listen()
|
||||
capacity := cap(ch)
|
||||
assert.Equal(t, tt.bufferSize, capacity,
|
||||
"Notification channel should have buffer size of %d", tt.bufferSize)
|
||||
|
||||
// Cleanup
|
||||
sub.Unsubscribe()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubscriber_BufferFull tests behavior when the notification buffer fills up.
|
||||
// With a buffered channel and TryLock behavior, notifications may be dropped when
|
||||
// the subscriber is slow to read.
|
||||
func TestSubscriber_BufferFull(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Don't read from the channel - let it fill up
|
||||
|
||||
// Send 60 notifications (more than buffer size of 50)
|
||||
sent := 0
|
||||
for i := 0; i < 60; i++ {
|
||||
notification := Notification{
|
||||
Target: sub.ID,
|
||||
Level: LevelInfo,
|
||||
Message: "Notification",
|
||||
}
|
||||
// Send in goroutine to avoid blocking
|
||||
go func() {
|
||||
n.Notify(notification)
|
||||
}()
|
||||
sent++
|
||||
time.Sleep(1 * time.Millisecond) // Small delay
|
||||
}
|
||||
|
||||
// Wait a bit for sends to complete
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Now read what we can
|
||||
received := 0
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-sub.Listen():
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
received++
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// No more notifications available
|
||||
goto done
|
||||
}
|
||||
}
|
||||
done:
|
||||
|
||||
// We should have received approximately buffer size worth
|
||||
// Due to timing and goroutines, we might receive slightly more than 50 if a send
|
||||
// was in progress when we started reading, or fewer due to TryLock behavior
|
||||
assert.GreaterOrEqual(t, received, 40, "Should receive most notifications")
|
||||
assert.LessOrEqual(t, received, 60, "Should not receive all 60 (some should be dropped)")
|
||||
|
||||
t.Logf("Sent %d notifications, received %d", sent, received)
|
||||
}
|
||||
|
||||
// TestSubscriber_MultipleReceives verifies that a subscriber can receive
|
||||
// multiple notifications sequentially.
|
||||
func TestSubscriber_MultipleReceives(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Send 10 notifications
|
||||
for i := 0; i < 10; i++ {
|
||||
notification := Notification{
|
||||
Target: sub.ID,
|
||||
Level: LevelInfo,
|
||||
Message: "Notification",
|
||||
}
|
||||
go n.Notify(notification)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Receive all 10
|
||||
received := 0
|
||||
for i := 0; i < 10; i++ {
|
||||
_, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
||||
if ok {
|
||||
received++
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(t, 10, received, "Should receive all 10 notifications")
|
||||
}
|
||||
|
||||
// TestSubscriber_ConcurrentReads verifies that multiple goroutines can safely
|
||||
// read from the same subscriber's channel.
|
||||
func TestSubscriber_ConcurrentReads(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
totalReceived := 0
|
||||
|
||||
// Start 3 goroutines reading from the same channel
|
||||
for i := 0; i < 3; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-sub.Listen():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
totalReceived++
|
||||
mu.Unlock()
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Send 30 notifications
|
||||
for i := 0; i < 30; i++ {
|
||||
notification := Notification{
|
||||
Target: sub.ID,
|
||||
Level: LevelInfo,
|
||||
Message: "Concurrent test",
|
||||
}
|
||||
go n.Notify(notification)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Wait for all readers
|
||||
wg.Wait()
|
||||
|
||||
// Each notification should only be received by one goroutine
|
||||
mu.Lock()
|
||||
assert.LessOrEqual(t, totalReceived, 30, "Total received should not exceed sent")
|
||||
assert.GreaterOrEqual(t, totalReceived, 1, "Should receive at least some notifications")
|
||||
mu.Unlock()
|
||||
|
||||
// Cleanup
|
||||
sub.Unsubscribe()
|
||||
}
|
||||
|
||||
// TestSubscriber_NotifyAfterClose verifies that attempting to notify a subscriber
|
||||
// after unsubscribe doesn't cause issues.
|
||||
func TestSubscriber_NotifyAfterClose(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
targetID := sub.ID
|
||||
|
||||
// Unsubscribe
|
||||
sub.Unsubscribe()
|
||||
|
||||
// Wait for cleanup
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Try to notify - should be silently ignored
|
||||
notification := Notification{
|
||||
Target: targetID,
|
||||
Level: LevelError,
|
||||
Message: "Should be ignored",
|
||||
}
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
n.Notify(notification)
|
||||
}, "Notifying closed subscriber should not panic")
|
||||
}
|
||||
|
||||
// TestSubscriber_UnsubscribedFlag verifies that the unsubscribed flag is properly
|
||||
// set and prevents double-close.
|
||||
func TestSubscriber_UnsubscribedFlag(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Initially should be false
|
||||
assert.False(t, sub.unsubscribed, "New subscriber should have unsubscribed=false")
|
||||
|
||||
// After unsubscribe should be true
|
||||
sub.Unsubscribe()
|
||||
assert.True(t, sub.unsubscribed, "After Unsubscribe() flag should be true")
|
||||
|
||||
// Second call should still be safe
|
||||
sub.Unsubscribe()
|
||||
assert.True(t, sub.unsubscribed, "Flag should remain true")
|
||||
}
|
||||
|
||||
// TestSubscriber_FieldsInitialized verifies all Subscriber fields are properly initialized.
|
||||
func TestSubscriber_FieldsInitialized(t *testing.T) {
|
||||
n := NewNotifier(50)
|
||||
sub, err := n.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, Target(""), sub.ID, "ID should be set")
|
||||
assert.NotNil(t, sub.notifications, "notifications channel should be initialized")
|
||||
assert.Equal(t, n, sub.notifier, "notifier reference should be set")
|
||||
assert.NotNil(t, sub.unsubscribelock, "unsubscribelock should be initialized")
|
||||
assert.False(t, sub.unsubscribed, "unsubscribed flag should be false initially")
|
||||
}
|
||||
Reference in New Issue
Block a user