A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

actually add scanner implementation

+1281
+270
pkg/hold/scanner/extractor.go
··· 1 + package scanner 2 + 3 + import ( 4 + "archive/tar" 5 + "compress/gzip" 6 + "context" 7 + "encoding/json" 8 + "fmt" 9 + "io" 10 + "log/slog" 11 + "os" 12 + "path/filepath" 13 + "strings" 14 + ) 15 + 16 + // extractLayers extracts all image layers from storage to a temporary directory 17 + // Returns the directory path and a cleanup function 18 + func (w *Worker) extractLayers(ctx context.Context, job *ScanJob) (string, func(), error) { 19 + // Create temp directory for extraction 20 + // Use the database directory as the base (since we're in a scratch container with no /tmp) 21 + scanTmpBase := filepath.Join(w.config.Database.Path, "scanner-tmp") 22 + if err := os.MkdirAll(scanTmpBase, 0755); err != nil { 23 + return "", nil, fmt.Errorf("failed to create scanner temp base: %w", err) 24 + } 25 + 26 + tmpDir, err := os.MkdirTemp(scanTmpBase, "scan-*") 27 + if err != nil { 28 + return "", nil, fmt.Errorf("failed to create temp directory: %w", err) 29 + } 30 + 31 + cleanup := func() { 32 + if err := os.RemoveAll(tmpDir); err != nil { 33 + slog.Warn("Failed to clean up temp directory", "dir", tmpDir, "error", err) 34 + } 35 + } 36 + 37 + // Create image directory structure 38 + imageDir := filepath.Join(tmpDir, "image") 39 + if err := os.MkdirAll(imageDir, 0755); err != nil { 40 + cleanup() 41 + return "", nil, fmt.Errorf("failed to create image directory: %w", err) 42 + } 43 + 44 + // Download and extract config blob 45 + slog.Info("Downloading config blob", "digest", job.Config.Digest) 46 + configPath := filepath.Join(imageDir, "config.json") 47 + if err := w.downloadBlob(ctx, job.Config.Digest, configPath); err != nil { 48 + cleanup() 49 + return "", nil, fmt.Errorf("failed to download config blob: %w", err) 50 + } 51 + 52 + // Validate config is valid JSON 53 + configData, err := os.ReadFile(configPath) 54 + if err != nil { 55 + cleanup() 56 + return "", nil, fmt.Errorf("failed to read config: %w", err) 57 + } 58 + var configObj map[string]interface{} 59 + if err := json.Unmarshal(configData, &configObj); err != nil { 60 + cleanup() 61 + return "", nil, fmt.Errorf("invalid config JSON: %w", err) 62 + } 63 + 64 + // Create layers directory for extracted content 65 + layersDir := filepath.Join(imageDir, "layers") 66 + if err := os.MkdirAll(layersDir, 0755); err != nil { 67 + cleanup() 68 + return "", nil, fmt.Errorf("failed to create layers directory: %w", err) 69 + } 70 + 71 + // Download and extract each layer in order (creating overlayfs-style filesystem) 72 + rootfsDir := filepath.Join(imageDir, "rootfs") 73 + if err := os.MkdirAll(rootfsDir, 0755); err != nil { 74 + cleanup() 75 + return "", nil, fmt.Errorf("failed to create rootfs directory: %w", err) 76 + } 77 + 78 + for i, layer := range job.Layers { 79 + slog.Info("Extracting layer", "index", i, "digest", layer.Digest, "size", layer.Size) 80 + 81 + // Download layer blob to temp file 82 + layerPath := filepath.Join(layersDir, fmt.Sprintf("layer-%d.tar.gz", i)) 83 + if err := w.downloadBlob(ctx, layer.Digest, layerPath); err != nil { 84 + cleanup() 85 + return "", nil, fmt.Errorf("failed to download layer %d: %w", i, err) 86 + } 87 + 88 + // Extract layer on top of rootfs (overlayfs style) 89 + if err := w.extractTarGz(layerPath, rootfsDir); err != nil { 90 + cleanup() 91 + return "", nil, fmt.Errorf("failed to extract layer %d: %w", i, err) 92 + } 93 + 94 + // Remove layer tar.gz to save space 95 + os.Remove(layerPath) 96 + } 97 + 98 + // Check what was extracted 99 + entries, err := os.ReadDir(rootfsDir) 100 + if err != nil { 101 + slog.Warn("Failed to read rootfs directory", "error", err) 102 + } else { 103 + slog.Info("Successfully extracted image", 104 + "layers", len(job.Layers), 105 + "rootfs", rootfsDir, 106 + "topLevelEntries", len(entries), 107 + "sampleEntries", func() []string { 108 + var samples []string 109 + for i, e := range entries { 110 + if i >= 10 { 111 + break 112 + } 113 + samples = append(samples, e.Name()) 114 + } 115 + return samples 116 + }()) 117 + } 118 + 119 + return rootfsDir, cleanup, nil 120 + } 121 + 122 + // downloadBlob downloads a blob from storage to a local file 123 + func (w *Worker) downloadBlob(ctx context.Context, digest, destPath string) error { 124 + // Convert digest to storage path using distribution's sharding scheme 125 + // Format: /docker/registry/v2/blobs/sha256/47/4734bc89.../data 126 + // where 47 is the first 2 characters of the hash for directory sharding 127 + blobPath := blobPathForDigest(digest) 128 + 129 + // Open blob from storage driver 130 + reader, err := w.driver.Reader(ctx, blobPath, 0) 131 + if err != nil { 132 + return fmt.Errorf("failed to open blob %s: %w", digest, err) 133 + } 134 + defer reader.Close() 135 + 136 + // Create destination file 137 + dest, err := os.Create(destPath) 138 + if err != nil { 139 + return fmt.Errorf("failed to create destination file: %w", err) 140 + } 141 + defer dest.Close() 142 + 143 + // Copy blob data to file 144 + if _, err := io.Copy(dest, reader); err != nil { 145 + return fmt.Errorf("failed to copy blob data: %w", err) 146 + } 147 + 148 + return nil 149 + } 150 + 151 + // extractTarGz extracts a tar.gz file to a destination directory (overlayfs style) 152 + func (w *Worker) extractTarGz(tarGzPath, destDir string) error { 153 + // Open tar.gz file 154 + file, err := os.Open(tarGzPath) 155 + if err != nil { 156 + return fmt.Errorf("failed to open tar.gz: %w", err) 157 + } 158 + defer file.Close() 159 + 160 + // Create gzip reader 161 + gzr, err := gzip.NewReader(file) 162 + if err != nil { 163 + return fmt.Errorf("failed to create gzip reader: %w", err) 164 + } 165 + defer gzr.Close() 166 + 167 + // Create tar reader 168 + tr := tar.NewReader(gzr) 169 + 170 + // Extract each file 171 + for { 172 + header, err := tr.Next() 173 + if err == io.EOF { 174 + break 175 + } 176 + if err != nil { 177 + return fmt.Errorf("failed to read tar header: %w", err) 178 + } 179 + 180 + // Build target path (clean to prevent path traversal) 181 + target := filepath.Join(destDir, filepath.Clean(header.Name)) 182 + 183 + // Ensure target is within destDir (security check) 184 + if !strings.HasPrefix(target, filepath.Clean(destDir)+string(os.PathSeparator)) { 185 + slog.Warn("Skipping path outside destination", "path", header.Name) 186 + continue 187 + } 188 + 189 + switch header.Typeflag { 190 + case tar.TypeDir: 191 + // Create directory 192 + if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil { 193 + return fmt.Errorf("failed to create directory %s: %w", target, err) 194 + } 195 + 196 + case tar.TypeReg: 197 + // Create parent directory 198 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 199 + return fmt.Errorf("failed to create parent directory: %w", err) 200 + } 201 + 202 + // Create file 203 + outFile, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode)) 204 + if err != nil { 205 + return fmt.Errorf("failed to create file %s: %w", target, err) 206 + } 207 + 208 + // Copy file contents 209 + if _, err := io.Copy(outFile, tr); err != nil { 210 + outFile.Close() 211 + return fmt.Errorf("failed to write file %s: %w", target, err) 212 + } 213 + outFile.Close() 214 + 215 + case tar.TypeSymlink: 216 + // Create symlink 217 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 218 + return fmt.Errorf("failed to create parent directory for symlink: %w", err) 219 + } 220 + 221 + // Remove existing file/symlink if it exists 222 + os.Remove(target) 223 + 224 + if err := os.Symlink(header.Linkname, target); err != nil { 225 + slog.Warn("Failed to create symlink", "target", target, "link", header.Linkname, "error", err) 226 + } 227 + 228 + case tar.TypeLink: 229 + // Create hard link 230 + linkTarget := filepath.Join(destDir, filepath.Clean(header.Linkname)) 231 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 232 + return fmt.Errorf("failed to create parent directory for hardlink: %w", err) 233 + } 234 + 235 + // Remove existing file if it exists 236 + os.Remove(target) 237 + 238 + if err := os.Link(linkTarget, target); err != nil { 239 + slog.Warn("Failed to create hardlink", "target", target, "link", linkTarget, "error", err) 240 + } 241 + 242 + default: 243 + slog.Debug("Skipping unsupported tar entry type", "type", header.Typeflag, "name", header.Name) 244 + } 245 + } 246 + 247 + return nil 248 + } 249 + 250 + // blobPathForDigest converts a digest to a storage path using distribution's sharding scheme 251 + // Format: /docker/registry/v2/blobs/sha256/47/4734bc89.../data 252 + // where 47 is the first 2 characters of the hash for directory sharding 253 + func blobPathForDigest(digest string) string { 254 + // Split digest into algorithm and hash 255 + parts := strings.SplitN(digest, ":", 2) 256 + if len(parts) != 2 { 257 + // Fallback for malformed digest 258 + return fmt.Sprintf("/docker/registry/v2/blobs/%s/data", digest) 259 + } 260 + 261 + algorithm := parts[0] 262 + hash := parts[1] 263 + 264 + // Use first 2 characters for sharding 265 + if len(hash) < 2 { 266 + return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/data", algorithm, hash) 267 + } 268 + 269 + return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", algorithm, hash[:2], hash) 270 + }
+351
pkg/hold/scanner/grype.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "os" 10 + "path/filepath" 11 + "sync" 12 + 13 + "github.com/anchore/grype/grype" 14 + "github.com/anchore/grype/grype/db/v6/distribution" 15 + "github.com/anchore/grype/grype/db/v6/installation" 16 + "github.com/anchore/grype/grype/distro" 17 + "github.com/anchore/grype/grype/match" 18 + "github.com/anchore/grype/grype/matcher" 19 + "github.com/anchore/grype/grype/matcher/dotnet" 20 + "github.com/anchore/grype/grype/matcher/golang" 21 + "github.com/anchore/grype/grype/matcher/java" 22 + "github.com/anchore/grype/grype/matcher/javascript" 23 + "github.com/anchore/grype/grype/matcher/python" 24 + "github.com/anchore/grype/grype/matcher/ruby" 25 + "github.com/anchore/grype/grype/matcher/stock" 26 + grypePkg "github.com/anchore/grype/grype/pkg" 27 + "github.com/anchore/grype/grype/vulnerability" 28 + "github.com/anchore/syft/syft/sbom" 29 + ) 30 + 31 + // Global vulnerability database (shared across workers) 32 + var ( 33 + vulnDB vulnerability.Provider 34 + vulnDBLock sync.RWMutex 35 + ) 36 + 37 + // scanVulnerabilities scans an SBOM for vulnerabilities using Grype 38 + // Returns vulnerability report JSON, digest, summary, and any error 39 + func (w *Worker) scanVulnerabilities(ctx context.Context, s *sbom.SBOM) ([]byte, string, VulnerabilitySummary, error) { 40 + slog.Info("Scanning for vulnerabilities with Grype") 41 + 42 + // Load vulnerability database (cached globally) 43 + store, err := w.loadVulnDatabase(ctx) 44 + if err != nil { 45 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to load vulnerability database: %w", err) 46 + } 47 + 48 + // Create package context from SBOM (need distro for synthesis) 49 + var grypeDistro *distro.Distro 50 + if s.Artifacts.LinuxDistribution != nil { 51 + grypeDistro = distro.FromRelease(s.Artifacts.LinuxDistribution, nil) 52 + if grypeDistro != nil { 53 + slog.Info("Using distro for package synthesis", 54 + "name", grypeDistro.Name(), 55 + "version", grypeDistro.Version, 56 + "type", grypeDistro.Type, 57 + "codename", grypeDistro.Codename) 58 + } 59 + } 60 + 61 + // Convert Syft packages to Grype packages WITH distro info 62 + synthesisConfig := grypePkg.SynthesisConfig{ 63 + GenerateMissingCPEs: true, 64 + Distro: grypePkg.DistroConfig{ 65 + Override: grypeDistro, 66 + }, 67 + } 68 + grypePackages := grypePkg.FromCollection(s.Artifacts.Packages, synthesisConfig) 69 + 70 + slog.Info("Converted packages for vulnerability scanning", 71 + "syftPackages", s.Artifacts.Packages.PackageCount(), 72 + "grypePackages", len(grypePackages), 73 + "distro", func() string { 74 + if s.Artifacts.LinuxDistribution != nil { 75 + return fmt.Sprintf("%s %s", s.Artifacts.LinuxDistribution.Name, s.Artifacts.LinuxDistribution.Version) 76 + } 77 + return "none" 78 + }()) 79 + 80 + // Create matchers 81 + matchers := matcher.NewDefaultMatchers(matcher.Config{ 82 + Java: java.MatcherConfig{}, 83 + Ruby: ruby.MatcherConfig{}, 84 + Python: python.MatcherConfig{}, 85 + Dotnet: dotnet.MatcherConfig{}, 86 + Javascript: javascript.MatcherConfig{}, 87 + Golang: golang.MatcherConfig{}, 88 + Stock: stock.MatcherConfig{}, 89 + }) 90 + 91 + // Create package context with the same distro we used for synthesis 92 + pkgContext := grypePkg.Context{ 93 + Source: &s.Source, 94 + Distro: grypeDistro, 95 + } 96 + 97 + // Create vulnerability matcher 98 + vulnerabilityMatcher := &grype.VulnerabilityMatcher{ 99 + VulnerabilityProvider: store, 100 + Matchers: matchers, 101 + NormalizeByCVE: true, 102 + } 103 + 104 + // Find vulnerabilities 105 + slog.Info("Matching vulnerabilities", 106 + "packages", len(grypePackages), 107 + "distro", func() string { 108 + if grypeDistro != nil { 109 + return fmt.Sprintf("%s %s", grypeDistro.Name(), grypeDistro.Version) 110 + } 111 + return "none" 112 + }()) 113 + allMatches, _, err := vulnerabilityMatcher.FindMatches(grypePackages, pkgContext) 114 + if err != nil { 115 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to find vulnerabilities: %w", err) 116 + } 117 + 118 + slog.Info("Vulnerability matching complete", 119 + "totalMatches", allMatches.Count()) 120 + 121 + // If we found 0 matches, log some diagnostic info 122 + if allMatches.Count() == 0 { 123 + slog.Warn("No vulnerability matches found - this may indicate an issue", 124 + "distro", func() string { 125 + if grypeDistro != nil { 126 + return fmt.Sprintf("%s %s", grypeDistro.Name(), grypeDistro.Version) 127 + } 128 + return "none" 129 + }(), 130 + "packages", len(grypePackages), 131 + "databaseBuilt", func() string { 132 + vulnDBLock.RLock() 133 + defer vulnDBLock.RUnlock() 134 + if vulnDB == nil { 135 + return "not loaded" 136 + } 137 + // We can't easily get the build date here without exposing internal state 138 + return "loaded" 139 + }()) 140 + } 141 + 142 + // Count vulnerabilities by severity 143 + summary := w.countVulnerabilitiesBySeverity(*allMatches) 144 + 145 + slog.Info("Vulnerability scan complete", 146 + "critical", summary.Critical, 147 + "high", summary.High, 148 + "medium", summary.Medium, 149 + "low", summary.Low, 150 + "total", summary.Total) 151 + 152 + // Create vulnerability report JSON 153 + report := map[string]interface{}{ 154 + "matches": allMatches.Sorted(), 155 + "source": s.Source, 156 + "distro": s.Artifacts.LinuxDistribution, 157 + "descriptor": map[string]interface{}{ 158 + "name": "grype", 159 + "version": "v0.102.0", // TODO: Get actual Grype version 160 + }, 161 + "summary": summary, 162 + } 163 + 164 + // Encode report to JSON 165 + reportJSON, err := json.MarshalIndent(report, "", " ") 166 + if err != nil { 167 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to encode vulnerability report: %w", err) 168 + } 169 + 170 + // Calculate digest 171 + hash := sha256.Sum256(reportJSON) 172 + digest := fmt.Sprintf("sha256:%x", hash) 173 + 174 + slog.Info("Vulnerability report generated", "size", len(reportJSON), "digest", digest) 175 + 176 + // Upload report blob to storage 177 + if err := w.uploadBlob(ctx, digest, reportJSON); err != nil { 178 + return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to upload vulnerability report: %w", err) 179 + } 180 + 181 + return reportJSON, digest, summary, nil 182 + } 183 + 184 + // loadVulnDatabase loads the Grype vulnerability database (with caching) 185 + func (w *Worker) loadVulnDatabase(ctx context.Context) (vulnerability.Provider, error) { 186 + // Check if database is already loaded 187 + vulnDBLock.RLock() 188 + if vulnDB != nil { 189 + vulnDBLock.RUnlock() 190 + return vulnDB, nil 191 + } 192 + vulnDBLock.RUnlock() 193 + 194 + // Acquire write lock to load database 195 + vulnDBLock.Lock() 196 + defer vulnDBLock.Unlock() 197 + 198 + // Check again (another goroutine might have loaded it) 199 + if vulnDB != nil { 200 + return vulnDB, nil 201 + } 202 + 203 + slog.Info("Loading Grype vulnerability database", "path", w.config.Scanner.VulnDBPath) 204 + 205 + // Ensure database directory exists 206 + if err := ensureDir(w.config.Scanner.VulnDBPath); err != nil { 207 + return nil, fmt.Errorf("failed to create vulnerability database directory: %w", err) 208 + } 209 + 210 + // Configure database distribution 211 + distConfig := distribution.DefaultConfig() 212 + 213 + // Configure database installation 214 + installConfig := installation.Config{ 215 + DBRootDir: w.config.Scanner.VulnDBPath, 216 + ValidateAge: true, 217 + ValidateChecksum: true, 218 + MaxAllowedBuiltAge: w.config.Scanner.VulnDBUpdateInterval, 219 + } 220 + 221 + // Load database (should already be downloaded by initializeVulnDatabase) 222 + store, status, err := grype.LoadVulnerabilityDB(distConfig, installConfig, false) 223 + if err != nil { 224 + return nil, fmt.Errorf("failed to load vulnerability database (status=%v): %w (hint: database may still be downloading)", status, err) 225 + } 226 + 227 + slog.Info("Vulnerability database loaded", 228 + "status", status, 229 + "built", status.Built, 230 + "location", status.Path, 231 + "schemaVersion", status.SchemaVersion) 232 + 233 + // Check database file size to verify it has content 234 + if stat, err := os.Stat(status.Path); err == nil { 235 + slog.Info("Vulnerability database file stats", 236 + "size", stat.Size(), 237 + "sizeMB", stat.Size()/1024/1024) 238 + } 239 + 240 + // Cache database globally 241 + vulnDB = store 242 + 243 + slog.Info("Vulnerability database loaded successfully") 244 + return vulnDB, nil 245 + } 246 + 247 + // countVulnerabilitiesBySeverity counts vulnerabilities by severity level 248 + func (w *Worker) countVulnerabilitiesBySeverity(matches match.Matches) VulnerabilitySummary { 249 + summary := VulnerabilitySummary{} 250 + 251 + for m := range matches.Enumerate() { 252 + summary.Total++ 253 + 254 + // Get severity from vulnerability metadata 255 + if m.Vulnerability.Metadata != nil { 256 + severity := m.Vulnerability.Metadata.Severity 257 + switch severity { 258 + case "Critical": 259 + summary.Critical++ 260 + case "High": 261 + summary.High++ 262 + case "Medium": 263 + summary.Medium++ 264 + case "Low": 265 + summary.Low++ 266 + } 267 + } 268 + } 269 + 270 + return summary 271 + } 272 + 273 + // initializeVulnDatabase downloads and initializes the vulnerability database on startup 274 + func (w *Worker) initializeVulnDatabase(ctx context.Context) error { 275 + slog.Info("Initializing vulnerability database", "path", w.config.Scanner.VulnDBPath) 276 + 277 + // Ensure database directory exists 278 + if err := ensureDir(w.config.Scanner.VulnDBPath); err != nil { 279 + return fmt.Errorf("failed to create vulnerability database directory: %w", err) 280 + } 281 + 282 + // Create temp directory for Grype downloads (scratch container has no /tmp) 283 + tmpDir := filepath.Join(w.config.Database.Path, "tmp") 284 + if err := ensureDir(tmpDir); err != nil { 285 + return fmt.Errorf("failed to create temp directory: %w", err) 286 + } 287 + 288 + // Set TMPDIR environment variable so Grype uses our temp directory 289 + oldTmpDir := os.Getenv("TMPDIR") 290 + os.Setenv("TMPDIR", tmpDir) 291 + defer func() { 292 + if oldTmpDir != "" { 293 + os.Setenv("TMPDIR", oldTmpDir) 294 + } else { 295 + os.Unsetenv("TMPDIR") 296 + } 297 + }() 298 + 299 + // Configure database distribution 300 + distConfig := distribution.DefaultConfig() 301 + 302 + // Configure database installation 303 + installConfig := installation.Config{ 304 + DBRootDir: w.config.Scanner.VulnDBPath, 305 + ValidateAge: true, 306 + ValidateChecksum: true, 307 + MaxAllowedBuiltAge: w.config.Scanner.VulnDBUpdateInterval, 308 + } 309 + 310 + // Create distribution client for downloading 311 + downloader, err := distribution.NewClient(distConfig) 312 + if err != nil { 313 + return fmt.Errorf("failed to create database downloader: %w", err) 314 + } 315 + 316 + // Create curator to manage database 317 + curator, err := installation.NewCurator(installConfig, downloader) 318 + if err != nil { 319 + return fmt.Errorf("failed to create database curator: %w", err) 320 + } 321 + 322 + // Check if database already exists 323 + status := curator.Status() 324 + if !status.Built.IsZero() && status.Error == nil { 325 + slog.Info("Vulnerability database already exists", "built", status.Built, "schema", status.SchemaVersion) 326 + return nil 327 + } 328 + 329 + // Download database (this may take several minutes) 330 + slog.Info("Downloading vulnerability database (this may take 5-10 minutes)...") 331 + updated, err := curator.Update() 332 + if err != nil { 333 + return fmt.Errorf("failed to download vulnerability database: %w", err) 334 + } 335 + 336 + if updated { 337 + slog.Info("Vulnerability database downloaded successfully") 338 + } else { 339 + slog.Info("Vulnerability database is up to date") 340 + } 341 + 342 + return nil 343 + } 344 + 345 + // ensureDir creates a directory if it doesn't exist 346 + func ensureDir(path string) error { 347 + if err := os.MkdirAll(path, 0755); err != nil { 348 + return fmt.Errorf("failed to create directory %s: %w", path, err) 349 + } 350 + return nil 351 + }
+67
pkg/hold/scanner/job.go
··· 1 + package scanner 2 + 3 + import ( 4 + "time" 5 + 6 + "atcr.io/pkg/atproto" 7 + ) 8 + 9 + // ScanJob represents a vulnerability scanning job for a container image 10 + type ScanJob struct { 11 + // ManifestDigest is the digest of the manifest to scan 12 + ManifestDigest string 13 + 14 + // Repository is the repository name (e.g., "alice/myapp") 15 + Repository string 16 + 17 + // Tag is the tag name (e.g., "latest") 18 + Tag string 19 + 20 + // UserDID is the DID of the user who owns this image 21 + UserDID string 22 + 23 + // UserHandle is the handle of the user (for display) 24 + UserHandle string 25 + 26 + // Config is the image config blob descriptor 27 + Config atproto.BlobReference 28 + 29 + // Layers are the image layer blob descriptors (in order) 30 + Layers []atproto.BlobReference 31 + 32 + // EnqueuedAt is when this job was enqueued 33 + EnqueuedAt time.Time 34 + } 35 + 36 + // ScanResult represents the result of a vulnerability scan 37 + type ScanResult struct { 38 + // Job is the original scan job 39 + Job *ScanJob 40 + 41 + // VulnerabilitiesJSON is the raw Grype JSON output 42 + VulnerabilitiesJSON []byte 43 + 44 + // Summary contains vulnerability counts by severity 45 + Summary VulnerabilitySummary 46 + 47 + // SBOMDigest is the digest of the SBOM blob (if SBOM was generated) 48 + SBOMDigest string 49 + 50 + // VulnDigest is the digest of the vulnerability report blob 51 + VulnDigest string 52 + 53 + // ScannedAt is when the scan completed 54 + ScannedAt time.Time 55 + 56 + // ScannerVersion is the version of the scanner used 57 + ScannerVersion string 58 + } 59 + 60 + // VulnerabilitySummary contains counts of vulnerabilities by severity 61 + type VulnerabilitySummary struct { 62 + Critical int `json:"critical"` 63 + High int `json:"high"` 64 + Medium int `json:"medium"` 65 + Low int `json:"low"` 66 + Total int `json:"total"` 67 + }
+226
pkg/hold/scanner/queue.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "sync" 8 + 9 + "atcr.io/pkg/atproto" 10 + ) 11 + 12 + // Queue manages a pool of workers for scanning container images 13 + type Queue struct { 14 + jobs chan *ScanJob 15 + results chan *ScanResult 16 + workers int 17 + wg sync.WaitGroup 18 + ctx context.Context 19 + cancel context.CancelFunc 20 + } 21 + 22 + // NewQueue creates a new scanner queue with the specified number of workers 23 + func NewQueue(workers int, bufferSize int) *Queue { 24 + ctx, cancel := context.WithCancel(context.Background()) 25 + 26 + return &Queue{ 27 + jobs: make(chan *ScanJob, bufferSize), 28 + results: make(chan *ScanResult, bufferSize), 29 + workers: workers, 30 + ctx: ctx, 31 + cancel: cancel, 32 + } 33 + } 34 + 35 + // Start starts the worker pool 36 + // The workerFunc is called for each job to perform the actual scanning 37 + func (q *Queue) Start(workerFunc func(context.Context, *ScanJob) (*ScanResult, error)) { 38 + slog.Info("Starting scanner worker pool", "workers", q.workers) 39 + 40 + for i := 0; i < q.workers; i++ { 41 + q.wg.Add(1) 42 + go q.worker(i, workerFunc) 43 + } 44 + 45 + // Start result handler goroutine 46 + q.wg.Add(1) 47 + go q.resultHandler() 48 + } 49 + 50 + // worker processes jobs from the queue 51 + func (q *Queue) worker(id int, workerFunc func(context.Context, *ScanJob) (*ScanResult, error)) { 52 + defer q.wg.Done() 53 + 54 + slog.Info("Scanner worker started", "worker_id", id) 55 + 56 + for { 57 + select { 58 + case <-q.ctx.Done(): 59 + slog.Info("Scanner worker shutting down", "worker_id", id) 60 + return 61 + 62 + case job, ok := <-q.jobs: 63 + if !ok { 64 + slog.Info("Scanner worker: jobs channel closed", "worker_id", id) 65 + return 66 + } 67 + 68 + slog.Info("Scanner worker processing job", 69 + "worker_id", id, 70 + "repository", job.Repository, 71 + "tag", job.Tag, 72 + "digest", job.ManifestDigest) 73 + 74 + result, err := workerFunc(q.ctx, job) 75 + if err != nil { 76 + slog.Error("Scanner worker failed to process job", 77 + "worker_id", id, 78 + "repository", job.Repository, 79 + "tag", job.Tag, 80 + "error", err) 81 + continue 82 + } 83 + 84 + // Send result to results channel 85 + select { 86 + case q.results <- result: 87 + slog.Info("Scanner worker completed job", 88 + "worker_id", id, 89 + "repository", job.Repository, 90 + "tag", job.Tag, 91 + "vulnerabilities", result.Summary.Total) 92 + case <-q.ctx.Done(): 93 + return 94 + } 95 + } 96 + } 97 + } 98 + 99 + // resultHandler processes scan results (for logging and metrics) 100 + func (q *Queue) resultHandler() { 101 + defer q.wg.Done() 102 + 103 + for { 104 + select { 105 + case <-q.ctx.Done(): 106 + return 107 + 108 + case result, ok := <-q.results: 109 + if !ok { 110 + return 111 + } 112 + 113 + // Log the result 114 + slog.Info("Scan completed", 115 + "repository", result.Job.Repository, 116 + "tag", result.Job.Tag, 117 + "digest", result.Job.ManifestDigest, 118 + "critical", result.Summary.Critical, 119 + "high", result.Summary.High, 120 + "medium", result.Summary.Medium, 121 + "low", result.Summary.Low, 122 + "total", result.Summary.Total, 123 + "scanner", result.ScannerVersion) 124 + } 125 + } 126 + } 127 + 128 + // Enqueue adds a job to the queue 129 + func (q *Queue) Enqueue(jobAny any) error { 130 + // Type assert to ScanJob (can be map or struct from HandleNotifyManifest) 131 + var job *ScanJob 132 + 133 + switch v := jobAny.(type) { 134 + case *ScanJob: 135 + job = v 136 + case map[string]interface{}: 137 + // Convert map to ScanJob (from HandleNotifyManifest) 138 + job = &ScanJob{ 139 + ManifestDigest: v["manifestDigest"].(string), 140 + Repository: v["repository"].(string), 141 + Tag: v["tag"].(string), 142 + UserDID: v["userDID"].(string), 143 + UserHandle: v["userHandle"].(string), 144 + } 145 + 146 + // Parse config blob reference 147 + if configMap, ok := v["config"].(map[string]interface{}); ok { 148 + job.Config = atproto.BlobReference{ 149 + Digest: configMap["digest"].(string), 150 + Size: convertToInt64(configMap["size"]), 151 + MediaType: configMap["mediaType"].(string), 152 + } 153 + } 154 + 155 + // Parse layers 156 + if layersSlice, ok := v["layers"].([]interface{}); ok { 157 + slog.Info("Parsing layers from scan job", 158 + "layersFound", len(layersSlice)) 159 + job.Layers = make([]atproto.BlobReference, len(layersSlice)) 160 + for i, layerAny := range layersSlice { 161 + if layerMap, ok := layerAny.(map[string]interface{}); ok { 162 + job.Layers[i] = atproto.BlobReference{ 163 + Digest: layerMap["digest"].(string), 164 + Size: convertToInt64(layerMap["size"]), 165 + MediaType: layerMap["mediaType"].(string), 166 + } 167 + } 168 + } 169 + } else { 170 + slog.Warn("No layers found in scan job map", 171 + "layersType", fmt.Sprintf("%T", v["layers"]), 172 + "layersValue", v["layers"]) 173 + } 174 + default: 175 + return fmt.Errorf("invalid job type: %T", jobAny) 176 + } 177 + 178 + select { 179 + case q.jobs <- job: 180 + slog.Info("Enqueued scan job", 181 + "repository", job.Repository, 182 + "tag", job.Tag, 183 + "digest", job.ManifestDigest) 184 + return nil 185 + case <-q.ctx.Done(): 186 + return q.ctx.Err() 187 + } 188 + } 189 + 190 + // Shutdown gracefully shuts down the queue, waiting for all workers to finish 191 + func (q *Queue) Shutdown() { 192 + slog.Info("Shutting down scanner queue") 193 + 194 + // Close the jobs channel to signal no more jobs 195 + close(q.jobs) 196 + 197 + // Wait for all workers to finish 198 + q.wg.Wait() 199 + 200 + // Close results channel 201 + close(q.results) 202 + 203 + // Cancel context 204 + q.cancel() 205 + 206 + slog.Info("Scanner queue shut down complete") 207 + } 208 + 209 + // Len returns the number of jobs currently in the queue 210 + func (q *Queue) Len() int { 211 + return len(q.jobs) 212 + } 213 + 214 + // convertToInt64 converts an interface{} number to int64, handling both float64 and int64 215 + func convertToInt64(v interface{}) int64 { 216 + switch n := v.(type) { 217 + case float64: 218 + return int64(n) 219 + case int64: 220 + return n 221 + case int: 222 + return int64(n) 223 + default: 224 + return 0 225 + } 226 + }
+123
pkg/hold/scanner/storage.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "time" 10 + 11 + "atcr.io/pkg/atproto" 12 + ) 13 + 14 + // storeResults uploads scan results and creates ORAS manifest records in the hold's PDS 15 + func (w *Worker) storeResults(ctx context.Context, job *ScanJob, sbomDigest, vulnDigest string, vulnJSON []byte, summary VulnerabilitySummary) error { 16 + if !w.config.Scanner.VulnEnabled { 17 + slog.Info("Vulnerability scanning disabled, skipping result storage") 18 + return nil 19 + } 20 + 21 + slog.Info("Storing scan results as ORAS artifact", 22 + "repository", job.Repository, 23 + "subjectDigest", job.ManifestDigest, 24 + "vulnDigest", vulnDigest) 25 + 26 + // Create ORAS manifest for vulnerability report 27 + orasManifest := map[string]interface{}{ 28 + "schemaVersion": 2, 29 + "mediaType": "application/vnd.oci.image.manifest.v1+json", 30 + "artifactType": "application/vnd.atcr.vulnerabilities+json", 31 + "config": map[string]interface{}{ 32 + "mediaType": "application/vnd.oci.empty.v1+json", 33 + "digest": "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", // Empty JSON object 34 + "size": 2, 35 + }, 36 + "subject": map[string]interface{}{ 37 + "mediaType": "application/vnd.oci.image.manifest.v1+json", 38 + "digest": job.ManifestDigest, 39 + "size": 0, // We don't have the size, but it's optional 40 + }, 41 + "layers": []map[string]interface{}{ 42 + { 43 + "mediaType": "application/json", 44 + "digest": vulnDigest, 45 + "size": len(vulnJSON), 46 + "annotations": map[string]string{ 47 + "org.opencontainers.image.title": "vulnerability-report.json", 48 + }, 49 + }, 50 + }, 51 + "annotations": map[string]string{ 52 + "io.atcr.vuln.critical": fmt.Sprintf("%d", summary.Critical), 53 + "io.atcr.vuln.high": fmt.Sprintf("%d", summary.High), 54 + "io.atcr.vuln.medium": fmt.Sprintf("%d", summary.Medium), 55 + "io.atcr.vuln.low": fmt.Sprintf("%d", summary.Low), 56 + "io.atcr.vuln.total": fmt.Sprintf("%d", summary.Total), 57 + "io.atcr.vuln.scannedAt": time.Now().Format(time.RFC3339), 58 + "io.atcr.vuln.scannerVersion": w.getScannerVersion(), 59 + }, 60 + } 61 + 62 + // Encode ORAS manifest to JSON 63 + orasManifestJSON, err := json.Marshal(orasManifest) 64 + if err != nil { 65 + return fmt.Errorf("failed to encode ORAS manifest: %w", err) 66 + } 67 + 68 + // Calculate ORAS manifest digest 69 + orasDigest := fmt.Sprintf("sha256:%x", sha256Bytes(orasManifestJSON)) 70 + 71 + // Upload ORAS manifest blob to storage 72 + if err := w.uploadBlob(ctx, orasDigest, orasManifestJSON); err != nil { 73 + return fmt.Errorf("failed to upload ORAS manifest blob: %w", err) 74 + } 75 + 76 + // Create manifest record in hold's PDS 77 + if err := w.createManifestRecord(ctx, job, orasDigest, orasManifestJSON, summary); err != nil { 78 + return fmt.Errorf("failed to create manifest record: %w", err) 79 + } 80 + 81 + slog.Info("Successfully stored scan results", "orasDigest", orasDigest) 82 + return nil 83 + } 84 + 85 + // createManifestRecord creates an ORAS manifest record in the hold's PDS 86 + func (w *Worker) createManifestRecord(ctx context.Context, job *ScanJob, orasDigest string, orasManifestJSON []byte, summary VulnerabilitySummary) error { 87 + // Create ManifestRecord from ORAS manifest 88 + record, err := atproto.NewManifestRecord(job.Repository, orasDigest, orasManifestJSON) 89 + if err != nil { 90 + return fmt.Errorf("failed to create manifest record: %w", err) 91 + } 92 + 93 + // Set SBOM/vulnerability specific fields 94 + record.OwnerDID = job.UserDID 95 + record.ScannedAt = time.Now().Format(time.RFC3339) 96 + record.ScannerVersion = w.getScannerVersion() 97 + 98 + // Add hold DID (this ORAS artifact is stored in the hold's PDS) 99 + record.HoldDID = w.pds.DID() 100 + 101 + // Convert digest to record key (remove "sha256:" prefix) 102 + rkey := orasDigest[len("sha256:"):] 103 + 104 + // Store record in hold's PDS 105 + slog.Info("Creating manifest record in hold's PDS", 106 + "collection", atproto.ManifestCollection, 107 + "rkey", rkey, 108 + "ownerDid", job.UserDID) 109 + 110 + _, _, err = w.pds.CreateManifestRecord(ctx, record, rkey) 111 + if err != nil { 112 + return fmt.Errorf("failed to put record in PDS: %w", err) 113 + } 114 + 115 + slog.Info("Manifest record created successfully", "uri", fmt.Sprintf("at://%s/%s/%s", w.pds.DID(), atproto.ManifestCollection, rkey)) 116 + return nil 117 + } 118 + 119 + // sha256Bytes calculates SHA256 hash of byte slice 120 + func sha256Bytes(data []byte) []byte { 121 + hash := sha256.Sum256(data) 122 + return hash[:] 123 + }
+128
pkg/hold/scanner/syft.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "fmt" 7 + "log/slog" 8 + "os" 9 + 10 + "github.com/anchore/syft/syft" 11 + "github.com/anchore/syft/syft/format" 12 + "github.com/anchore/syft/syft/format/spdxjson" 13 + "github.com/anchore/syft/syft/sbom" 14 + "github.com/anchore/syft/syft/source/directorysource" 15 + ) 16 + 17 + // generateSBOM generates an SBOM using Syft from an extracted image directory 18 + // Returns the SBOM object, SBOM JSON, its digest, and any error 19 + func (w *Worker) generateSBOM(ctx context.Context, imageDir string) (*sbom.SBOM, []byte, string, error) { 20 + slog.Info("Generating SBOM with Syft", "imageDir", imageDir) 21 + 22 + // Check if directory exists and is accessible 23 + entries, err := os.ReadDir(imageDir) 24 + if err != nil { 25 + return nil, nil, "", fmt.Errorf("failed to read image directory: %w", err) 26 + } 27 + slog.Info("Image directory contents", 28 + "path", imageDir, 29 + "entries", len(entries), 30 + "sampleFiles", func() []string { 31 + var samples []string 32 + for i, e := range entries { 33 + if i >= 20 { 34 + break 35 + } 36 + samples = append(samples, e.Name()) 37 + } 38 + return samples 39 + }()) 40 + 41 + // Create Syft source from directory 42 + src, err := directorysource.NewFromPath(imageDir) 43 + if err != nil { 44 + return nil, nil, "", fmt.Errorf("failed to create Syft source: %w", err) 45 + } 46 + defer src.Close() 47 + 48 + // Generate SBOM 49 + slog.Info("Running Syft cataloging") 50 + sbomResult, err := syft.CreateSBOM(ctx, src, nil) 51 + if err != nil { 52 + return nil, nil, "", fmt.Errorf("failed to generate SBOM: %w", err) 53 + } 54 + 55 + if sbomResult == nil { 56 + return nil, nil, "", fmt.Errorf("Syft returned nil SBOM") 57 + } 58 + 59 + slog.Info("SBOM generated", 60 + "packages", sbomResult.Artifacts.Packages.PackageCount(), 61 + "distro", func() string { 62 + if sbomResult.Artifacts.LinuxDistribution != nil { 63 + return fmt.Sprintf("%s %s", sbomResult.Artifacts.LinuxDistribution.Name, sbomResult.Artifacts.LinuxDistribution.Version) 64 + } 65 + return "none" 66 + }()) 67 + 68 + // Encode SBOM to SPDX JSON format 69 + encoder, err := spdxjson.NewFormatEncoderWithConfig(spdxjson.DefaultEncoderConfig()) 70 + if err != nil { 71 + return nil, nil, "", fmt.Errorf("failed to create SPDX encoder: %w", err) 72 + } 73 + 74 + sbomJSON, err := format.Encode(*sbomResult, encoder) 75 + if err != nil { 76 + return nil, nil, "", fmt.Errorf("failed to encode SBOM to SPDX JSON: %w", err) 77 + } 78 + 79 + // Calculate digest 80 + hash := sha256.Sum256(sbomJSON) 81 + digest := fmt.Sprintf("sha256:%x", hash) 82 + 83 + slog.Info("SBOM encoded", "format", "spdx-json", "size", len(sbomJSON), "digest", digest) 84 + 85 + // Upload SBOM blob to storage 86 + if err := w.uploadBlob(ctx, digest, sbomJSON); err != nil { 87 + return nil, nil, "", fmt.Errorf("failed to upload SBOM blob: %w", err) 88 + } 89 + 90 + return sbomResult, sbomJSON, digest, nil 91 + } 92 + 93 + // uploadBlob uploads a blob to storage 94 + func (w *Worker) uploadBlob(ctx context.Context, digest string, data []byte) error { 95 + // Convert digest to storage path (same format as distribution uses) 96 + // Path format: /docker/registry/v2/blobs/sha256/ab/abcd1234.../data 97 + algorithm := "sha256" 98 + digestHex := digest[len("sha256:"):] 99 + if len(digestHex) < 2 { 100 + return fmt.Errorf("invalid digest: %s", digest) 101 + } 102 + 103 + blobPath := fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", 104 + algorithm, 105 + digestHex[:2], 106 + digestHex) 107 + 108 + slog.Info("Uploading blob to storage", "digest", digest, "size", len(data), "path", blobPath) 109 + 110 + // Write blob to storage 111 + writer, err := w.driver.Writer(ctx, blobPath, false) 112 + if err != nil { 113 + return fmt.Errorf("failed to create storage writer: %w", err) 114 + } 115 + defer writer.Close() 116 + 117 + if _, err := writer.Write(data); err != nil { 118 + writer.Cancel(ctx) 119 + return fmt.Errorf("failed to write blob data: %w", err) 120 + } 121 + 122 + if err := writer.Commit(ctx); err != nil { 123 + return fmt.Errorf("failed to commit blob: %w", err) 124 + } 125 + 126 + slog.Info("Successfully uploaded blob", "digest", digest) 127 + return nil 128 + }
+116
pkg/hold/scanner/worker.go
··· 1 + package scanner 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "time" 8 + 9 + "atcr.io/pkg/hold" 10 + "atcr.io/pkg/hold/pds" 11 + "github.com/distribution/distribution/v3/registry/storage/driver" 12 + ) 13 + 14 + // Worker performs vulnerability scanning on container images 15 + type Worker struct { 16 + config *hold.Config 17 + driver driver.StorageDriver 18 + pds *pds.HoldPDS 19 + queue *Queue 20 + } 21 + 22 + // NewWorker creates a new scanner worker 23 + func NewWorker(config *hold.Config, driver driver.StorageDriver, pds *pds.HoldPDS) *Worker { 24 + return &Worker{ 25 + config: config, 26 + driver: driver, 27 + pds: pds, 28 + } 29 + } 30 + 31 + // Start starts the worker pool and initializes vulnerability database 32 + func (w *Worker) Start(queue *Queue) { 33 + w.queue = queue 34 + 35 + // Initialize vulnerability database on startup if scanning is enabled 36 + if w.config.Scanner.VulnEnabled { 37 + go func() { 38 + ctx := context.Background() 39 + if err := w.initializeVulnDatabase(ctx); err != nil { 40 + slog.Error("Failed to initialize vulnerability database", "error", err) 41 + slog.Warn("Vulnerability scanning will be disabled until database is available") 42 + } 43 + }() 44 + } 45 + 46 + queue.Start(w.processJob) 47 + } 48 + 49 + // processJob processes a single scan job 50 + func (w *Worker) processJob(ctx context.Context, job *ScanJob) (*ScanResult, error) { 51 + slog.Info("Processing scan job", 52 + "repository", job.Repository, 53 + "tag", job.Tag, 54 + "digest", job.ManifestDigest, 55 + "layers", len(job.Layers)) 56 + 57 + startTime := time.Now() 58 + 59 + // Step 1: Extract image layers from storage 60 + slog.Info("Extracting image layers", "repository", job.Repository) 61 + imageDir, cleanup, err := w.extractLayers(ctx, job) 62 + if err != nil { 63 + return nil, fmt.Errorf("failed to extract layers: %w", err) 64 + } 65 + defer cleanup() 66 + 67 + // Step 2: Generate SBOM with Syft 68 + slog.Info("Generating SBOM", "repository", job.Repository) 69 + sbomResult, _, sbomDigest, err := w.generateSBOM(ctx, imageDir) 70 + if err != nil { 71 + return nil, fmt.Errorf("failed to generate SBOM: %w", err) 72 + } 73 + 74 + // Step 3: Scan SBOM with Grype (if enabled) 75 + var vulnJSON []byte 76 + var vulnDigest string 77 + var summary VulnerabilitySummary 78 + 79 + if w.config.Scanner.VulnEnabled { 80 + slog.Info("Scanning for vulnerabilities", "repository", job.Repository) 81 + vulnJSON, vulnDigest, summary, err = w.scanVulnerabilities(ctx, sbomResult) 82 + if err != nil { 83 + return nil, fmt.Errorf("failed to scan vulnerabilities: %w", err) 84 + } 85 + } 86 + 87 + // Step 4: Upload results to storage and create ORAS manifests 88 + slog.Info("Storing scan results", "repository", job.Repository) 89 + err = w.storeResults(ctx, job, sbomDigest, vulnDigest, vulnJSON, summary) 90 + if err != nil { 91 + return nil, fmt.Errorf("failed to store results: %w", err) 92 + } 93 + 94 + duration := time.Since(startTime) 95 + slog.Info("Scan job completed", 96 + "repository", job.Repository, 97 + "tag", job.Tag, 98 + "duration", duration, 99 + "vulnerabilities", summary.Total) 100 + 101 + return &ScanResult{ 102 + Job: job, 103 + VulnerabilitiesJSON: vulnJSON, 104 + Summary: summary, 105 + SBOMDigest: sbomDigest, 106 + VulnDigest: vulnDigest, 107 + ScannedAt: time.Now(), 108 + ScannerVersion: w.getScannerVersion(), 109 + }, nil 110 + } 111 + 112 + // getScannerVersion returns the version string for the scanner 113 + func (w *Worker) getScannerVersion() string { 114 + // TODO: Get actual Syft and Grype versions dynamically 115 + return "syft-v1.36.0/grype-v0.102.0" 116 + }