added a notify system
This commit is contained in:
@@ -13,6 +13,7 @@ type Config struct {
|
||||
ReadHeaderTimeout time.Duration // ENV HWS_READ_HEADER_TIMEOUT: Timeout for reading request headers in seconds (default: 2)
|
||||
WriteTimeout time.Duration // ENV HWS_WRITE_TIMEOUT: Timeout for writing requests in seconds (default: 10)
|
||||
IdleTimeout time.Duration // ENV HWS_IDLE_TIMEOUT: Timeout for idle connections in seconds (default: 120)
|
||||
ShutdownDelay time.Duration // ENV HWS_SHUTDOWN_DELAY: Delay in seconds before server shutsdown when Shutdown is called (default: 5)
|
||||
}
|
||||
|
||||
// ConfigFromEnv returns a Config struct loaded from the environment variables
|
||||
@@ -24,6 +25,7 @@ func ConfigFromEnv() (*Config, error) {
|
||||
ReadHeaderTimeout: time.Duration(env.Int("HWS_READ_HEADER_TIMEOUT", 2)) * time.Second,
|
||||
WriteTimeout: time.Duration(env.Int("HWS_WRITE_TIMEOUT", 10)) * time.Second,
|
||||
IdleTimeout: time.Duration(env.Int("HWS_IDLE_TIMEOUT", 120)) * time.Second,
|
||||
ShutdownDelay: time.Duration(env.Int("HWS_SHUTDOWN_DELAY", 5)) * time.Second,
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
|
||||
@@ -5,6 +5,7 @@ go 1.25.5
|
||||
require (
|
||||
git.haelnorr.com/h/golib/env v0.9.1
|
||||
git.haelnorr.com/h/golib/hlog v0.9.0
|
||||
git.haelnorr.com/h/golib/notify v0.1.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/testify v1.11.1
|
||||
k8s.io/apimachinery v0.35.0
|
||||
|
||||
@@ -2,6 +2,8 @@ git.haelnorr.com/h/golib/env v0.9.1 h1:2Vsj+mJKnO5f1Md1GO5v9ggLN5zWa0baCewcSHTjo
|
||||
git.haelnorr.com/h/golib/env v0.9.1/go.mod h1:glUQVdA1HMKX1avTDyTyuhcr36SSxZtlJxKDT5KTztg=
|
||||
git.haelnorr.com/h/golib/hlog v0.9.0 h1:ib8n2MdmiRK2TF067p220kXmhDe9aAnlcsgpuv+QpvE=
|
||||
git.haelnorr.com/h/golib/hlog v0.9.0/go.mod h1:oOlzb8UVHUYP1k7dN5PSJXVskAB2z8EYgRN85jAi0Zk=
|
||||
git.haelnorr.com/h/golib/notify v0.1.0 h1:xdf6zd21F6n+SuGTeJiuLNMf6zFXMvwpKD0gmNq8N10=
|
||||
git.haelnorr.com/h/golib/notify v0.1.0/go.mod h1:ARqaRmCYb8LMURhDM75sG+qX+YpqXmUVeAtacwjHjBc=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
||||
316
hws/notify.go
Normal file
316
hws/notify.go
Normal file
@@ -0,0 +1,316 @@
|
||||
package hws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.haelnorr.com/h/golib/notify"
|
||||
)
|
||||
|
||||
// LevelShutdown is a special level used for the notification sent on shutdown.
|
||||
// This can be used to check if the notification is a shutdown event and if it should
|
||||
// be passed on to consumers or special considerations should be made.
|
||||
const LevelShutdown notify.Level = "shutdown"
|
||||
|
||||
// Notifier manages client subscriptions and notification delivery for the HWS server.
|
||||
// It wraps the notify.Notifier with additional client management features including
|
||||
// dual identification (subscription ID + alternate ID) and automatic cleanup of
|
||||
// inactive clients after 5 minutes.
|
||||
type Notifier struct {
|
||||
*notify.Notifier
|
||||
clients *Clients
|
||||
running bool
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// Clients maintains thread-safe mappings between subscriber IDs, alternate IDs,
|
||||
// and Client instances. It supports querying clients by either their unique
|
||||
// subscription ID or their alternate ID (where multiple clients can share an alternate ID).
|
||||
type Clients struct {
|
||||
clientsSubMap map[notify.Target]*Client
|
||||
clientsIDMap map[string][]*Client
|
||||
lock *sync.RWMutex
|
||||
}
|
||||
|
||||
// Client represents a unique subscriber to the notifications channel.
|
||||
// It tracks activity via lastSeen timestamp (updated atomically) and monitors
|
||||
// consecutive send failures for automatic disconnect detection.
|
||||
type Client struct {
|
||||
sub *notify.Subscriber
|
||||
lastSeen int64 // accessed atomically
|
||||
altID string
|
||||
consecutiveFails int32 // accessed atomically
|
||||
}
|
||||
|
||||
func (s *Server) startNotifier() {
|
||||
if s.notifier != nil && s.notifier.running {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.notifier = &Notifier{
|
||||
Notifier: notify.NewNotifier(50),
|
||||
clients: &Clients{
|
||||
clientsSubMap: make(map[notify.Target]*Client),
|
||||
clientsIDMap: make(map[string][]*Client),
|
||||
lock: new(sync.RWMutex),
|
||||
},
|
||||
running: true,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.notifier.clients.cleanUp()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) closeNotifier() {
|
||||
if s.notifier != nil {
|
||||
if s.notifier.cancel != nil {
|
||||
s.notifier.cancel()
|
||||
}
|
||||
s.notifier.running = false
|
||||
s.notifier.Close()
|
||||
}
|
||||
s.notifier = nil
|
||||
}
|
||||
|
||||
// NotifySub sends a notification to a specific subscriber identified by the notification's Target field.
|
||||
// If the subscriber doesn't exist, a warning is logged but the operation does not fail.
|
||||
// This is thread-safe and can be called from multiple goroutines.
|
||||
func (s *Server) NotifySub(nt notify.Notification) {
|
||||
if s.notifier == nil {
|
||||
return
|
||||
}
|
||||
_, exists := s.notifier.clients.getClient(nt.Target)
|
||||
if !exists {
|
||||
err := fmt.Errorf("Tried to notify subscriber that doesn't exist - subID: %s", nt.Target)
|
||||
s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err})
|
||||
return
|
||||
}
|
||||
s.notifier.Notify(nt)
|
||||
}
|
||||
|
||||
// NotifyID sends a notification to all clients associated with the given alternate ID.
|
||||
// Multiple clients can share the same alternate ID (e.g., multiple sessions for one user).
|
||||
// If no clients exist with that ID, a warning is logged but the operation does not fail.
|
||||
// This is thread-safe and can be called from multiple goroutines.
|
||||
func (s *Server) NotifyID(nt notify.Notification, altID string) {
|
||||
if s.notifier == nil {
|
||||
return
|
||||
}
|
||||
s.notifier.clients.lock.RLock()
|
||||
clients, exists := s.notifier.clients.clientsIDMap[altID]
|
||||
s.notifier.clients.lock.RUnlock()
|
||||
if !exists {
|
||||
err := fmt.Errorf("Tried to notify client group that doesn't exist - altID: %s", altID)
|
||||
s.LogError(HWSError{Level: ErrorWARN, Message: "Failed to notify", Error: err})
|
||||
return
|
||||
}
|
||||
for _, client := range clients {
|
||||
ntt := nt
|
||||
ntt.Target = client.sub.ID
|
||||
s.NotifySub(ntt)
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyAll broadcasts a notification to all connected clients.
|
||||
// This is thread-safe and can be called from multiple goroutines.
|
||||
func (s *Server) NotifyAll(nt notify.Notification) {
|
||||
if s.notifier == nil {
|
||||
return
|
||||
}
|
||||
nt.Target = ""
|
||||
s.notifier.NotifyAll(nt)
|
||||
}
|
||||
|
||||
// GetClient returns a Client that can be used to receive notifications.
|
||||
// If a client exists with the provided subID, that client will be returned.
|
||||
// If altID is provided, it will update the existing Client.
|
||||
// If subID is an empty string, a new client will be returned.
|
||||
// If both altID and subID are empty, a new Client with no altID will be returned.
|
||||
// Multiple clients with the same altID are permitted.
|
||||
func (s *Server) GetClient(subID, altID string) (*Client, error) {
|
||||
if s.notifier == nil || !s.notifier.running {
|
||||
return nil, errors.New("notifier hasn't started")
|
||||
}
|
||||
target := notify.Target(subID)
|
||||
client, exists := s.notifier.clients.getClient(target)
|
||||
if exists {
|
||||
s.notifier.clients.updateAltID(client, altID)
|
||||
return client, nil
|
||||
}
|
||||
// An error should only be returned if there are 10 collisions of a randomly generated 16 bit byte string from rand.Rand()
|
||||
// Basically never going to happen, and if it does its not my problem
|
||||
sub, _ := s.notifier.Subscribe()
|
||||
client = &Client{
|
||||
sub: sub,
|
||||
lastSeen: time.Now().Unix(),
|
||||
altID: altID,
|
||||
consecutiveFails: 0,
|
||||
}
|
||||
s.notifier.clients.addClient(client)
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (cs *Clients) getClient(target notify.Target) (*Client, bool) {
|
||||
cs.lock.RLock()
|
||||
client, exists := cs.clientsSubMap[target]
|
||||
cs.lock.RUnlock()
|
||||
return client, exists
|
||||
}
|
||||
|
||||
func (cs *Clients) updateAltID(client *Client, altID string) {
|
||||
cs.lock.Lock()
|
||||
if altID != "" && !slices.Contains(cs.clientsIDMap[altID], client) {
|
||||
cs.clientsIDMap[altID] = append(cs.clientsIDMap[altID], client)
|
||||
}
|
||||
if client.altID != altID && client.altID != "" {
|
||||
cs.deleteFromID(client, client.altID)
|
||||
}
|
||||
client.altID = altID
|
||||
cs.lock.Unlock()
|
||||
}
|
||||
|
||||
func (cs *Clients) deleteFromID(client *Client, altID string) {
|
||||
cs.clientsIDMap[altID] = deleteFromSlice(cs.clientsIDMap[altID], client, func(a, b *Client) bool {
|
||||
return a.sub.ID == b.sub.ID
|
||||
})
|
||||
if len(cs.clientsIDMap[altID]) == 0 {
|
||||
delete(cs.clientsIDMap, altID)
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *Clients) addClient(client *Client) {
|
||||
cs.lock.Lock()
|
||||
cs.clientsSubMap[client.sub.ID] = client
|
||||
if client.altID != "" {
|
||||
cs.clientsIDMap[client.altID] = append(cs.clientsIDMap[client.altID], client)
|
||||
}
|
||||
cs.lock.Unlock()
|
||||
}
|
||||
|
||||
func (cs *Clients) cleanUp() {
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Collect clients to kill while holding read lock
|
||||
cs.lock.RLock()
|
||||
toKill := make([]*Client, 0)
|
||||
for _, client := range cs.clientsSubMap {
|
||||
if now-atomic.LoadInt64(&client.lastSeen) > 300 {
|
||||
toKill = append(toKill, client)
|
||||
}
|
||||
}
|
||||
cs.lock.RUnlock()
|
||||
|
||||
// Kill clients without holding lock
|
||||
for _, client := range toKill {
|
||||
cs.killClient(client)
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *Clients) killClient(client *Client) {
|
||||
client.sub.Unsubscribe()
|
||||
|
||||
cs.lock.Lock()
|
||||
delete(cs.clientsSubMap, client.sub.ID)
|
||||
if client.altID != "" {
|
||||
cs.deleteFromID(client, client.altID)
|
||||
}
|
||||
cs.lock.Unlock()
|
||||
}
|
||||
|
||||
// Listen starts a goroutine that forwards notifications from the subscriber to a returned channel.
|
||||
// It returns a receive-only channel for notifications and a channel to stop listening.
|
||||
// The notification channel is buffered with size 10 to tolerate brief slowness.
|
||||
//
|
||||
// The goroutine automatically stops and closes the notification channel when:
|
||||
// - The subscriber is unsubscribed
|
||||
// - The stop channel is closed
|
||||
// - The client fails to receive 5 consecutive notifications within 5 seconds each
|
||||
//
|
||||
// Client.lastSeen is updated every 30 seconds via heartbeat, or when a notification is successfully delivered.
|
||||
// Consecutive send failures are tracked; after 5 failures, the client is considered disconnected and cleaned up.
|
||||
func (c *Client) Listen() (<-chan notify.Notification, chan<- struct{}) {
|
||||
ch := make(chan notify.Notification, 10)
|
||||
stop := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
defer close(ch)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
|
||||
case nt, ok := <-c.sub.Listen():
|
||||
if !ok {
|
||||
// Subscriber channel closed
|
||||
return
|
||||
}
|
||||
|
||||
// Try to send with timeout
|
||||
timeout := time.NewTimer(5 * time.Second)
|
||||
select {
|
||||
case ch <- nt:
|
||||
// Successfully sent - update lastSeen and reset failure count
|
||||
atomic.StoreInt64(&c.lastSeen, time.Now().Unix())
|
||||
atomic.StoreInt32(&c.consecutiveFails, 0)
|
||||
timeout.Stop()
|
||||
|
||||
case <-timeout.C:
|
||||
// Send timeout - increment failure count
|
||||
fails := atomic.AddInt32(&c.consecutiveFails, 1)
|
||||
if fails >= 5 {
|
||||
// Too many consecutive failures - client is stuck/disconnected
|
||||
c.sub.Unsubscribe()
|
||||
return
|
||||
}
|
||||
|
||||
case <-stop:
|
||||
timeout.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
// Heartbeat - update lastSeen to keep client alive
|
||||
atomic.StoreInt64(&c.lastSeen, time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, stop
|
||||
}
|
||||
|
||||
func (c *Client) ID() string {
|
||||
return string(c.sub.ID)
|
||||
}
|
||||
|
||||
func deleteFromSlice[T any](a []T, c T, eq func(T, T) bool) []T {
|
||||
n := 0
|
||||
for _, x := range a {
|
||||
if !eq(x, c) {
|
||||
a[n] = x
|
||||
n++
|
||||
}
|
||||
}
|
||||
return a[:n]
|
||||
}
|
||||
1014
hws/notify_test.go
Normal file
1014
hws/notify_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -13,3 +13,7 @@ func (w *wrappedWriter) WriteHeader(statusCode int) {
|
||||
w.ResponseWriter.WriteHeader(statusCode)
|
||||
w.statusCode = statusCode
|
||||
}
|
||||
|
||||
func (w *wrappedWriter) Unwrap() http.ResponseWriter {
|
||||
return w.ResponseWriter
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.haelnorr.com/h/golib/notify"
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -20,6 +21,8 @@ type Server struct {
|
||||
middleware bool
|
||||
errorPage ErrorPageFunc
|
||||
ready chan struct{}
|
||||
notifier *Notifier
|
||||
shutdowndelay time.Duration
|
||||
}
|
||||
|
||||
// Ready returns a channel that is closed when the server is started
|
||||
@@ -87,6 +90,7 @@ func NewServer(config *Config) (*Server, error) {
|
||||
routes: false,
|
||||
GZIP: config.GZIP,
|
||||
ready: make(chan struct{}),
|
||||
shutdowndelay: config.ShutdownDelay,
|
||||
}
|
||||
return server, nil
|
||||
}
|
||||
@@ -105,6 +109,8 @@ func (server *Server) Start(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
server.startNotifier()
|
||||
|
||||
go func() {
|
||||
if server.logger == nil {
|
||||
fmt.Printf("Listening for requests on %s", server.server.Addr)
|
||||
@@ -126,6 +132,13 @@ func (server *Server) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (server *Server) Shutdown(ctx context.Context) error {
|
||||
server.logger.logger.Debug().Dur("shutdown_delay", server.shutdowndelay).Msg("HWS Server shutting down")
|
||||
server.NotifyAll(notify.Notification{
|
||||
Title: "Shutting down",
|
||||
Message: fmt.Sprintf("Server is shutting down in %v", server.shutdowndelay),
|
||||
Level: LevelShutdown,
|
||||
})
|
||||
<-time.NewTimer(server.shutdowndelay).C
|
||||
if !server.IsReady() {
|
||||
return errors.New("Server isn't running")
|
||||
}
|
||||
@@ -136,6 +149,7 @@ func (server *Server) Shutdown(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Failed to shutdown the server gracefully")
|
||||
}
|
||||
server.closeNotifier()
|
||||
server.ready = make(chan struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user