Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 212 lines 5.2 kB view raw
1package main 2 3import ( 4 "encoding/base64" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "mime" 10 "net/http" 11 "strings" 12 "sync/atomic" 13 "time" 14 15 "github.com/google/uuid" 16) 17 18var version = "dev" 19 20type Server struct { 21 broker *Broker 22 config *atomic.Pointer[Configuration] 23} 24 25func NewServer(broker *Broker, config *atomic.Pointer[Configuration]) http.Handler { 26 s := &Server{broker: broker, config: config} 27 return http.HandlerFunc(s.ServeHTTP) 28} 29 30func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 31 setCORSHeaders(w) 32 33 if r.Method == "GET" && r.URL.Path == "/" { 34 w.Header().Set("Content-Type", "text/plain") 35 fmt.Fprintf(w, "wicket %s\n", version) 36 return 37 } 38 39 if r.Method == "GET" && r.URL.Path == "/_health" { 40 w.WriteHeader(http.StatusNoContent) 41 return 42 } 43 44 switch r.Method { 45 case "OPTIONS": 46 w.WriteHeader(http.StatusNoContent) 47 case "POST": 48 s.handlePost(w, r) 49 case "GET": 50 if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") { 51 http.NotFound(w, r) 52 return 53 } 54 s.handleSSE(w, r) 55 default: 56 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 57 } 58} 59 60func normalizePath(raw string) string { 61 return strings.TrimRight(strings.TrimPrefix(raw, "/"), "/") 62} 63 64func setCORSHeaders(w http.ResponseWriter) { 65 w.Header().Set("Access-Control-Allow-Origin", "*") 66 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") 67 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Last-Event-ID") 68} 69 70func (s *Server) handlePost(w http.ResponseWriter, r *http.Request) { 71 path := normalizePath(r.URL.Path) 72 73 body, _ := io.ReadAll(r.Body) 74 75 cfg := s.config.Load() 76 if pc := cfg.LookupVerification(path); pc != nil { 77 verifier, err := NewVerifier(pc.Verify) 78 if err != nil { 79 http.Error(w, "server configuration error", http.StatusInternalServerError) 80 return 81 } 82 if err := verifier.Verify(body, r.Header, pc.Secret, pc.SignatureHeader); err != nil { 83 log.Printf("verification failed for %s: %v, headers: %v", path, err, r.Header) 84 http.Error(w, "forbidden", http.StatusForbidden) 85 return 86 } 87 } 88 89 var payload any 90 mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) 91 if err == nil && mediaType == "application/json" { 92 if err := json.Unmarshal(body, &payload); err != nil { 93 http.Error(w, "invalid JSON", http.StatusBadRequest) 94 return 95 } 96 } else if err == nil && isTextContent(mediaType, params) { 97 payload = string(body) 98 } else { 99 payload = base64.StdEncoding.EncodeToString(body) 100 } 101 102 headers := extractHeaders(r.Header) 103 104 event := &Event{ 105 ID: uuid.New().String(), 106 Timestamp: time.Now().UTC(), 107 Method: r.Method, 108 Path: path, 109 Headers: headers, 110 Payload: payload, 111 } 112 113 if err := s.broker.Publish(event); err != nil { 114 log.Printf("publish failed for %s: %v", path, err) 115 http.Error(w, "publish failed", http.StatusInternalServerError) 116 return 117 } 118 log.Printf("published %s %s (%s) id=%s", r.Method, path, r.Header.Get("Content-Type"), event.ID) 119 w.WriteHeader(http.StatusAccepted) 120} 121 122func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { 123 path := normalizePath(r.URL.Path) 124 125 cfg := s.config.Load() 126 if secret := cfg.LookupSubscribeSecret(path); secret != "" { 127 auth := r.Header.Get("Authorization") 128 if !strings.HasPrefix(auth, "Bearer ") || strings.TrimPrefix(auth, "Bearer ") != secret { 129 http.Error(w, "unauthorized", http.StatusUnauthorized) 130 return 131 } 132 } 133 134 flusher, ok := w.(http.Flusher) 135 if !ok { 136 http.Error(w, "streaming unsupported", http.StatusInternalServerError) 137 return 138 } 139 140 filters := ParseFilters(r.URL.Query()) 141 lastEventID := r.Header.Get("Last-Event-ID") 142 143 ch, unsub := s.broker.Subscribe(path, lastEventID) 144 defer unsub() 145 146 w.Header().Set("Content-Type", "text/event-stream") 147 w.Header().Set("Cache-Control", "no-cache") 148 w.Header().Set("Connection", "keep-alive") 149 w.WriteHeader(http.StatusOK) 150 flusher.Flush() 151 152 ctx := r.Context() 153 for { 154 select { 155 case <-ctx.Done(): 156 return 157 case event, ok := <-ch: 158 if !ok { 159 return 160 } 161 if !MatchAll(filters, event) { 162 continue 163 } 164 data, err := json.Marshal(event) 165 if err != nil { 166 log.Printf("marshaling event %s: %v", event.ID, err) 167 continue 168 } 169 fmt.Fprintf(w, "id: %s\ndata: %s\n\n", event.ID, data) 170 flusher.Flush() 171 } 172 } 173} 174 175var hopByHopHeaders = map[string]bool{ 176 "Connection": true, 177 "Keep-Alive": true, 178 "Proxy-Authenticate": true, 179 "Proxy-Authorization": true, 180 "Te": true, 181 "Trailer": true, 182 "Transfer-Encoding": true, 183 "Upgrade": true, 184 "Host": true, 185 "Content-Length": true, 186} 187 188func extractHeaders(h http.Header) map[string]string { 189 headers := make(map[string]string) 190 for name, values := range h { 191 if hopByHopHeaders[name] { 192 continue 193 } 194 headers[name] = values[0] 195 } 196 return headers 197} 198 199func isTextContent(mediaType string, params map[string]string) bool { 200 if strings.HasPrefix(mediaType, "text/") { 201 return true 202 } 203 if mediaType == "application/x-www-form-urlencoded" || 204 mediaType == "application/xml" || 205 mediaType == "application/xhtml+xml" { 206 return true 207 } 208 if _, ok := params["charset"]; ok { 209 return true 210 } 211 return false 212}