package notify import ( "crypto/rand" "encoding/base64" "errors" "sync" ) type Notifier struct { subscribers map[Target]*Subscriber sublock *sync.Mutex bufferSize int closed bool } // NewNotifier creates a new Notifier with the specified notification buffer size. // The buffer size determines how many notifications can be queued per subscriber // before sends block or notifications are dropped (if using TryLock). // A buffer size of 0 creates unbuffered channels (sends block immediately). // Recommended buffer size: 50-100 for most applications. func NewNotifier(bufferSize int) *Notifier { n := &Notifier{ subscribers: make(map[Target]*Subscriber), sublock: new(sync.Mutex), bufferSize: bufferSize, } return n } func (n *Notifier) Subscribe() (*Subscriber, error) { n.sublock.Lock() if n.closed { n.sublock.Unlock() return nil, errors.New("notifier is closed") } n.sublock.Unlock() id, err := n.genRand() if err != nil { return nil, err } sub := &Subscriber{ ID: id, notifications: make(chan Notification, n.bufferSize), notifier: n, unsubscribelock: new(sync.Mutex), unsubscribed: false, } n.sublock.Lock() if n.closed { n.sublock.Unlock() return nil, errors.New("notifier is closed") } n.subscribers[sub.ID] = sub n.sublock.Unlock() return sub, nil } func (n *Notifier) RemoveSubscriber(s *Subscriber) { n.sublock.Lock() _, exists := n.subscribers[s.ID] if exists { delete(n.subscribers, s.ID) } n.sublock.Unlock() if exists { close(s.notifications) } } // Close shuts down the Notifier and unsubscribes all subscribers. // After Close() is called, no new subscribers can be added and all // notification channels are closed. Close() is idempotent and safe // to call multiple times. func (n *Notifier) Close() { n.sublock.Lock() if n.closed { n.sublock.Unlock() return } n.closed = true // Collect all subscribers subscribers := make([]*Subscriber, 0, len(n.subscribers)) for _, sub := range n.subscribers { subscribers = append(subscribers, sub) } // Clear the map n.subscribers = make(map[Target]*Subscriber) n.sublock.Unlock() // Unsubscribe all (this closes their channels) for _, sub := range subscribers { // Mark as unsubscribed and close channel sub.unsubscribelock.Lock() if !sub.unsubscribed { sub.unsubscribed = true close(sub.notifications) } sub.unsubscribelock.Unlock() } } // NotifyAll broadcasts a notification to all current subscribers. // If the notification's Target field is already set, the notification // is sent only to that specific target instead of broadcasting. // // To broadcast, leave the Target field empty: // // n.NotifyAll(notify.Notification{ // Level: notify.LevelInfo, // Message: "Broadcast to all", // }) // // NotifyAll is thread-safe and can be called from multiple goroutines. // Notifications may be dropped if a subscriber is unsubscribing or if // their buffer is full and they are slow to read. func (n *Notifier) NotifyAll(nt Notification) { if nt.Target != "" { n.Notify(nt) return } // Collect subscribers while holding lock, then notify without lock n.sublock.Lock() subscribers := make([]*Subscriber, 0, len(n.subscribers)) for _, s := range n.subscribers { subscribers = append(subscribers, s) } n.sublock.Unlock() // Notify each subscriber for _, s := range subscribers { nnt := nt nnt.Target = s.ID n.Notify(nnt) } } // Notify sends a notification to a specific subscriber identified by // the notification's Target field. If the target does not exist or // is unsubscribing, the notification is silently dropped. // // Example: // // n.Notify(notify.Notification{ // Target: subscriberID, // Level: notify.LevelSuccess, // Message: "Operation completed", // }) // // Notify is thread-safe and non-blocking (uses TryLock). If the subscriber // is busy unsubscribing, the notification is dropped to avoid blocking. func (n *Notifier) Notify(nt Notification) { n.sublock.Lock() s, exists := n.subscribers[nt.Target] n.sublock.Unlock() if !exists { return } if s.unsubscribelock.TryLock() { s.notifications <- nt s.unsubscribelock.Unlock() } } func (n *Notifier) genRand() (Target, error) { const maxAttempts = 10 for attempt := 0; attempt < maxAttempts; attempt++ { random := make([]byte, 16) rand.Read(random) str := base64.URLEncoding.EncodeToString(random)[:16] tgt := Target(str) n.sublock.Lock() _, exists := n.subscribers[tgt] n.sublock.Unlock() if !exists { return tgt, nil } } return Target(""), errors.New("failed to generate unique subscriber ID after maximum attempts") }