Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 143 lines 2.9 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "sync" 9 "time" 10 11 "github.com/redis/go-redis/v9" 12) 13 14const ( 15 redisChannel = "wicket:events" 16 redisBufferKey = "wicket:buffer" 17) 18 19type RedisBackend struct { 20 client *redis.Client 21 bufferSize int 22 mu sync.Mutex 23 closed bool 24} 25 26func NewRedisBackend(url string, bufferSize int) (*RedisBackend, error) { 27 opts, err := redis.ParseURL(url) 28 if err != nil { 29 return nil, fmt.Errorf("parsing redis URL: %w", err) 30 } 31 client := redis.NewClient(opts) 32 if err := client.Ping(context.Background()).Err(); err != nil { 33 return nil, fmt.Errorf("connecting to redis: %w", err) 34 } 35 return &RedisBackend{ 36 client: client, 37 bufferSize: bufferSize, 38 }, nil 39} 40 41func newRedisBackendFromClient(client *redis.Client, bufferSize int) *RedisBackend { 42 return &RedisBackend{ 43 client: client, 44 bufferSize: bufferSize, 45 } 46} 47 48func (r *RedisBackend) Publish(event *Event) error { 49 data, err := json.Marshal(event) 50 if err != nil { 51 return fmt.Errorf("marshaling event: %w", err) 52 } 53 ctx := context.Background() 54 pipe := r.client.Pipeline() 55 pipe.LPush(ctx, redisBufferKey, data) 56 pipe.LTrim(ctx, redisBufferKey, 0, int64(r.bufferSize-1)) 57 pipe.Expire(ctx, redisBufferKey, 24*time.Hour) 58 pipe.Publish(ctx, redisChannel, data) 59 _, err = pipe.Exec(ctx) 60 return err 61} 62 63func (r *RedisBackend) Subscribe(ctx context.Context) <-chan *Event { 64 ch := make(chan *Event, 256) 65 pubsub := r.client.Subscribe(ctx, redisChannel) 66 67 go func() { 68 defer close(ch) 69 defer pubsub.Close() 70 msgCh := pubsub.Channel() 71 for { 72 select { 73 case <-ctx.Done(): 74 return 75 case msg := <-msgCh: 76 if msg == nil { 77 return 78 } 79 var event Event 80 if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { 81 log.Printf("redis: unmarshal event: %v", err) 82 continue 83 } 84 select { 85 case ch <- &event: 86 default: 87 } 88 } 89 } 90 }() 91 92 return ch 93} 94 95func (r *RedisBackend) Since(lastEventID string, subscribePath string) []*Event { 96 ctx := context.Background() 97 vals, err := r.client.LRange(ctx, redisBufferKey, 0, -1).Result() 98 if err != nil { 99 log.Printf("redis: LRANGE: %v", err) 100 return nil 101 } 102 103 // Redis list is newest-first (LPUSH), reverse to chronological order 104 events := make([]*Event, 0, len(vals)) 105 for i := len(vals) - 1; i >= 0; i-- { 106 var e Event 107 if err := json.Unmarshal([]byte(vals[i]), &e); err != nil { 108 continue 109 } 110 events = append(events, &e) 111 } 112 113 found := false 114 foundIdx := 0 115 for i, e := range events { 116 if e.ID == lastEventID { 117 found = true 118 foundIdx = i + 1 119 break 120 } 121 } 122 if !found { 123 foundIdx = 0 124 } 125 126 var result []*Event 127 for i := foundIdx; i < len(events); i++ { 128 if pathMatches(subscribePath, events[i].Path) { 129 result = append(result, events[i]) 130 } 131 } 132 return result 133} 134 135func (r *RedisBackend) Close() error { 136 r.mu.Lock() 137 defer r.mu.Unlock() 138 if r.closed { 139 return nil 140 } 141 r.closed = true 142 return r.client.Close() 143}