Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 114 lines 2.1 kB view raw
1package main 2 3import ( 4 "context" 5 "strings" 6 "sync" 7 "sync/atomic" 8) 9 10type subscriber struct { 11 ch chan *Event 12 path string 13 startSeq int64 14} 15 16type Broker struct { 17 mu sync.RWMutex 18 subscribers map[string][]*subscriber 19 backend Backend 20 seq atomic.Int64 21} 22 23func NewBroker(backend Backend) *Broker { 24 return &Broker{ 25 subscribers: make(map[string][]*subscriber), 26 backend: backend, 27 } 28} 29 30func (b *Broker) Start(ctx context.Context) { 31 ch := b.backend.Subscribe(ctx) 32 go func() { 33 var fanOutSeq int64 34 for event := range ch { 35 fanOutSeq++ 36 b.fanOut(event, fanOutSeq) 37 } 38 }() 39} 40 41func (b *Broker) fanOut(event *Event, seq int64) { 42 b.mu.RLock() 43 defer b.mu.RUnlock() 44 45 for _, path := range publishPaths(event.Path) { 46 for _, sub := range b.subscribers[path] { 47 if seq <= sub.startSeq { 48 continue 49 } 50 select { 51 case sub.ch <- event: 52 default: 53 } 54 } 55 } 56} 57 58func (b *Broker) Publish(event *Event) error { 59 b.seq.Add(1) 60 return b.backend.Publish(event) 61} 62 63func (b *Broker) Subscribe(path string, lastEventID string) (<-chan *Event, func()) { 64 ch := make(chan *Event, 64) 65 sub := &subscriber{ch: ch, path: path, startSeq: b.seq.Load()} 66 67 b.mu.Lock() 68 b.subscribers[path] = append(b.subscribers[path], sub) 69 b.mu.Unlock() 70 71 if lastEventID != "" { 72 events := b.backend.Since(lastEventID, path) 73 for _, e := range events { 74 ch <- e 75 } 76 } 77 78 unsub := func() { 79 b.mu.Lock() 80 defer b.mu.Unlock() 81 subs := b.subscribers[path] 82 for i, s := range subs { 83 if s == sub { 84 b.subscribers[path] = append(subs[:i], subs[i+1:]...) 85 break 86 } 87 } 88 if len(b.subscribers[path]) == 0 { 89 delete(b.subscribers, path) 90 } 91 func() { 92 defer func() { recover() }() 93 close(ch) 94 }() 95 } 96 97 return ch, unsub 98} 99 100func publishPaths(eventPath string) []string { 101 paths := []string{eventPath} 102 for { 103 i := strings.LastIndex(eventPath, "/") 104 if i < 0 { 105 break 106 } 107 eventPath = eventPath[:i] 108 paths = append(paths, eventPath) 109 } 110 if paths[len(paths)-1] != "" { 111 paths = append(paths, "") 112 } 113 return paths 114}