Files
golib/hws/notify.go
2026-02-03 18:43:31 +11:00

317 lines
9.0 KiB
Go

package hws
import (
"context"
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
"git.haelnorr.com/h/golib/notify"
)
// LevelShutdown is a special level used for the notification sent on shutdown.
// This can be used to check if the notification is a shutdown event and if it should
// be passed on to consumers or special considerations should be made.
const LevelShutdown notify.Level = "shutdown"
// Notifier manages client subscriptions and notification delivery for the HWS server.
// It wraps the notify.Notifier with additional client management features including
// dual identification (subscription ID + alternate ID) and automatic cleanup of
// inactive clients after 5 minutes.
type Notifier struct {
*notify.Notifier
clients *Clients
running bool
ctx context.Context
cancel context.CancelFunc
}
// Clients maintains thread-safe mappings between subscriber IDs, alternate IDs,
// and Client instances. It supports querying clients by either their unique
// subscription ID or their alternate ID (where multiple clients can share an alternate ID).
type Clients struct {
clientsSubMap map[notify.Target]*Client
clientsIDMap map[string][]*Client
lock *sync.RWMutex
}
// Client represents a unique subscriber to the notifications channel.
// It tracks activity via lastSeen timestamp (updated atomically) and monitors
// consecutive send failures for automatic disconnect detection.
type Client struct {
sub *notify.Subscriber
lastSeen int64 // accessed atomically
altID string
consecutiveFails int32 // accessed atomically
}
func (s *Server) startNotifier() {
if s.notifier != nil && s.notifier.running {
return
}
ctx, cancel := context.WithCancel(context.Background())
s.notifier = &Notifier{
Notifier: notify.NewNotifier(50),
clients: &Clients{
clientsSubMap: make(map[notify.Target]*Client),
clientsIDMap: make(map[string][]*Client),
lock: new(sync.RWMutex),
},
running: true,
ctx: ctx,
cancel: cancel,
}
ticker := time.NewTicker(time.Minute)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.notifier.clients.cleanUp()
}
}
}()
}
func (s *Server) closeNotifier() {
if s.notifier != nil {
if s.notifier.cancel != nil {
s.notifier.cancel()
}
s.notifier.running = false
s.notifier.Close()
}
s.notifier = nil
}
// NotifySub sends a notification to a specific subscriber identified by the notification's Target field.
// If the subscriber doesn't exist, a warning is logged but the operation does not fail.
// This is thread-safe and can be called from multiple goroutines.
func (s *Server) NotifySub(nt notify.Notification) {
if s.notifier == nil {
return
}
_, exists := s.notifier.clients.getClient(nt.Target)
if !exists {
err := fmt.Errorf("tried to notify subscriber that doesn't exist - subID: %s", nt.Target)
s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err})
return
}
s.notifier.Notify(nt)
}
// NotifyID sends a notification to all clients associated with the given alternate ID.
// Multiple clients can share the same alternate ID (e.g., multiple sessions for one user).
// If no clients exist with that ID, a warning is logged but the operation does not fail.
// This is thread-safe and can be called from multiple goroutines.
func (s *Server) NotifyID(nt notify.Notification, altID string) {
if s.notifier == nil {
return
}
s.notifier.clients.lock.RLock()
clients, exists := s.notifier.clients.clientsIDMap[altID]
s.notifier.clients.lock.RUnlock()
if !exists {
err := fmt.Errorf("tried to notify client group that doesn't exist - altID: %s", altID)
s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err})
return
}
for _, client := range clients {
ntt := nt
ntt.Target = client.sub.ID
s.NotifySub(ntt)
}
}
// NotifyAll broadcasts a notification to all connected clients.
// This is thread-safe and can be called from multiple goroutines.
func (s *Server) NotifyAll(nt notify.Notification) {
if s.notifier == nil {
return
}
nt.Target = ""
s.notifier.NotifyAll(nt)
}
// GetClient returns a Client that can be used to receive notifications.
// If a client exists with the provided subID, that client will be returned.
// If altID is provided, it will update the existing Client.
// If subID is an empty string, a new client will be returned.
// If both altID and subID are empty, a new Client with no altID will be returned.
// Multiple clients with the same altID are permitted.
func (s *Server) GetClient(subID, altID string) (*Client, error) {
if s.notifier == nil || !s.notifier.running {
return nil, errors.New("notifier hasn't started")
}
target := notify.Target(subID)
client, exists := s.notifier.clients.getClient(target)
if exists {
s.notifier.clients.updateAltID(client, altID)
return client, nil
}
// An error should only be returned if there are 10 collisions of a randomly generated 16 bit byte string from rand.Rand()
// Basically never going to happen, and if it does its not my problem
sub, _ := s.notifier.Subscribe()
client = &Client{
sub: sub,
lastSeen: time.Now().Unix(),
altID: altID,
consecutiveFails: 0,
}
s.notifier.clients.addClient(client)
return client, nil
}
func (cs *Clients) getClient(target notify.Target) (*Client, bool) {
cs.lock.RLock()
client, exists := cs.clientsSubMap[target]
cs.lock.RUnlock()
return client, exists
}
func (cs *Clients) updateAltID(client *Client, altID string) {
cs.lock.Lock()
if altID != "" && !slices.Contains(cs.clientsIDMap[altID], client) {
cs.clientsIDMap[altID] = append(cs.clientsIDMap[altID], client)
}
if client.altID != altID && client.altID != "" {
cs.deleteFromID(client, client.altID)
}
client.altID = altID
cs.lock.Unlock()
}
func (cs *Clients) deleteFromID(client *Client, altID string) {
cs.clientsIDMap[altID] = deleteFromSlice(cs.clientsIDMap[altID], client, func(a, b *Client) bool {
return a.sub.ID == b.sub.ID
})
if len(cs.clientsIDMap[altID]) == 0 {
delete(cs.clientsIDMap, altID)
}
}
func (cs *Clients) addClient(client *Client) {
cs.lock.Lock()
cs.clientsSubMap[client.sub.ID] = client
if client.altID != "" {
cs.clientsIDMap[client.altID] = append(cs.clientsIDMap[client.altID], client)
}
cs.lock.Unlock()
}
func (cs *Clients) cleanUp() {
now := time.Now().Unix()
// Collect clients to kill while holding read lock
cs.lock.RLock()
toKill := make([]*Client, 0)
for _, client := range cs.clientsSubMap {
if now-atomic.LoadInt64(&client.lastSeen) > 300 {
toKill = append(toKill, client)
}
}
cs.lock.RUnlock()
// Kill clients without holding lock
for _, client := range toKill {
cs.killClient(client)
}
}
func (cs *Clients) killClient(client *Client) {
client.sub.Unsubscribe()
cs.lock.Lock()
delete(cs.clientsSubMap, client.sub.ID)
if client.altID != "" {
cs.deleteFromID(client, client.altID)
}
cs.lock.Unlock()
}
// Listen starts a goroutine that forwards notifications from the subscriber to a returned channel.
// It returns a receive-only channel for notifications and a channel to stop listening.
// The notification channel is buffered with size 10 to tolerate brief slowness.
//
// The goroutine automatically stops and closes the notification channel when:
// - The subscriber is unsubscribed
// - The stop channel is closed
// - The client fails to receive 5 consecutive notifications within 5 seconds each
//
// Client.lastSeen is updated every 30 seconds via heartbeat, or when a notification is successfully delivered.
// Consecutive send failures are tracked; after 5 failures, the client is considered disconnected and cleaned up.
func (c *Client) Listen() (<-chan notify.Notification, chan<- struct{}) {
ch := make(chan notify.Notification, 10)
stop := make(chan struct{})
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
defer close(ch)
for {
select {
case <-stop:
return
case nt, ok := <-c.sub.Listen():
if !ok {
// Subscriber channel closed
return
}
// Try to send with timeout
timeout := time.NewTimer(5 * time.Second)
select {
case ch <- nt:
// Successfully sent - update lastSeen and reset failure count
atomic.StoreInt64(&c.lastSeen, time.Now().Unix())
atomic.StoreInt32(&c.consecutiveFails, 0)
timeout.Stop()
case <-timeout.C:
// Send timeout - increment failure count
fails := atomic.AddInt32(&c.consecutiveFails, 1)
if fails >= 5 {
// Too many consecutive failures - client is stuck/disconnected
c.sub.Unsubscribe()
return
}
case <-stop:
timeout.Stop()
return
}
case <-ticker.C:
// Heartbeat - update lastSeen to keep client alive
atomic.StoreInt64(&c.lastSeen, time.Now().Unix())
}
}
}()
return ch, stop
}
func (c *Client) ID() string {
return string(c.sub.ID)
}
func deleteFromSlice[T any](a []T, c T, eq func(T, T) bool) []T {
n := 0
for _, x := range a {
if !eq(x, c) {
a[n] = x
n++
}
}
return a[:n]
}