A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/identity"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13
14 "atcr.io/pkg/appview/db"
15 "atcr.io/pkg/atproto"
16)
17
18// Processor handles shared database operations for both Worker (live) and Backfill (sync)
19// This eliminates code duplication between the two data ingestion paths
20type Processor struct {
21 db *sql.DB
22 directory identity.Directory
23 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
24 useCache bool
25}
26
27// NewProcessor creates a new shared processor
28// useCache: true for Worker (live streaming), false for Backfill (batch processing)
29func NewProcessor(database *sql.DB, useCache bool) *Processor {
30 p := &Processor{
31 db: database,
32 directory: identity.DefaultDirectory(),
33 useCache: useCache,
34 }
35
36 if useCache {
37 p.userCache = &UserCache{
38 cache: make(map[string]*db.User),
39 }
40 }
41
42 return p
43}
44
45// EnsureUser resolves and upserts a user by DID
46// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
47func (p *Processor) EnsureUser(ctx context.Context, did string) error {
48 // Check cache first (if enabled)
49 if p.useCache && p.userCache != nil {
50 if user, ok := p.userCache.cache[did]; ok {
51 // Update last seen
52 user.LastSeen = time.Now()
53 return db.UpsertUser(p.db, user)
54 }
55 } else if !p.useCache {
56 // No cache - check if user already exists in DB
57 existingUser, err := db.GetUserByDID(p.db, did)
58 if err == nil && existingUser != nil {
59 // Update last seen
60 existingUser.LastSeen = time.Now()
61 return db.UpsertUser(p.db, existingUser)
62 }
63 }
64
65 // Resolve DID to get handle and PDS endpoint
66 didParsed, err := syntax.ParseDID(did)
67 if err != nil {
68 // Fallback: use DID as handle
69 user := &db.User{
70 DID: did,
71 Handle: did,
72 PDSEndpoint: "https://bsky.social",
73 LastSeen: time.Now(),
74 }
75 if p.useCache {
76 p.userCache.cache[did] = user
77 }
78 return db.UpsertUser(p.db, user)
79 }
80
81 ident, err := p.directory.LookupDID(ctx, didParsed)
82 if err != nil {
83 // Fallback: use DID as handle
84 user := &db.User{
85 DID: did,
86 Handle: did,
87 PDSEndpoint: "https://bsky.social",
88 LastSeen: time.Now(),
89 }
90 if p.useCache {
91 p.userCache.cache[did] = user
92 }
93 return db.UpsertUser(p.db, user)
94 }
95
96 resolvedDID := ident.DID.String()
97 handle := ident.Handle.String()
98 pdsEndpoint := ident.PDSEndpoint()
99
100 // If handle is invalid or PDS is missing, use defaults
101 if handle == "handle.invalid" || handle == "" {
102 handle = resolvedDID
103 }
104 if pdsEndpoint == "" {
105 pdsEndpoint = "https://bsky.social"
106 }
107
108 // Fetch user's Bluesky profile (including avatar)
109 // Use public Bluesky AppView API (doesn't require auth for public profiles)
110 avatar := ""
111 publicClient := atproto.NewClient("https://public.api.bsky.app", "", "")
112 profile, err := publicClient.GetActorProfile(ctx, resolvedDID)
113 if err != nil {
114 fmt.Printf("WARNING [processor]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err)
115 // Continue without avatar
116 } else {
117 avatar = profile.Avatar
118 }
119
120 // Create user record
121 user := &db.User{
122 DID: resolvedDID,
123 Handle: handle,
124 PDSEndpoint: pdsEndpoint,
125 Avatar: avatar,
126 LastSeen: time.Now(),
127 }
128
129 // Cache if enabled
130 if p.useCache {
131 p.userCache.cache[did] = user
132 }
133
134 // Upsert to database
135 return db.UpsertUser(p.db, user)
136}
137
138// ProcessManifest processes a manifest record and stores it in the database
139// Returns the manifest ID for further processing (layers/references)
140func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
141 // Unmarshal manifest record
142 var manifestRecord atproto.ManifestRecord
143 if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
144 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
145 }
146 // Detect manifest type
147 isManifestList := len(manifestRecord.Manifests) > 0
148
149 // Prepare manifest for insertion (WITHOUT annotation fields)
150 manifest := &db.Manifest{
151 DID: did,
152 Repository: manifestRecord.Repository,
153 Digest: manifestRecord.Digest,
154 MediaType: manifestRecord.MediaType,
155 SchemaVersion: manifestRecord.SchemaVersion,
156 HoldEndpoint: manifestRecord.HoldEndpoint,
157 CreatedAt: manifestRecord.CreatedAt,
158 // Annotations removed - stored separately in repository_annotations table
159 }
160
161 // Set config fields only for image manifests (not manifest lists)
162 if !isManifestList && manifestRecord.Config != nil {
163 manifest.ConfigDigest = manifestRecord.Config.Digest
164 manifest.ConfigSize = manifestRecord.Config.Size
165 }
166
167 // Insert manifest
168 manifestID, err := db.InsertManifest(p.db, manifest)
169 if err != nil {
170 // For backfill: if manifest already exists, get its ID
171 if strings.Contains(err.Error(), "UNIQUE constraint failed") {
172 var existingID int64
173 err := p.db.QueryRow(`
174 SELECT id FROM manifests
175 WHERE did = ? AND repository = ? AND digest = ?
176 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
177
178 if err != nil {
179 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
180 }
181 manifestID = existingID
182 } else {
183 return 0, fmt.Errorf("failed to insert manifest: %w", err)
184 }
185 }
186
187 // Update repository annotations ONLY if manifest has at least one non-empty annotation
188 if manifestRecord.Annotations != nil {
189 hasData := false
190 for _, value := range manifestRecord.Annotations {
191 if value != "" {
192 hasData = true
193 break
194 }
195 }
196
197 if hasData {
198 // Replace all annotations for this repository
199 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
200 if err != nil {
201 return 0, fmt.Errorf("failed to upsert annotations: %w", err)
202 }
203 }
204 }
205
206 // Insert manifest references or layers
207 if isManifestList {
208 // Insert manifest references (for manifest lists/indexes)
209 for i, ref := range manifestRecord.Manifests {
210 platformArch := ""
211 platformOS := ""
212 platformVariant := ""
213 platformOSVersion := ""
214
215 if ref.Platform != nil {
216 platformArch = ref.Platform.Architecture
217 platformOS = ref.Platform.OS
218 platformVariant = ref.Platform.Variant
219 platformOSVersion = ref.Platform.OSVersion
220 }
221
222 if err := db.InsertManifestReference(p.db, &db.ManifestReference{
223 ManifestID: manifestID,
224 Digest: ref.Digest,
225 MediaType: ref.MediaType,
226 Size: ref.Size,
227 PlatformArchitecture: platformArch,
228 PlatformOS: platformOS,
229 PlatformVariant: platformVariant,
230 PlatformOSVersion: platformOSVersion,
231 ReferenceIndex: i,
232 }); err != nil {
233 // Continue on error - reference might already exist
234 continue
235 }
236 }
237 } else {
238 // Insert layers (for image manifests)
239 for i, layer := range manifestRecord.Layers {
240 if err := db.InsertLayer(p.db, &db.Layer{
241 ManifestID: manifestID,
242 Digest: layer.Digest,
243 MediaType: layer.MediaType,
244 Size: layer.Size,
245 LayerIndex: i,
246 }); err != nil {
247 // Continue on error - layer might already exist
248 continue
249 }
250 }
251 }
252
253 return manifestID, nil
254}
255
256// ProcessTag processes a tag record and stores it in the database
257func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
258 // Unmarshal tag record
259 var tagRecord atproto.TagRecord
260 if err := json.Unmarshal(recordData, &tagRecord); err != nil {
261 return fmt.Errorf("failed to unmarshal tag: %w", err)
262 }
263 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
264 manifestDigest, err := tagRecord.GetManifestDigest()
265 if err != nil {
266 return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
267 }
268
269 // Insert or update tag
270 return db.UpsertTag(p.db, &db.Tag{
271 DID: did,
272 Repository: tagRecord.Repository,
273 Tag: tagRecord.Tag,
274 Digest: manifestDigest,
275 CreatedAt: tagRecord.UpdatedAt,
276 })
277}
278
279// ProcessStar processes a star record and stores it in the database
280func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
281 // Unmarshal star record
282 var starRecord atproto.StarRecord
283 if err := json.Unmarshal(recordData, &starRecord); err != nil {
284 return fmt.Errorf("failed to unmarshal star: %w", err)
285 }
286 // Upsert the star record (idempotent - won't duplicate)
287 // The DID here is the starrer (user who starred)
288 // The subject contains the owner DID and repository
289 // Star count will be calculated on demand from the stars table
290 return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
291}
292
293// ProcessSailorProfile processes a sailor profile record
294// This is primarily used by backfill to cache captain records for holds
295func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
296 // Unmarshal sailor profile record
297 var profileRecord atproto.SailorProfileRecord
298 if err := json.Unmarshal(recordData, &profileRecord); err != nil {
299 return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
300 }
301
302 // Skip if no default hold set
303 if profileRecord.DefaultHold == "" {
304 return nil
305 }
306
307 // Convert hold URL/DID to canonical DID
308 holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold)
309 if holdDID == "" {
310 fmt.Printf("WARNING [processor]: Invalid hold reference in profile for %s: %s\n", did, profileRecord.DefaultHold)
311 return nil
312 }
313
314 // Query and cache the captain record using provided function
315 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here
316 if queryCaptainFn != nil {
317 return queryCaptainFn(ctx, holdDID)
318 }
319
320 return nil
321}