A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
73
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 97d1b3cdd50e4727e5db3c498f4e8bb73851fd39 321 lines 9.8 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/identity" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 14 "atcr.io/pkg/appview/db" 15 "atcr.io/pkg/atproto" 16) 17 18// Processor handles shared database operations for both Worker (live) and Backfill (sync) 19// This eliminates code duplication between the two data ingestion paths 20type Processor struct { 21 db *sql.DB 22 directory identity.Directory 23 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill 24 useCache bool 25} 26 27// NewProcessor creates a new shared processor 28// useCache: true for Worker (live streaming), false for Backfill (batch processing) 29func NewProcessor(database *sql.DB, useCache bool) *Processor { 30 p := &Processor{ 31 db: database, 32 directory: identity.DefaultDirectory(), 33 useCache: useCache, 34 } 35 36 if useCache { 37 p.userCache = &UserCache{ 38 cache: make(map[string]*db.User), 39 } 40 } 41 42 return p 43} 44 45// EnsureUser resolves and upserts a user by DID 46// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) 47func (p *Processor) EnsureUser(ctx context.Context, did string) error { 48 // Check cache first (if enabled) 49 if p.useCache && p.userCache != nil { 50 if user, ok := p.userCache.cache[did]; ok { 51 // Update last seen 52 user.LastSeen = time.Now() 53 return db.UpsertUser(p.db, user) 54 } 55 } else if !p.useCache { 56 // No cache - check if user already exists in DB 57 existingUser, err := db.GetUserByDID(p.db, did) 58 if err == nil && existingUser != nil { 59 // Update last seen 60 existingUser.LastSeen = time.Now() 61 return db.UpsertUser(p.db, existingUser) 62 } 63 } 64 65 // Resolve DID to get handle and PDS endpoint 66 didParsed, err := syntax.ParseDID(did) 67 if err != nil { 68 // Fallback: use DID as handle 69 user := &db.User{ 70 DID: did, 71 Handle: did, 72 PDSEndpoint: "https://bsky.social", 73 LastSeen: time.Now(), 74 } 75 if p.useCache { 76 p.userCache.cache[did] = user 77 } 78 return db.UpsertUser(p.db, user) 79 } 80 81 ident, err := p.directory.LookupDID(ctx, didParsed) 82 if err != nil { 83 // Fallback: use DID as handle 84 user := &db.User{ 85 DID: did, 86 Handle: did, 87 PDSEndpoint: "https://bsky.social", 88 LastSeen: time.Now(), 89 } 90 if p.useCache { 91 p.userCache.cache[did] = user 92 } 93 return db.UpsertUser(p.db, user) 94 } 95 96 resolvedDID := ident.DID.String() 97 handle := ident.Handle.String() 98 pdsEndpoint := ident.PDSEndpoint() 99 100 // If handle is invalid or PDS is missing, use defaults 101 if handle == "handle.invalid" || handle == "" { 102 handle = resolvedDID 103 } 104 if pdsEndpoint == "" { 105 pdsEndpoint = "https://bsky.social" 106 } 107 108 // Fetch user's Bluesky profile (including avatar) 109 // Use public Bluesky AppView API (doesn't require auth for public profiles) 110 avatar := "" 111 publicClient := atproto.NewClient("https://public.api.bsky.app", "", "") 112 profile, err := publicClient.GetActorProfile(ctx, resolvedDID) 113 if err != nil { 114 fmt.Printf("WARNING [processor]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err) 115 // Continue without avatar 116 } else { 117 avatar = profile.Avatar 118 } 119 120 // Create user record 121 user := &db.User{ 122 DID: resolvedDID, 123 Handle: handle, 124 PDSEndpoint: pdsEndpoint, 125 Avatar: avatar, 126 LastSeen: time.Now(), 127 } 128 129 // Cache if enabled 130 if p.useCache { 131 p.userCache.cache[did] = user 132 } 133 134 // Upsert to database 135 return db.UpsertUser(p.db, user) 136} 137 138// ProcessManifest processes a manifest record and stores it in the database 139// Returns the manifest ID for further processing (layers/references) 140func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { 141 // Unmarshal manifest record 142 var manifestRecord atproto.ManifestRecord 143 if err := json.Unmarshal(recordData, &manifestRecord); err != nil { 144 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) 145 } 146 // Detect manifest type 147 isManifestList := len(manifestRecord.Manifests) > 0 148 149 // Prepare manifest for insertion (WITHOUT annotation fields) 150 manifest := &db.Manifest{ 151 DID: did, 152 Repository: manifestRecord.Repository, 153 Digest: manifestRecord.Digest, 154 MediaType: manifestRecord.MediaType, 155 SchemaVersion: manifestRecord.SchemaVersion, 156 HoldEndpoint: manifestRecord.HoldEndpoint, 157 CreatedAt: manifestRecord.CreatedAt, 158 // Annotations removed - stored separately in repository_annotations table 159 } 160 161 // Set config fields only for image manifests (not manifest lists) 162 if !isManifestList && manifestRecord.Config != nil { 163 manifest.ConfigDigest = manifestRecord.Config.Digest 164 manifest.ConfigSize = manifestRecord.Config.Size 165 } 166 167 // Insert manifest 168 manifestID, err := db.InsertManifest(p.db, manifest) 169 if err != nil { 170 // For backfill: if manifest already exists, get its ID 171 if strings.Contains(err.Error(), "UNIQUE constraint failed") { 172 var existingID int64 173 err := p.db.QueryRow(` 174 SELECT id FROM manifests 175 WHERE did = ? AND repository = ? AND digest = ? 176 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 177 178 if err != nil { 179 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) 180 } 181 manifestID = existingID 182 } else { 183 return 0, fmt.Errorf("failed to insert manifest: %w", err) 184 } 185 } 186 187 // Update repository annotations ONLY if manifest has at least one non-empty annotation 188 if manifestRecord.Annotations != nil { 189 hasData := false 190 for _, value := range manifestRecord.Annotations { 191 if value != "" { 192 hasData = true 193 break 194 } 195 } 196 197 if hasData { 198 // Replace all annotations for this repository 199 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations) 200 if err != nil { 201 return 0, fmt.Errorf("failed to upsert annotations: %w", err) 202 } 203 } 204 } 205 206 // Insert manifest references or layers 207 if isManifestList { 208 // Insert manifest references (for manifest lists/indexes) 209 for i, ref := range manifestRecord.Manifests { 210 platformArch := "" 211 platformOS := "" 212 platformVariant := "" 213 platformOSVersion := "" 214 215 if ref.Platform != nil { 216 platformArch = ref.Platform.Architecture 217 platformOS = ref.Platform.OS 218 platformVariant = ref.Platform.Variant 219 platformOSVersion = ref.Platform.OSVersion 220 } 221 222 if err := db.InsertManifestReference(p.db, &db.ManifestReference{ 223 ManifestID: manifestID, 224 Digest: ref.Digest, 225 MediaType: ref.MediaType, 226 Size: ref.Size, 227 PlatformArchitecture: platformArch, 228 PlatformOS: platformOS, 229 PlatformVariant: platformVariant, 230 PlatformOSVersion: platformOSVersion, 231 ReferenceIndex: i, 232 }); err != nil { 233 // Continue on error - reference might already exist 234 continue 235 } 236 } 237 } else { 238 // Insert layers (for image manifests) 239 for i, layer := range manifestRecord.Layers { 240 if err := db.InsertLayer(p.db, &db.Layer{ 241 ManifestID: manifestID, 242 Digest: layer.Digest, 243 MediaType: layer.MediaType, 244 Size: layer.Size, 245 LayerIndex: i, 246 }); err != nil { 247 // Continue on error - layer might already exist 248 continue 249 } 250 } 251 } 252 253 return manifestID, nil 254} 255 256// ProcessTag processes a tag record and stores it in the database 257func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { 258 // Unmarshal tag record 259 var tagRecord atproto.TagRecord 260 if err := json.Unmarshal(recordData, &tagRecord); err != nil { 261 return fmt.Errorf("failed to unmarshal tag: %w", err) 262 } 263 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 264 manifestDigest, err := tagRecord.GetManifestDigest() 265 if err != nil { 266 return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 267 } 268 269 // Insert or update tag 270 return db.UpsertTag(p.db, &db.Tag{ 271 DID: did, 272 Repository: tagRecord.Repository, 273 Tag: tagRecord.Tag, 274 Digest: manifestDigest, 275 CreatedAt: tagRecord.UpdatedAt, 276 }) 277} 278 279// ProcessStar processes a star record and stores it in the database 280func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { 281 // Unmarshal star record 282 var starRecord atproto.StarRecord 283 if err := json.Unmarshal(recordData, &starRecord); err != nil { 284 return fmt.Errorf("failed to unmarshal star: %w", err) 285 } 286 // Upsert the star record (idempotent - won't duplicate) 287 // The DID here is the starrer (user who starred) 288 // The subject contains the owner DID and repository 289 // Star count will be calculated on demand from the stars table 290 return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 291} 292 293// ProcessSailorProfile processes a sailor profile record 294// This is primarily used by backfill to cache captain records for holds 295func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { 296 // Unmarshal sailor profile record 297 var profileRecord atproto.SailorProfileRecord 298 if err := json.Unmarshal(recordData, &profileRecord); err != nil { 299 return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 300 } 301 302 // Skip if no default hold set 303 if profileRecord.DefaultHold == "" { 304 return nil 305 } 306 307 // Convert hold URL/DID to canonical DID 308 holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) 309 if holdDID == "" { 310 fmt.Printf("WARNING [processor]: Invalid hold reference in profile for %s: %s\n", did, profileRecord.DefaultHold) 311 return nil 312 } 313 314 // Query and cache the captain record using provided function 315 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here 316 if queryCaptainFn != nil { 317 return queryCaptainFn(ctx, holdDID) 318 } 319 320 return nil 321}