A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at label-service 286 lines 8.3 kB view raw
1package webhooks 2 3import ( 4 "context" 5 "crypto/hmac" 6 "crypto/sha256" 7 "encoding/hex" 8 "encoding/json" 9 "fmt" 10 "io" 11 "log/slog" 12 "math/rand/v2" 13 "net/http" 14 "strings" 15 "time" 16 17 "atcr.io/pkg/appview/db" 18 "atcr.io/pkg/appview/storage" 19 "atcr.io/pkg/atproto" 20) 21 22// Dispatcher handles webhook delivery for push and scan notifications. 23// It reads webhooks from the appview DB and delivers payloads 24// with Discord/Slack formatting and HMAC signing. 25type Dispatcher struct { 26 db db.DBTX 27 meta atproto.AppviewMetadata 28} 29 30// NewDispatcher creates a new webhook dispatcher 31func NewDispatcher(database db.DBTX, meta atproto.AppviewMetadata) *Dispatcher { 32 return &Dispatcher{ 33 db: database, 34 meta: meta, 35 } 36} 37 38// DispatchForScan fires matching webhooks after a scan record arrives via Jetstream. 39// previousScan is nil for first-time scans. userHandle is used for payload enrichment. 40func (d *Dispatcher) DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string) { 41 webhooks, err := db.GetWebhooksForUser(d.db, scan.UserDID) 42 if err != nil || len(webhooks) == 0 { 43 return 44 } 45 46 isFirst := previousScan == nil 47 isChanged := previousScan != nil && vulnCountsChanged(scan, previousScan) 48 49 scanInfo := WebhookScanInfo{ 50 ScannedAt: scan.ScannedAt.Format(time.RFC3339), 51 ScannerVersion: scan.ScannerVersion, 52 Vulnerabilities: WebhookVulnCounts{ 53 Critical: scan.Critical, 54 High: scan.High, 55 Medium: scan.Medium, 56 Low: scan.Low, 57 Total: scan.Total, 58 }, 59 } 60 61 manifestInfo := WebhookManifestInfo{ 62 Digest: scan.ManifestDigest, 63 Repository: scan.Repository, 64 Tag: tag, 65 UserDID: scan.UserDID, 66 UserHandle: userHandle, 67 } 68 69 for _, wh := range webhooks { 70 // Check each trigger condition against bitmask 71 var triggers []string 72 if wh.Triggers&TriggerFirst != 0 && isFirst { 73 triggers = append(triggers, "scan:first") 74 } 75 if wh.Triggers&TriggerAll != 0 { 76 triggers = append(triggers, "scan:all") 77 } 78 if wh.Triggers&TriggerChanged != 0 && isChanged { 79 triggers = append(triggers, "scan:changed") 80 } 81 82 for _, trigger := range triggers { 83 payload := WebhookPayload{ 84 Trigger: trigger, 85 HoldDID: scan.HoldDID, 86 HoldEndpoint: holdEndpoint, 87 Manifest: manifestInfo, 88 Scan: scanInfo, 89 } 90 91 // Include previous counts for scan:changed 92 if trigger == "scan:changed" && previousScan != nil { 93 payload.Previous = &WebhookVulnCounts{ 94 Critical: previousScan.Critical, 95 High: previousScan.High, 96 Medium: previousScan.Medium, 97 Low: previousScan.Low, 98 Total: previousScan.Total, 99 } 100 } 101 102 payloadBytes, err := json.Marshal(payload) 103 if err != nil { 104 slog.Error("Failed to marshal webhook payload", "error", err) 105 continue 106 } 107 108 go d.deliverWithRetry(wh.URL, wh.Secret, payloadBytes) 109 } 110 } 111} 112 113// DispatchForPush fires matching webhooks after a manifest is pushed. 114func (d *Dispatcher) DispatchForPush(ctx context.Context, event storage.PushWebhookEvent) { 115 webhooks, err := db.GetWebhooksForUser(d.db, event.OwnerDID) 116 if err != nil || len(webhooks) == 0 { 117 return 118 } 119 120 // Fetch star/pull counts for payload enrichment 121 var starCount, pullCount int 122 stats, err := db.GetRepositoryStats(d.db, event.OwnerDID, event.Repository) 123 if err == nil && stats != nil { 124 starCount = stats.StarCount 125 pullCount = stats.PullCount 126 } 127 128 // Build repo URL using the primary registry domain (pull domain) if available 129 baseURL := d.meta.BaseURL 130 if len(d.meta.RegistryDomains) > 0 { 131 baseURL = "https://" + d.meta.RegistryDomains[0] 132 } 133 repoURL := fmt.Sprintf("%s/%s/%s", baseURL, event.OwnerHandle, event.Repository) 134 135 payload := PushWebhookPayload{ 136 Trigger: "push", 137 PushData: PushData{ 138 PushedAt: time.Now().Format(time.RFC3339), 139 Pusher: event.PusherHandle, 140 PusherDID: event.PusherDID, 141 Tag: event.Tag, 142 Digest: event.Digest, 143 }, 144 Repository: PushRepository{ 145 Name: event.Repository, 146 Namespace: event.OwnerHandle, 147 RepoName: event.OwnerHandle + "/" + event.Repository, 148 RepoURL: repoURL, 149 MediaType: event.MediaType, 150 StarCount: starCount, 151 PullCount: pullCount, 152 }, 153 Hold: PushHold{ 154 DID: event.HoldDID, 155 Endpoint: event.HoldEndpoint, 156 }, 157 } 158 159 payloadBytes, err := json.Marshal(payload) 160 if err != nil { 161 slog.Error("Failed to marshal push webhook payload", "error", err) 162 return 163 } 164 165 for _, wh := range webhooks { 166 if wh.Triggers&TriggerPush == 0 { 167 continue 168 } 169 go d.deliverWithRetry(wh.URL, wh.Secret, payloadBytes) 170 } 171} 172 173// DeliverTest sends a test payload to a specific webhook (synchronous, single attempt) 174func (d *Dispatcher) DeliverTest(ctx context.Context, webhookID, userDID, userHandle string) (bool, error) { 175 wh, err := db.GetWebhookByID(d.db, webhookID) 176 if err != nil { 177 return false, err 178 } 179 if wh.UserDID != userDID { 180 return false, fmt.Errorf("unauthorized") 181 } 182 183 // Randomize vulnerability counts so each test shows a different severity color 184 critical := rand.IntN(3) 185 high := rand.IntN(5) 186 medium := rand.IntN(8) 187 low := rand.IntN(10) 188 total := critical + high + medium + low 189 190 payload := WebhookPayload{ 191 Trigger: "test", 192 Manifest: WebhookManifestInfo{ 193 Digest: "sha256:0000000000000000000000000000000000000000000000000000000000000000", 194 Repository: "test-repo", 195 Tag: "latest", 196 UserDID: userDID, 197 UserHandle: userHandle, 198 }, 199 Scan: WebhookScanInfo{ 200 ScannedAt: time.Now().Format(time.RFC3339), 201 ScannerVersion: "atcr-scanner-v1.0.0", 202 Vulnerabilities: WebhookVulnCounts{ 203 Critical: critical, High: high, Medium: medium, Low: low, Total: total, 204 }, 205 }, 206 } 207 208 payloadBytes, _ := json.Marshal(payload) 209 success := d.attemptDelivery(wh.URL, wh.Secret, payloadBytes) 210 return success, nil 211} 212 213// deliverWithRetry attempts to deliver a webhook with exponential backoff 214func (d *Dispatcher) deliverWithRetry(webhookURL, secret string, payload []byte) { 215 delays := []time.Duration{0, 30 * time.Second, 2 * time.Minute, 8 * time.Minute} 216 for attempt, delay := range delays { 217 if attempt > 0 { 218 time.Sleep(delay) 219 } 220 if d.attemptDelivery(webhookURL, secret, payload) { 221 return 222 } 223 } 224 slog.Warn("Webhook delivery failed after retries", "url", maskURL(webhookURL)) 225} 226 227// attemptDelivery sends a single webhook HTTP POST 228func (d *Dispatcher) attemptDelivery(webhookURL, secret string, payload []byte) bool { 229 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 230 defer cancel() 231 232 // Reformat payload for platform-specific webhook APIs 233 sendPayload := payload 234 if isDiscordWebhook(webhookURL) || isSlackWebhook(webhookURL) { 235 formatted, fmtErr := formatPlatformPayload(payload, webhookURL, d.meta) 236 if fmtErr == nil { 237 sendPayload = formatted 238 } 239 } 240 241 req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, strings.NewReader(string(sendPayload))) 242 if err != nil { 243 slog.Warn("Failed to create webhook request", "error", err) 244 return false 245 } 246 247 req.Header.Set("Content-Type", "application/json") 248 req.Header.Set("User-Agent", d.meta.ClientShortName+"-Webhook/1.0") 249 250 // HMAC signing if secret is set (signs the actual payload sent) 251 if secret != "" { 252 mac := hmac.New(sha256.New, []byte(secret)) 253 mac.Write(sendPayload) 254 sig := hex.EncodeToString(mac.Sum(nil)) 255 req.Header.Set("X-Webhook-Signature-256", "sha256="+sig) 256 } 257 258 client := &http.Client{Timeout: 10 * time.Second} 259 resp, err := client.Do(req) 260 if err != nil { 261 slog.Warn("Webhook delivery attempt failed", "url", maskURL(webhookURL), "error", err) 262 return false 263 } 264 defer resp.Body.Close() 265 266 if resp.StatusCode >= 200 && resp.StatusCode < 300 { 267 slog.Info("Webhook delivered successfully", "url", maskURL(webhookURL), "status", resp.StatusCode) 268 return true 269 } 270 271 // Read response body for debugging 272 body, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) 273 slog.Warn("Webhook delivery got non-2xx response", 274 "url", maskURL(webhookURL), 275 "status", resp.StatusCode, 276 "body", string(body)) 277 return false 278} 279 280// vulnCountsChanged checks if vulnerability counts differ between scans 281func vulnCountsChanged(current, previous *db.Scan) bool { 282 return current.Critical != previous.Critical || 283 current.High != previous.High || 284 current.Medium != previous.Medium || 285 current.Low != previous.Low 286}