Files
golib/notify/notifier.go
2026-01-24 20:35:40 +11:00

190 lines
4.6 KiB
Go

package notify
import (
"crypto/rand"
"encoding/base64"
"errors"
"sync"
)
type Notifier struct {
subscribers map[Target]*Subscriber
sublock *sync.Mutex
bufferSize int
closed bool
}
// NewNotifier creates a new Notifier with the specified notification buffer size.
// The buffer size determines how many notifications can be queued per subscriber
// before sends block or notifications are dropped (if using TryLock).
// A buffer size of 0 creates unbuffered channels (sends block immediately).
// Recommended buffer size: 50-100 for most applications.
func NewNotifier(bufferSize int) *Notifier {
n := &Notifier{
subscribers: make(map[Target]*Subscriber),
sublock: new(sync.Mutex),
bufferSize: bufferSize,
}
return n
}
func (n *Notifier) Subscribe() (*Subscriber, error) {
n.sublock.Lock()
if n.closed {
n.sublock.Unlock()
return nil, errors.New("notifier is closed")
}
n.sublock.Unlock()
id, err := n.genRand()
if err != nil {
return nil, err
}
sub := &Subscriber{
ID: id,
notifications: make(chan Notification, n.bufferSize),
notifier: n,
unsubscribelock: new(sync.Mutex),
unsubscribed: false,
}
n.sublock.Lock()
if n.closed {
n.sublock.Unlock()
return nil, errors.New("notifier is closed")
}
n.subscribers[sub.ID] = sub
n.sublock.Unlock()
return sub, nil
}
func (n *Notifier) RemoveSubscriber(s *Subscriber) {
n.sublock.Lock()
_, exists := n.subscribers[s.ID]
if exists {
delete(n.subscribers, s.ID)
}
n.sublock.Unlock()
if exists {
close(s.notifications)
}
}
// Close shuts down the Notifier and unsubscribes all subscribers.
// After Close() is called, no new subscribers can be added and all
// notification channels are closed. Close() is idempotent and safe
// to call multiple times.
func (n *Notifier) Close() {
n.sublock.Lock()
if n.closed {
n.sublock.Unlock()
return
}
n.closed = true
// Collect all subscribers
subscribers := make([]*Subscriber, 0, len(n.subscribers))
for _, sub := range n.subscribers {
subscribers = append(subscribers, sub)
}
// Clear the map
n.subscribers = make(map[Target]*Subscriber)
n.sublock.Unlock()
// Unsubscribe all (this closes their channels)
for _, sub := range subscribers {
// Mark as unsubscribed and close channel
sub.unsubscribelock.Lock()
if !sub.unsubscribed {
sub.unsubscribed = true
close(sub.notifications)
}
sub.unsubscribelock.Unlock()
}
}
// NotifyAll broadcasts a notification to all current subscribers.
// If the notification's Target field is already set, the notification
// is sent only to that specific target instead of broadcasting.
//
// To broadcast, leave the Target field empty:
//
// n.NotifyAll(notify.Notification{
// Level: notify.LevelInfo,
// Message: "Broadcast to all",
// })
//
// NotifyAll is thread-safe and can be called from multiple goroutines.
// Notifications may be dropped if a subscriber is unsubscribing or if
// their buffer is full and they are slow to read.
func (n *Notifier) NotifyAll(nt Notification) {
if nt.Target != "" {
n.Notify(nt)
return
}
// Collect subscribers while holding lock, then notify without lock
n.sublock.Lock()
subscribers := make([]*Subscriber, 0, len(n.subscribers))
for _, s := range n.subscribers {
subscribers = append(subscribers, s)
}
n.sublock.Unlock()
// Notify each subscriber
for _, s := range subscribers {
nnt := nt
nnt.Target = s.ID
n.Notify(nnt)
}
}
// Notify sends a notification to a specific subscriber identified by
// the notification's Target field. If the target does not exist or
// is unsubscribing, the notification is silently dropped.
//
// Example:
//
// n.Notify(notify.Notification{
// Target: subscriberID,
// Level: notify.LevelSuccess,
// Message: "Operation completed",
// })
//
// Notify is thread-safe and non-blocking (uses TryLock). If the subscriber
// is busy unsubscribing, the notification is dropped to avoid blocking.
func (n *Notifier) Notify(nt Notification) {
n.sublock.Lock()
s, exists := n.subscribers[nt.Target]
n.sublock.Unlock()
if !exists {
return
}
if s.unsubscribelock.TryLock() {
s.notifications <- nt
s.unsubscribelock.Unlock()
}
}
func (n *Notifier) genRand() (Target, error) {
const maxAttempts = 10
for attempt := 0; attempt < maxAttempts; attempt++ {
random := make([]byte, 16)
rand.Read(random)
str := base64.URLEncoding.EncodeToString(random)[:16]
tgt := Target(str)
n.sublock.Lock()
_, exists := n.subscribers[tgt]
n.sublock.Unlock()
if !exists {
return tgt, nil
}
}
return Target(""), errors.New("failed to generate unique subscriber ID after maximum attempts")
}