package hws import ( "context" "errors" "fmt" "slices" "sync" "sync/atomic" "time" "git.haelnorr.com/h/golib/notify" ) // LevelShutdown is a special level used for the notification sent on shutdown. // This can be used to check if the notification is a shutdown event and if it should // be passed on to consumers or special considerations should be made. const LevelShutdown notify.Level = "shutdown" // Notifier manages client subscriptions and notification delivery for the HWS server. // It wraps the notify.Notifier with additional client management features including // dual identification (subscription ID + alternate ID) and automatic cleanup of // inactive clients after 5 minutes. type Notifier struct { *notify.Notifier clients *Clients running bool ctx context.Context cancel context.CancelFunc } // Clients maintains thread-safe mappings between subscriber IDs, alternate IDs, // and Client instances. It supports querying clients by either their unique // subscription ID or their alternate ID (where multiple clients can share an alternate ID). type Clients struct { clientsSubMap map[notify.Target]*Client clientsIDMap map[string][]*Client lock *sync.RWMutex } // Client represents a unique subscriber to the notifications channel. // It tracks activity via lastSeen timestamp (updated atomically) and monitors // consecutive send failures for automatic disconnect detection. type Client struct { sub *notify.Subscriber lastSeen int64 // accessed atomically altID string consecutiveFails int32 // accessed atomically } func (s *Server) startNotifier() { if s.notifier != nil && s.notifier.running { return } ctx, cancel := context.WithCancel(context.Background()) s.notifier = &Notifier{ Notifier: notify.NewNotifier(50), clients: &Clients{ clientsSubMap: make(map[notify.Target]*Client), clientsIDMap: make(map[string][]*Client), lock: new(sync.RWMutex), }, running: true, ctx: ctx, cancel: cancel, } ticker := time.NewTicker(time.Minute) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.notifier.clients.cleanUp() } } }() } func (s *Server) closeNotifier() { if s.notifier != nil { if s.notifier.cancel != nil { s.notifier.cancel() } s.notifier.running = false s.notifier.Close() } s.notifier = nil } // NotifySub sends a notification to a specific subscriber identified by the notification's Target field. // If the subscriber doesn't exist, a warning is logged but the operation does not fail. // This is thread-safe and can be called from multiple goroutines. func (s *Server) NotifySub(nt notify.Notification) { if s.notifier == nil { return } _, exists := s.notifier.clients.getClient(nt.Target) if !exists { err := fmt.Errorf("Tried to notify subscriber that doesn't exist - subID: %s", nt.Target) s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err}) return } s.notifier.Notify(nt) } // NotifyID sends a notification to all clients associated with the given alternate ID. // Multiple clients can share the same alternate ID (e.g., multiple sessions for one user). // If no clients exist with that ID, a warning is logged but the operation does not fail. // This is thread-safe and can be called from multiple goroutines. func (s *Server) NotifyID(nt notify.Notification, altID string) { if s.notifier == nil { return } s.notifier.clients.lock.RLock() clients, exists := s.notifier.clients.clientsIDMap[altID] s.notifier.clients.lock.RUnlock() if !exists { err := fmt.Errorf("Tried to notify client group that doesn't exist - altID: %s", altID) s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err}) return } for _, client := range clients { ntt := nt ntt.Target = client.sub.ID s.NotifySub(ntt) } } // NotifyAll broadcasts a notification to all connected clients. // This is thread-safe and can be called from multiple goroutines. func (s *Server) NotifyAll(nt notify.Notification) { if s.notifier == nil { return } nt.Target = "" s.notifier.NotifyAll(nt) } // GetClient returns a Client that can be used to receive notifications. // If a client exists with the provided subID, that client will be returned. // If altID is provided, it will update the existing Client. // If subID is an empty string, a new client will be returned. // If both altID and subID are empty, a new Client with no altID will be returned. // Multiple clients with the same altID are permitted. func (s *Server) GetClient(subID, altID string) (*Client, error) { if s.notifier == nil || !s.notifier.running { return nil, errors.New("notifier hasn't started") } target := notify.Target(subID) client, exists := s.notifier.clients.getClient(target) if exists { s.notifier.clients.updateAltID(client, altID) return client, nil } // An error should only be returned if there are 10 collisions of a randomly generated 16 bit byte string from rand.Rand() // Basically never going to happen, and if it does its not my problem sub, _ := s.notifier.Subscribe() client = &Client{ sub: sub, lastSeen: time.Now().Unix(), altID: altID, consecutiveFails: 0, } s.notifier.clients.addClient(client) return client, nil } func (cs *Clients) getClient(target notify.Target) (*Client, bool) { cs.lock.RLock() client, exists := cs.clientsSubMap[target] cs.lock.RUnlock() return client, exists } func (cs *Clients) updateAltID(client *Client, altID string) { cs.lock.Lock() if altID != "" && !slices.Contains(cs.clientsIDMap[altID], client) { cs.clientsIDMap[altID] = append(cs.clientsIDMap[altID], client) } if client.altID != altID && client.altID != "" { cs.deleteFromID(client, client.altID) } client.altID = altID cs.lock.Unlock() } func (cs *Clients) deleteFromID(client *Client, altID string) { cs.clientsIDMap[altID] = deleteFromSlice(cs.clientsIDMap[altID], client, func(a, b *Client) bool { return a.sub.ID == b.sub.ID }) if len(cs.clientsIDMap[altID]) == 0 { delete(cs.clientsIDMap, altID) } } func (cs *Clients) addClient(client *Client) { cs.lock.Lock() cs.clientsSubMap[client.sub.ID] = client if client.altID != "" { cs.clientsIDMap[client.altID] = append(cs.clientsIDMap[client.altID], client) } cs.lock.Unlock() } func (cs *Clients) cleanUp() { now := time.Now().Unix() // Collect clients to kill while holding read lock cs.lock.RLock() toKill := make([]*Client, 0) for _, client := range cs.clientsSubMap { if now-atomic.LoadInt64(&client.lastSeen) > 300 { toKill = append(toKill, client) } } cs.lock.RUnlock() // Kill clients without holding lock for _, client := range toKill { cs.killClient(client) } } func (cs *Clients) killClient(client *Client) { client.sub.Unsubscribe() cs.lock.Lock() delete(cs.clientsSubMap, client.sub.ID) if client.altID != "" { cs.deleteFromID(client, client.altID) } cs.lock.Unlock() } // Listen starts a goroutine that forwards notifications from the subscriber to a returned channel. // It returns a receive-only channel for notifications and a channel to stop listening. // The notification channel is buffered with size 10 to tolerate brief slowness. // // The goroutine automatically stops and closes the notification channel when: // - The subscriber is unsubscribed // - The stop channel is closed // - The client fails to receive 5 consecutive notifications within 5 seconds each // // Client.lastSeen is updated every 30 seconds via heartbeat, or when a notification is successfully delivered. // Consecutive send failures are tracked; after 5 failures, the client is considered disconnected and cleaned up. func (c *Client) Listen() (<-chan notify.Notification, chan<- struct{}) { ch := make(chan notify.Notification, 10) stop := make(chan struct{}) go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() defer close(ch) for { select { case <-stop: return case nt, ok := <-c.sub.Listen(): if !ok { // Subscriber channel closed return } // Try to send with timeout timeout := time.NewTimer(5 * time.Second) select { case ch <- nt: // Successfully sent - update lastSeen and reset failure count atomic.StoreInt64(&c.lastSeen, time.Now().Unix()) atomic.StoreInt32(&c.consecutiveFails, 0) timeout.Stop() case <-timeout.C: // Send timeout - increment failure count fails := atomic.AddInt32(&c.consecutiveFails, 1) if fails >= 5 { // Too many consecutive failures - client is stuck/disconnected c.sub.Unsubscribe() return } case <-stop: timeout.Stop() return } case <-ticker.C: // Heartbeat - update lastSeen to keep client alive atomic.StoreInt64(&c.lastSeen, time.Now().Unix()) } } }() return ch, stop } func (c *Client) ID() string { return string(c.sub.ID) } func deleteFromSlice[T any](a []T, c T, eq func(T, T) bool) []T { n := 0 for _, x := range a { if !eq(x, c) { a[n] = x n++ } } return a[:n] }