A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

begin scanner implementation

+1281
+270
pkg/hold/scanner/extractor.go
··· 1 + package scanner 2 + 3 + import ( 4 + "archive/tar" 5 + "compress/gzip" 6 + "context" 7 + "encoding/json" 8 + "fmt" 9 + "io" 10 + "log/slog" 11 + "os" 12 + "path/filepath" 13 + "strings" 14 + ) 15 + 16 + // extractLayers extracts all image layers from storage to a temporary directory 17 + // Returns the directory path and a cleanup function 18 + func (w *Worker) extractLayers(ctx context.Context, job *ScanJob) (string, func(), error) { 19 + // Create temp directory for extraction 20 + // Use the database directory as the base (since we're in a scratch container with no /tmp) 21 + scanTmpBase := filepath.Join(w.config.Database.Path, "scanner-tmp") 22 + if err := os.MkdirAll(scanTmpBase, 0755); err != nil { 23 + return "", nil, fmt.Errorf("failed to create scanner temp base: %w", err) 24 + } 25 + 26 + tmpDir, err := os.MkdirTemp(scanTmpBase, "scan-*") 27 + if err != nil { 28 + return "", nil, fmt.Errorf("failed to create temp directory: %w", err) 29 + } 30 + 31 + cleanup := func() { 32 + if err := os.RemoveAll(tmpDir); err != nil { 33 + slog.Warn("Failed to clean up temp directory", "dir", tmpDir, "error", err) 34 + } 35 + } 36 + 37 + // Create image directory structure 38 + imageDir := filepath.Join(tmpDir, "image") 39 + if err := os.MkdirAll(imageDir, 0755); err != nil { 40 + cleanup() 41 + return "", nil, fmt.Errorf("failed to create image directory: %w", err) 42 + } 43 + 44 + // Download and extract config blob 45 + slog.Info("Downloading config blob", "digest", job.Config.Digest) 46 + configPath := filepath.Join(imageDir, "config.json") 47 + if err := w.downloadBlob(ctx, job.Config.Digest, configPath); err != nil { 48 + cleanup() 49 + return "", nil, fmt.Errorf("failed to download config blob: %w", err) 50 + } 51 + 52 + // Validate config is valid JSON 53 + configData, err := os.ReadFile(configPath) 54 + if err != nil { 55 + cleanup() 56 + return "", nil, fmt.Errorf("failed to read config: %w", err) 57 + } 58 + var configObj map[string]interface{} 59 + if err := json.Unmarshal(configData, &configObj); err != nil { 60 + cleanup() 61 + return "", nil, fmt.Errorf("invalid config JSON: %w", err) 62 + } 63 + 64 + // Create layers directory for extracted content 65 + layersDir := filepath.Join(imageDir, "layers") 66 + if err := os.MkdirAll(layersDir, 0755); err != nil { 67 + cleanup() 68 + return "", nil, fmt.Errorf("failed to create layers directory: %w", err) 69 + } 70 + 71 + // Download and extract each layer in order (creating overlayfs-style filesystem) 72 + rootfsDir := filepath.Join(imageDir, "rootfs") 73 + if err := os.MkdirAll(rootfsDir, 0755); err != nil { 74 + cleanup() 75 + return "", nil, fmt.Errorf("failed to create rootfs directory: %w", err) 76 + } 77 + 78 + for i, layer := range job.Layers { 79 + slog.Info("Extracting layer", "index", i, "digest", layer.Digest, "size", layer.Size) 80 + 81 + // Download layer blob to temp file 82 + layerPath := filepath.Join(layersDir, fmt.Sprintf("layer-%d.tar.gz", i)) 83 + if err := w.downloadBlob(ctx, layer.Digest, layerPath); err != nil { 84 + cleanup() 85 + return "", nil, fmt.Errorf("failed to download layer %d: %w", i, err) 86 + } 87 + 88 + // Extract layer on top of rootfs (overlayfs style) 89 + if err := w.extractTarGz(layerPath, rootfsDir); err != nil { 90 + cleanup() 91 + return "", nil, fmt.Errorf("failed to extract layer %d: %w", i, err) 92 + } 93 + 94 + // Remove layer tar.gz to save space 95 + os.Remove(layerPath) 96 + } 97 + 98 + // Check what was extracted 99 + entries, err := os.ReadDir(rootfsDir) 100 + if err != nil { 101 + slog.Warn("Failed to read rootfs directory", "error", err) 102 + } else { 103 + slog.Info("Successfully extracted image", 104 + "layers", len(job.Layers), 105 + "rootfs", rootfsDir, 106 + "topLevelEntries", len(entries), 107 + "sampleEntries", func() []string { 108 + var samples []string 109 + for i, e := range entries { 110 + if i >= 10 { 111 + break 112 + } 113 + samples = append(samples, e.Name()) 114 + } 115 + return samples 116 + }()) 117 + } 118 + 119 + return rootfsDir, cleanup, nil 120 + } 121 + 122 + // downloadBlob downloads a blob from storage to a local file 123 + func (w *Worker) downloadBlob(ctx context.Context, digest, destPath string) error { 124 + // Convert digest to storage path using distribution's sharding scheme 125 + // Format: /docker/registry/v2/blobs/sha256/47/4734bc89.../data 126 + // where 47 is the first 2 characters of the hash for directory sharding 127 + blobPath := blobPathForDigest(digest) 128 + 129 + // Open blob from storage driver 130 + reader, err := w.driver.Reader(ctx, blobPath, 0) 131 + if err != nil { 132 + return fmt.Errorf("failed to open blob %s: %w", digest, err) 133 + } 134 + defer reader.Close() 135 + 136 + // Create destination file 137 + dest, err := os.Create(destPath) 138 + if err != nil { 139 + return fmt.Errorf("failed to create destination file: %w", err) 140 + } 141 + defer dest.Close() 142 + 143 + // Copy blob data to file 144 + if _, err := io.Copy(dest, reader); err != nil { 145 + return fmt.Errorf("failed to copy blob data: %w", err) 146 + } 147 + 148 + return nil 149 + } 150 + 151 + // extractTarGz extracts a tar.gz file to a destination directory (overlayfs style) 152 + func (w *Worker) extractTarGz(tarGzPath, destDir string) error { 153 + // Open tar.gz file 154 + file, err := os.Open(tarGzPath) 155 + if err != nil { 156 + return fmt.Errorf("failed to open tar.gz: %w", err) 157 + } 158 + defer file.Close() 159 + 160 + // Create gzip reader 161 + gzr, err := gzip.NewReader(file) 162 + if err != nil { 163 + return fmt.Errorf("failed to create gzip reader: %w", err) 164 + } 165 + defer gzr.Close() 166 + 167 + // Create tar reader 168 + tr := tar.NewReader(gzr) 169 + 170 + // Extract each file 171 + for { 172 + header, err := tr.Next() 173 + if err == io.EOF { 174 + break 175 + } 176 + if err != nil { 177 + return fmt.Errorf("failed to read tar header: %w", err) 178 + } 179 + 180 + // Build target path (clean to prevent path traversal) 181 + target := filepath.Join(destDir, filepath.Clean(header.Name)) 182 + 183 + // Ensure target is within destDir (security check) 184 + if !strings.HasPrefix(target, filepath.Clean(destDir)+string(os.PathSeparator)) { 185 + slog.Warn("Skipping path outside destination", "path", header.Name) 186 + continue 187 + } 188 + 189 + switch header.Typeflag { 190 + case tar.TypeDir: 191 + // Create directory 192 + if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil { 193 + return fmt.Errorf("failed to create directory %s: %w", target, err) 194 + } 195 + 196 + case tar.TypeReg: 197 + // Create parent directory 198 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 199 + return fmt.Errorf("failed to create parent directory: %w", err) 200 + } 201 + 202 + // Create file 203 + outFile, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode)) 204 + if err != nil { 205 + return fmt.Errorf("failed to create file %s: %w", target, err) 206 + } 207 + 208 + // Copy file contents 209 + if _, err := io.Copy(outFile, tr); err != nil { 210 + outFile.Close() 211 + return fmt.Errorf("failed to write file %s: %w", target, err) 212 + } 213 + outFile.Close() 214 + 215 + case tar.TypeSymlink: 216 + // Create symlink 217 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 218 + return fmt.Errorf("failed to create parent directory for symlink: %w", err) 219 + } 220 + 221 + // Remove existing file/symlink if it exists 222 + os.Remove(target) 223 + 224 + if err := os.Symlink(header.Linkname, target); err != nil { 225 + slog.Warn("Failed to create symlink", "target", target, "link", header.Linkname, "error", err) 226 + } 227 + 228 + case tar.TypeLink: 229 + // Create hard link 230 + linkTarget := filepath.Join(destDir, filepath.Clean(header.Linkname)) 231 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 232 + return fmt.Errorf("failed to create parent directory for hardlink: %w", err) 233 + } 234 + 235 + // Remove existing file if it exists 236 + os.Remove(target) 237 + 238 + if err := os.Link(linkTarget, target); err != nil { 239 + slog.Warn("Failed to create hardlink", "target", target, "link", linkTarget, "error", err) 240 + } 241 + 242 + default: 243 + slog.Debug("Skipping unsupported tar entry type", "type", header.Typeflag, "name", header.Name) 244 + } 245 + } 246 + 247 + return nil 248 + } 249 + 250 + // blobPathForDigest converts a digest to a storage path using distribution's sharding scheme 251 + // Format: /docker/registry/v2/blobs/sha256/47/4734bc89.../data 252 + // where 47 is the first 2 characters of the hash for directory sharding 253 + func blobPathForDigest(digest string) string { 254 + // Split digest into algorithm and hash 255 + parts := strings.SplitN(digest, ":", 2) 256 + if len(parts) != 2 { 257 + // Fallback for malformed digest 258 + return fmt.Sprintf("/docker/registry/v2/blobs/%s/data", digest) 259 + } 260 + 261 + algorithm := parts[0] 262 + hash := parts[1] 263 + 264 + // Use first 2 characters for sharding 265 + if len(hash) < 2 { 266 + return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/data", algorithm, hash) 267 + } 268 + 269 + return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", algorithm, hash[:2], hash) 270 + }
+351
pkg/hold/scanner/grype.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "os" 10 + "path/filepath" 11 + "sync" 12 + 13 + "github.com/anchore/grype/grype" 14 + "github.com/anchore/grype/grype/db/v6/distribution" 15 + "github.com/anchore/grype/grype/db/v6/installation" 16 + "github.com/anchore/grype/grype/distro" 17 + "github.com/anchore/grype/grype/match" 18 + "github.com/anchore/grype/grype/matcher" 19 + "github.com/anchore/grype/grype/matcher/dotnet" 20 + "github.com/anchore/grype/grype/matcher/golang" 21 + "github.com/anchore/grype/grype/matcher/java" 22 + "github.com/anchore/grype/grype/matcher/javascript" 23 + "github.com/anchore/grype/grype/matcher/python" 24 + "github.com/anchore/grype/grype/matcher/ruby" 25 + "github.com/anchore/grype/grype/matcher/stock" 26 + grypePkg "github.com/anchore/grype/grype/pkg" 27 + "github.com/anchore/grype/grype/vulnerability" 28 + "github.com/anchore/syft/syft/sbom" 29 + ) 30 + 31 + // Global vulnerability database (shared across workers) 32 + var ( 33 + vulnDB vulnerability.Provider 34 + vulnDBLock sync.RWMutex 35 + ) 36 + 37 + // scanVulnerabilities scans an SBOM for vulnerabilities using Grype 38 + // Returns vulnerability report JSON, digest, summary, and any error 39 + func (w *Worker) scanVulnerabilities(ctx context.Context, s *sbom.SBOM) ([]byte, string, VulnerabilitySummary, error) { 40 + slog.Info("Scanning for vulnerabilities with Grype") 41 + 42 + // Load vulnerability database (cached globally) 43 + store, err := w.loadVulnDatabase(ctx) 44 + if err != nil { 45 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to load vulnerability database: %w", err) 46 + } 47 + 48 + // Create package context from SBOM (need distro for synthesis) 49 + var grypeDistro *distro.Distro 50 + if s.Artifacts.LinuxDistribution != nil { 51 + grypeDistro = distro.FromRelease(s.Artifacts.LinuxDistribution, nil) 52 + if grypeDistro != nil { 53 + slog.Info("Using distro for package synthesis", 54 + "name", grypeDistro.Name(), 55 + "version", grypeDistro.Version, 56 + "type", grypeDistro.Type, 57 + "codename", grypeDistro.Codename) 58 + } 59 + } 60 + 61 + // Convert Syft packages to Grype packages WITH distro info 62 + synthesisConfig := grypePkg.SynthesisConfig{ 63 + GenerateMissingCPEs: true, 64 + Distro: grypePkg.DistroConfig{ 65 + Override: grypeDistro, 66 + }, 67 + } 68 + grypePackages := grypePkg.FromCollection(s.Artifacts.Packages, synthesisConfig) 69 + 70 + slog.Info("Converted packages for vulnerability scanning", 71 + "syftPackages", s.Artifacts.Packages.PackageCount(), 72 + "grypePackages", len(grypePackages), 73 + "distro", func() string { 74 + if s.Artifacts.LinuxDistribution != nil { 75 + return fmt.Sprintf("%s %s", s.Artifacts.LinuxDistribution.Name, s.Artifacts.LinuxDistribution.Version) 76 + } 77 + return "none" 78 + }()) 79 + 80 + // Create matchers 81 + matchers := matcher.NewDefaultMatchers(matcher.Config{ 82 + Java: java.MatcherConfig{}, 83 + Ruby: ruby.MatcherConfig{}, 84 + Python: python.MatcherConfig{}, 85 + Dotnet: dotnet.MatcherConfig{}, 86 + Javascript: javascript.MatcherConfig{}, 87 + Golang: golang.MatcherConfig{}, 88 + Stock: stock.MatcherConfig{}, 89 + }) 90 + 91 + // Create package context with the same distro we used for synthesis 92 + pkgContext := grypePkg.Context{ 93 + Source: &s.Source, 94 + Distro: grypeDistro, 95 + } 96 + 97 + // Create vulnerability matcher 98 + vulnerabilityMatcher := &grype.VulnerabilityMatcher{ 99 + VulnerabilityProvider: store, 100 + Matchers: matchers, 101 + NormalizeByCVE: true, 102 + } 103 + 104 + // Find vulnerabilities 105 + slog.Info("Matching vulnerabilities", 106 + "packages", len(grypePackages), 107 + "distro", func() string { 108 + if grypeDistro != nil { 109 + return fmt.Sprintf("%s %s", grypeDistro.Name(), grypeDistro.Version) 110 + } 111 + return "none" 112 + }()) 113 + allMatches, _, err := vulnerabilityMatcher.FindMatches(grypePackages, pkgContext) 114 + if err != nil { 115 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to find vulnerabilities: %w", err) 116 + } 117 + 118 + slog.Info("Vulnerability matching complete", 119 + "totalMatches", allMatches.Count()) 120 + 121 + // If we found 0 matches, log some diagnostic info 122 + if allMatches.Count() == 0 { 123 + slog.Warn("No vulnerability matches found - this may indicate an issue", 124 + "distro", func() string { 125 + if grypeDistro != nil { 126 + return fmt.Sprintf("%s %s", grypeDistro.Name(), grypeDistro.Version) 127 + } 128 + return "none" 129 + }(), 130 + "packages", len(grypePackages), 131 + "databaseBuilt", func() string { 132 + vulnDBLock.RLock() 133 + defer vulnDBLock.RUnlock() 134 + if vulnDB == nil { 135 + return "not loaded" 136 + } 137 + // We can't easily get the build date here without exposing internal state 138 + return "loaded" 139 + }()) 140 + } 141 + 142 + // Count vulnerabilities by severity 143 + summary := w.countVulnerabilitiesBySeverity(*allMatches) 144 + 145 + slog.Info("Vulnerability scan complete", 146 + "critical", summary.Critical, 147 + "high", summary.High, 148 + "medium", summary.Medium, 149 + "low", summary.Low, 150 + "total", summary.Total) 151 + 152 + // Create vulnerability report JSON 153 + report := map[string]interface{}{ 154 + "matches": allMatches.Sorted(), 155 + "source": s.Source, 156 + "distro": s.Artifacts.LinuxDistribution, 157 + "descriptor": map[string]interface{}{ 158 + "name": "grype", 159 + "version": "v0.102.0", // TODO: Get actual Grype version 160 + }, 161 + "summary": summary, 162 + } 163 + 164 + // Encode report to JSON 165 + reportJSON, err := json.MarshalIndent(report, "", " ") 166 + if err != nil { 167 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to encode vulnerability report: %w", err) 168 + } 169 + 170 + // Calculate digest 171 + hash := sha256.Sum256(reportJSON) 172 + digest := fmt.Sprintf("sha256:%x", hash) 173 + 174 + slog.Info("Vulnerability report generated", "size", len(reportJSON), "digest", digest) 175 + 176 + // Upload report blob to storage 177 + if err := w.uploadBlob(ctx, digest, reportJSON); err != nil { 178 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to upload vulnerability report: %w", err) 179 + } 180 + 181 + return reportJSON, digest, summary, nil 182 + } 183 + 184 + // loadVulnDatabase loads the Grype vulnerability database (with caching) 185 + func (w *Worker) loadVulnDatabase(ctx context.Context) (vulnerability.Provider, error) { 186 + // Check if database is already loaded 187 + vulnDBLock.RLock() 188 + if vulnDB != nil { 189 + vulnDBLock.RUnlock() 190 + return vulnDB, nil 191 + } 192 + vulnDBLock.RUnlock() 193 + 194 + // Acquire write lock to load database 195 + vulnDBLock.Lock() 196 + defer vulnDBLock.Unlock() 197 + 198 + // Check again (another goroutine might have loaded it) 199 + if vulnDB != nil { 200 + return vulnDB, nil 201 + } 202 + 203 + slog.Info("Loading Grype vulnerability database", "path", w.config.Scanner.VulnDBPath) 204 + 205 + // Ensure database directory exists 206 + if err := ensureDir(w.config.Scanner.VulnDBPath); err != nil { 207 + return nil, fmt.Errorf("failed to create vulnerability database directory: %w", err) 208 + } 209 + 210 + // Configure database distribution 211 + distConfig := distribution.DefaultConfig() 212 + 213 + // Configure database installation 214 + installConfig := installation.Config{ 215 + DBRootDir: w.config.Scanner.VulnDBPath, 216 + ValidateAge: true, 217 + ValidateChecksum: true, 218 + MaxAllowedBuiltAge: w.config.Scanner.VulnDBUpdateInterval, 219 + } 220 + 221 + // Load database (should already be downloaded by initializeVulnDatabase) 222 + store, status, err := grype.LoadVulnerabilityDB(distConfig, installConfig, false) 223 + if err != nil { 224 + return nil, fmt.Errorf("failed to load vulnerability database (status=%v): %w (hint: database may still be downloading)", status, err) 225 + } 226 + 227 + slog.Info("Vulnerability database loaded", 228 + "status", status, 229 + "built", status.Built, 230 + "location", status.Path, 231 + "schemaVersion", status.SchemaVersion) 232 + 233 + // Check database file size to verify it has content 234 + if stat, err := os.Stat(status.Path); err == nil { 235 + slog.Info("Vulnerability database file stats", 236 + "size", stat.Size(), 237 + "sizeMB", stat.Size()/1024/1024) 238 + } 239 + 240 + // Cache database globally 241 + vulnDB = store 242 + 243 + slog.Info("Vulnerability database loaded successfully") 244 + return vulnDB, nil 245 + } 246 + 247 + // countVulnerabilitiesBySeverity counts vulnerabilities by severity level 248 + func (w *Worker) countVulnerabilitiesBySeverity(matches match.Matches) VulnerabilitySummary { 249 + summary := VulnerabilitySummary{} 250 + 251 + for m := range matches.Enumerate() { 252 + summary.Total++ 253 + 254 + // Get severity from vulnerability metadata 255 + if m.Vulnerability.Metadata != nil { 256 + severity := m.Vulnerability.Metadata.Severity 257 + switch severity { 258 + case "Critical": 259 + summary.Critical++ 260 + case "High": 261 + summary.High++ 262 + case "Medium": 263 + summary.Medium++ 264 + case "Low": 265 + summary.Low++ 266 + } 267 + } 268 + } 269 + 270 + return summary 271 + } 272 + 273 + // initializeVulnDatabase downloads and initializes the vulnerability database on startup 274 + func (w *Worker) initializeVulnDatabase(ctx context.Context) error { 275 + slog.Info("Initializing vulnerability database", "path", w.config.Scanner.VulnDBPath) 276 + 277 + // Ensure database directory exists 278 + if err := ensureDir(w.config.Scanner.VulnDBPath); err != nil { 279 + return fmt.Errorf("failed to create vulnerability database directory: %w", err) 280 + } 281 + 282 + // Create temp directory for Grype downloads (scratch container has no /tmp) 283 + tmpDir := filepath.Join(w.config.Database.Path, "tmp") 284 + if err := ensureDir(tmpDir); err != nil { 285 + return fmt.Errorf("failed to create temp directory: %w", err) 286 + } 287 + 288 + // Set TMPDIR environment variable so Grype uses our temp directory 289 + oldTmpDir := os.Getenv("TMPDIR") 290 + os.Setenv("TMPDIR", tmpDir) 291 + defer func() { 292 + if oldTmpDir != "" { 293 + os.Setenv("TMPDIR", oldTmpDir) 294 + } else { 295 + os.Unsetenv("TMPDIR") 296 + } 297 + }() 298 + 299 + // Configure database distribution 300 + distConfig := distribution.DefaultConfig() 301 + 302 + // Configure database installation 303 + installConfig := installation.Config{ 304 + DBRootDir: w.config.Scanner.VulnDBPath, 305 + ValidateAge: true, 306 + ValidateChecksum: true, 307 + MaxAllowedBuiltAge: w.config.Scanner.VulnDBUpdateInterval, 308 + } 309 + 310 + // Create distribution client for downloading 311 + downloader, err := distribution.NewClient(distConfig) 312 + if err != nil { 313 + return fmt.Errorf("failed to create database downloader: %w", err) 314 + } 315 + 316 + // Create curator to manage database 317 + curator, err := installation.NewCurator(installConfig, downloader) 318 + if err != nil { 319 + return fmt.Errorf("failed to create database curator: %w", err) 320 + } 321 + 322 + // Check if database already exists 323 + status := curator.Status() 324 + if !status.Built.IsZero() && status.Error == nil { 325 + slog.Info("Vulnerability database already exists", "built", status.Built, "schema", status.SchemaVersion) 326 + return nil 327 + } 328 + 329 + // Download database (this may take several minutes) 330 + slog.Info("Downloading vulnerability database (this may take 5-10 minutes)...") 331 + updated, err := curator.Update() 332 + if err != nil { 333 + return fmt.Errorf("failed to download vulnerability database: %w", err) 334 + } 335 + 336 + if updated { 337 + slog.Info("Vulnerability database downloaded successfully") 338 + } else { 339 + slog.Info("Vulnerability database is up to date") 340 + } 341 + 342 + return nil 343 + } 344 + 345 + // ensureDir creates a directory if it doesn't exist 346 + func ensureDir(path string) error { 347 + if err := os.MkdirAll(path, 0755); err != nil { 348 + return fmt.Errorf("failed to create directory %s: %w", path, err) 349 + } 350 + return nil 351 + }
+67
pkg/hold/scanner/job.go
··· 1 + package scanner 2 + 3 + import ( 4 + "time" 5 + 6 + "atcr.io/pkg/atproto" 7 + ) 8 + 9 + // ScanJob represents a vulnerability scanning job for a container image 10 + type ScanJob struct { 11 + // ManifestDigest is the digest of the manifest to scan 12 + ManifestDigest string 13 + 14 + // Repository is the repository name (e.g., "alice/myapp") 15 + Repository string 16 + 17 + // Tag is the tag name (e.g., "latest") 18 + Tag string 19 + 20 + // UserDID is the DID of the user who owns this image 21 + UserDID string 22 + 23 + // UserHandle is the handle of the user (for display) 24 + UserHandle string 25 + 26 + // Config is the image config blob descriptor 27 + Config atproto.BlobReference 28 + 29 + // Layers are the image layer blob descriptors (in order) 30 + Layers []atproto.BlobReference 31 + 32 + // EnqueuedAt is when this job was enqueued 33 + EnqueuedAt time.Time 34 + } 35 + 36 + // ScanResult represents the result of a vulnerability scan 37 + type ScanResult struct { 38 + // Job is the original scan job 39 + Job *ScanJob 40 + 41 + // VulnerabilitiesJSON is the raw Grype JSON output 42 + VulnerabilitiesJSON []byte 43 + 44 + // Summary contains vulnerability counts by severity 45 + Summary VulnerabilitySummary 46 + 47 + // SBOMDigest is the digest of the SBOM blob (if SBOM was generated) 48 + SBOMDigest string 49 + 50 + // VulnDigest is the digest of the vulnerability report blob 51 + VulnDigest string 52 + 53 + // ScannedAt is when the scan completed 54 + ScannedAt time.Time 55 + 56 + // ScannerVersion is the version of the scanner used 57 + ScannerVersion string 58 + } 59 + 60 + // VulnerabilitySummary contains counts of vulnerabilities by severity 61 + type VulnerabilitySummary struct { 62 + Critical int `json:"critical"` 63 + High int `json:"high"` 64 + Medium int `json:"medium"` 65 + Low int `json:"low"` 66 + Total int `json:"total"` 67 + }
+226
pkg/hold/scanner/queue.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "sync" 8 + 9 + "atcr.io/pkg/atproto" 10 + ) 11 + 12 + // Queue manages a pool of workers for scanning container images 13 + type Queue struct { 14 + jobs chan *ScanJob 15 + results chan *ScanResult 16 + workers int 17 + wg sync.WaitGroup 18 + ctx context.Context 19 + cancel context.CancelFunc 20 + } 21 + 22 + // NewQueue creates a new scanner queue with the specified number of workers 23 + func NewQueue(workers int, bufferSize int) *Queue { 24 + ctx, cancel := context.WithCancel(context.Background()) 25 + 26 + return &Queue{ 27 + jobs: make(chan *ScanJob, bufferSize), 28 + results: make(chan *ScanResult, bufferSize), 29 + workers: workers, 30 + ctx: ctx, 31 + cancel: cancel, 32 + } 33 + } 34 + 35 + // Start starts the worker pool 36 + // The workerFunc is called for each job to perform the actual scanning 37 + func (q *Queue) Start(workerFunc func(context.Context, *ScanJob) (*ScanResult, error)) { 38 + slog.Info("Starting scanner worker pool", "workers", q.workers) 39 + 40 + for i := 0; i < q.workers; i++ { 41 + q.wg.Add(1) 42 + go q.worker(i, workerFunc) 43 + } 44 + 45 + // Start result handler goroutine 46 + q.wg.Add(1) 47 + go q.resultHandler() 48 + } 49 + 50 + // worker processes jobs from the queue 51 + func (q *Queue) worker(id int, workerFunc func(context.Context, *ScanJob) (*ScanResult, error)) { 52 + defer q.wg.Done() 53 + 54 + slog.Info("Scanner worker started", "worker_id", id) 55 + 56 + for { 57 + select { 58 + case <-q.ctx.Done(): 59 + slog.Info("Scanner worker shutting down", "worker_id", id) 60 + return 61 + 62 + case job, ok := <-q.jobs: 63 + if !ok { 64 + slog.Info("Scanner worker: jobs channel closed", "worker_id", id) 65 + return 66 + } 67 + 68 + slog.Info("Scanner worker processing job", 69 + "worker_id", id, 70 + "repository", job.Repository, 71 + "tag", job.Tag, 72 + "digest", job.ManifestDigest) 73 + 74 + result, err := workerFunc(q.ctx, job) 75 + if err != nil { 76 + slog.Error("Scanner worker failed to process job", 77 + "worker_id", id, 78 + "repository", job.Repository, 79 + "tag", job.Tag, 80 + "error", err) 81 + continue 82 + } 83 + 84 + // Send result to results channel 85 + select { 86 + case q.results <- result: 87 + slog.Info("Scanner worker completed job", 88 + "worker_id", id, 89 + "repository", job.Repository, 90 + "tag", job.Tag, 91 + "vulnerabilities", result.Summary.Total) 92 + case <-q.ctx.Done(): 93 + return 94 + } 95 + } 96 + } 97 + } 98 + 99 + // resultHandler processes scan results (for logging and metrics) 100 + func (q *Queue) resultHandler() { 101 + defer q.wg.Done() 102 + 103 + for { 104 + select { 105 + case <-q.ctx.Done(): 106 + return 107 + 108 + case result, ok := <-q.results: 109 + if !ok { 110 + return 111 + } 112 + 113 + // Log the result 114 + slog.Info("Scan completed", 115 + "repository", result.Job.Repository, 116 + "tag", result.Job.Tag, 117 + "digest", result.Job.ManifestDigest, 118 + "critical", result.Summary.Critical, 119 + "high", result.Summary.High, 120 + "medium", result.Summary.Medium, 121 + "low", result.Summary.Low, 122 + "total", result.Summary.Total, 123 + "scanner", result.ScannerVersion) 124 + } 125 + } 126 + } 127 + 128 + // Enqueue adds a job to the queue 129 + func (q *Queue) Enqueue(jobAny any) error { 130 + // Type assert to ScanJob (can be map or struct from HandleNotifyManifest) 131 + var job *ScanJob 132 + 133 + switch v := jobAny.(type) { 134 + case *ScanJob: 135 + job = v 136 + case map[string]interface{}: 137 + // Convert map to ScanJob (from HandleNotifyManifest) 138 + job = &ScanJob{ 139 + ManifestDigest: v["manifestDigest"].(string), 140 + Repository: v["repository"].(string), 141 + Tag: v["tag"].(string), 142 + UserDID: v["userDID"].(string), 143 + UserHandle: v["userHandle"].(string), 144 + } 145 + 146 + // Parse config blob reference 147 + if configMap, ok := v["config"].(map[string]interface{}); ok { 148 + job.Config = atproto.BlobReference{ 149 + Digest: configMap["digest"].(string), 150 + Size: convertToInt64(configMap["size"]), 151 + MediaType: configMap["mediaType"].(string), 152 + } 153 + } 154 + 155 + // Parse layers 156 + if layersSlice, ok := v["layers"].([]interface{}); ok { 157 + slog.Info("Parsing layers from scan job", 158 + "layersFound", len(layersSlice)) 159 + job.Layers = make([]atproto.BlobReference, len(layersSlice)) 160 + for i, layerAny := range layersSlice { 161 + if layerMap, ok := layerAny.(map[string]interface{}); ok { 162 + job.Layers[i] = atproto.BlobReference{ 163 + Digest: layerMap["digest"].(string), 164 + Size: convertToInt64(layerMap["size"]), 165 + MediaType: layerMap["mediaType"].(string), 166 + } 167 + } 168 + } 169 + } else { 170 + slog.Warn("No layers found in scan job map", 171 + "layersType", fmt.Sprintf("%T", v["layers"]), 172 + "layersValue", v["layers"]) 173 + } 174 + default: 175 + return fmt.Errorf("invalid job type: %T", jobAny) 176 + } 177 + 178 + select { 179 + case q.jobs <- job: 180 + slog.Info("Enqueued scan job", 181 + "repository", job.Repository, 182 + "tag", job.Tag, 183 + "digest", job.ManifestDigest) 184 + return nil 185 + case <-q.ctx.Done(): 186 + return q.ctx.Err() 187 + } 188 + } 189 + 190 + // Shutdown gracefully shuts down the queue, waiting for all workers to finish 191 + func (q *Queue) Shutdown() { 192 + slog.Info("Shutting down scanner queue") 193 + 194 + // Close the jobs channel to signal no more jobs 195 + close(q.jobs) 196 + 197 + // Wait for all workers to finish 198 + q.wg.Wait() 199 + 200 + // Close results channel 201 + close(q.results) 202 + 203 + // Cancel context 204 + q.cancel() 205 + 206 + slog.Info("Scanner queue shut down complete") 207 + } 208 + 209 + // Len returns the number of jobs currently in the queue 210 + func (q *Queue) Len() int { 211 + return len(q.jobs) 212 + } 213 + 214 + // convertToInt64 converts an interface{} number to int64, handling both float64 and int64 215 + func convertToInt64(v interface{}) int64 { 216 + switch n := v.(type) { 217 + case float64: 218 + return int64(n) 219 + case int64: 220 + return n 221 + case int: 222 + return int64(n) 223 + default: 224 + return 0 225 + } 226 + }
+123
pkg/hold/scanner/storage.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "time" 10 + 11 + "atcr.io/pkg/atproto" 12 + ) 13 + 14 + // storeResults uploads scan results and creates ORAS manifest records in the hold's PDS 15 + func (w *Worker) storeResults(ctx context.Context, job *ScanJob, sbomDigest, vulnDigest string, vulnJSON []byte, summary VulnerabilitySummary) error { 16 + if !w.config.Scanner.VulnEnabled { 17 + slog.Info("Vulnerability scanning disabled, skipping result storage") 18 + return nil 19 + } 20 + 21 + slog.Info("Storing scan results as ORAS artifact", 22 + "repository", job.Repository, 23 + "subjectDigest", job.ManifestDigest, 24 + "vulnDigest", vulnDigest) 25 + 26 + // Create ORAS manifest for vulnerability report 27 + orasManifest := map[string]interface{}{ 28 + "schemaVersion": 2, 29 + "mediaType": "application/vnd.oci.image.manifest.v1+json", 30 + "artifactType": "application/vnd.atcr.vulnerabilities+json", 31 + "config": map[string]interface{}{ 32 + "mediaType": "application/vnd.oci.empty.v1+json", 33 + "digest": "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", // Empty JSON object 34 + "size": 2, 35 + }, 36 + "subject": map[string]interface{}{ 37 + "mediaType": "application/vnd.oci.image.manifest.v1+json", 38 + "digest": job.ManifestDigest, 39 + "size": 0, // We don't have the size, but it's optional 40 + }, 41 + "layers": []map[string]interface{}{ 42 + { 43 + "mediaType": "application/json", 44 + "digest": vulnDigest, 45 + "size": len(vulnJSON), 46 + "annotations": map[string]string{ 47 + "org.opencontainers.image.title": "vulnerability-report.json", 48 + }, 49 + }, 50 + }, 51 + "annotations": map[string]string{ 52 + "io.atcr.vuln.critical": fmt.Sprintf("%d", summary.Critical), 53 + "io.atcr.vuln.high": fmt.Sprintf("%d", summary.High), 54 + "io.atcr.vuln.medium": fmt.Sprintf("%d", summary.Medium), 55 + "io.atcr.vuln.low": fmt.Sprintf("%d", summary.Low), 56 + "io.atcr.vuln.total": fmt.Sprintf("%d", summary.Total), 57 + "io.atcr.vuln.scannedAt": time.Now().Format(time.RFC3339), 58 + "io.atcr.vuln.scannerVersion": w.getScannerVersion(), 59 + }, 60 + } 61 + 62 + // Encode ORAS manifest to JSON 63 + orasManifestJSON, err := json.Marshal(orasManifest) 64 + if err != nil { 65 + return fmt.Errorf("failed to encode ORAS manifest: %w", err) 66 + } 67 + 68 + // Calculate ORAS manifest digest 69 + orasDigest := fmt.Sprintf("sha256:%x", sha256Bytes(orasManifestJSON)) 70 + 71 + // Upload ORAS manifest blob to storage 72 + if err := w.uploadBlob(ctx, orasDigest, orasManifestJSON); err != nil { 73 + return fmt.Errorf("failed to upload ORAS manifest blob: %w", err) 74 + } 75 + 76 + // Create manifest record in hold's PDS 77 + if err := w.createManifestRecord(ctx, job, orasDigest, orasManifestJSON, summary); err != nil { 78 + return fmt.Errorf("failed to create manifest record: %w", err) 79 + } 80 + 81 + slog.Info("Successfully stored scan results", "orasDigest", orasDigest) 82 + return nil 83 + } 84 + 85 + // createManifestRecord creates an ORAS manifest record in the hold's PDS 86 + func (w *Worker) createManifestRecord(ctx context.Context, job *ScanJob, orasDigest string, orasManifestJSON []byte, summary VulnerabilitySummary) error { 87 + // Create ManifestRecord from ORAS manifest 88 + record, err := atproto.NewManifestRecord(job.Repository, orasDigest, orasManifestJSON) 89 + if err != nil { 90 + return fmt.Errorf("failed to create manifest record: %w", err) 91 + } 92 + 93 + // Set SBOM/vulnerability specific fields 94 + record.OwnerDID = job.UserDID 95 + record.ScannedAt = time.Now().Format(time.RFC3339) 96 + record.ScannerVersion = w.getScannerVersion() 97 + 98 + // Add hold DID (this ORAS artifact is stored in the hold's PDS) 99 + record.HoldDID = w.pds.DID() 100 + 101 + // Convert digest to record key (remove "sha256:" prefix) 102 + rkey := orasDigest[len("sha256:"):] 103 + 104 + // Store record in hold's PDS 105 + slog.Info("Creating manifest record in hold's PDS", 106 + "collection", atproto.ManifestCollection, 107 + "rkey", rkey, 108 + "ownerDid", job.UserDID) 109 + 110 + _, _, err = w.pds.CreateManifestRecord(ctx, record, rkey) 111 + if err != nil { 112 + return fmt.Errorf("failed to put record in PDS: %w", err) 113 + } 114 + 115 + slog.Info("Manifest record created successfully", "uri", fmt.Sprintf("at://%s/%s/%s", w.pds.DID(), atproto.ManifestCollection, rkey)) 116 + return nil 117 + } 118 + 119 + // sha256Bytes calculates SHA256 hash of byte slice 120 + func sha256Bytes(data []byte) []byte { 121 + hash := sha256.Sum256(data) 122 + return hash[:] 123 + }
+128
pkg/hold/scanner/syft.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "fmt" 7 + "log/slog" 8 + "os" 9 + 10 + "github.com/anchore/syft/syft" 11 + "github.com/anchore/syft/syft/format" 12 + "github.com/anchore/syft/syft/format/spdxjson" 13 + "github.com/anchore/syft/syft/sbom" 14 + "github.com/anchore/syft/syft/source/directorysource" 15 + ) 16 + 17 + // generateSBOM generates an SBOM using Syft from an extracted image directory 18 + // Returns the SBOM object, SBOM JSON, its digest, and any error 19 + func (w *Worker) generateSBOM(ctx context.Context, imageDir string) (*sbom.SBOM, []byte, string, error) { 20 + slog.Info("Generating SBOM with Syft", "imageDir", imageDir) 21 + 22 + // Check if directory exists and is accessible 23 + entries, err := os.ReadDir(imageDir) 24 + if err != nil { 25 + return nil, nil, "", fmt.Errorf("failed to read image directory: %w", err) 26 + } 27 + slog.Info("Image directory contents", 28 + "path", imageDir, 29 + "entries", len(entries), 30 + "sampleFiles", func() []string { 31 + var samples []string 32 + for i, e := range entries { 33 + if i >= 20 { 34 + break 35 + } 36 + samples = append(samples, e.Name()) 37 + } 38 + return samples 39 + }()) 40 + 41 + // Create Syft source from directory 42 + src, err := directorysource.NewFromPath(imageDir) 43 + if err != nil { 44 + return nil, nil, "", fmt.Errorf("failed to create Syft source: %w", err) 45 + } 46 + defer src.Close() 47 + 48 + // Generate SBOM 49 + slog.Info("Running Syft cataloging") 50 + sbomResult, err := syft.CreateSBOM(ctx, src, nil) 51 + if err != nil { 52 + return nil, nil, "", fmt.Errorf("failed to generate SBOM: %w", err) 53 + } 54 + 55 + if sbomResult == nil { 56 + return nil, nil, "", fmt.Errorf("Syft returned nil SBOM") 57 + } 58 + 59 + slog.Info("SBOM generated", 60 + "packages", sbomResult.Artifacts.Packages.PackageCount(), 61 + "distro", func() string { 62 + if sbomResult.Artifacts.LinuxDistribution != nil { 63 + return fmt.Sprintf("%s %s", sbomResult.Artifacts.LinuxDistribution.Name, sbomResult.Artifacts.LinuxDistribution.Version) 64 + } 65 + return "none" 66 + }()) 67 + 68 + // Encode SBOM to SPDX JSON format 69 + encoder, err := spdxjson.NewFormatEncoderWithConfig(spdxjson.DefaultEncoderConfig()) 70 + if err != nil { 71 + return nil, nil, "", fmt.Errorf("failed to create SPDX encoder: %w", err) 72 + } 73 + 74 + sbomJSON, err := format.Encode(*sbomResult, encoder) 75 + if err != nil { 76 + return nil, nil, "", fmt.Errorf("failed to encode SBOM to SPDX JSON: %w", err) 77 + } 78 + 79 + // Calculate digest 80 + hash := sha256.Sum256(sbomJSON) 81 + digest := fmt.Sprintf("sha256:%x", hash) 82 + 83 + slog.Info("SBOM encoded", "format", "spdx-json", "size", len(sbomJSON), "digest", digest) 84 + 85 + // Upload SBOM blob to storage 86 + if err := w.uploadBlob(ctx, digest, sbomJSON); err != nil { 87 + return nil, nil, "", fmt.Errorf("failed to upload SBOM blob: %w", err) 88 + } 89 + 90 + return sbomResult, sbomJSON, digest, nil 91 + } 92 + 93 + // uploadBlob uploads a blob to storage 94 + func (w *Worker) uploadBlob(ctx context.Context, digest string, data []byte) error { 95 + // Convert digest to storage path (same format as distribution uses) 96 + // Path format: /docker/registry/v2/blobs/sha256/ab/abcd1234.../data 97 + algorithm := "sha256" 98 + digestHex := digest[len("sha256:"):] 99 + if len(digestHex) < 2 { 100 + return fmt.Errorf("invalid digest: %s", digest) 101 + } 102 + 103 + blobPath := fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", 104 + algorithm, 105 + digestHex[:2], 106 + digestHex) 107 + 108 + slog.Info("Uploading blob to storage", "digest", digest, "size", len(data), "path", blobPath) 109 + 110 + // Write blob to storage 111 + writer, err := w.driver.Writer(ctx, blobPath, false) 112 + if err != nil { 113 + return fmt.Errorf("failed to create storage writer: %w", err) 114 + } 115 + defer writer.Close() 116 + 117 + if _, err := writer.Write(data); err != nil { 118 + writer.Cancel(ctx) 119 + return fmt.Errorf("failed to write blob data: %w", err) 120 + } 121 + 122 + if err := writer.Commit(ctx); err != nil { 123 + return fmt.Errorf("failed to commit blob: %w", err) 124 + } 125 + 126 + slog.Info("Successfully uploaded blob", "digest", digest) 127 + return nil 128 + }
+116
pkg/hold/scanner/worker.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "time" 8 + 9 + "atcr.io/pkg/hold" 10 + "atcr.io/pkg/hold/pds" 11 + "github.com/distribution/distribution/v3/registry/storage/driver" 12 + ) 13 + 14 + // Worker performs vulnerability scanning on container images 15 + type Worker struct { 16 + config *hold.Config 17 + driver driver.StorageDriver 18 + pds *pds.HoldPDS 19 + queue *Queue 20 + } 21 + 22 + // NewWorker creates a new scanner worker 23 + func NewWorker(config *hold.Config, driver driver.StorageDriver, pds *pds.HoldPDS) *Worker { 24 + return &Worker{ 25 + config: config, 26 + driver: driver, 27 + pds: pds, 28 + } 29 + } 30 + 31 + // Start starts the worker pool and initializes vulnerability database 32 + func (w *Worker) Start(queue *Queue) { 33 + w.queue = queue 34 + 35 + // Initialize vulnerability database on startup if scanning is enabled 36 + if w.config.Scanner.VulnEnabled { 37 + go func() { 38 + ctx := context.Background() 39 + if err := w.initializeVulnDatabase(ctx); err != nil { 40 + slog.Error("Failed to initialize vulnerability database", "error", err) 41 + slog.Warn("Vulnerability scanning will be disabled until database is available") 42 + } 43 + }() 44 + } 45 + 46 + queue.Start(w.processJob) 47 + } 48 + 49 + // processJob processes a single scan job 50 + func (w *Worker) processJob(ctx context.Context, job *ScanJob) (*ScanResult, error) { 51 + slog.Info("Processing scan job", 52 + "repository", job.Repository, 53 + "tag", job.Tag, 54 + "digest", job.ManifestDigest, 55 + "layers", len(job.Layers)) 56 + 57 + startTime := time.Now() 58 + 59 + // Step 1: Extract image layers from storage 60 + slog.Info("Extracting image layers", "repository", job.Repository) 61 + imageDir, cleanup, err := w.extractLayers(ctx, job) 62 + if err != nil { 63 + return nil, fmt.Errorf("failed to extract layers: %w", err) 64 + } 65 + defer cleanup() 66 + 67 + // Step 2: Generate SBOM with Syft 68 + slog.Info("Generating SBOM", "repository", job.Repository) 69 + sbomResult, _, sbomDigest, err := w.generateSBOM(ctx, imageDir) 70 + if err != nil { 71 + return nil, fmt.Errorf("failed to generate SBOM: %w", err) 72 + } 73 + 74 + // Step 3: Scan SBOM with Grype (if enabled) 75 + var vulnJSON []byte 76 + var vulnDigest string 77 + var summary VulnerabilitySummary 78 + 79 + if w.config.Scanner.VulnEnabled { 80 + slog.Info("Scanning for vulnerabilities", "repository", job.Repository) 81 + vulnJSON, vulnDigest, summary, err = w.scanVulnerabilities(ctx, sbomResult) 82 + if err != nil { 83 + return nil, fmt.Errorf("failed to scan vulnerabilities: %w", err) 84 + } 85 + } 86 + 87 + // Step 4: Upload results to storage and create ORAS manifests 88 + slog.Info("Storing scan results", "repository", job.Repository) 89 + err = w.storeResults(ctx, job, sbomDigest, vulnDigest, vulnJSON, summary) 90 + if err != nil { 91 + return nil, fmt.Errorf("failed to store results: %w", err) 92 + } 93 + 94 + duration := time.Since(startTime) 95 + slog.Info("Scan job completed", 96 + "repository", job.Repository, 97 + "tag", job.Tag, 98 + "duration", duration, 99 + "vulnerabilities", summary.Total) 100 + 101 + return &ScanResult{ 102 + Job: job, 103 + VulnerabilitiesJSON: vulnJSON, 104 + Summary: summary, 105 + SBOMDigest: sbomDigest, 106 + VulnDigest: vulnDigest, 107 + ScannedAt: time.Now(), 108 + ScannerVersion: w.getScannerVersion(), 109 + }, nil 110 + } 111 + 112 + // getScannerVersion returns the version string for the scanner 113 + func (w *Worker) getScannerVersion() string { 114 + // TODO: Get actual Syft and Grype versions dynamically 115 + return "syft-v1.36.0/grype-v0.102.0" 116 + }