package main import ( "context" "strings" "sync" "sync/atomic" ) type subscriber struct { ch chan *Event path string startSeq int64 } type Broker struct { mu sync.RWMutex subscribers map[string][]*subscriber backend Backend seq atomic.Int64 } func NewBroker(backend Backend) *Broker { return &Broker{ subscribers: make(map[string][]*subscriber), backend: backend, } } func (b *Broker) Start(ctx context.Context) { ch := b.backend.Subscribe(ctx) go func() { var fanOutSeq int64 for event := range ch { fanOutSeq++ b.fanOut(event, fanOutSeq) } }() } func (b *Broker) fanOut(event *Event, seq int64) { b.mu.RLock() defer b.mu.RUnlock() for _, path := range publishPaths(event.Path) { for _, sub := range b.subscribers[path] { if seq <= sub.startSeq { continue } select { case sub.ch <- event: default: } } } } func (b *Broker) Publish(event *Event) error { b.seq.Add(1) return b.backend.Publish(event) } func (b *Broker) Subscribe(path string, lastEventID string) (<-chan *Event, func()) { ch := make(chan *Event, 64) sub := &subscriber{ch: ch, path: path, startSeq: b.seq.Load()} b.mu.Lock() b.subscribers[path] = append(b.subscribers[path], sub) b.mu.Unlock() if lastEventID != "" { events := b.backend.Since(lastEventID, path) for _, e := range events { ch <- e } } unsub := func() { b.mu.Lock() defer b.mu.Unlock() subs := b.subscribers[path] for i, s := range subs { if s == sub { b.subscribers[path] = append(subs[:i], subs[i+1:]...) break } } if len(b.subscribers[path]) == 0 { delete(b.subscribers, path) } func() { defer func() { recover() }() close(ch) }() } return ch, unsub } func publishPaths(eventPath string) []string { paths := []string{eventPath} for { i := strings.LastIndex(eventPath, "/") if i < 0 { break } eventPath = eventPath[:i] paths = append(paths, eventPath) } if paths[len(paths)-1] != "" { paths = append(paths, "") } return paths }