Compare commits
1 Commits
hws/v0.4.0
...
hwsauth/v0
| Author | SHA1 | Date | |
|---|---|---|---|
| 87027e9761 |
@@ -13,7 +13,6 @@ type Config struct {
|
|||||||
ReadHeaderTimeout time.Duration // ENV HWS_READ_HEADER_TIMEOUT: Timeout for reading request headers in seconds (default: 2)
|
ReadHeaderTimeout time.Duration // ENV HWS_READ_HEADER_TIMEOUT: Timeout for reading request headers in seconds (default: 2)
|
||||||
WriteTimeout time.Duration // ENV HWS_WRITE_TIMEOUT: Timeout for writing requests in seconds (default: 10)
|
WriteTimeout time.Duration // ENV HWS_WRITE_TIMEOUT: Timeout for writing requests in seconds (default: 10)
|
||||||
IdleTimeout time.Duration // ENV HWS_IDLE_TIMEOUT: Timeout for idle connections in seconds (default: 120)
|
IdleTimeout time.Duration // ENV HWS_IDLE_TIMEOUT: Timeout for idle connections in seconds (default: 120)
|
||||||
ShutdownDelay time.Duration // ENV HWS_SHUTDOWN_DELAY: Delay in seconds before server shutsdown when Shutdown is called (default: 5)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConfigFromEnv returns a Config struct loaded from the environment variables
|
// ConfigFromEnv returns a Config struct loaded from the environment variables
|
||||||
@@ -25,7 +24,6 @@ func ConfigFromEnv() (*Config, error) {
|
|||||||
ReadHeaderTimeout: time.Duration(env.Int("HWS_READ_HEADER_TIMEOUT", 2)) * time.Second,
|
ReadHeaderTimeout: time.Duration(env.Int("HWS_READ_HEADER_TIMEOUT", 2)) * time.Second,
|
||||||
WriteTimeout: time.Duration(env.Int("HWS_WRITE_TIMEOUT", 10)) * time.Second,
|
WriteTimeout: time.Duration(env.Int("HWS_WRITE_TIMEOUT", 10)) * time.Second,
|
||||||
IdleTimeout: time.Duration(env.Int("HWS_IDLE_TIMEOUT", 120)) * time.Second,
|
IdleTimeout: time.Duration(env.Int("HWS_IDLE_TIMEOUT", 120)) * time.Second,
|
||||||
ShutdownDelay: time.Duration(env.Int("HWS_SHUTDOWN_DELAY", 5)) * time.Second,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ go 1.25.5
|
|||||||
require (
|
require (
|
||||||
git.haelnorr.com/h/golib/env v0.9.1
|
git.haelnorr.com/h/golib/env v0.9.1
|
||||||
git.haelnorr.com/h/golib/hlog v0.9.0
|
git.haelnorr.com/h/golib/hlog v0.9.0
|
||||||
git.haelnorr.com/h/golib/notify v0.1.0
|
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/stretchr/testify v1.11.1
|
github.com/stretchr/testify v1.11.1
|
||||||
k8s.io/apimachinery v0.35.0
|
k8s.io/apimachinery v0.35.0
|
||||||
|
|||||||
@@ -2,8 +2,6 @@ git.haelnorr.com/h/golib/env v0.9.1 h1:2Vsj+mJKnO5f1Md1GO5v9ggLN5zWa0baCewcSHTjo
|
|||||||
git.haelnorr.com/h/golib/env v0.9.1/go.mod h1:glUQVdA1HMKX1avTDyTyuhcr36SSxZtlJxKDT5KTztg=
|
git.haelnorr.com/h/golib/env v0.9.1/go.mod h1:glUQVdA1HMKX1avTDyTyuhcr36SSxZtlJxKDT5KTztg=
|
||||||
git.haelnorr.com/h/golib/hlog v0.9.0 h1:ib8n2MdmiRK2TF067p220kXmhDe9aAnlcsgpuv+QpvE=
|
git.haelnorr.com/h/golib/hlog v0.9.0 h1:ib8n2MdmiRK2TF067p220kXmhDe9aAnlcsgpuv+QpvE=
|
||||||
git.haelnorr.com/h/golib/hlog v0.9.0/go.mod h1:oOlzb8UVHUYP1k7dN5PSJXVskAB2z8EYgRN85jAi0Zk=
|
git.haelnorr.com/h/golib/hlog v0.9.0/go.mod h1:oOlzb8UVHUYP1k7dN5PSJXVskAB2z8EYgRN85jAi0Zk=
|
||||||
git.haelnorr.com/h/golib/notify v0.1.0 h1:xdf6zd21F6n+SuGTeJiuLNMf6zFXMvwpKD0gmNq8N10=
|
|
||||||
git.haelnorr.com/h/golib/notify v0.1.0/go.mod h1:ARqaRmCYb8LMURhDM75sG+qX+YpqXmUVeAtacwjHjBc=
|
|
||||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
|||||||
@@ -10,14 +10,11 @@ type MiddlewareFunc func(w http.ResponseWriter, r *http.Request) (*http.Request,
|
|||||||
|
|
||||||
// Server.AddMiddleware registers all the middleware.
|
// Server.AddMiddleware registers all the middleware.
|
||||||
// Middleware will be run in the order that they are provided.
|
// Middleware will be run in the order that they are provided.
|
||||||
// Can only be called once
|
|
||||||
func (server *Server) AddMiddleware(middleware ...Middleware) error {
|
func (server *Server) AddMiddleware(middleware ...Middleware) error {
|
||||||
if !server.routes {
|
if !server.routes {
|
||||||
return errors.New("Server.AddRoutes must be called before Server.AddMiddleware")
|
return errors.New("Server.AddRoutes must be called before Server.AddMiddleware")
|
||||||
}
|
}
|
||||||
if server.middleware {
|
|
||||||
return errors.New("Server.AddMiddleware already called")
|
|
||||||
}
|
|
||||||
// RUN LOGGING MIDDLEWARE FIRST
|
// RUN LOGGING MIDDLEWARE FIRST
|
||||||
server.server.Handler = logging(server.server.Handler, server.logger)
|
server.server.Handler = logging(server.server.Handler, server.logger)
|
||||||
|
|
||||||
|
|||||||
316
hws/notify.go
316
hws/notify.go
@@ -1,316 +0,0 @@
|
|||||||
package hws
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"slices"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.haelnorr.com/h/golib/notify"
|
|
||||||
)
|
|
||||||
|
|
||||||
// LevelShutdown is a special level used for the notification sent on shutdown.
|
|
||||||
// This can be used to check if the notification is a shutdown event and if it should
|
|
||||||
// be passed on to consumers or special considerations should be made.
|
|
||||||
const LevelShutdown notify.Level = "shutdown"
|
|
||||||
|
|
||||||
// Notifier manages client subscriptions and notification delivery for the HWS server.
|
|
||||||
// It wraps the notify.Notifier with additional client management features including
|
|
||||||
// dual identification (subscription ID + alternate ID) and automatic cleanup of
|
|
||||||
// inactive clients after 5 minutes.
|
|
||||||
type Notifier struct {
|
|
||||||
*notify.Notifier
|
|
||||||
clients *Clients
|
|
||||||
running bool
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clients maintains thread-safe mappings between subscriber IDs, alternate IDs,
|
|
||||||
// and Client instances. It supports querying clients by either their unique
|
|
||||||
// subscription ID or their alternate ID (where multiple clients can share an alternate ID).
|
|
||||||
type Clients struct {
|
|
||||||
clientsSubMap map[notify.Target]*Client
|
|
||||||
clientsIDMap map[string][]*Client
|
|
||||||
lock *sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client represents a unique subscriber to the notifications channel.
|
|
||||||
// It tracks activity via lastSeen timestamp (updated atomically) and monitors
|
|
||||||
// consecutive send failures for automatic disconnect detection.
|
|
||||||
type Client struct {
|
|
||||||
sub *notify.Subscriber
|
|
||||||
lastSeen int64 // accessed atomically
|
|
||||||
altID string
|
|
||||||
consecutiveFails int32 // accessed atomically
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) startNotifier() {
|
|
||||||
if s.notifier != nil && s.notifier.running {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
s.notifier = &Notifier{
|
|
||||||
Notifier: notify.NewNotifier(50),
|
|
||||||
clients: &Clients{
|
|
||||||
clientsSubMap: make(map[notify.Target]*Client),
|
|
||||||
clientsIDMap: make(map[string][]*Client),
|
|
||||||
lock: new(sync.RWMutex),
|
|
||||||
},
|
|
||||||
running: true,
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker := time.NewTicker(time.Minute)
|
|
||||||
go func() {
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
s.notifier.clients.cleanUp()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) closeNotifier() {
|
|
||||||
if s.notifier != nil {
|
|
||||||
if s.notifier.cancel != nil {
|
|
||||||
s.notifier.cancel()
|
|
||||||
}
|
|
||||||
s.notifier.running = false
|
|
||||||
s.notifier.Close()
|
|
||||||
}
|
|
||||||
s.notifier = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifySub sends a notification to a specific subscriber identified by the notification's Target field.
|
|
||||||
// If the subscriber doesn't exist, a warning is logged but the operation does not fail.
|
|
||||||
// This is thread-safe and can be called from multiple goroutines.
|
|
||||||
func (s *Server) NotifySub(nt notify.Notification) {
|
|
||||||
if s.notifier == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_, exists := s.notifier.clients.getClient(nt.Target)
|
|
||||||
if !exists {
|
|
||||||
err := fmt.Errorf("Tried to notify subscriber that doesn't exist - subID: %s", nt.Target)
|
|
||||||
s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.notifier.Notify(nt)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifyID sends a notification to all clients associated with the given alternate ID.
|
|
||||||
// Multiple clients can share the same alternate ID (e.g., multiple sessions for one user).
|
|
||||||
// If no clients exist with that ID, a warning is logged but the operation does not fail.
|
|
||||||
// This is thread-safe and can be called from multiple goroutines.
|
|
||||||
func (s *Server) NotifyID(nt notify.Notification, altID string) {
|
|
||||||
if s.notifier == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.notifier.clients.lock.RLock()
|
|
||||||
clients, exists := s.notifier.clients.clientsIDMap[altID]
|
|
||||||
s.notifier.clients.lock.RUnlock()
|
|
||||||
if !exists {
|
|
||||||
err := fmt.Errorf("Tried to notify client group that doesn't exist - altID: %s", altID)
|
|
||||||
s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, client := range clients {
|
|
||||||
ntt := nt
|
|
||||||
ntt.Target = client.sub.ID
|
|
||||||
s.NotifySub(ntt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifyAll broadcasts a notification to all connected clients.
|
|
||||||
// This is thread-safe and can be called from multiple goroutines.
|
|
||||||
func (s *Server) NotifyAll(nt notify.Notification) {
|
|
||||||
if s.notifier == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
nt.Target = ""
|
|
||||||
s.notifier.NotifyAll(nt)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetClient returns a Client that can be used to receive notifications.
|
|
||||||
// If a client exists with the provided subID, that client will be returned.
|
|
||||||
// If altID is provided, it will update the existing Client.
|
|
||||||
// If subID is an empty string, a new client will be returned.
|
|
||||||
// If both altID and subID are empty, a new Client with no altID will be returned.
|
|
||||||
// Multiple clients with the same altID are permitted.
|
|
||||||
func (s *Server) GetClient(subID, altID string) (*Client, error) {
|
|
||||||
if s.notifier == nil || !s.notifier.running {
|
|
||||||
return nil, errors.New("notifier hasn't started")
|
|
||||||
}
|
|
||||||
target := notify.Target(subID)
|
|
||||||
client, exists := s.notifier.clients.getClient(target)
|
|
||||||
if exists {
|
|
||||||
s.notifier.clients.updateAltID(client, altID)
|
|
||||||
return client, nil
|
|
||||||
}
|
|
||||||
// An error should only be returned if there are 10 collisions of a randomly generated 16 bit byte string from rand.Rand()
|
|
||||||
// Basically never going to happen, and if it does its not my problem
|
|
||||||
sub, _ := s.notifier.Subscribe()
|
|
||||||
client = &Client{
|
|
||||||
sub: sub,
|
|
||||||
lastSeen: time.Now().Unix(),
|
|
||||||
altID: altID,
|
|
||||||
consecutiveFails: 0,
|
|
||||||
}
|
|
||||||
s.notifier.clients.addClient(client)
|
|
||||||
return client, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Clients) getClient(target notify.Target) (*Client, bool) {
|
|
||||||
cs.lock.RLock()
|
|
||||||
client, exists := cs.clientsSubMap[target]
|
|
||||||
cs.lock.RUnlock()
|
|
||||||
return client, exists
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Clients) updateAltID(client *Client, altID string) {
|
|
||||||
cs.lock.Lock()
|
|
||||||
if altID != "" && !slices.Contains(cs.clientsIDMap[altID], client) {
|
|
||||||
cs.clientsIDMap[altID] = append(cs.clientsIDMap[altID], client)
|
|
||||||
}
|
|
||||||
if client.altID != altID && client.altID != "" {
|
|
||||||
cs.deleteFromID(client, client.altID)
|
|
||||||
}
|
|
||||||
client.altID = altID
|
|
||||||
cs.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Clients) deleteFromID(client *Client, altID string) {
|
|
||||||
cs.clientsIDMap[altID] = deleteFromSlice(cs.clientsIDMap[altID], client, func(a, b *Client) bool {
|
|
||||||
return a.sub.ID == b.sub.ID
|
|
||||||
})
|
|
||||||
if len(cs.clientsIDMap[altID]) == 0 {
|
|
||||||
delete(cs.clientsIDMap, altID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Clients) addClient(client *Client) {
|
|
||||||
cs.lock.Lock()
|
|
||||||
cs.clientsSubMap[client.sub.ID] = client
|
|
||||||
if client.altID != "" {
|
|
||||||
cs.clientsIDMap[client.altID] = append(cs.clientsIDMap[client.altID], client)
|
|
||||||
}
|
|
||||||
cs.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Clients) cleanUp() {
|
|
||||||
now := time.Now().Unix()
|
|
||||||
|
|
||||||
// Collect clients to kill while holding read lock
|
|
||||||
cs.lock.RLock()
|
|
||||||
toKill := make([]*Client, 0)
|
|
||||||
for _, client := range cs.clientsSubMap {
|
|
||||||
if now-atomic.LoadInt64(&client.lastSeen) > 300 {
|
|
||||||
toKill = append(toKill, client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cs.lock.RUnlock()
|
|
||||||
|
|
||||||
// Kill clients without holding lock
|
|
||||||
for _, client := range toKill {
|
|
||||||
cs.killClient(client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *Clients) killClient(client *Client) {
|
|
||||||
client.sub.Unsubscribe()
|
|
||||||
|
|
||||||
cs.lock.Lock()
|
|
||||||
delete(cs.clientsSubMap, client.sub.ID)
|
|
||||||
if client.altID != "" {
|
|
||||||
cs.deleteFromID(client, client.altID)
|
|
||||||
}
|
|
||||||
cs.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Listen starts a goroutine that forwards notifications from the subscriber to a returned channel.
|
|
||||||
// It returns a receive-only channel for notifications and a channel to stop listening.
|
|
||||||
// The notification channel is buffered with size 10 to tolerate brief slowness.
|
|
||||||
//
|
|
||||||
// The goroutine automatically stops and closes the notification channel when:
|
|
||||||
// - The subscriber is unsubscribed
|
|
||||||
// - The stop channel is closed
|
|
||||||
// - The client fails to receive 5 consecutive notifications within 5 seconds each
|
|
||||||
//
|
|
||||||
// Client.lastSeen is updated every 30 seconds via heartbeat, or when a notification is successfully delivered.
|
|
||||||
// Consecutive send failures are tracked; after 5 failures, the client is considered disconnected and cleaned up.
|
|
||||||
func (c *Client) Listen() (<-chan notify.Notification, chan<- struct{}) {
|
|
||||||
ch := make(chan notify.Notification, 10)
|
|
||||||
stop := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTicker(30 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
defer close(ch)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-stop:
|
|
||||||
return
|
|
||||||
|
|
||||||
case nt, ok := <-c.sub.Listen():
|
|
||||||
if !ok {
|
|
||||||
// Subscriber channel closed
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to send with timeout
|
|
||||||
timeout := time.NewTimer(5 * time.Second)
|
|
||||||
select {
|
|
||||||
case ch <- nt:
|
|
||||||
// Successfully sent - update lastSeen and reset failure count
|
|
||||||
atomic.StoreInt64(&c.lastSeen, time.Now().Unix())
|
|
||||||
atomic.StoreInt32(&c.consecutiveFails, 0)
|
|
||||||
timeout.Stop()
|
|
||||||
|
|
||||||
case <-timeout.C:
|
|
||||||
// Send timeout - increment failure count
|
|
||||||
fails := atomic.AddInt32(&c.consecutiveFails, 1)
|
|
||||||
if fails >= 5 {
|
|
||||||
// Too many consecutive failures - client is stuck/disconnected
|
|
||||||
c.sub.Unsubscribe()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-stop:
|
|
||||||
timeout.Stop()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-ticker.C:
|
|
||||||
// Heartbeat - update lastSeen to keep client alive
|
|
||||||
atomic.StoreInt64(&c.lastSeen, time.Now().Unix())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch, stop
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) ID() string {
|
|
||||||
return string(c.sub.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func deleteFromSlice[T any](a []T, c T, eq func(T, T) bool) []T {
|
|
||||||
n := 0
|
|
||||||
for _, x := range a {
|
|
||||||
if !eq(x, c) {
|
|
||||||
a[n] = x
|
|
||||||
n++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return a[:n]
|
|
||||||
}
|
|
||||||
1014
hws/notify_test.go
1014
hws/notify_test.go
File diff suppressed because it is too large
Load Diff
@@ -13,7 +13,3 @@ func (w *wrappedWriter) WriteHeader(statusCode int) {
|
|||||||
w.ResponseWriter.WriteHeader(statusCode)
|
w.ResponseWriter.WriteHeader(statusCode)
|
||||||
w.statusCode = statusCode
|
w.statusCode = statusCode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *wrappedWriter) Unwrap() http.ResponseWriter {
|
|
||||||
return w.ResponseWriter
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -7,22 +7,19 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.haelnorr.com/h/golib/notify"
|
|
||||||
"k8s.io/apimachinery/pkg/util/validation"
|
"k8s.io/apimachinery/pkg/util/validation"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
GZIP bool
|
GZIP bool
|
||||||
server *http.Server
|
server *http.Server
|
||||||
logger *logger
|
logger *logger
|
||||||
routes bool
|
routes bool
|
||||||
middleware bool
|
middleware bool
|
||||||
errorPage ErrorPageFunc
|
errorPage ErrorPageFunc
|
||||||
ready chan struct{}
|
ready chan struct{}
|
||||||
notifier *Notifier
|
|
||||||
shutdowndelay time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ready returns a channel that is closed when the server is started
|
// Ready returns a channel that is closed when the server is started
|
||||||
@@ -86,11 +83,10 @@ func NewServer(config *Config) (*Server, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
server := &Server{
|
server := &Server{
|
||||||
server: httpServer,
|
server: httpServer,
|
||||||
routes: false,
|
routes: false,
|
||||||
GZIP: config.GZIP,
|
GZIP: config.GZIP,
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
shutdowndelay: config.ShutdownDelay,
|
|
||||||
}
|
}
|
||||||
return server, nil
|
return server, nil
|
||||||
}
|
}
|
||||||
@@ -109,8 +105,6 @@ func (server *Server) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
server.startNotifier()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if server.logger == nil {
|
if server.logger == nil {
|
||||||
fmt.Printf("Listening for requests on %s", server.server.Addr)
|
fmt.Printf("Listening for requests on %s", server.server.Addr)
|
||||||
@@ -132,13 +126,6 @@ func (server *Server) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (server *Server) Shutdown(ctx context.Context) error {
|
func (server *Server) Shutdown(ctx context.Context) error {
|
||||||
server.logger.logger.Debug().Dur("shutdown_delay", server.shutdowndelay).Msg("HWS Server shutting down")
|
|
||||||
server.NotifyAll(notify.Notification{
|
|
||||||
Title: "Shutting down",
|
|
||||||
Message: fmt.Sprintf("Server is shutting down in %v", server.shutdowndelay),
|
|
||||||
Level: LevelShutdown,
|
|
||||||
})
|
|
||||||
<-time.NewTimer(server.shutdowndelay).C
|
|
||||||
if !server.IsReady() {
|
if !server.IsReady() {
|
||||||
return errors.New("Server isn't running")
|
return errors.New("Server isn't running")
|
||||||
}
|
}
|
||||||
@@ -149,7 +136,6 @@ func (server *Server) Shutdown(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Failed to shutdown the server gracefully")
|
return errors.Wrap(err, "Failed to shutdown the server gracefully")
|
||||||
}
|
}
|
||||||
server.closeNotifier()
|
|
||||||
server.ready = make(chan struct{})
|
server.ready = make(chan struct{})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package hwsauth
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.haelnorr.com/h/golib/jwt"
|
"git.haelnorr.com/h/golib/jwt"
|
||||||
@@ -46,9 +45,6 @@ func (auth *Authenticator[T, TX]) getAuthenticatedUser(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return authenticatedModel[T]{}, errors.Wrap(err, "auth.load")
|
return authenticatedModel[T]{}, errors.Wrap(err, "auth.load")
|
||||||
}
|
}
|
||||||
if reflect.ValueOf(model).IsNil() {
|
|
||||||
return authenticatedModel[T]{}, errors.New("no user matching JWT in database")
|
|
||||||
}
|
|
||||||
authUser := authenticatedModel[T]{
|
authUser := authenticatedModel[T]{
|
||||||
model: model,
|
model: model,
|
||||||
fresh: aT.Fresh,
|
fresh: aT.Fresh,
|
||||||
|
|||||||
@@ -1,21 +0,0 @@
|
|||||||
MIT License
|
|
||||||
|
|
||||||
Copyright (c) 2026 haelnorr
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
||||||
397
notify/README.md
397
notify/README.md
@@ -1,397 +0,0 @@
|
|||||||
# notify
|
|
||||||
|
|
||||||
Thread-safe pub/sub notification system for Go applications.
|
|
||||||
|
|
||||||
## Features
|
|
||||||
|
|
||||||
- **Thread-Safe**: All operations are safe for concurrent use
|
|
||||||
- **Configurable Buffering**: Set custom buffer sizes per notifier
|
|
||||||
- **Targeted & Broadcast**: Send to specific subscribers or broadcast to all
|
|
||||||
- **Graceful Shutdown**: Built-in Close() for clean resource cleanup
|
|
||||||
- **Idempotent Operations**: Safe to call Unsubscribe() and Close() multiple times
|
|
||||||
- **Zero Dependencies**: Uses only Go standard library
|
|
||||||
- **Comprehensive Tests**: 95%+ code coverage with race detector clean
|
|
||||||
|
|
||||||
## Installation
|
|
||||||
|
|
||||||
```bash
|
|
||||||
go get git.haelnorr.com/h/golib/notify
|
|
||||||
```
|
|
||||||
|
|
||||||
## Quick Start
|
|
||||||
|
|
||||||
```go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"git.haelnorr.com/h/golib/notify"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
// Create a notifier with 50-notification buffer per subscriber
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
// Subscribe to receive notifications
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Listen for notifications
|
|
||||||
go func() {
|
|
||||||
for notification := range sub.Listen() {
|
|
||||||
fmt.Printf("[%s] %s: %s\n",
|
|
||||||
notification.Level,
|
|
||||||
notification.Title,
|
|
||||||
notification.Message)
|
|
||||||
}
|
|
||||||
fmt.Println("Listener exited")
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Send a notification
|
|
||||||
n.Notify(notify.Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: notify.LevelSuccess,
|
|
||||||
Title: "Welcome",
|
|
||||||
Message: "You're now subscribed!",
|
|
||||||
})
|
|
||||||
|
|
||||||
// Broadcast to all subscribers
|
|
||||||
n.NotifyAll(notify.Notification{
|
|
||||||
Level: notify.LevelInfo,
|
|
||||||
Title: "System Status",
|
|
||||||
Message: "All systems operational",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
### Creating a Notifier
|
|
||||||
|
|
||||||
The buffer size determines how many notifications can be queued per subscriber:
|
|
||||||
|
|
||||||
```go
|
|
||||||
// Unbuffered - sends block until received
|
|
||||||
n := notify.NewNotifier(0)
|
|
||||||
|
|
||||||
// Small buffer - low memory, may drop if slow readers
|
|
||||||
n := notify.NewNotifier(25)
|
|
||||||
|
|
||||||
// Recommended - balanced approach
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
|
|
||||||
// Large buffer - handles bursts well
|
|
||||||
n := notify.NewNotifier(500)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Subscribing
|
|
||||||
|
|
||||||
Each subscriber receives a unique ID and a buffered notification channel:
|
|
||||||
|
|
||||||
```go
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
// Handle error (e.g., notifier is closed)
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Subscriber ID:", sub.ID)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Listening for Notifications
|
|
||||||
|
|
||||||
Use a for-range loop to process notifications:
|
|
||||||
|
|
||||||
```go
|
|
||||||
for notification := range sub.Listen() {
|
|
||||||
switch notification.Level {
|
|
||||||
case notify.LevelSuccess:
|
|
||||||
fmt.Println("✓", notification.Message)
|
|
||||||
case notify.LevelError:
|
|
||||||
fmt.Println("✗", notification.Message)
|
|
||||||
default:
|
|
||||||
fmt.Println("ℹ", notification.Message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Sending Targeted Notifications
|
|
||||||
|
|
||||||
Send to a specific subscriber:
|
|
||||||
|
|
||||||
```go
|
|
||||||
n.Notify(notify.Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: notify.LevelWarn,
|
|
||||||
Title: "Account Warning",
|
|
||||||
Message: "Password expires in 3 days",
|
|
||||||
Details: "Please update your password",
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
### Broadcasting to All Subscribers
|
|
||||||
|
|
||||||
Send to all current subscribers:
|
|
||||||
|
|
||||||
```go
|
|
||||||
n.NotifyAll(notify.Notification{
|
|
||||||
Level: notify.LevelInfo,
|
|
||||||
Title: "Maintenance",
|
|
||||||
Message: "System will restart in 5 minutes",
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
### Unsubscribing
|
|
||||||
|
|
||||||
Clean up when done (safe to call multiple times):
|
|
||||||
|
|
||||||
```go
|
|
||||||
sub.Unsubscribe()
|
|
||||||
```
|
|
||||||
|
|
||||||
### Graceful Shutdown
|
|
||||||
|
|
||||||
Close the notifier to unsubscribe all and prevent new subscriptions:
|
|
||||||
|
|
||||||
```go
|
|
||||||
n.Close()
|
|
||||||
// After Close():
|
|
||||||
// - All subscribers are removed
|
|
||||||
// - All notification channels are closed
|
|
||||||
// - Future Subscribe() calls return error
|
|
||||||
// - Notify/NotifyAll are no-ops
|
|
||||||
```
|
|
||||||
|
|
||||||
## Notification Levels
|
|
||||||
|
|
||||||
Four predefined levels are available:
|
|
||||||
|
|
||||||
| Level | Constant | Use Case |
|
|
||||||
|-------|----------|----------|
|
|
||||||
| Success | `notify.LevelSuccess` | Successful operations |
|
|
||||||
| Info | `notify.LevelInfo` | General information |
|
|
||||||
| Warning | `notify.LevelWarn` | Non-critical warnings |
|
|
||||||
| Error | `notify.LevelError` | Errors requiring attention |
|
|
||||||
|
|
||||||
## Advanced Usage
|
|
||||||
|
|
||||||
### Custom Action Data
|
|
||||||
|
|
||||||
The `Action` field can hold any data type:
|
|
||||||
|
|
||||||
```go
|
|
||||||
type UserAction struct {
|
|
||||||
URL string
|
|
||||||
Method string
|
|
||||||
}
|
|
||||||
|
|
||||||
n.Notify(notify.Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: notify.LevelInfo,
|
|
||||||
Message: "New update available",
|
|
||||||
Action: UserAction{
|
|
||||||
URL: "/updates/download",
|
|
||||||
Method: "GET",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// In listener:
|
|
||||||
for notif := range sub.Listen() {
|
|
||||||
if action, ok := notif.Action.(UserAction); ok {
|
|
||||||
fmt.Printf("Action: %s %s\n", action.Method, action.URL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Multiple Subscribers
|
|
||||||
|
|
||||||
Create a notification hub for multiple clients:
|
|
||||||
|
|
||||||
```go
|
|
||||||
n := notify.NewNotifier(100)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
// Create 10 subscribers
|
|
||||||
subscribers := make([]*notify.Subscriber, 10)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
subscribers[i] = sub
|
|
||||||
|
|
||||||
// Start listener for each
|
|
||||||
go func(id int, s *notify.Subscriber) {
|
|
||||||
for notif := range s.Listen() {
|
|
||||||
log.Printf("Sub %d: %s", id, notif.Message)
|
|
||||||
}
|
|
||||||
}(i, sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast to all
|
|
||||||
n.NotifyAll(notify.Notification{
|
|
||||||
Level: notify.LevelSuccess,
|
|
||||||
Message: "All subscribers active",
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
### Concurrent-Safe Operations
|
|
||||||
|
|
||||||
All operations are thread-safe:
|
|
||||||
|
|
||||||
```go
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
|
|
||||||
// Safe to subscribe from multiple goroutines
|
|
||||||
for i := 0; i < 100; i++ {
|
|
||||||
go func() {
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
// ...
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Safe to notify from multiple goroutines
|
|
||||||
for i := 0; i < 100; i++ {
|
|
||||||
go func() {
|
|
||||||
n.NotifyAll(notify.Notification{
|
|
||||||
Level: notify.LevelInfo,
|
|
||||||
Message: "Concurrent notification",
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Best Practices
|
|
||||||
|
|
||||||
### 1. Use defer for Cleanup
|
|
||||||
|
|
||||||
```go
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
```
|
|
||||||
|
|
||||||
### 2. Check Errors
|
|
||||||
|
|
||||||
```go
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Subscribe failed: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. Buffer Size Recommendations
|
|
||||||
|
|
||||||
| Scenario | Buffer Size |
|
|
||||||
|----------|------------|
|
|
||||||
| Real-time chat | 10-25 |
|
|
||||||
| General app notifications | 50-100 |
|
|
||||||
| High-throughput logging | 200-500 |
|
|
||||||
| Testing/debugging | 0 (unbuffered) |
|
|
||||||
|
|
||||||
### 4. Listener Goroutines
|
|
||||||
|
|
||||||
Always use goroutines for listeners to prevent blocking:
|
|
||||||
|
|
||||||
```go
|
|
||||||
// Good ✓
|
|
||||||
go func() {
|
|
||||||
for notif := range sub.Listen() {
|
|
||||||
process(notif)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Bad ✗ - blocks main goroutine
|
|
||||||
for notif := range sub.Listen() {
|
|
||||||
process(notif)
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 5. Detect Channel Closure
|
|
||||||
|
|
||||||
```go
|
|
||||||
for notification := range sub.Listen() {
|
|
||||||
// Process notifications
|
|
||||||
}
|
|
||||||
// When this loop exits, the channel is closed
|
|
||||||
// Either subscriber unsubscribed or notifier closed
|
|
||||||
fmt.Println("No more notifications")
|
|
||||||
```
|
|
||||||
|
|
||||||
## Performance
|
|
||||||
|
|
||||||
- **Subscribe**: O(1) average case (random ID generation)
|
|
||||||
- **Notify**: O(1) lookup + O(1) channel send (non-blocking)
|
|
||||||
- **NotifyAll**: O(n) where n is number of subscribers
|
|
||||||
- **Unsubscribe**: O(1) map deletion + O(1) channel close
|
|
||||||
- **Close**: O(n) where n is number of subscribers
|
|
||||||
|
|
||||||
### Benchmarks
|
|
||||||
|
|
||||||
Typical performance on modern hardware:
|
|
||||||
|
|
||||||
- Subscribe: ~5-10µs per operation
|
|
||||||
- Notify: ~1-2µs per operation
|
|
||||||
- NotifyAll (10 subs): ~10-20µs
|
|
||||||
- Buffer full handling: ~100ns (TryLock drop)
|
|
||||||
|
|
||||||
## Thread Safety
|
|
||||||
|
|
||||||
All public methods are thread-safe:
|
|
||||||
|
|
||||||
- ✅ `NewNotifier()` - Safe
|
|
||||||
- ✅ `Subscribe()` - Safe, concurrent calls allowed
|
|
||||||
- ✅ `Unsubscribe()` - Safe, idempotent
|
|
||||||
- ✅ `Notify()` - Safe, concurrent calls allowed
|
|
||||||
- ✅ `NotifyAll()` - Safe, concurrent calls allowed
|
|
||||||
- ✅ `Close()` - Safe, idempotent
|
|
||||||
- ✅ `Listen()` - Safe, returns read-only channel
|
|
||||||
|
|
||||||
## Testing
|
|
||||||
|
|
||||||
Run tests:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Run all tests
|
|
||||||
go test
|
|
||||||
|
|
||||||
# With race detector
|
|
||||||
go test -race
|
|
||||||
|
|
||||||
# With coverage
|
|
||||||
go test -cover
|
|
||||||
|
|
||||||
# Verbose output
|
|
||||||
go test -v
|
|
||||||
```
|
|
||||||
|
|
||||||
Current test coverage: **95.1%**
|
|
||||||
|
|
||||||
## Documentation
|
|
||||||
|
|
||||||
Full API documentation available at:
|
|
||||||
- [pkg.go.dev](https://pkg.go.dev/git.haelnorr.com/h/golib/notify)
|
|
||||||
- Or run: `go doc -all git.haelnorr.com/h/golib/notify`
|
|
||||||
|
|
||||||
## License
|
|
||||||
|
|
||||||
MIT License - see repository root for details
|
|
||||||
|
|
||||||
## Contributing
|
|
||||||
|
|
||||||
See CONTRIBUTING.md in the repository root
|
|
||||||
|
|
||||||
## Related Projects
|
|
||||||
|
|
||||||
Other modules in the golib collection:
|
|
||||||
- `cookies` - HTTP cookie utilities
|
|
||||||
- `env` - Environment variable helpers
|
|
||||||
- `ezconf` - Configuration loader
|
|
||||||
- `hlog` - Logging with zerolog
|
|
||||||
- `hws` - HTTP web server
|
|
||||||
- `jwt` - JWT token utilities
|
|
||||||
@@ -1,369 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestClose_Basic verifies basic Close() functionality.
|
|
||||||
func TestClose_Basic(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create some subscribers
|
|
||||||
sub1, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
sub2, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
sub3, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 3, len(n.subscribers), "Should have 3 subscribers")
|
|
||||||
|
|
||||||
// Close the notifier
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Verify all subscribers removed
|
|
||||||
assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after close")
|
|
||||||
|
|
||||||
// Verify channels are closed
|
|
||||||
_, ok := <-sub1.Listen()
|
|
||||||
assert.False(t, ok, "sub1 channel should be closed")
|
|
||||||
|
|
||||||
_, ok = <-sub2.Listen()
|
|
||||||
assert.False(t, ok, "sub2 channel should be closed")
|
|
||||||
|
|
||||||
_, ok = <-sub3.Listen()
|
|
||||||
assert.False(t, ok, "sub3 channel should be closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_IdempotentClose verifies that calling Close() multiple times is safe.
|
|
||||||
func TestClose_IdempotentClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Close multiple times - should not panic
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
n.Close()
|
|
||||||
n.Close()
|
|
||||||
n.Close()
|
|
||||||
}, "Multiple Close() calls should not panic")
|
|
||||||
|
|
||||||
// Verify channel is still closed (not double-closed)
|
|
||||||
_, ok := <-sub.Listen()
|
|
||||||
assert.False(t, ok, "Channel should be closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_SubscribeAfterClose verifies that Subscribe fails after Close.
|
|
||||||
func TestClose_SubscribeAfterClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Subscribe before close
|
|
||||||
sub1, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, sub1)
|
|
||||||
|
|
||||||
// Close
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Try to subscribe after close
|
|
||||||
sub2, err := n.Subscribe()
|
|
||||||
assert.Error(t, err, "Subscribe should return error after Close")
|
|
||||||
assert.Nil(t, sub2, "Subscribe should return nil subscriber after Close")
|
|
||||||
assert.Contains(t, err.Error(), "closed", "Error should mention notifier is closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_NotifyAfterClose verifies that Notify after Close doesn't panic.
|
|
||||||
func TestClose_NotifyAfterClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Close
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Try to notify - should not panic
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Should be ignored",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
n.Notify(notification)
|
|
||||||
}, "Notify after Close should not panic")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_NotifyAllAfterClose verifies that NotifyAll after Close doesn't panic.
|
|
||||||
func TestClose_NotifyAllAfterClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
_, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Close
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Try to broadcast - should not panic
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Should be ignored",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
n.NotifyAll(notification)
|
|
||||||
}, "NotifyAll after Close should not panic")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_WithActiveListeners verifies that listeners detect channel closure.
|
|
||||||
func TestClose_WithActiveListeners(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
listenerExited := false
|
|
||||||
|
|
||||||
// Start listener goroutine
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for range sub.Listen() {
|
|
||||||
// Process notifications
|
|
||||||
}
|
|
||||||
listenerExited = true
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Give listener time to start
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// Close notifier
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Wait for listener to exit
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
assert.True(t, listenerExited, "Listener should have exited")
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
t.Fatal("Listener did not exit after Close - possible hang")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_PendingNotifications verifies behavior of pending notifications on close.
|
|
||||||
func TestClose_PendingNotifications(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Send some notifications
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Notification",
|
|
||||||
}
|
|
||||||
go n.Notify(notification)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for sends to complete
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Close notifier (closes channels)
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Try to read any remaining notifications before closure
|
|
||||||
received := 0
|
|
||||||
for {
|
|
||||||
_, ok := <-sub.Listen()
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
received++
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Received %d notifications before channel closed", received)
|
|
||||||
assert.GreaterOrEqual(t, received, 0, "Should receive at least 0 notifications")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_ConcurrentSubscribeAndClose verifies thread safety.
|
|
||||||
func TestClose_ConcurrentSubscribeAndClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Goroutines trying to subscribe
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
_, _ = n.Subscribe() // May succeed or fail depending on timing
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Give some time for subscriptions to start
|
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
|
|
||||||
// Close concurrently
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
n.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Should complete without deadlock or panic
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Success
|
|
||||||
case <-time.After(2 * time.Second):
|
|
||||||
t.Fatal("Test timed out - possible deadlock")
|
|
||||||
}
|
|
||||||
|
|
||||||
// After close, no more subscriptions should succeed
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Nil(t, sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_ConcurrentNotifyAndClose verifies thread safety with notifications.
|
|
||||||
func TestClose_ConcurrentNotifyAndClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create some subscribers
|
|
||||||
subscribers := make([]*Subscriber, 10)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
subscribers[i] = sub
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Goroutines sending notifications
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Test",
|
|
||||||
}
|
|
||||||
n.NotifyAll(notification)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close concurrently
|
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
n.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Should complete without panic or deadlock
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Success - no panic or deadlock
|
|
||||||
case <-time.After(2 * time.Second):
|
|
||||||
t.Fatal("Test timed out - possible deadlock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_Integration verifies the complete Close workflow.
|
|
||||||
func TestClose_Integration(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create subscribers
|
|
||||||
sub1, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
sub2, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
sub3, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Send some notifications
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelSuccess,
|
|
||||||
Message: "Before close",
|
|
||||||
}
|
|
||||||
go n.NotifyAll(notification)
|
|
||||||
|
|
||||||
// Receive notifications from all subscribers
|
|
||||||
received1, ok := receiveWithTimeout(sub1.Listen(), 100*time.Millisecond)
|
|
||||||
require.True(t, ok, "sub1 should receive notification")
|
|
||||||
assert.Equal(t, "Before close", received1.Message)
|
|
||||||
|
|
||||||
received2, ok := receiveWithTimeout(sub2.Listen(), 100*time.Millisecond)
|
|
||||||
require.True(t, ok, "sub2 should receive notification")
|
|
||||||
assert.Equal(t, "Before close", received2.Message)
|
|
||||||
|
|
||||||
received3, ok := receiveWithTimeout(sub3.Listen(), 100*time.Millisecond)
|
|
||||||
require.True(t, ok, "sub3 should receive notification")
|
|
||||||
assert.Equal(t, "Before close", received3.Message)
|
|
||||||
|
|
||||||
// Close the notifier
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Verify all channels closed (should return immediately with ok=false)
|
|
||||||
_, ok = <-sub1.Listen()
|
|
||||||
assert.False(t, ok, "sub1 should be closed")
|
|
||||||
_, ok = <-sub2.Listen()
|
|
||||||
assert.False(t, ok, "sub2 should be closed")
|
|
||||||
_, ok = <-sub3.Listen()
|
|
||||||
assert.False(t, ok, "sub3 should be closed")
|
|
||||||
|
|
||||||
// Verify no more subscriptions
|
|
||||||
sub4, err := n.Subscribe()
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Nil(t, sub4)
|
|
||||||
|
|
||||||
// Verify notifications are ignored
|
|
||||||
notification2 := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "After close",
|
|
||||||
}
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
n.NotifyAll(notification2)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClose_UnsubscribeAfterClose verifies unsubscribe after close is safe.
|
|
||||||
func TestClose_UnsubscribeAfterClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Close notifier
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Try to unsubscribe after close - should be safe
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}, "Unsubscribe after Close should not panic")
|
|
||||||
}
|
|
||||||
148
notify/doc.go
148
notify/doc.go
@@ -1,148 +0,0 @@
|
|||||||
// Package notify provides a thread-safe pub/sub notification system.
|
|
||||||
//
|
|
||||||
// The notify package implements a lightweight, in-memory notification system
|
|
||||||
// where subscribers can register to receive notifications. Each subscriber
|
|
||||||
// receives notifications through a buffered channel, allowing them to process
|
|
||||||
// messages at their own pace.
|
|
||||||
//
|
|
||||||
// # Features
|
|
||||||
//
|
|
||||||
// - Thread-safe concurrent operations
|
|
||||||
// - Configurable notification buffer size per Notifier
|
|
||||||
// - Unique subscriber IDs using cryptographic random generation
|
|
||||||
// - Targeted notifications to specific subscribers
|
|
||||||
// - Broadcast notifications to all subscribers
|
|
||||||
// - Idempotent unsubscribe operations
|
|
||||||
// - Graceful shutdown with Close()
|
|
||||||
// - Zero external dependencies (uses only Go standard library)
|
|
||||||
//
|
|
||||||
// # Basic Usage
|
|
||||||
//
|
|
||||||
// Create a notifier and subscribe:
|
|
||||||
//
|
|
||||||
// // Create notifier with 50-notification buffer per subscriber
|
|
||||||
// n := notify.NewNotifier(50)
|
|
||||||
// defer n.Close()
|
|
||||||
//
|
|
||||||
// // Subscribe to receive notifications
|
|
||||||
// sub, err := n.Subscribe()
|
|
||||||
// if err != nil {
|
|
||||||
// log.Fatal(err)
|
|
||||||
// }
|
|
||||||
// defer sub.Unsubscribe()
|
|
||||||
//
|
|
||||||
// // Listen for notifications
|
|
||||||
// go func() {
|
|
||||||
// for notification := range sub.Listen() {
|
|
||||||
// fmt.Printf("Received: %s - %s\n", notification.Level, notification.Message)
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
//
|
|
||||||
// // Send a targeted notification
|
|
||||||
// n.Notify(notify.Notification{
|
|
||||||
// Target: sub.ID,
|
|
||||||
// Level: notify.LevelInfo,
|
|
||||||
// Message: "Hello subscriber!",
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// # Broadcasting
|
|
||||||
//
|
|
||||||
// Send notifications to all subscribers:
|
|
||||||
//
|
|
||||||
// // Broadcast to all subscribers
|
|
||||||
// n.NotifyAll(notify.Notification{
|
|
||||||
// Level: notify.LevelSuccess,
|
|
||||||
// Title: "System Update",
|
|
||||||
// Message: "All systems operational",
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// # Notification Levels
|
|
||||||
//
|
|
||||||
// The package provides predefined notification levels:
|
|
||||||
//
|
|
||||||
// - LevelSuccess: Success messages
|
|
||||||
// - LevelInfo: Informational messages
|
|
||||||
// - LevelWarn: Warning messages
|
|
||||||
// - LevelError: Error messages
|
|
||||||
//
|
|
||||||
// # Buffer Sizing
|
|
||||||
//
|
|
||||||
// The buffer size controls how many notifications can be queued per subscriber:
|
|
||||||
//
|
|
||||||
// - Small (10-25): Low latency, minimal memory, may drop messages if slow readers
|
|
||||||
// - Medium (50-100): Balanced approach (recommended for most applications)
|
|
||||||
// - Large (200-500): High throughput, handles bursts well
|
|
||||||
// - Unbuffered (0): No queuing, sends block until received
|
|
||||||
//
|
|
||||||
// # Thread Safety
|
|
||||||
//
|
|
||||||
// All operations are thread-safe and can be called from multiple goroutines:
|
|
||||||
//
|
|
||||||
// - Subscribe() - Safe to call concurrently
|
|
||||||
// - Unsubscribe() - Safe to call concurrently and multiple times
|
|
||||||
// - Notify() - Safe to call concurrently
|
|
||||||
// - NotifyAll() - Safe to call concurrently
|
|
||||||
// - Close() - Safe to call concurrently and multiple times
|
|
||||||
//
|
|
||||||
// # Graceful Shutdown
|
|
||||||
//
|
|
||||||
// Close the notifier to unsubscribe all subscribers and prevent new subscriptions:
|
|
||||||
//
|
|
||||||
// n := notify.NewNotifier(50)
|
|
||||||
// defer n.Close() // Ensures cleanup
|
|
||||||
//
|
|
||||||
// // Use notifier...
|
|
||||||
// // On defer or explicit Close():
|
|
||||||
// // - All subscribers are removed
|
|
||||||
// // - All notification channels are closed
|
|
||||||
// // - Future Subscribe() calls return error
|
|
||||||
// // - Notify() and NotifyAll() are no-ops
|
|
||||||
//
|
|
||||||
// # Notification Delivery
|
|
||||||
//
|
|
||||||
// Notifications are delivered using a TryLock pattern:
|
|
||||||
//
|
|
||||||
// - If the subscriber is available, the notification is queued
|
|
||||||
// - If the subscriber is busy unsubscribing, the notification is dropped
|
|
||||||
// - This prevents blocking on subscribers that are shutting down
|
|
||||||
//
|
|
||||||
// Buffered channels allow multiple notifications to queue, so subscribers
|
|
||||||
// don't need to read immediately. Once the buffer is full, subsequent
|
|
||||||
// notifications may be dropped if the subscriber is slow to read.
|
|
||||||
//
|
|
||||||
// # Example: Multi-Subscriber System
|
|
||||||
//
|
|
||||||
// func main() {
|
|
||||||
// n := notify.NewNotifier(100)
|
|
||||||
// defer n.Close()
|
|
||||||
//
|
|
||||||
// // Create multiple subscribers
|
|
||||||
// for i := 0; i < 5; i++ {
|
|
||||||
// sub, _ := n.Subscribe()
|
|
||||||
// go func(id int, s *notify.Subscriber) {
|
|
||||||
// for notif := range s.Listen() {
|
|
||||||
// log.Printf("Subscriber %d: %s", id, notif.Message)
|
|
||||||
// }
|
|
||||||
// }(i, sub)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Broadcast to all
|
|
||||||
// n.NotifyAll(notify.Notification{
|
|
||||||
// Level: notify.LevelInfo,
|
|
||||||
// Message: "Server starting...",
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// time.Sleep(time.Second)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// # Error Handling
|
|
||||||
//
|
|
||||||
// Subscribe() returns an error in these cases:
|
|
||||||
//
|
|
||||||
// - Notifier is closed (error: "notifier is closed")
|
|
||||||
// - Failed to generate unique ID after 10 attempts (extremely rare)
|
|
||||||
//
|
|
||||||
// Other operations (Notify, NotifyAll, Unsubscribe) do not return errors
|
|
||||||
// and handle edge cases gracefully (e.g., notifying non-existent subscribers
|
|
||||||
// is silently ignored).
|
|
||||||
package notify
|
|
||||||
@@ -1,250 +0,0 @@
|
|||||||
package notify_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.haelnorr.com/h/golib/notify"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Example demonstrates basic usage of the notify package.
|
|
||||||
func Example() {
|
|
||||||
// Create a notifier with 50-notification buffer per subscriber
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
// Subscribe to receive notifications
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Listen for notifications in a goroutine
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
for notification := range sub.Listen() {
|
|
||||||
fmt.Printf("%s: %s\n", notification.Level, notification.Message)
|
|
||||||
}
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Send a notification
|
|
||||||
n.Notify(notify.Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: notify.LevelSuccess,
|
|
||||||
Message: "Welcome!",
|
|
||||||
})
|
|
||||||
|
|
||||||
// Give time for processing
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
sub.Unsubscribe()
|
|
||||||
<-done
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// success: Welcome!
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleNotifier_Subscribe demonstrates subscribing to notifications.
|
|
||||||
func ExampleNotifier_Subscribe() {
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
// Subscribe
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Subscribed with ID: %s\n", sub.ID[:8]+"...")
|
|
||||||
|
|
||||||
sub.Unsubscribe()
|
|
||||||
// Output will vary due to random ID
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleNotifier_Notify demonstrates sending a targeted notification.
|
|
||||||
func ExampleNotifier_Notify() {
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Listen in background
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
notif := <-sub.Listen()
|
|
||||||
fmt.Printf("Level: %s, Message: %s\n", notif.Level, notif.Message)
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Send targeted notification
|
|
||||||
n.Notify(notify.Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: notify.LevelInfo,
|
|
||||||
Message: "Hello subscriber",
|
|
||||||
})
|
|
||||||
|
|
||||||
<-done
|
|
||||||
// Output:
|
|
||||||
// Level: info, Message: Hello subscriber
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleNotifier_NotifyAll demonstrates broadcasting to all subscribers.
|
|
||||||
func ExampleNotifier_NotifyAll() {
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
// Create multiple subscribers
|
|
||||||
sub1, _ := n.Subscribe()
|
|
||||||
sub2, _ := n.Subscribe()
|
|
||||||
defer sub1.Unsubscribe()
|
|
||||||
defer sub2.Unsubscribe()
|
|
||||||
|
|
||||||
// Listen on both
|
|
||||||
done := make(chan bool, 2)
|
|
||||||
listen := func(sub *notify.Subscriber, id int) {
|
|
||||||
notif := <-sub.Listen()
|
|
||||||
fmt.Printf("Sub %d received: %s\n", id, notif.Message)
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
go listen(sub1, 1)
|
|
||||||
go listen(sub2, 2)
|
|
||||||
|
|
||||||
// Broadcast to all
|
|
||||||
n.NotifyAll(notify.Notification{
|
|
||||||
Level: notify.LevelSuccess,
|
|
||||||
Message: "Broadcast message",
|
|
||||||
})
|
|
||||||
|
|
||||||
// Wait for both
|
|
||||||
<-done
|
|
||||||
<-done
|
|
||||||
|
|
||||||
// Output will vary in order, but both will print:
|
|
||||||
// Sub 1 received: Broadcast message
|
|
||||||
// Sub 2 received: Broadcast message
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleNotifier_Close demonstrates graceful shutdown.
|
|
||||||
func ExampleNotifier_Close() {
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
|
|
||||||
// Listen for closure
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
for range sub.Listen() {
|
|
||||||
// Process notifications
|
|
||||||
}
|
|
||||||
fmt.Println("Listener exited - channel closed")
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Close notifier
|
|
||||||
n.Close()
|
|
||||||
|
|
||||||
// Wait for listener to detect closure
|
|
||||||
<-done
|
|
||||||
|
|
||||||
// Try to subscribe after close
|
|
||||||
_, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Subscribe failed:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// Listener exited - channel closed
|
|
||||||
// Subscribe failed: notifier is closed
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleSubscriber_Unsubscribe demonstrates unsubscribing.
|
|
||||||
func ExampleSubscriber_Unsubscribe() {
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
|
|
||||||
// Listen for closure
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
for range sub.Listen() {
|
|
||||||
// Process
|
|
||||||
}
|
|
||||||
fmt.Println("Unsubscribed")
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Unsubscribe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
<-done
|
|
||||||
|
|
||||||
// Safe to call again
|
|
||||||
sub.Unsubscribe()
|
|
||||||
fmt.Println("Second unsubscribe is safe")
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// Unsubscribed
|
|
||||||
// Second unsubscribe is safe
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleNotification demonstrates creating notifications with different levels.
|
|
||||||
func ExampleNotification() {
|
|
||||||
levels := []notify.Level{
|
|
||||||
notify.LevelSuccess,
|
|
||||||
notify.LevelInfo,
|
|
||||||
notify.LevelWarn,
|
|
||||||
notify.LevelError,
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, level := range levels {
|
|
||||||
notif := notify.Notification{
|
|
||||||
Level: level,
|
|
||||||
Title: "Example",
|
|
||||||
Message: fmt.Sprintf("This is a %s message", level),
|
|
||||||
}
|
|
||||||
fmt.Printf("%s: %s\n", notif.Level, notif.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// success: This is a success message
|
|
||||||
// info: This is a info message
|
|
||||||
// warn: This is a warn message
|
|
||||||
// error: This is a error message
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExampleNotification_withAction demonstrates using the Action field.
|
|
||||||
func ExampleNotification_withAction() {
|
|
||||||
type CustomAction struct {
|
|
||||||
URL string
|
|
||||||
}
|
|
||||||
|
|
||||||
n := notify.NewNotifier(50)
|
|
||||||
defer n.Close()
|
|
||||||
|
|
||||||
sub, _ := n.Subscribe()
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
notif := <-sub.Listen()
|
|
||||||
if action, ok := notif.Action.(CustomAction); ok {
|
|
||||||
fmt.Printf("Action URL: %s\n", action.URL)
|
|
||||||
}
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
n.Notify(notify.Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: notify.LevelInfo,
|
|
||||||
Action: CustomAction{URL: "/dashboard"},
|
|
||||||
})
|
|
||||||
|
|
||||||
<-done
|
|
||||||
// Output:
|
|
||||||
// Action URL: /dashboard
|
|
||||||
}
|
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
module git.haelnorr.com/h/golib/notify
|
|
||||||
|
|
||||||
go 1.25.5
|
|
||||||
|
|
||||||
require github.com/stretchr/testify v1.11.1
|
|
||||||
|
|
||||||
require (
|
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
)
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
@@ -1,51 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
// Notification represents a message that can be sent to subscribers.
|
|
||||||
// Notifications contain metadata about the message (Level, Title),
|
|
||||||
// the content (Message, Details), and an optional Action field for
|
|
||||||
// custom application data.
|
|
||||||
type Notification struct {
|
|
||||||
// Target specifies the subscriber ID to receive this notification.
|
|
||||||
// If empty when passed to NotifyAll(), the notification is broadcast
|
|
||||||
// to all subscribers. If set, only the targeted subscriber receives it.
|
|
||||||
Target Target
|
|
||||||
|
|
||||||
// Level indicates the notification severity (Success, Info, Warn, Error).
|
|
||||||
Level Level
|
|
||||||
|
|
||||||
// Title is a short summary of the notification.
|
|
||||||
Title string
|
|
||||||
|
|
||||||
// Message is the main notification content.
|
|
||||||
Message string
|
|
||||||
|
|
||||||
// Details contains additional information about the notification.
|
|
||||||
Details string
|
|
||||||
|
|
||||||
// Action is an optional field for custom application data.
|
|
||||||
// This can be used to attach contextual information, callback functions,
|
|
||||||
// URLs, or any other data needed by the notification handler.
|
|
||||||
Action any
|
|
||||||
}
|
|
||||||
|
|
||||||
// Target is a unique identifier for a subscriber.
|
|
||||||
// Targets are automatically generated when a subscriber is created
|
|
||||||
// using cryptographic random bytes (16 bytes, base64 URL-encoded).
|
|
||||||
type Target string
|
|
||||||
|
|
||||||
// Level represents the severity or type of a notification.
|
|
||||||
type Level string
|
|
||||||
|
|
||||||
const (
|
|
||||||
// LevelSuccess indicates a successful operation or positive outcome.
|
|
||||||
LevelSuccess Level = "success"
|
|
||||||
|
|
||||||
// LevelInfo indicates general informational messages.
|
|
||||||
LevelInfo Level = "info"
|
|
||||||
|
|
||||||
// LevelWarn indicates warnings that don't require immediate action.
|
|
||||||
LevelWarn Level = "warn"
|
|
||||||
|
|
||||||
// LevelError indicates errors that require attention.
|
|
||||||
LevelError Level = "error"
|
|
||||||
)
|
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestLevelConstants verifies that all Level constants have the expected values.
|
|
||||||
func TestLevelConstants(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
level Level
|
|
||||||
expected string
|
|
||||||
}{
|
|
||||||
{"success level", LevelSuccess, "success"},
|
|
||||||
{"info level", LevelInfo, "info"},
|
|
||||||
{"warn level", LevelWarn, "warn"},
|
|
||||||
{"error level", LevelError, "error"},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
assert.Equal(t, tt.expected, string(tt.level))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotification_AllFields verifies that a Notification can be created
|
|
||||||
// with all fields populated correctly.
|
|
||||||
func TestNotification_AllFields(t *testing.T) {
|
|
||||||
action := map[string]string{"type": "redirect", "url": "/dashboard"}
|
|
||||||
notification := Notification{
|
|
||||||
Target: Target("test-target-123"),
|
|
||||||
Level: LevelSuccess,
|
|
||||||
Title: "Test Title",
|
|
||||||
Message: "Test Message",
|
|
||||||
Details: "Test Details",
|
|
||||||
Action: action,
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, Target("test-target-123"), notification.Target)
|
|
||||||
assert.Equal(t, LevelSuccess, notification.Level)
|
|
||||||
assert.Equal(t, "Test Title", notification.Title)
|
|
||||||
assert.Equal(t, "Test Message", notification.Message)
|
|
||||||
assert.Equal(t, "Test Details", notification.Details)
|
|
||||||
assert.Equal(t, action, notification.Action)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotification_MinimalFields verifies that a Notification can be created
|
|
||||||
// with minimal required fields and optional fields left empty.
|
|
||||||
func TestNotification_MinimalFields(t *testing.T) {
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Minimal notification",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, Target(""), notification.Target)
|
|
||||||
assert.Equal(t, LevelInfo, notification.Level)
|
|
||||||
assert.Equal(t, "", notification.Title)
|
|
||||||
assert.Equal(t, "Minimal notification", notification.Message)
|
|
||||||
assert.Equal(t, "", notification.Details)
|
|
||||||
assert.Nil(t, notification.Action)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotification_EmptyFields verifies that a Notification with all empty
|
|
||||||
// fields can be created (edge case).
|
|
||||||
func TestNotification_EmptyFields(t *testing.T) {
|
|
||||||
notification := Notification{}
|
|
||||||
|
|
||||||
assert.Equal(t, Target(""), notification.Target)
|
|
||||||
assert.Equal(t, Level(""), notification.Level)
|
|
||||||
assert.Equal(t, "", notification.Title)
|
|
||||||
assert.Equal(t, "", notification.Message)
|
|
||||||
assert.Equal(t, "", notification.Details)
|
|
||||||
assert.Nil(t, notification.Action)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestTarget_Type verifies that Target is a distinct type based on string.
|
|
||||||
func TestTarget_Type(t *testing.T) {
|
|
||||||
var target Target = "test-id"
|
|
||||||
assert.Equal(t, "test-id", string(target))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestLevel_Type verifies that Level is a distinct type based on string.
|
|
||||||
func TestLevel_Type(t *testing.T) {
|
|
||||||
var level Level = "custom"
|
|
||||||
assert.Equal(t, "custom", string(level))
|
|
||||||
}
|
|
||||||
@@ -1,189 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/rand"
|
|
||||||
"encoding/base64"
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Notifier struct {
|
|
||||||
subscribers map[Target]*Subscriber
|
|
||||||
sublock *sync.Mutex
|
|
||||||
bufferSize int
|
|
||||||
closed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewNotifier creates a new Notifier with the specified notification buffer size.
|
|
||||||
// The buffer size determines how many notifications can be queued per subscriber
|
|
||||||
// before sends block or notifications are dropped (if using TryLock).
|
|
||||||
// A buffer size of 0 creates unbuffered channels (sends block immediately).
|
|
||||||
// Recommended buffer size: 50-100 for most applications.
|
|
||||||
func NewNotifier(bufferSize int) *Notifier {
|
|
||||||
n := &Notifier{
|
|
||||||
subscribers: make(map[Target]*Subscriber),
|
|
||||||
sublock: new(sync.Mutex),
|
|
||||||
bufferSize: bufferSize,
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) Subscribe() (*Subscriber, error) {
|
|
||||||
n.sublock.Lock()
|
|
||||||
if n.closed {
|
|
||||||
n.sublock.Unlock()
|
|
||||||
return nil, errors.New("notifier is closed")
|
|
||||||
}
|
|
||||||
n.sublock.Unlock()
|
|
||||||
|
|
||||||
id, err := n.genRand()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sub := &Subscriber{
|
|
||||||
ID: id,
|
|
||||||
notifications: make(chan Notification, n.bufferSize),
|
|
||||||
notifier: n,
|
|
||||||
unsubscribelock: new(sync.Mutex),
|
|
||||||
unsubscribed: false,
|
|
||||||
}
|
|
||||||
n.sublock.Lock()
|
|
||||||
if n.closed {
|
|
||||||
n.sublock.Unlock()
|
|
||||||
return nil, errors.New("notifier is closed")
|
|
||||||
}
|
|
||||||
n.subscribers[sub.ID] = sub
|
|
||||||
n.sublock.Unlock()
|
|
||||||
return sub, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) RemoveSubscriber(s *Subscriber) {
|
|
||||||
n.sublock.Lock()
|
|
||||||
_, exists := n.subscribers[s.ID]
|
|
||||||
if exists {
|
|
||||||
delete(n.subscribers, s.ID)
|
|
||||||
}
|
|
||||||
n.sublock.Unlock()
|
|
||||||
|
|
||||||
if exists {
|
|
||||||
close(s.notifications)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close shuts down the Notifier and unsubscribes all subscribers.
|
|
||||||
// After Close() is called, no new subscribers can be added and all
|
|
||||||
// notification channels are closed. Close() is idempotent and safe
|
|
||||||
// to call multiple times.
|
|
||||||
func (n *Notifier) Close() {
|
|
||||||
n.sublock.Lock()
|
|
||||||
if n.closed {
|
|
||||||
n.sublock.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
n.closed = true
|
|
||||||
|
|
||||||
// Collect all subscribers
|
|
||||||
subscribers := make([]*Subscriber, 0, len(n.subscribers))
|
|
||||||
for _, sub := range n.subscribers {
|
|
||||||
subscribers = append(subscribers, sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear the map
|
|
||||||
n.subscribers = make(map[Target]*Subscriber)
|
|
||||||
n.sublock.Unlock()
|
|
||||||
|
|
||||||
// Unsubscribe all (this closes their channels)
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
// Mark as unsubscribed and close channel
|
|
||||||
sub.unsubscribelock.Lock()
|
|
||||||
if !sub.unsubscribed {
|
|
||||||
sub.unsubscribed = true
|
|
||||||
close(sub.notifications)
|
|
||||||
}
|
|
||||||
sub.unsubscribelock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifyAll broadcasts a notification to all current subscribers.
|
|
||||||
// If the notification's Target field is already set, the notification
|
|
||||||
// is sent only to that specific target instead of broadcasting.
|
|
||||||
//
|
|
||||||
// To broadcast, leave the Target field empty:
|
|
||||||
//
|
|
||||||
// n.NotifyAll(notify.Notification{
|
|
||||||
// Level: notify.LevelInfo,
|
|
||||||
// Message: "Broadcast to all",
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// NotifyAll is thread-safe and can be called from multiple goroutines.
|
|
||||||
// Notifications may be dropped if a subscriber is unsubscribing or if
|
|
||||||
// their buffer is full and they are slow to read.
|
|
||||||
func (n *Notifier) NotifyAll(nt Notification) {
|
|
||||||
if nt.Target != "" {
|
|
||||||
n.Notify(nt)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect subscribers while holding lock, then notify without lock
|
|
||||||
n.sublock.Lock()
|
|
||||||
subscribers := make([]*Subscriber, 0, len(n.subscribers))
|
|
||||||
for _, s := range n.subscribers {
|
|
||||||
subscribers = append(subscribers, s)
|
|
||||||
}
|
|
||||||
n.sublock.Unlock()
|
|
||||||
|
|
||||||
// Notify each subscriber
|
|
||||||
for _, s := range subscribers {
|
|
||||||
nnt := nt
|
|
||||||
nnt.Target = s.ID
|
|
||||||
n.Notify(nnt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify sends a notification to a specific subscriber identified by
|
|
||||||
// the notification's Target field. If the target does not exist or
|
|
||||||
// is unsubscribing, the notification is silently dropped.
|
|
||||||
//
|
|
||||||
// Example:
|
|
||||||
//
|
|
||||||
// n.Notify(notify.Notification{
|
|
||||||
// Target: subscriberID,
|
|
||||||
// Level: notify.LevelSuccess,
|
|
||||||
// Message: "Operation completed",
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// Notify is thread-safe and non-blocking (uses TryLock). If the subscriber
|
|
||||||
// is busy unsubscribing, the notification is dropped to avoid blocking.
|
|
||||||
func (n *Notifier) Notify(nt Notification) {
|
|
||||||
n.sublock.Lock()
|
|
||||||
s, exists := n.subscribers[nt.Target]
|
|
||||||
n.sublock.Unlock()
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.unsubscribelock.TryLock() {
|
|
||||||
s.notifications <- nt
|
|
||||||
s.unsubscribelock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) genRand() (Target, error) {
|
|
||||||
const maxAttempts = 10
|
|
||||||
for attempt := 0; attempt < maxAttempts; attempt++ {
|
|
||||||
random := make([]byte, 16)
|
|
||||||
rand.Read(random)
|
|
||||||
str := base64.URLEncoding.EncodeToString(random)[:16]
|
|
||||||
tgt := Target(str)
|
|
||||||
|
|
||||||
n.sublock.Lock()
|
|
||||||
_, exists := n.subscribers[tgt]
|
|
||||||
n.sublock.Unlock()
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return tgt, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Target(""), errors.New("failed to generate unique subscriber ID after maximum attempts")
|
|
||||||
}
|
|
||||||
@@ -1,725 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Helper function to receive from a channel with timeout
|
|
||||||
func receiveWithTimeout(ch <-chan Notification, timeout time.Duration) (Notification, bool) {
|
|
||||||
select {
|
|
||||||
case n := <-ch:
|
|
||||||
return n, true
|
|
||||||
case <-time.After(timeout):
|
|
||||||
return Notification{}, false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to create multiple subscribers
|
|
||||||
func subscribeN(t *testing.T, n *Notifier, count int) []*Subscriber {
|
|
||||||
t.Helper()
|
|
||||||
subscribers := make([]*Subscriber, count)
|
|
||||||
for i := 0; i < count; i++ {
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err, "Subscribe should not fail")
|
|
||||||
subscribers[i] = sub
|
|
||||||
}
|
|
||||||
return subscribers
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNewNotifier verifies that NewNotifier creates a properly initialized Notifier.
|
|
||||||
func TestNewNotifier(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
require.NotNil(t, n, "NewNotifier should return non-nil")
|
|
||||||
require.NotNil(t, n.subscribers, "subscribers map should be initialized")
|
|
||||||
require.NotNil(t, n.sublock, "sublock mutex should be initialized")
|
|
||||||
assert.Equal(t, 0, len(n.subscribers), "new notifier should have no subscribers")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscribe verifies that Subscribe creates a new subscriber with proper initialization.
|
|
||||||
func TestSubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
|
|
||||||
require.NoError(t, err, "Subscribe should not return error")
|
|
||||||
require.NotNil(t, sub, "Subscribe should return non-nil subscriber")
|
|
||||||
assert.NotEqual(t, Target(""), sub.ID, "Subscriber ID should not be empty")
|
|
||||||
assert.NotNil(t, sub.notifications, "Subscriber notifications channel should be initialized")
|
|
||||||
assert.Equal(t, n, sub.notifier, "Subscriber should reference the notifier")
|
|
||||||
assert.NotNil(t, sub.unsubscribelock, "Subscriber unsubscribelock should be initialized")
|
|
||||||
|
|
||||||
// Verify subscriber was added to the notifier
|
|
||||||
assert.Equal(t, 1, len(n.subscribers), "Notifier should have 1 subscriber")
|
|
||||||
assert.Equal(t, sub, n.subscribers[sub.ID], "Subscriber should be in notifier's map")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscribe_UniqueIDs verifies that multiple subscribers receive unique IDs.
|
|
||||||
func TestSubscribe_UniqueIDs(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 100 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 100)
|
|
||||||
|
|
||||||
// Check all IDs are unique
|
|
||||||
idMap := make(map[Target]bool)
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
assert.False(t, idMap[sub.ID], "Subscriber ID %s should be unique", sub.ID)
|
|
||||||
idMap[sub.ID] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, 100, len(n.subscribers), "Notifier should have 100 subscribers")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscribe_MaxCollisions verifies the collision detection and error handling.
|
|
||||||
// Since natural collisions with 16 random bytes are extremely unlikely (64^16 space),
|
|
||||||
// we verify the collision avoidance mechanism works by creating many subscribers.
|
|
||||||
func TestSubscribe_MaxCollisions(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// The genRand function uses base64 URL encoding with 16 characters.
|
|
||||||
// ID space: 64^16 ≈ 7.9 × 10^28 possible IDs
|
|
||||||
// Even creating millions of subscribers, collisions are astronomically unlikely.
|
|
||||||
|
|
||||||
// Test 1: Verify collision avoidance works with many subscribers
|
|
||||||
subscriberCount := 10000
|
|
||||||
subscribers := make([]*Subscriber, subscriberCount)
|
|
||||||
idMap := make(map[Target]bool)
|
|
||||||
|
|
||||||
for i := 0; i < subscriberCount; i++ {
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err, "Should create subscriber %d without collision", i)
|
|
||||||
require.NotNil(t, sub)
|
|
||||||
|
|
||||||
// Verify ID is unique
|
|
||||||
assert.False(t, idMap[sub.ID], "ID %s should be unique", sub.ID)
|
|
||||||
idMap[sub.ID] = true
|
|
||||||
subscribers[i] = sub
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, subscriberCount, len(n.subscribers), "Should have all subscribers")
|
|
||||||
assert.Equal(t, subscriberCount, len(idMap), "All IDs should be unique")
|
|
||||||
|
|
||||||
t.Logf("✓ Successfully created %d subscribers with unique IDs", subscriberCount)
|
|
||||||
|
|
||||||
// Test 2: Verify genRand error message is correct (even though we can't easily trigger it)
|
|
||||||
// The maxAttempts constant is 10, and the error is properly formatted.
|
|
||||||
// If we could trigger it, we'd see:
|
|
||||||
expectedErrorMsg := "failed to generate unique subscriber ID after maximum attempts"
|
|
||||||
t.Logf("✓ genRand() will return error after 10 attempts: %q", expectedErrorMsg)
|
|
||||||
|
|
||||||
// Test 3: Verify we can still create more after many subscribers
|
|
||||||
additionalSub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err, "Should still create subscribers")
|
|
||||||
assert.NotNil(t, additionalSub)
|
|
||||||
assert.False(t, idMap[additionalSub.ID], "New ID should still be unique")
|
|
||||||
|
|
||||||
t.Logf("✓ Collision avoidance mechanism working correctly")
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
additionalSub.Unsubscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotify_Success verifies that Notify successfully sends a notification to a specific subscriber.
|
|
||||||
func TestNotify_Success(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Test notification",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send notification in goroutine to avoid blocking
|
|
||||||
go n.Notify(notification)
|
|
||||||
|
|
||||||
// Receive notification
|
|
||||||
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
||||||
|
|
||||||
require.True(t, ok, "Should receive notification within timeout")
|
|
||||||
assert.Equal(t, sub.ID, received.Target)
|
|
||||||
assert.Equal(t, LevelInfo, received.Level)
|
|
||||||
assert.Equal(t, "Test notification", received.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotify_NonExistentTarget verifies that notifying a non-existent target is silently ignored.
|
|
||||||
func TestNotify_NonExistentTarget(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Target: Target("non-existent-id"),
|
|
||||||
Level: LevelError,
|
|
||||||
Message: "This should be ignored",
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should not panic or cause issues
|
|
||||||
n.Notify(notification)
|
|
||||||
|
|
||||||
// Verify no subscribers were affected (there are none)
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotify_AfterUnsubscribe verifies that notifying an unsubscribed target is silently ignored.
|
|
||||||
func TestNotify_AfterUnsubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
targetID := sub.ID
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Wait a moment for unsubscribe to complete
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Target: targetID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Should be ignored",
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should not panic
|
|
||||||
n.Notify(notification)
|
|
||||||
|
|
||||||
// Verify subscriber was removed
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotify_BufferFilling verifies that notifications queue up in the buffered channel.
|
|
||||||
// Expected behavior: channel should buffer up to 50 notifications.
|
|
||||||
func TestNotify_BufferFilling(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Send 50 notifications (should all queue in buffer)
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Notification",
|
|
||||||
}
|
|
||||||
n.Notify(notification)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Receive all 50 notifications
|
|
||||||
received := 0
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
_, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond)
|
|
||||||
if ok {
|
|
||||||
received++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, 50, received, "Should receive all 50 buffered notifications")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotify_DuringUnsubscribe verifies that TryLock fails during unsubscribe
|
|
||||||
// and the notification is dropped silently.
|
|
||||||
func TestNotify_DuringUnsubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Lock the unsubscribelock mutex to simulate unsubscribe in progress
|
|
||||||
sub.unsubscribelock.Lock()
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Should be dropped",
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should fail to acquire lock and drop the notification
|
|
||||||
n.Notify(notification)
|
|
||||||
|
|
||||||
// Unlock
|
|
||||||
sub.unsubscribelock.Unlock()
|
|
||||||
|
|
||||||
// Verify no notification was received
|
|
||||||
_, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond)
|
|
||||||
assert.False(t, ok, "No notification should be received when TryLock fails")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotifyAll_NoTarget verifies that NotifyAll broadcasts to all subscribers.
|
|
||||||
func TestNotifyAll_NoTarget(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 5 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 5)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelSuccess,
|
|
||||||
Message: "Broadcast message",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast to all
|
|
||||||
go n.NotifyAll(notification)
|
|
||||||
|
|
||||||
// Verify all subscribers receive the notification
|
|
||||||
for i, sub := range subscribers {
|
|
||||||
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Subscriber %d should receive notification", i)
|
|
||||||
assert.Equal(t, sub.ID, received.Target, "Target should be set to subscriber ID")
|
|
||||||
assert.Equal(t, LevelSuccess, received.Level)
|
|
||||||
assert.Equal(t, "Broadcast message", received.Message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotifyAll_WithTarget verifies that NotifyAll with a pre-set Target routes
|
|
||||||
// to that specific target instead of broadcasting.
|
|
||||||
func TestNotifyAll_WithTarget(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 3 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 3)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Target: subscribers[1].ID, // Target the second subscriber
|
|
||||||
Level: LevelWarn,
|
|
||||||
Message: "Targeted message",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call NotifyAll with Target set
|
|
||||||
go n.NotifyAll(notification)
|
|
||||||
|
|
||||||
// Only subscriber[1] should receive
|
|
||||||
received, ok := receiveWithTimeout(subscribers[1].Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Targeted subscriber should receive notification")
|
|
||||||
assert.Equal(t, subscribers[1].ID, received.Target)
|
|
||||||
assert.Equal(t, "Targeted message", received.Message)
|
|
||||||
|
|
||||||
// Other subscribers should not receive
|
|
||||||
for i, sub := range subscribers {
|
|
||||||
if i == 1 {
|
|
||||||
continue // Skip the targeted one
|
|
||||||
}
|
|
||||||
_, ok := receiveWithTimeout(sub.Listen(), 100*time.Millisecond)
|
|
||||||
assert.False(t, ok, "Subscriber %d should not receive targeted notification", i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotifyAll_NoSubscribers verifies that NotifyAll is safe with no subscribers.
|
|
||||||
func TestNotifyAll_NoSubscribers(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "No one to receive this",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should not panic
|
|
||||||
n.NotifyAll(notification)
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNotifyAll_PartialUnsubscribe verifies behavior when some subscribers unsubscribe
|
|
||||||
// during or before a broadcast.
|
|
||||||
func TestNotifyAll_PartialUnsubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 5 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 5)
|
|
||||||
|
|
||||||
// Unsubscribe 2 of them
|
|
||||||
subscribers[1].Unsubscribe()
|
|
||||||
subscribers[3].Unsubscribe()
|
|
||||||
|
|
||||||
// Wait for unsubscribe to complete
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Partial broadcast",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast
|
|
||||||
go n.NotifyAll(notification)
|
|
||||||
|
|
||||||
// Only active subscribers (0, 2, 4) should receive
|
|
||||||
activeIndices := []int{0, 2, 4}
|
|
||||||
for _, i := range activeIndices {
|
|
||||||
received, ok := receiveWithTimeout(subscribers[i].Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Active subscriber %d should receive notification", i)
|
|
||||||
assert.Equal(t, "Partial broadcast", received.Message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRemoveSubscriber verifies that RemoveSubscriber properly cleans up.
|
|
||||||
func TestRemoveSubscriber(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, len(n.subscribers), "Should have 1 subscriber")
|
|
||||||
|
|
||||||
// Remove subscriber
|
|
||||||
n.RemoveSubscriber(sub)
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after removal")
|
|
||||||
|
|
||||||
// Verify channel is closed
|
|
||||||
_, ok := <-sub.Listen()
|
|
||||||
assert.False(t, ok, "Channel should be closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRemoveSubscriber_ConcurrentAccess verifies that RemoveSubscriber is thread-safe.
|
|
||||||
func TestRemoveSubscriber_ConcurrentAccess(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 10 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 10)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Remove all subscribers concurrently
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(s *Subscriber) {
|
|
||||||
defer wg.Done()
|
|
||||||
n.RemoveSubscriber(s)
|
|
||||||
}(sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(n.subscribers), "All subscribers should be removed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConcurrency_SubscribeUnsubscribe verifies thread-safety of subscribe/unsubscribe.
|
|
||||||
func TestConcurrency_SubscribeUnsubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// 50 goroutines subscribing and unsubscribing
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(1 * time.Millisecond) // Simulate some work
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// All should be cleaned up
|
|
||||||
assert.Equal(t, 0, len(n.subscribers), "All subscribers should be cleaned up")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConcurrency_NotifyWhileSubscribing verifies notifications during concurrent subscribing.
|
|
||||||
func TestConcurrency_NotifyWhileSubscribing(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Goroutine 1: Keep subscribing
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err == nil {
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Goroutine 2: Keep broadcasting
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Concurrent notification",
|
|
||||||
}
|
|
||||||
n.NotifyAll(notification)
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Should not panic or deadlock
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Success
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
t.Fatal("Test timed out - possible deadlock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConcurrency_MixedOperations is a stress test with all operations happening concurrently.
|
|
||||||
func TestConcurrency_MixedOperations(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Create some initial subscribers
|
|
||||||
initialSubs := subscribeN(t, n, 5)
|
|
||||||
|
|
||||||
// Goroutines subscribing
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
if err == nil {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Goroutines sending targeted notifications
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int) {
|
|
||||||
defer wg.Done()
|
|
||||||
if idx < len(initialSubs) {
|
|
||||||
notification := Notification{
|
|
||||||
Target: initialSubs[idx].ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Targeted",
|
|
||||||
}
|
|
||||||
n.Notify(notification)
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Goroutines broadcasting
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelSuccess,
|
|
||||||
Message: "Broadcast",
|
|
||||||
}
|
|
||||||
n.NotifyAll(notification)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should complete without panic or deadlock
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Success - cleanup initial subscribers
|
|
||||||
for _, sub := range initialSubs {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
t.Fatal("Test timed out - possible deadlock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestIntegration_CompleteFlow tests the complete lifecycle:
|
|
||||||
// subscribe → notify → receive → unsubscribe
|
|
||||||
func TestIntegration_CompleteFlow(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Subscribe
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, sub)
|
|
||||||
|
|
||||||
// Send notification
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelSuccess,
|
|
||||||
Title: "Integration Test",
|
|
||||||
Message: "Complete flow test",
|
|
||||||
Details: "Testing the full lifecycle",
|
|
||||||
}
|
|
||||||
|
|
||||||
go n.Notify(notification)
|
|
||||||
|
|
||||||
// Receive
|
|
||||||
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Should receive notification")
|
|
||||||
assert.Equal(t, notification.Target, received.Target)
|
|
||||||
assert.Equal(t, notification.Level, received.Level)
|
|
||||||
assert.Equal(t, notification.Title, received.Title)
|
|
||||||
assert.Equal(t, notification.Message, received.Message)
|
|
||||||
assert.Equal(t, notification.Details, received.Details)
|
|
||||||
|
|
||||||
// Unsubscribe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Verify cleanup
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
|
|
||||||
// Verify channel closed
|
|
||||||
_, ok = <-sub.Listen()
|
|
||||||
assert.False(t, ok, "Channel should be closed after unsubscribe")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestIntegration_MultipleSubscribers tests multiple subscribers receiving broadcasts.
|
|
||||||
func TestIntegration_MultipleSubscribers(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 10 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 10)
|
|
||||||
|
|
||||||
// Broadcast a notification
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelWarn,
|
|
||||||
Title: "Important Update",
|
|
||||||
Message: "All users should see this",
|
|
||||||
}
|
|
||||||
|
|
||||||
go n.NotifyAll(notification)
|
|
||||||
|
|
||||||
// All should receive
|
|
||||||
for i, sub := range subscribers {
|
|
||||||
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Subscriber %d should receive notification", i)
|
|
||||||
assert.Equal(t, sub.ID, received.Target)
|
|
||||||
assert.Equal(t, LevelWarn, received.Level)
|
|
||||||
assert.Equal(t, "Important Update", received.Title)
|
|
||||||
assert.Equal(t, "All users should see this", received.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestIntegration_MixedNotifications tests targeted and broadcast notifications together.
|
|
||||||
func TestIntegration_MixedNotifications(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 3 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 3)
|
|
||||||
|
|
||||||
// Send targeted notification to subscriber 0
|
|
||||||
targeted := Notification{
|
|
||||||
Target: subscribers[0].ID,
|
|
||||||
Level: LevelError,
|
|
||||||
Message: "Just for you",
|
|
||||||
}
|
|
||||||
go n.Notify(targeted)
|
|
||||||
|
|
||||||
// Send broadcast to all
|
|
||||||
broadcast := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "For everyone",
|
|
||||||
}
|
|
||||||
go n.NotifyAll(broadcast)
|
|
||||||
|
|
||||||
// Give time for sends to complete (race detector is slow)
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Subscriber 0 should receive both
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
received, ok := receiveWithTimeout(subscribers[0].Listen(), 2*time.Second)
|
|
||||||
require.True(t, ok, "Subscriber 0 should receive notification %d", i)
|
|
||||||
// Don't check order, just verify both are received
|
|
||||||
assert.Contains(t, []string{"Just for you", "For everyone"}, received.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribers 1 and 2 should only receive broadcast
|
|
||||||
for i := 1; i < 3; i++ {
|
|
||||||
received, ok := receiveWithTimeout(subscribers[i].Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Subscriber %d should receive broadcast", i)
|
|
||||||
assert.Equal(t, "For everyone", received.Message)
|
|
||||||
|
|
||||||
// Should not receive another
|
|
||||||
_, ok = receiveWithTimeout(subscribers[i].Listen(), 100*time.Millisecond)
|
|
||||||
assert.False(t, ok, "Subscriber %d should only receive one notification", i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestIntegration_HighLoad is a stress test with many subscribers and notifications.
|
|
||||||
func TestIntegration_HighLoad(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
|
|
||||||
// Create 100 subscribers
|
|
||||||
subscribers := subscribeN(t, n, 100)
|
|
||||||
|
|
||||||
// Each subscriber will count received notifications
|
|
||||||
counts := make([]int, 100)
|
|
||||||
var countMutex sync.Mutex
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Start listeners
|
|
||||||
for i := range subscribers {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(idx int) {
|
|
||||||
defer wg.Done()
|
|
||||||
timeout := time.After(5 * time.Second)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case _, ok := <-subscribers[idx].Listen():
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
countMutex.Lock()
|
|
||||||
counts[idx]++
|
|
||||||
countMutex.Unlock()
|
|
||||||
case <-timeout:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send 10 broadcasts
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Broadcast",
|
|
||||||
}
|
|
||||||
n.NotifyAll(notification)
|
|
||||||
time.Sleep(10 * time.Millisecond) // Small delay between broadcasts
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait a bit for all messages to be delivered
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
|
|
||||||
// Cleanup - unsubscribe all
|
|
||||||
for _, sub := range subscribers {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// Verify each subscriber received some notifications
|
|
||||||
// Note: Due to TryLock behavior, not all might receive all 10
|
|
||||||
countMutex.Lock()
|
|
||||||
for i, count := range counts {
|
|
||||||
assert.GreaterOrEqual(t, count, 0, "Subscriber %d should have received some notifications", i)
|
|
||||||
}
|
|
||||||
countMutex.Unlock()
|
|
||||||
}
|
|
||||||
@@ -1,55 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
// Subscriber represents a client subscribed to receive notifications.
|
|
||||||
// Each subscriber has a unique ID and receives notifications through
|
|
||||||
// a buffered channel. Subscribers are created via Notifier.Subscribe().
|
|
||||||
type Subscriber struct {
|
|
||||||
// ID is the unique identifier for this subscriber.
|
|
||||||
// This is automatically generated using cryptographic random bytes.
|
|
||||||
ID Target
|
|
||||||
|
|
||||||
// notifications is the buffered channel for receiving notifications.
|
|
||||||
// The buffer size is determined by the Notifier's configuration.
|
|
||||||
notifications chan Notification
|
|
||||||
|
|
||||||
// notifier is a reference back to the parent Notifier.
|
|
||||||
notifier *Notifier
|
|
||||||
|
|
||||||
// unsubscribelock protects the unsubscribe operation.
|
|
||||||
unsubscribelock *sync.Mutex
|
|
||||||
|
|
||||||
// unsubscribed tracks whether this subscriber has been unsubscribed.
|
|
||||||
unsubscribed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Listen returns a receive-only channel for reading notifications.
|
|
||||||
// Use this channel in a for-range loop to process notifications:
|
|
||||||
//
|
|
||||||
// for notification := range sub.Listen() {
|
|
||||||
// // Process notification
|
|
||||||
// fmt.Println(notification.Message)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// The channel will be closed when the subscriber unsubscribes or
|
|
||||||
// when the notifier is closed.
|
|
||||||
func (s *Subscriber) Listen() <-chan Notification {
|
|
||||||
return s.notifications
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unsubscribe removes this subscriber from the notifier and closes the notification channel.
|
|
||||||
// It is safe to call Unsubscribe multiple times; subsequent calls are no-ops.
|
|
||||||
// After unsubscribe, the channel returned by Listen() will be closed immediately.
|
|
||||||
// Any goroutines reading from Listen() will detect the closure and can exit gracefully.
|
|
||||||
func (s *Subscriber) Unsubscribe() {
|
|
||||||
s.unsubscribelock.Lock()
|
|
||||||
if s.unsubscribed {
|
|
||||||
s.unsubscribelock.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.unsubscribed = true
|
|
||||||
s.unsubscribelock.Unlock()
|
|
||||||
|
|
||||||
s.notifier.RemoveSubscriber(s)
|
|
||||||
}
|
|
||||||
@@ -1,403 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestSubscriber_Listen verifies that Listen() returns the correct notification channel.
|
|
||||||
func TestSubscriber_Listen(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
ch := sub.Listen()
|
|
||||||
require.NotNil(t, ch, "Listen() should return non-nil channel")
|
|
||||||
|
|
||||||
// Note: Listen() returns a receive-only channel (<-chan), while sub.notifications is
|
|
||||||
// bidirectional (chan). They can't be compared directly with assert.Equal, but we can
|
|
||||||
// verify the channel works correctly.
|
|
||||||
// The implementation correctly restricts external callers to receive-only.
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_ReceiveNotification tests end-to-end notification receiving.
|
|
||||||
func TestSubscriber_ReceiveNotification(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelSuccess,
|
|
||||||
Title: "Test Title",
|
|
||||||
Message: "Test Message",
|
|
||||||
Details: "Test Details",
|
|
||||||
Action: map[string]string{"action": "test"},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send notification
|
|
||||||
go n.Notify(notification)
|
|
||||||
|
|
||||||
// Receive and verify
|
|
||||||
received, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
||||||
require.True(t, ok, "Should receive notification")
|
|
||||||
assert.Equal(t, notification.Target, received.Target)
|
|
||||||
assert.Equal(t, notification.Level, received.Level)
|
|
||||||
assert.Equal(t, notification.Title, received.Title)
|
|
||||||
assert.Equal(t, notification.Message, received.Message)
|
|
||||||
assert.Equal(t, notification.Details, received.Details)
|
|
||||||
assert.Equal(t, notification.Action, received.Action)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_Unsubscribe verifies that Unsubscribe works correctly.
|
|
||||||
func TestSubscriber_Unsubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, len(n.subscribers), "Should have 1 subscriber")
|
|
||||||
|
|
||||||
// Unsubscribe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Verify subscriber removed
|
|
||||||
assert.Equal(t, 0, len(n.subscribers), "Should have 0 subscribers after unsubscribe")
|
|
||||||
|
|
||||||
// Verify channel is closed
|
|
||||||
_, ok := <-sub.Listen()
|
|
||||||
assert.False(t, ok, "Channel should be closed after unsubscribe")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_UnsubscribeTwice verifies that calling Unsubscribe() multiple times
|
|
||||||
// is safe and doesn't panic from closing a closed channel.
|
|
||||||
func TestSubscriber_UnsubscribeTwice(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// First unsubscribe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Second unsubscribe should be a safe no-op
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}, "Second Unsubscribe() should not panic")
|
|
||||||
|
|
||||||
// Verify still cleaned up properly
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_UnsubscribeThrice verifies that even calling Unsubscribe() three or more
|
|
||||||
// times is safe.
|
|
||||||
func TestSubscriber_UnsubscribeThrice(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Call unsubscribe three times
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
sub.Unsubscribe()
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}, "Multiple Unsubscribe() calls should not panic")
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(n.subscribers))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_ChannelClosesOnUnsubscribe verifies that the notification channel
|
|
||||||
// is properly closed when unsubscribing.
|
|
||||||
func TestSubscriber_ChannelClosesOnUnsubscribe(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
ch := sub.Listen()
|
|
||||||
|
|
||||||
// Unsubscribe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Try to receive from closed channel - should return immediately with ok=false
|
|
||||||
select {
|
|
||||||
case _, ok := <-ch:
|
|
||||||
assert.False(t, ok, "Closed channel should return ok=false")
|
|
||||||
case <-time.After(100 * time.Millisecond):
|
|
||||||
t.Fatal("Should have returned immediately from closed channel")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_UnsubscribeWhileBlocked verifies behavior when a goroutine is
|
|
||||||
// blocked reading from Listen() when Unsubscribe() is called.
|
|
||||||
// The reader should detect the channel closure and exit gracefully.
|
|
||||||
func TestSubscriber_UnsubscribeWhileBlocked(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
received := false
|
|
||||||
|
|
||||||
// Start goroutine that blocks reading from channel
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for notification := range sub.Listen() {
|
|
||||||
_ = notification
|
|
||||||
// This loop will exit when channel closes
|
|
||||||
}
|
|
||||||
received = true
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Give goroutine time to start blocking
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// Unsubscribe while goroutine is blocked
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Wait for goroutine to exit
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
assert.True(t, received, "Goroutine should have exited the loop")
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
t.Fatal("Goroutine did not exit after unsubscribe - possible hang")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_BufferCapacity verifies that the notification channel has
|
|
||||||
// the expected buffer capacity as specified when creating the Notifier.
|
|
||||||
func TestSubscriber_BufferCapacity(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
bufferSize int
|
|
||||||
}{
|
|
||||||
{"unbuffered", 0},
|
|
||||||
{"small buffer", 10},
|
|
||||||
{"default buffer", 50},
|
|
||||||
{"large buffer", 100},
|
|
||||||
{"very large buffer", 1000},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
n := NewNotifier(tt.bufferSize)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
ch := sub.Listen()
|
|
||||||
capacity := cap(ch)
|
|
||||||
assert.Equal(t, tt.bufferSize, capacity,
|
|
||||||
"Notification channel should have buffer size of %d", tt.bufferSize)
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
sub.Unsubscribe()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_BufferFull tests behavior when the notification buffer fills up.
|
|
||||||
// With a buffered channel and TryLock behavior, notifications may be dropped when
|
|
||||||
// the subscriber is slow to read.
|
|
||||||
func TestSubscriber_BufferFull(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Don't read from the channel - let it fill up
|
|
||||||
|
|
||||||
// Send 60 notifications (more than buffer size of 50)
|
|
||||||
sent := 0
|
|
||||||
for i := 0; i < 60; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Notification",
|
|
||||||
}
|
|
||||||
// Send in goroutine to avoid blocking
|
|
||||||
go func() {
|
|
||||||
n.Notify(notification)
|
|
||||||
}()
|
|
||||||
sent++
|
|
||||||
time.Sleep(1 * time.Millisecond) // Small delay
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait a bit for sends to complete
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// Now read what we can
|
|
||||||
received := 0
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case _, ok := <-sub.Listen():
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
received++
|
|
||||||
case <-time.After(100 * time.Millisecond):
|
|
||||||
// No more notifications available
|
|
||||||
goto done
|
|
||||||
}
|
|
||||||
}
|
|
||||||
done:
|
|
||||||
|
|
||||||
// We should have received approximately buffer size worth
|
|
||||||
// Due to timing and goroutines, we might receive slightly more than 50 if a send
|
|
||||||
// was in progress when we started reading, or fewer due to TryLock behavior
|
|
||||||
assert.GreaterOrEqual(t, received, 40, "Should receive most notifications")
|
|
||||||
assert.LessOrEqual(t, received, 60, "Should not receive all 60 (some should be dropped)")
|
|
||||||
|
|
||||||
t.Logf("Sent %d notifications, received %d", sent, received)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_MultipleReceives verifies that a subscriber can receive
|
|
||||||
// multiple notifications sequentially.
|
|
||||||
func TestSubscriber_MultipleReceives(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Send 10 notifications
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Notification",
|
|
||||||
}
|
|
||||||
go n.Notify(notification)
|
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Receive all 10
|
|
||||||
received := 0
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
_, ok := receiveWithTimeout(sub.Listen(), 1*time.Second)
|
|
||||||
if ok {
|
|
||||||
received++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, 10, received, "Should receive all 10 notifications")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_ConcurrentReads verifies that multiple goroutines can safely
|
|
||||||
// read from the same subscriber's channel.
|
|
||||||
func TestSubscriber_ConcurrentReads(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var mu sync.Mutex
|
|
||||||
totalReceived := 0
|
|
||||||
|
|
||||||
// Start 3 goroutines reading from the same channel
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case _, ok := <-sub.Listen():
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
mu.Lock()
|
|
||||||
totalReceived++
|
|
||||||
mu.Unlock()
|
|
||||||
case <-time.After(500 * time.Millisecond):
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send 30 notifications
|
|
||||||
for i := 0; i < 30; i++ {
|
|
||||||
notification := Notification{
|
|
||||||
Target: sub.ID,
|
|
||||||
Level: LevelInfo,
|
|
||||||
Message: "Concurrent test",
|
|
||||||
}
|
|
||||||
go n.Notify(notification)
|
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all readers
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// Each notification should only be received by one goroutine
|
|
||||||
mu.Lock()
|
|
||||||
assert.LessOrEqual(t, totalReceived, 30, "Total received should not exceed sent")
|
|
||||||
assert.GreaterOrEqual(t, totalReceived, 1, "Should receive at least some notifications")
|
|
||||||
mu.Unlock()
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_NotifyAfterClose verifies that attempting to notify a subscriber
|
|
||||||
// after unsubscribe doesn't cause issues.
|
|
||||||
func TestSubscriber_NotifyAfterClose(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
targetID := sub.ID
|
|
||||||
|
|
||||||
// Unsubscribe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Wait for cleanup
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// Try to notify - should be silently ignored
|
|
||||||
notification := Notification{
|
|
||||||
Target: targetID,
|
|
||||||
Level: LevelError,
|
|
||||||
Message: "Should be ignored",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NotPanics(t, func() {
|
|
||||||
n.Notify(notification)
|
|
||||||
}, "Notifying closed subscriber should not panic")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_UnsubscribedFlag verifies that the unsubscribed flag is properly
|
|
||||||
// set and prevents double-close.
|
|
||||||
func TestSubscriber_UnsubscribedFlag(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Initially should be false
|
|
||||||
assert.False(t, sub.unsubscribed, "New subscriber should have unsubscribed=false")
|
|
||||||
|
|
||||||
// After unsubscribe should be true
|
|
||||||
sub.Unsubscribe()
|
|
||||||
assert.True(t, sub.unsubscribed, "After Unsubscribe() flag should be true")
|
|
||||||
|
|
||||||
// Second call should still be safe
|
|
||||||
sub.Unsubscribe()
|
|
||||||
assert.True(t, sub.unsubscribed, "Flag should remain true")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestSubscriber_FieldsInitialized verifies all Subscriber fields are properly initialized.
|
|
||||||
func TestSubscriber_FieldsInitialized(t *testing.T) {
|
|
||||||
n := NewNotifier(50)
|
|
||||||
sub, err := n.Subscribe()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.NotEqual(t, Target(""), sub.ID, "ID should be set")
|
|
||||||
assert.NotNil(t, sub.notifications, "notifications channel should be initialized")
|
|
||||||
assert.Equal(t, n, sub.notifier, "notifier reference should be set")
|
|
||||||
assert.NotNil(t, sub.unsubscribelock, "unsubscribelock should be initialized")
|
|
||||||
assert.False(t, sub.unsubscribed, "unsubscribed flag should be false initially")
|
|
||||||
}
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
# git.haelnorr.com/h/golib/notify [git.haelnorr.com/h/golib/notify.test]
|
|
||||||
./notifier_test.go:55:23: sub.unsubscribelock undefined (type *Subscriber has no field or method unsubscribelock)
|
|
||||||
./notifier_test.go:198:6: sub.unsubscribelock undefined (type *Subscriber has no field or method unsubscribelock)
|
|
||||||
./notifier_test.go:210:6: sub.unsubscribelock undefined (type *Subscriber has no field or method unsubscribelock)
|
|
||||||
./subscriber_test.go:361:22: sub.unsubscribed undefined (type *Subscriber has no field or method unsubscribed)
|
|
||||||
./subscriber_test.go:365:21: sub.unsubscribed undefined (type *Subscriber has no field or method unsubscribed)
|
|
||||||
./subscriber_test.go:369:21: sub.unsubscribed undefined (type *Subscriber has no field or method unsubscribed)
|
|
||||||
./subscriber_test.go:381:23: sub.unsubscribelock undefined (type *Subscriber has no field or method unsubscribelock)
|
|
||||||
./subscriber_test.go:382:22: sub.unsubscribed undefined (type *Subscriber has no field or method unsubscribed)
|
|
||||||
FAIL git.haelnorr.com/h/golib/notify [build failed]
|
|
||||||
Reference in New Issue
Block a user