190 lines
4.6 KiB
Go
190 lines
4.6 KiB
Go
package notify
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
type Notifier struct {
|
|
subscribers map[Target]*Subscriber
|
|
sublock *sync.Mutex
|
|
bufferSize int
|
|
closed bool
|
|
}
|
|
|
|
// NewNotifier creates a new Notifier with the specified notification buffer size.
|
|
// The buffer size determines how many notifications can be queued per subscriber
|
|
// before sends block or notifications are dropped (if using TryLock).
|
|
// A buffer size of 0 creates unbuffered channels (sends block immediately).
|
|
// Recommended buffer size: 50-100 for most applications.
|
|
func NewNotifier(bufferSize int) *Notifier {
|
|
n := &Notifier{
|
|
subscribers: make(map[Target]*Subscriber),
|
|
sublock: new(sync.Mutex),
|
|
bufferSize: bufferSize,
|
|
}
|
|
return n
|
|
}
|
|
|
|
func (n *Notifier) Subscribe() (*Subscriber, error) {
|
|
n.sublock.Lock()
|
|
if n.closed {
|
|
n.sublock.Unlock()
|
|
return nil, errors.New("notifier is closed")
|
|
}
|
|
n.sublock.Unlock()
|
|
|
|
id, err := n.genRand()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sub := &Subscriber{
|
|
ID: id,
|
|
notifications: make(chan Notification, n.bufferSize),
|
|
notifier: n,
|
|
unsubscribelock: new(sync.Mutex),
|
|
unsubscribed: false,
|
|
}
|
|
n.sublock.Lock()
|
|
if n.closed {
|
|
n.sublock.Unlock()
|
|
return nil, errors.New("notifier is closed")
|
|
}
|
|
n.subscribers[sub.ID] = sub
|
|
n.sublock.Unlock()
|
|
return sub, nil
|
|
}
|
|
|
|
func (n *Notifier) RemoveSubscriber(s *Subscriber) {
|
|
n.sublock.Lock()
|
|
_, exists := n.subscribers[s.ID]
|
|
if exists {
|
|
delete(n.subscribers, s.ID)
|
|
}
|
|
n.sublock.Unlock()
|
|
|
|
if exists {
|
|
close(s.notifications)
|
|
}
|
|
}
|
|
|
|
// Close shuts down the Notifier and unsubscribes all subscribers.
|
|
// After Close() is called, no new subscribers can be added and all
|
|
// notification channels are closed. Close() is idempotent and safe
|
|
// to call multiple times.
|
|
func (n *Notifier) Close() {
|
|
n.sublock.Lock()
|
|
if n.closed {
|
|
n.sublock.Unlock()
|
|
return
|
|
}
|
|
n.closed = true
|
|
|
|
// Collect all subscribers
|
|
subscribers := make([]*Subscriber, 0, len(n.subscribers))
|
|
for _, sub := range n.subscribers {
|
|
subscribers = append(subscribers, sub)
|
|
}
|
|
|
|
// Clear the map
|
|
n.subscribers = make(map[Target]*Subscriber)
|
|
n.sublock.Unlock()
|
|
|
|
// Unsubscribe all (this closes their channels)
|
|
for _, sub := range subscribers {
|
|
// Mark as unsubscribed and close channel
|
|
sub.unsubscribelock.Lock()
|
|
if !sub.unsubscribed {
|
|
sub.unsubscribed = true
|
|
close(sub.notifications)
|
|
}
|
|
sub.unsubscribelock.Unlock()
|
|
}
|
|
}
|
|
|
|
// NotifyAll broadcasts a notification to all current subscribers.
|
|
// If the notification's Target field is already set, the notification
|
|
// is sent only to that specific target instead of broadcasting.
|
|
//
|
|
// To broadcast, leave the Target field empty:
|
|
//
|
|
// n.NotifyAll(notify.Notification{
|
|
// Level: notify.LevelInfo,
|
|
// Message: "Broadcast to all",
|
|
// })
|
|
//
|
|
// NotifyAll is thread-safe and can be called from multiple goroutines.
|
|
// Notifications may be dropped if a subscriber is unsubscribing or if
|
|
// their buffer is full and they are slow to read.
|
|
func (n *Notifier) NotifyAll(nt Notification) {
|
|
if nt.Target != "" {
|
|
n.Notify(nt)
|
|
return
|
|
}
|
|
|
|
// Collect subscribers while holding lock, then notify without lock
|
|
n.sublock.Lock()
|
|
subscribers := make([]*Subscriber, 0, len(n.subscribers))
|
|
for _, s := range n.subscribers {
|
|
subscribers = append(subscribers, s)
|
|
}
|
|
n.sublock.Unlock()
|
|
|
|
// Notify each subscriber
|
|
for _, s := range subscribers {
|
|
nnt := nt
|
|
nnt.Target = s.ID
|
|
n.Notify(nnt)
|
|
}
|
|
}
|
|
|
|
// Notify sends a notification to a specific subscriber identified by
|
|
// the notification's Target field. If the target does not exist or
|
|
// is unsubscribing, the notification is silently dropped.
|
|
//
|
|
// Example:
|
|
//
|
|
// n.Notify(notify.Notification{
|
|
// Target: subscriberID,
|
|
// Level: notify.LevelSuccess,
|
|
// Message: "Operation completed",
|
|
// })
|
|
//
|
|
// Notify is thread-safe and non-blocking (uses TryLock). If the subscriber
|
|
// is busy unsubscribing, the notification is dropped to avoid blocking.
|
|
func (n *Notifier) Notify(nt Notification) {
|
|
n.sublock.Lock()
|
|
s, exists := n.subscribers[nt.Target]
|
|
n.sublock.Unlock()
|
|
|
|
if !exists {
|
|
return
|
|
}
|
|
if s.unsubscribelock.TryLock() {
|
|
s.notifications <- nt
|
|
s.unsubscribelock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (n *Notifier) genRand() (Target, error) {
|
|
const maxAttempts = 10
|
|
for attempt := 0; attempt < maxAttempts; attempt++ {
|
|
random := make([]byte, 16)
|
|
rand.Read(random)
|
|
str := base64.URLEncoding.EncodeToString(random)[:16]
|
|
tgt := Target(str)
|
|
|
|
n.sublock.Lock()
|
|
_, exists := n.subscribers[tgt]
|
|
n.sublock.Unlock()
|
|
|
|
if !exists {
|
|
return tgt, nil
|
|
}
|
|
}
|
|
return Target(""), errors.New("failed to generate unique subscriber ID after maximum attempts")
|
|
}
|