···8484# HOLD_KEY_PATH=/var/lib/atcr-hold/signing.key
85858686# ==============================================================================
8787+# Bluesky Integration
8888+# ==============================================================================
8989+9090+# Enable Bluesky posts when users push container images (default: false)
9191+# When enabled, the hold's embedded PDS will create posts announcing image pushes
9292+# Can be overridden per-hold via the captain record's enableManifestPosts field
9393+# HOLD_BLUESKY_POSTS_ENABLED=false
9494+9595+# ==============================================================================
8796# Registration (REQUIRED)
8897# ==============================================================================
8998
+3-1
cmd/credential-helper/main.go
···127127 fmt.Fprintf(os.Stderr, "Stored credentials for %s are invalid or expired\n", appViewURL)
128128 // Delete the invalid credentials
129129 delete(allCreds.Credentials, appViewURL)
130130- saveDeviceCredentials(configPath, allCreds)
130130+ if err := saveDeviceCredentials(configPath, allCreds); err != nil {
131131+ fmt.Fprintf(os.Stderr, "Warning: failed to save updated credentials: %v\n", err)
132132+ }
131133 // Mark as not found so we re-authorize below
132134 found = false
133135 }
···8787# Default: false
8888HOLD_ALLOW_ALL_CREW=false
89899090+# Enable Bluesky posts when manifests are pushed
9191+# When enabled, the hold service creates Bluesky posts announcing new container
9292+# image pushes. Posts include image name, tag, size, and layer count.
9393+#
9494+# - true: Create Bluesky posts for manifest uploads
9595+# - false: Silent operation (no Bluesky posts)
9696+#
9797+# Note: This requires the hold owner to have OAuth credentials for posting.
9898+# See docs/BLUESKY_MANIFEST_POSTS.md for setup instructions.
9999+#
100100+# Default: false
101101+HOLD_BLUESKY_POSTS_ENABLED=false
102102+90103# ==============================================================================
91104# S3/UpCloud Object Storage Configuration
92105# ==============================================================================
···271271}
272272```
273273274274-### 4. Bluesky Post Creation
274274+### 4. Bluesky Post Creation with Facets
275275276276**File**: `pkg/hold/pds/manifest_post.go` (new file)
277277278278-**Pattern**: Reuse existing `status.go` pattern
278278+**Pattern**: Extends `status.go` pattern with rich text facets
279279280280```go
281281// CreateManifestPost creates a Bluesky post announcing a manifest upload
282282-func (p *HoldPDS) CreateManifestPost(ctx context.Context, repository, tag, userHandle string) (string, error) {
282282+// Includes facets for clickable mentions and links
283283+func (p *HoldPDS) CreateManifestPost(
284284+ ctx context.Context,
285285+ repository, tag, userHandle, digest string,
286286+ totalSize int64,
287287+) (string, error) {
283288 now := time.Now()
284289285285- // Format post text (similar to "what's new" feed)
286286- text := formatManifestPostText(repository, tag, userHandle)
290290+ // Build AppView repository URL
291291+ appViewURL := fmt.Sprintf("https://atcr.io/r/%s/%s", userHandle, repository)
292292+293293+ // Format post text components
294294+ digestShort := formatDigest(digest)
295295+ sizeStr := formatSize(totalSize)
296296+ repoWithTag := fmt.Sprintf("%s:%s", repository, tag)
297297+298298+ // Build text: "@alice.bsky.social just pushed hsm-secrets-operator:latest\nDigest: sha256:abc...def Size: 12.2 MB"
299299+ text := fmt.Sprintf("@%s just pushed %s\nDigest: %s Size: %s", userHandle, repoWithTag, digestShort, sizeStr)
300300+301301+ // Create facets for mentions and links
302302+ facets := buildFacets(text, userHandle, repoWithTag, appViewURL)
287303288288- // Create post struct
304304+ // Create post struct with facets
289305 post := &bsky.FeedPost{
290306 LexiconTypeID: "app.bsky.feed.post",
291307 Text: text,
308308+ Facets: facets,
292309 CreatedAt: now.Format(time.RFC3339),
293293- // Optional: Add embed with link to AppView
294294- // Embed: &bsky.FeedPost_Embed{...}
295310 }
296311297312 // Create record with auto-generated TID
···314329 return postURI, nil
315330}
316331317317-// formatManifestPostText generates the post text
318318-func formatManifestPostText(repository, tag, userHandle string) string {
319319- // Example formats:
320320- // "@alice.bsky.social pushed alice/myapp:latest to ATCR"
321321- // "New image pushed: alice/myapp:v1.0.0 by @alice.bsky.social"
322322- // "📦 alice/myapp:latest pushed by @alice.bsky.social"
332332+// formatDigest truncates digest to first 7 and last 7 chars
333333+// Example: sha256:abc1234567890...fedcba9876543210 -> sha256:abc1234...9876543
334334+func formatDigest(digest string) string {
335335+ if !strings.HasPrefix(digest, "sha256:") {
336336+ return digest // Return as-is if not sha256
337337+ }
323338324324- return fmt.Sprintf("📦 %s:%s pushed by @%s", repository, tag, userHandle)
339339+ hash := strings.TrimPrefix(digest, "sha256:")
340340+ if len(hash) <= 14 {
341341+ return digest // Too short to truncate
342342+ }
343343+344344+ return fmt.Sprintf("sha256:%s...%s", hash[:7], hash[len(hash)-7:])
345345+}
346346+347347+// formatSize converts bytes to human-readable format
348348+// Examples: 1024 -> "1.0 KB", 1048576 -> "1.0 MB", 1073741824 -> "1.0 GB"
349349+func formatSize(bytes int64) string {
350350+ const (
351351+ KB = 1024
352352+ MB = 1024 * KB
353353+ GB = 1024 * MB
354354+ )
355355+356356+ switch {
357357+ case bytes >= GB:
358358+ return fmt.Sprintf("%.1f GB", float64(bytes)/float64(GB))
359359+ case bytes >= MB:
360360+ return fmt.Sprintf("%.1f MB", float64(bytes)/float64(MB))
361361+ case bytes >= KB:
362362+ return fmt.Sprintf("%.1f KB", float64(bytes)/float64(KB))
363363+ default:
364364+ return fmt.Sprintf("%d B", bytes)
365365+ }
325366}
326326-```
327367328328-**Advanced Post Options**:
368368+// buildFacets creates mention and link facets for rich text
369369+// IMPORTANT: Byte offsets must be calculated for UTF-8 encoded text
370370+func buildFacets(text, userHandle, repoWithTag, appViewURL string) []*bsky.RichtextFacet {
371371+ facets := []*bsky.RichtextFacet{}
329372330330-```go
331331-// Example with embedded link to AppView
332332-post := &bsky.FeedPost{
333333- LexiconTypeID: "app.bsky.feed.post",
334334- Text: text,
335335- CreatedAt: now.Format(time.RFC3339),
336336- Embed: &bsky.FeedPost_Embed{
337337- FeedPost_External: &bsky.EmbedExternal{
338338- External: &bsky.EmbedExternal_External{
339339- Uri: fmt.Sprintf("https://atcr.io/%s", repository),
340340- Title: fmt.Sprintf("%s:%s", repository, tag),
341341- Description: "View on ATCR",
373373+ // Find mention: "@alice.bsky.social"
374374+ mentionText := "@" + userHandle
375375+ mentionStart := strings.Index(text, mentionText)
376376+ if mentionStart >= 0 {
377377+ // Calculate byte offsets (not character offsets!)
378378+ byteStart := int64(len(text[:mentionStart]))
379379+ byteEnd := int64(len(text[:mentionStart+len(mentionText)]))
380380+381381+ facets = append(facets, &bsky.RichtextFacet{
382382+ Index: &bsky.RichtextFacet_ByteSlice{
383383+ ByteStart: byteStart,
384384+ ByteEnd: byteEnd,
342385 },
343343- },
344344- },
345345-}
386386+ Features: []*bsky.RichtextFacet_Features_Elem{
387387+ {
388388+ RichtextFacet_Mention: &bsky.RichtextFacet_Mention{
389389+ Did: "", // Will be resolved by Bluesky from handle
390390+ },
391391+ },
392392+ },
393393+ })
394394+ }
346395347347-// Example with facets (mentions)
348348-// This would require parsing the text and creating facet structs
349349-// for @mentions to be clickable in Bluesky
396396+ // Find repository link: "hsm-secrets-operator:latest"
397397+ linkStart := strings.Index(text, repoWithTag)
398398+ if linkStart >= 0 {
399399+ // Calculate byte offsets
400400+ byteStart := int64(len(text[:linkStart]))
401401+ byteEnd := int64(len(text[:linkStart+len(repoWithTag)]))
402402+403403+ facets = append(facets, &bsky.RichtextFacet{
404404+ Index: &bsky.RichtextFacet_ByteSlice{
405405+ ByteStart: byteStart,
406406+ ByteEnd: byteEnd,
407407+ },
408408+ Features: []*bsky.RichtextFacet_Features_Elem{
409409+ {
410410+ RichtextFacet_Link: &bsky.RichtextFacet_Link{
411411+ Uri: appViewURL,
412412+ },
413413+ },
414414+ },
415415+ })
416416+ }
417417+418418+ return facets
419419+}
350420```
351421422422+**Facet Implementation Notes:**
423423+424424+1. **Byte Offsets**: ATProto uses byte offsets (UTF-8 encoded), not character offsets
425425+ - For ASCII text: `len(text[:index])` gives correct byte offset
426426+ - For Unicode: Must use `len()` on substring to get byte count
427427+ - Never use `rune` indexes directly
428428+429429+2. **Mention Facets**:
430430+ - Include `@` symbol in the facet range
431431+ - DID field can be empty; Bluesky resolves from handle
432432+ - Type: `app.bsky.richtext.facet#mention`
433433+434434+3. **Link Facets**:
435435+ - Text can be anything (doesn't have to be URL)
436436+ - URI field contains actual target URL
437437+ - Type: `app.bsky.richtext.facet#link`
438438+439439+4. **Ordering**: Facets should not overlap; order doesn't matter
440440+352441### 5. AppView Integration
353442354443**File**: `pkg/appview/storage/manifest_store.go`
···411500 }
412501413502 // 5. Build notification request
414414- notifyReq := map[string]interface{}{
503503+ notifyReq := map[string]any{
415504 "repository": ms.repository,
416505 "tag": tag,
417506 "userDid": regCtx.DID,
418507 "userHandle": regCtx.Handle, // Need to add this to RegistryContext
419419- "manifest": map[string]interface{}{
508508+ "manifest": map[string]any{
420509 "mediaType": parsedManifest.MediaType,
421421- "config": map[string]interface{}{
510510+ "config": map[string]any{
422511 "digest": parsedManifest.Config.Digest.String(),
423512 "size": parsedManifest.Config.Size,
424513 },
425425- "layers": func() []map[string]interface{} {
426426- layers := make([]map[string]interface{}, len(parsedManifest.Layers))
514514+ "layers": func() []map[string]any {
515515+ layers := make([]map[string]any, len(parsedManifest.Layers))
427516 for i, layer := range parsedManifest.Layers {
428428- layers[i] = map[string]interface{}{
517517+ layers[i] = map[string]any{
429518 "digest": layer.Digest.String(),
430519 "size": layer.Size,
431520 "mediaType": layer.MediaType,
···463552 }
464553465554 // 7. Parse response (optional logging)
466466- var notifyResp map[string]interface{}
555555+ var notifyResp map[string]any
467556 if err := json.NewDecoder(resp.Body).Decode(¬ifyResp); err == nil {
468557 log.Printf("Hold notification successful: %+v", notifyResp)
469558 }
···603692604693**Hold Service** (`.env.hold.example`):
605694```bash
606606-# Enable/disable Bluesky posting
607607-HOLD_BLUESKY_POSTS_ENABLED=true
695695+# Enable/disable Bluesky manifest posting (default: false)
696696+# When enabled, hold will create Bluesky posts when users push images
697697+# Can be overridden per-hold via captain record's enableManifestPosts field
698698+HOLD_BLUESKY_POSTS_ENABLED=false
699699+```
700700+701701+**AppView** - No configuration needed. AppView always attempts to notify holds after manifest uploads, but handles failures gracefully.
702702+703703+### Feature Flags
608704609609-# Enable/disable layer record creation
610610-HOLD_LAYER_RECORDS_ENABLED=true
611611-```
705705+**Captain Record Override:**
706706+The hold's captain record includes an `enableManifestPosts` field that overrides the environment variable:
612707613613-**AppView** (`.env.appview.example`):
614614-```bash
615615-# Enable/disable manifest notifications to holds
616616-ATCR_NOTIFY_HOLDS_ENABLED=true
708708+```go
709709+type CaptainRecord struct {
710710+ // ... other fields ...
711711+ EnableManifestPosts bool `json:"enableManifestPosts" cborgen:"enableManifestPosts"`
712712+}
617713```
618714619619-### Feature Flags
715715+**Precedence (highest to lowest):**
716716+1. Captain record `enableManifestPosts` field (if set)
717717+2. `HOLD_BLUESKY_POSTS_ENABLED` environment variable
718718+3. Default: `false` (opt-in feature)
620719621621-Consider making this feature opt-in initially:
622622-- Add flag to captain record: `enableSocialPosts bool`
623623-- Check flag before creating posts
624624-- Allow hold owners to disable social features
720720+**Rationale:**
721721+- Default off for backward compatibility and privacy
722722+- Hold owners can enable via env var at deployment
723723+- Per-hold override via captain record for multi-tenant scenarios
724724+- Follows same pattern as existing status post feature
625725626726## Performance Considerations
627727···816916817917## Example Post Formats
818918819819-### Simple Format
919919+### Preferred Format (Facet-Based)
920920+921921+**Text representation:**
922922+```
923923+@alice.bsky.social just pushed hsm-secrets-operator:latest
924924+Digest: sha256:abc1234...def5678 Size: 12.2 MB
925925+```
926926+927927+**Actual implementation:**
928928+- `@alice.bsky.social` - Clickable mention (facet type: `app.bsky.richtext.facet#mention`)
929929+- `hsm-secrets-operator:latest` - Clickable link to `https://atcr.io/r/alice.bsky.social/hsm-secrets-operator` (facet type: `app.bsky.richtext.facet#link`)
930930+- `sha256:abc1234...def5678` - Truncated digest (first 7 + last 7 chars)
931931+- `12.2 MB` - Human-readable size (auto-formatted from bytes)
932932+933933+**Why facets?**
934934+- Mentions are clickable and link to user profiles in Bluesky
935935+- Repository names link directly to AppView repository pages
936936+- Better user experience than plain text URLs
937937+- Standard ATProto rich text format
938938+939939+### Alternative Formats
940940+941941+#### Simple Format
820942```
821943📦 alice/myapp:latest pushed by @alice.bsky.social
822944```
823945824824-### Detailed Format
946946+#### Detailed Format
825947```
826948📦 New container image pushed!
827949···832954View: https://atcr.io/alice/myapp
833955```
834956835835-### With Emoji/Styling
957957+#### With Emoji/Styling
836958```
837959🚀 alice/myapp:latest
838960···842964🔗 atcr.io/alice/myapp
843965```
844966845845-### With Tags
967967+#### With Tags
846968```
847969📦 alice/myapp:latest pushed by @alice.bsky.social
848970
+691
docs/RELAY.md
···11+# Running an ATProto Relay for ATCR Hold Discovery
22+33+This document explains what it takes to run an ATProto relay for indexing ATCR hold records, including infrastructure requirements, configuration, and trade-offs.
44+55+## Overview
66+77+### What is an ATProto Relay?
88+99+An ATProto relay is a service that:
1010+- **Subscribes to multiple PDS hosts** and aggregates their data streams
1111+- **Outputs a combined "firehose"** event stream for real-time network updates
1212+- **Validates data integrity** and identity signatures
1313+- **Provides discovery endpoints** like `com.atproto.sync.listReposByCollection`
1414+1515+The relay acts as a network-wide indexer, making it possible to discover which DIDs have records of specific types (collections).
1616+1717+### Why ATCR Needs a Relay
1818+1919+ATCR uses hold captain records (`io.atcr.hold.captain`) stored in hold PDSs to enable hold discovery. The `listReposByCollection` endpoint allows AppViews to efficiently discover all holds in the network without crawling every PDS individually.
2020+2121+**The problem**: Standard Bluesky relays appear to only index collections from `did:plc` DIDs, not `did:web` DIDs. Since ATCR holds use `did:web` (e.g., `did:web:hold01.atcr.io`), they aren't discoverable via Bluesky's public relays.
2222+2323+## Recommended Approach: Phased Implementation
2424+2525+ATCR's discovery needs evolve as the network grows. Start simple, scale as needed.
2626+2727+## MVP: Minimal Discovery Service
2828+2929+For initial deployment with a small number of holds (dozens, not thousands), build a **lightweight custom discovery service** focused solely on `io.atcr.*` collections.
3030+3131+### Why Minimal Service for MVP?
3232+3333+- **Scope**: Only index `io.atcr.*` collections (manifests, tags, captain/crew, sailor profiles)
3434+- **Opt-in**: Only crawls PDSs that explicitly call `requestCrawl`
3535+- **Small scale**: Dozens of holds, not millions of users
3636+- **Simple storage**: SQLite sufficient for current scale
3737+- **Cost-effective**: $5-10/month VPS
3838+3939+### Architecture
4040+4141+**Inbound endpoints:**
4242+```
4343+POST /xrpc/com.atproto.sync.requestCrawl
4444+ → Hold registers itself for crawling
4545+4646+GET /xrpc/com.atproto.sync.listReposByCollection?collection=io.atcr.hold.captain
4747+ → AppView discovers holds
4848+```
4949+5050+**Outbound (client to PDS):**
5151+```
5252+1. com.atproto.repo.describeRepo → verify PDS exists
5353+2. com.atproto.sync.getRepo → fetch full CAR file (initial backfill)
5454+3. com.atproto.sync.subscribeRepos → WebSocket for real-time updates
5555+4. Parse events → extract io.atcr.* records → index in SQLite
5656+```
5757+5858+**Data flow:**
5959+6060+**Initial crawl (on requestCrawl):**
6161+```
6262+1. Hold POSTs requestCrawl → service queues crawl job
6363+2. Service fetches getRepo (CAR file) from hold's PDS for backfill
6464+3. Service parses CAR using indigo libraries
6565+4. Service extracts io.atcr.* records (captain, crew, manifests, etc.)
6666+5. Service stores: (did, collection, rkey, record_data) in SQLite
6767+6. Service opens WebSocket to subscribeRepos for this DID
6868+7. Service stores cursor for reconnection handling
6969+```
7070+7171+**Ongoing updates (WebSocket):**
7272+```
7373+1. Receive commit events via subscribeRepos WebSocket
7474+2. Parse event, filter to io.atcr.* collections only
7575+3. Update indexed_records incrementally (insert/update/delete)
7676+4. Update cursor after processing each event
7777+5. On disconnect: reconnect with stored cursor to resume
7878+```
7979+8080+**Discovery (AppView query):**
8181+```
8282+1. AppView GETs listReposByCollection?collection=io.atcr.hold.captain
8383+2. Service queries SQLite WHERE collection='io.atcr.hold.captain'
8484+3. Service returns list of DIDs with that collection
8585+```
8686+8787+### Implementation Requirements
8888+8989+**Technologies:**
9090+- Go (reuse indigo libraries for CAR parsing and WebSocket)
9191+- SQLite (sufficient for dozens/hundreds of holds)
9292+- Standard HTTP server + WebSocket client
9393+9494+**Core components:**
9595+9696+1. **HTTP handlers** (`cmd/atcr-discovery/handlers/`):
9797+ - `requestCrawl` - queue crawl jobs
9898+ - `listReposByCollection` - query indexed collections
9999+100100+2. **Crawler** (`pkg/discovery/crawler.go`):
101101+ - Fetch CAR files from PDSs for initial backfill
102102+ - Parse with `github.com/bluesky-social/indigo/repo`
103103+ - Extract records, filter to `io.atcr.*` only
104104+105105+3. **WebSocket subscriber** (`pkg/discovery/subscriber.go`):
106106+ - WebSocket client for `com.atproto.sync.subscribeRepos`
107107+ - Event parsing and filtering
108108+ - Cursor management and persistence
109109+ - Automatic reconnection with resume
110110+111111+4. **Storage** (`pkg/discovery/storage.go`):
112112+ - SQLite schema for indexed records
113113+ - Indexes on (collection, did) for fast queries
114114+ - Cursor storage for reconnection
115115+116116+5. **Worker** (`pkg/discovery/worker.go`):
117117+ - Background crawl job processor
118118+ - WebSocket connection manager
119119+ - Health monitoring for subscriptions
120120+121121+**Database schema:**
122122+```sql
123123+CREATE TABLE indexed_records (
124124+ did TEXT NOT NULL,
125125+ collection TEXT NOT NULL,
126126+ rkey TEXT NOT NULL,
127127+ record_data TEXT NOT NULL, -- JSON
128128+ indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
129129+ PRIMARY KEY (did, collection, rkey)
130130+);
131131+132132+CREATE INDEX idx_collection ON indexed_records(collection);
133133+CREATE INDEX idx_did ON indexed_records(did);
134134+135135+CREATE TABLE crawl_queue (
136136+ id INTEGER PRIMARY KEY AUTOINCREMENT,
137137+ hostname TEXT NOT NULL UNIQUE,
138138+ did TEXT,
139139+ status TEXT DEFAULT 'pending', -- pending, in_progress, subscribed, failed
140140+ last_crawled_at TIMESTAMP,
141141+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
142142+);
143143+144144+CREATE TABLE subscriptions (
145145+ did TEXT PRIMARY KEY,
146146+ hostname TEXT NOT NULL,
147147+ cursor INTEGER, -- Last processed sequence number
148148+ status TEXT DEFAULT 'active', -- active, disconnected, failed
149149+ last_event_at TIMESTAMP,
150150+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
151151+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
152152+);
153153+```
154154+155155+**Leveraging indigo libraries:**
156156+157157+```go
158158+import (
159159+ "github.com/bluesky-social/indigo/repo"
160160+ "github.com/bluesky-social/indigo/atproto/syntax"
161161+ "github.com/bluesky-social/indigo/events"
162162+ "github.com/gorilla/websocket"
163163+ "github.com/ipfs/go-cid"
164164+)
165165+166166+// Initial backfill: Parse CAR file
167167+r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(carData))
168168+if err != nil {
169169+ return err
170170+}
171171+172172+// Iterate records
173173+err = r.ForEach(ctx, "", func(path string, nodeCid cid.Cid) error {
174174+ // Parse collection from path (e.g., "io.atcr.hold.captain/self")
175175+ parts := strings.Split(path, "/")
176176+ if len(parts) != 2 {
177177+ return nil // skip invalid paths
178178+ }
179179+180180+ collection := parts[0]
181181+ rkey := parts[1]
182182+183183+ // Filter to io.atcr.* only
184184+ if !strings.HasPrefix(collection, "io.atcr.") {
185185+ return nil
186186+ }
187187+188188+ // Get record data
189189+ recordBytes, err := r.GetRecord(ctx, path)
190190+ if err != nil {
191191+ return err
192192+ }
193193+194194+ // Store in database
195195+ return store.IndexRecord(did, collection, rkey, recordBytes)
196196+})
197197+198198+// WebSocket subscription: Listen for updates
199199+wsURL := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", hostname)
200200+conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
201201+if err != nil {
202202+ return err
203203+}
204204+205205+// Read events
206206+rsc := &events.RepoStreamCallbacks{
207207+ RepoCommit: func(evt *events.RepoCommit) error {
208208+ // Filter to io.atcr.* collections only
209209+ for _, op := range evt.Ops {
210210+ if !strings.HasPrefix(op.Collection, "io.atcr.") {
211211+ continue
212212+ }
213213+214214+ // Process create/update/delete operations
215215+ switch op.Action {
216216+ case "create", "update":
217217+ store.IndexRecord(evt.Repo, op.Collection, op.Rkey, op.Record)
218218+ case "delete":
219219+ store.DeleteRecord(evt.Repo, op.Collection, op.Rkey)
220220+ }
221221+ }
222222+223223+ // Update cursor
224224+ return store.UpdateCursor(evt.Repo, evt.Seq)
225225+ },
226226+}
227227+228228+// Process stream
229229+scheduler := events.NewScheduler("discovery-worker", conn.RemoteAddr().String(), rsc)
230230+return events.HandleRepoStream(ctx, conn, scheduler)
231231+```
232232+233233+### Infrastructure Requirements
234234+235235+**Minimum specs:**
236236+- 1 vCPU
237237+- 1-2GB RAM
238238+- 20GB SSD
239239+- Minimal bandwidth (<1GB/day for dozens of holds)
240240+241241+**Estimated cost:**
242242+- Hetzner CX11: €4.15/month (~$5/month)
243243+- DigitalOcean Basic: $6/month
244244+- Fly.io: ~$5-10/month
245245+246246+**Deployment:**
247247+```bash
248248+# Build
249249+go build -o atcr-discovery ./cmd/atcr-discovery
250250+251251+# Run
252252+export DATABASE_PATH="/var/lib/atcr-discovery/discovery.db"
253253+export HTTP_ADDR=":8080"
254254+./atcr-discovery
255255+```
256256+257257+### Limitations
258258+259259+**What it does NOT do:**
260260+- ❌ Serve outbound `subscribeRepos` firehose (AppViews query via listReposByCollection)
261261+- ❌ Full MST validation (trust PDS validation)
262262+- ❌ Scale to millions of accounts (SQLite limits)
263263+- ❌ Multi-instance deployment (single process with SQLite)
264264+265265+**When to migrate to full relay:** When you have 1000+ holds, need PostgreSQL, or multi-instance deployment.
266266+267267+## Future Scale: Full Relay (Sync v1.1)
268268+269269+When ATCR grows beyond dozens of holds and needs real-time indexing, migrate to Bluesky's relay v1.1 implementation.
270270+271271+### When to Upgrade
272272+273273+**Indicators:**
274274+- 100+ holds requesting frequent crawls
275275+- Need real-time updates (re-crawl latency too high)
276276+- Multiple AppView instances need coordinated discovery
277277+- SQLite performance becomes bottleneck
278278+279279+### Relay v1.1 Characteristics
280280+281281+Released May 2025, this is Bluesky's current reference implementation.
282282+283283+**Key features:**
284284+- **Non-archival**: Doesn't mirror full repository data, only processes firehose
285285+- **WebSocket subscriptions**: Real-time updates from PDSs
286286+- **Scalable**: 2 vCPU, 12GB RAM handles ~100M accounts
287287+- **PostgreSQL**: Required for production scale
288288+- **Admin UI**: Web dashboard for management
289289+290290+**Source**: `github.com/bluesky-social/indigo/cmd/relay`
291291+292292+### Migration Path
293293+294294+**Step 1: Deploy relay v1.1**
295295+```bash
296296+git clone https://github.com/bluesky-social/indigo.git
297297+cd indigo
298298+go build -o relay ./cmd/relay
299299+300300+export DATABASE_URL="postgres://relay:password@localhost:5432/atcr_relay"
301301+./relay --admin-password="secure-password"
302302+```
303303+304304+**Step 2: Migrate data**
305305+- Export indexed records from SQLite
306306+- Trigger crawls in relay for all known holds
307307+- Verify relay indexes correctly
308308+309309+**Step 3: Update AppView configuration**
310310+```bash
311311+# Point to new relay
312312+export ATCR_RELAY_ENDPOINT="https://relay.atcr.io"
313313+```
314314+315315+**Step 4: Decommission minimal service**
316316+- Monitor relay for stability
317317+- Shut down old discovery service
318318+319319+### Infrastructure Requirements (Full Relay)
320320+321321+**Minimum specs:**
322322+- 2 vCPU cores
323323+- 12GB RAM
324324+- 100GB SSD
325325+- 30 Mbps bandwidth
326326+327327+**Estimated cost:**
328328+- Hetzner: ~$30-40/month
329329+- DigitalOcean: ~$50/month (with managed PostgreSQL)
330330+- Fly.io: ~$35-50/month
331331+332332+## Collection Indexing: The `collectiondir` Microservice
333333+334334+The `com.atproto.sync.listReposByCollection` endpoint is **not part of the relay core**. It's provided by a separate microservice called **`collectiondir`**.
335335+336336+### What is collectiondir?
337337+338338+- **Separate service** that indexes collections for efficient discovery
339339+- **Optional**: Not required by the ATProto spec, but very useful for AppViews
340340+- **Deployed alongside relay** by Bluesky's public instances
341341+342342+### Current Limitation: did:plc Only?
343343+344344+Based on testing, Bluesky's public relays (with collectiondir) appear to:
345345+- ✅ Index `io.atcr.*` collections from `did:plc` DIDs
346346+- ❌ NOT index `io.atcr.*` collections from `did:web` DIDs
347347+348348+This means:
349349+- ATCR manifests from users (did:plc) are discoverable
350350+- ATCR hold captain records (did:web) are NOT discoverable
351351+- The relay still **stores** all data (CAR file includes did:web records)
352352+- The issue is specifically with **indexing** for `listReposByCollection`
353353+354354+### Configuring collectiondir
355355+356356+Documentation on configuring collectiondir is sparse. Possible approaches:
357357+358358+1. **Fork and modify**: Clone indigo repo, modify collectiondir to index all DIDs
359359+2. **Configuration file**: Check if collectiondir accepts whitelist/configuration for indexed collections
360360+3. **No filtering**: Default behavior might be to index everything, but Bluesky's deployment filters
361361+362362+**Action item**: Review `indigo/cmd/collectiondir` source code to understand configuration options.
363363+364364+## Multi-Relay Strategy
365365+366366+Holds can request crawls from **multiple relays** simultaneously. This enables:
367367+368368+### Scenario: Bluesky + ATCR Relays
369369+370370+**Setup:**
371371+1. Hold deploys with embedded PDS at `did:web:hold01.atcr.io`
372372+2. Hold creates captain record (`io.atcr.hold.captain/self`)
373373+3. Hold requests crawl from **both**:
374374+ - Bluesky relay: `https://bsky.network/xrpc/com.atproto.sync.requestCrawl`
375375+ - ATCR relay: `https://relay.atcr.io/xrpc/com.atproto.sync.requestCrawl`
376376+377377+**Result:**
378378+- ✅ Bluesky relay indexes social posts (if hold owner posts)
379379+- ✅ ATCR relay indexes hold captain records
380380+- ✅ AppViews query ATCR relay for hold discovery
381381+- ✅ Independent networks - Bluesky posts work regardless of ATCR relay
382382+383383+### Request Crawl Script
384384+385385+The existing script can be modified to support multiple relays:
386386+387387+```bash
388388+#!/bin/bash
389389+# deploy/request-crawl.sh
390390+391391+HOSTNAME=$1
392392+BLUESKY_RELAY=${2:-"https://bsky.network"}
393393+ATCR_RELAY=${3:-"https://relay.atcr.io"}
394394+395395+echo "Requesting crawl for $HOSTNAME from Bluesky relay..."
396396+curl -X POST "$BLUESKY_RELAY/xrpc/com.atproto.sync.requestCrawl" \
397397+ -H "Content-Type: application/json" \
398398+ -d "{\"hostname\": \"$HOSTNAME\"}"
399399+400400+echo "Requesting crawl for $HOSTNAME from ATCR relay..."
401401+curl -X POST "$ATCR_RELAY/xrpc/com.atproto.sync.requestCrawl" \
402402+ -H "Content-Type: application/json" \
403403+ -d "{\"hostname\": \"$HOSTNAME\"}"
404404+```
405405+406406+Usage:
407407+```bash
408408+./deploy/request-crawl.sh hold01.atcr.io
409409+```
410410+411411+## Deployment: Minimal Discovery Service
412412+413413+### 1. Infrastructure Setup
414414+415415+**Provision VPS:**
416416+- Hetzner CX11, DigitalOcean Basic, or Fly.io
417417+- Public domain (e.g., `discovery.atcr.io`)
418418+- TLS certificate (Let's Encrypt)
419419+420420+**Configure reverse proxy (optional - nginx):**
421421+```nginx
422422+upstream discovery {
423423+ server 127.0.0.1:8080;
424424+}
425425+426426+server {
427427+ listen 443 ssl http2;
428428+ server_name discovery.atcr.io;
429429+430430+ ssl_certificate /etc/letsencrypt/live/discovery.atcr.io/fullchain.pem;
431431+ ssl_certificate_key /etc/letsencrypt/live/discovery.atcr.io/privkey.pem;
432432+433433+ location / {
434434+ proxy_pass http://discovery;
435435+ proxy_set_header Host $host;
436436+ proxy_set_header X-Real-IP $remote_addr;
437437+ }
438438+}
439439+```
440440+441441+### 2. Build and Deploy
442442+443443+```bash
444444+# Clone ATCR repo
445445+git clone https://github.com/atcr-io/atcr.git
446446+cd atcr
447447+448448+# Build discovery service
449449+go build -o atcr-discovery ./cmd/atcr-discovery
450450+451451+# Run
452452+export DATABASE_PATH="/var/lib/atcr-discovery/discovery.db"
453453+export HTTP_ADDR=":8080"
454454+export CRAWL_INTERVAL="12h"
455455+./atcr-discovery
456456+```
457457+458458+### 3. Update Hold Startup
459459+460460+Each hold should request crawl on startup:
461461+462462+```bash
463463+# In hold startup script or environment
464464+export ATCR_DISCOVERY_URL="https://discovery.atcr.io"
465465+466466+# Request crawl from both Bluesky and ATCR
467467+curl -X POST "https://bsky.network/xrpc/com.atproto.sync.requestCrawl" \
468468+ -H "Content-Type: application/json" \
469469+ -d "{\"hostname\": \"$HOLD_PUBLIC_URL\"}"
470470+471471+curl -X POST "$ATCR_DISCOVERY_URL/xrpc/com.atproto.sync.requestCrawl" \
472472+ -H "Content-Type: application/json" \
473473+ -d "{\"hostname\": \"$HOLD_PUBLIC_URL\"}"
474474+```
475475+476476+### 4. Update AppView Configuration
477477+478478+Point AppView discovery worker to the discovery service:
479479+480480+```bash
481481+# In .env.appview or environment
482482+export ATCR_RELAY_ENDPOINT="https://discovery.atcr.io"
483483+export ATCR_HOLD_DISCOVERY_ENABLED="true"
484484+export ATCR_HOLD_DISCOVERY_INTERVAL="6h"
485485+```
486486+487487+### 5. Monitor and Maintain
488488+489489+**Monitoring:**
490490+- Check crawl queue status
491491+- Monitor SQLite database size
492492+- Track failed crawls
493493+494494+**Maintenance:**
495495+- Re-crawl on schedule (every 6-24 hours)
496496+- Prune stale records (>7 days old)
497497+- Backup SQLite database regularly
498498+499499+## Trade-Offs and Considerations
500500+501501+### Running Your Own Relay
502502+503503+**Pros:**
504504+- ✅ Full control over indexing (can index `did:web` holds)
505505+- ✅ No dependency on third-party relay policies
506506+- ✅ Can customize collection filters for ATCR-specific needs
507507+- ✅ Relatively lightweight with modern relay implementation
508508+509509+**Cons:**
510510+- ❌ Infrastructure cost (~$30-50/month minimum)
511511+- ❌ Operational overhead (monitoring, updates, backups)
512512+- ❌ Need to maintain as network grows
513513+- ❌ Single point of failure for discovery (unless multi-relay)
514514+515515+### Alternatives to Running a Relay
516516+517517+#### 1. Direct Registration API
518518+519519+Holds POST to AppView on startup to register themselves:
520520+521521+**Pros:**
522522+- ✅ Simplest implementation
523523+- ✅ No relay infrastructure needed
524524+- ✅ Immediate registration (no crawl delay)
525525+526526+**Cons:**
527527+- ❌ Ties holds to specific AppView instances
528528+- ❌ Breaks decentralized discovery model
529529+- ❌ Each AppView has different hold registry
530530+531531+#### 2. Static Discovery File
532532+533533+Maintain `https://atcr.io/.well-known/holds.json`:
534534+535535+**Pros:**
536536+- ✅ No infrastructure beyond static hosting
537537+- ✅ All AppViews share same registry
538538+- ✅ Simple to implement
539539+540540+**Cons:**
541541+- ❌ Manual process (PRs/issues to add holds)
542542+- ❌ Not real-time discovery
543543+- ❌ Centralized control point
544544+545545+#### 3. Hybrid Approach
546546+547547+Combine multiple discovery mechanisms:
548548+549549+```go
550550+func (w *HoldDiscoveryWorker) DiscoverHolds(ctx context.Context) error {
551551+ // 1. Fetch static registry
552552+ staticHolds := w.fetchStaticRegistry()
553553+554554+ // 2. Query relay (if available)
555555+ relayHolds := w.queryRelay(ctx)
556556+557557+ // 3. Accept direct registrations
558558+ registeredHolds := w.getDirectRegistrations()
559559+560560+ // Merge and deduplicate
561561+ allHolds := mergeHolds(staticHolds, relayHolds, registeredHolds)
562562+563563+ // Cache in database
564564+ for _, hold := range allHolds {
565565+ w.cacheHold(hold)
566566+ }
567567+}
568568+```
569569+570570+**Pros:**
571571+- ✅ Multiple discovery paths (resilient)
572572+- ✅ Gradual migration to relay-based discovery
573573+- ✅ Supports both centralized bootstrap and decentralized growth
574574+575575+**Cons:**
576576+- ❌ More complex implementation
577577+- ❌ Potential for stale data if sources conflict
578578+579579+## Recommendations for ATCR
580580+581581+### Phase 1: MVP (Now - 1000 holds)
582582+583583+**Build minimal discovery service with WebSocket** (~$5-10/month):
584584+1. Implement `requestCrawl` + `listReposByCollection` endpoints
585585+2. Initial backfill via `getRepo` (CAR file parsing)
586586+3. Real-time updates via WebSocket `subscribeRepos`
587587+4. SQLite storage with cursor management
588588+5. Filter to `io.atcr.*` collections only
589589+590590+**Deliverables:**
591591+- `cmd/atcr-discovery` service
592592+- SQLite schema with cursor storage
593593+- CAR file parser (indigo libraries)
594594+- WebSocket subscriber with reconnection
595595+- Deployment scripts
596596+597597+**Cost**: ~$5-10/month VPS
598598+599599+**Why**: Minimal infrastructure, real-time updates, full control over indexing, sufficient for hundreds of holds.
600600+601601+### Phase 2: Migrate to Full Relay (1000+ holds)
602602+603603+**Deploy Bluesky relay v1.1** when scaling needed (~$30-50/month):
604604+1. Set up PostgreSQL database
605605+2. Deploy indigo relay with admin UI
606606+3. Migrate indexed data from SQLite
607607+4. Configure for `io.atcr.*` collection filtering (if possible)
608608+5. Handle thousands of concurrent WebSocket connections
609609+610610+**Cost**: ~$30-50/month
611611+612612+**Why**: Proven scalability to 100M+ accounts, standardized protocol, community support, production-ready infrastructure.
613613+614614+### Phase 3: Multi-Relay Federation (Future)
615615+616616+**Decentralized relay network:**
617617+1. Multiple ATCR relays operated independently
618618+2. AppViews query multiple relays (fallback/redundancy)
619619+3. Holds request crawls from all known ATCR relays
620620+4. Cross-relay synchronization (optional)
621621+622622+**Why**: No single point of failure, fully decentralized discovery, geographic distribution.
623623+624624+## Next Steps
625625+626626+### For MVP Implementation
627627+628628+1. **Create `cmd/atcr-discovery` package structure**
629629+ - HTTP handlers for XRPC endpoints (`requestCrawl`, `listReposByCollection`)
630630+ - Crawler with indigo CAR parsing for initial backfill
631631+ - WebSocket subscriber for real-time updates
632632+ - SQLite storage layer with cursor management
633633+ - Background worker for managing subscriptions
634634+635635+2. **Database schema**
636636+ - `indexed_records` table for collection data
637637+ - `crawl_queue` table for crawl job management
638638+ - `subscriptions` table for WebSocket cursor tracking
639639+ - Indexes for efficient queries
640640+641641+3. **WebSocket implementation**
642642+ - Use `github.com/bluesky-social/indigo/events` for event handling
643643+ - Implement reconnection logic with cursor resume
644644+ - Filter events to `io.atcr.*` collections only
645645+ - Health monitoring for active subscriptions
646646+647647+4. **Testing strategy**
648648+ - Unit tests for CAR parsing
649649+ - Unit tests for event filtering
650650+ - Integration tests with mock PDSs and WebSocket
651651+ - Connection failure and reconnection testing
652652+ - Load testing with SQLite
653653+654654+5. **Deployment**
655655+ - Dockerfile for discovery service
656656+ - Deployment scripts (systemd, docker-compose)
657657+ - Monitoring setup (logs, metrics, WebSocket health)
658658+ - Alert on subscription failures
659659+660660+6. **Documentation**
661661+ - API documentation for XRPC endpoints
662662+ - Deployment guide
663663+ - Troubleshooting guide (WebSocket connection issues)
664664+665665+### Open Questions
666666+667667+1. **CAR parsing edge cases**: How to handle malformed CAR files or invalid records?
668668+2. **WebSocket reconnection**: What's the optimal backoff strategy for reconnection attempts?
669669+3. **Subscription management**: How many concurrent WebSocket connections can SQLite handle?
670670+4. **Rate limiting**: Should discovery service rate-limit requestCrawl to prevent abuse?
671671+5. **Authentication**: Should requestCrawl require authentication, or remain open?
672672+6. **Cursor storage**: Should cursors be persisted immediately or batched for performance?
673673+7. **Monitoring**: What metrics are most important for operational visibility (active subs, event rate, lag)?
674674+8. **Error handling**: When a WebSocket dies, should we re-backfill via getRepo or trust cursor resume?
675675+676676+## References
677677+678678+### ATProto Specifications
679679+- [ATProto Sync Specification](https://atproto.com/specs/sync)
680680+- [Repository Specification](https://atproto.com/specs/repository)
681681+- [CAR File Format](https://ipld.io/specs/transport/car/)
682682+683683+### Indigo Libraries
684684+- [Indigo Repository](https://github.com/bluesky-social/indigo)
685685+- [Indigo Repo Package](https://pkg.go.dev/github.com/bluesky-social/indigo/repo)
686686+- [Indigo ATProto Package](https://pkg.go.dev/github.com/bluesky-social/indigo/atproto)
687687+688688+### Relay Reference (Future)
689689+- [Relay v1.1 Updates](https://docs.bsky.app/blog/relay-sync-updates)
690690+- [Indigo Relay Implementation](https://github.com/bluesky-social/indigo/tree/main/cmd/relay)
691691+- [Running a Full-Network Relay](https://whtwnd.com/bnewbold.net/3kwzl7tye6u2y)
+7-1
pkg/appview/db/device_store.go
···417417func generateUserCode() string {
418418 chars := "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
419419 code := make([]byte, 8)
420420- rand.Read(code)
420420+ if _, err := rand.Read(code); err != nil {
421421+ // Fallback to timestamp-based generation if crypto rand fails
422422+ now := time.Now().UnixNano()
423423+ for i := range code {
424424+ code[i] = byte(now >> (i * 8))
425425+ }
426426+ }
421427 for i := range code {
422428 code[i] = chars[int(code[i])%len(chars)]
423429 }
+3-3
pkg/appview/db/oauth_store_test.go
···8585 }
86868787 // Verify mismatched session was deleted
8888- retrieved, err = store.GetSession(ctx, mismatchedSession.AccountDID, mismatchedSession.SessionID)
8888+ _, err = store.GetSession(ctx, mismatchedSession.AccountDID, mismatchedSession.SessionID)
8989 if err == nil {
9090 t.Error("Expected session to be deleted (should error), but got no error")
9191 }
···154154 }
155155156156 // Verify malformed session was deleted
157157- retrieved, err = store.GetSession(ctx, parsedDID, "malformed")
157157+ _, err = store.GetSession(ctx, parsedDID, "malformed")
158158 if err == nil {
159159 t.Error("Expected malformed session to be deleted, but got no error")
160160 }
···284284 }
285285286286 // Verify deletion
287287- retrieved, err = store.GetSession(ctx, did, "test_session_id")
287287+ _, err = store.GetSession(ctx, did, "test_session_id")
288288 if err == nil {
289289 t.Error("Expected error after deletion, got nil")
290290 }
+3-1
pkg/appview/db/readonly_test.go
···1313 dbPath := filepath.Join(tmpDir, "test.db")
14141515 // Set environment for database path
1616- os.Setenv("ATCR_UI_DATABASE_PATH", dbPath)
1616+ if err := os.Setenv("ATCR_UI_DATABASE_PATH", dbPath); err != nil {
1717+ t.Fatalf("Failed to set environment variable: %v", err)
1818+ }
1719 defer os.Unsetenv("ATCR_UI_DATABASE_PATH")
18201921 // Initialize database (creates schema)
+1-1
pkg/appview/jetstream/backfill.go
···402402 }
403403404404 // Update annotations from newest manifest only
405405- if manifestRecord.Annotations != nil && len(manifestRecord.Annotations) > 0 {
405405+ if len(manifestRecord.Annotations) > 0 {
406406 // Filter out empty annotations
407407 hasData := false
408408 for _, value := range manifestRecord.Annotations {
+7-2
pkg/appview/middleware/registry.go
···2626 "atcr.io/pkg/auth/token"
2727)
28282929+// holdDIDKey is the context key for storing hold DID
3030+const holdDIDKey contextKey = "hold.did"
3131+2932// Global variables for initialization only
3033// These are set by main.go during startup and copied into NamespaceResolver instances.
3134// After initialization, request handling uses the NamespaceResolver's instance fields.
···131134 }
132135133136 did := ident.DID.String()
137137+ handle := ident.Handle.String()
134138 pdsEndpoint := ident.PDSEndpoint()
135139 if pdsEndpoint == "" {
136140 return nil, fmt.Errorf("no PDS endpoint found for %s", identityStr)
137141 }
138142139139- fmt.Printf("DEBUG [registry/middleware]: Resolved identity: did=%s, pds=%s, handle=%s\n", did, pdsEndpoint, ident.Handle.String())
143143+ fmt.Printf("DEBUG [registry/middleware]: Resolved identity: did=%s, pds=%s, handle=%s\n", did, pdsEndpoint, handle)
140144141145 // Query for hold DID - either user's hold or default hold service
142146 holdDID := nr.findHoldDID(ctx, did, pdsEndpoint)
···144148 // This is a fatal configuration error - registry cannot function without a hold service
145149 return nil, fmt.Errorf("no hold DID configured: ensure default_hold_did is set in middleware config")
146150 }
147147- ctx = context.WithValue(ctx, "hold.did", holdDID)
151151+ ctx = context.WithValue(ctx, holdDIDKey, holdDID)
148152149153 // Get service token for hold authentication
150154 // Check cache first to avoid unnecessary PDS calls on every request
···308312 // Bundle all context into a single RegistryContext struct
309313 registryCtx := &storage.RegistryContext{
310314 DID: did,
315315+ Handle: handle,
311316 HoldDID: holdDID,
312317 PDSEndpoint: pdsEndpoint,
313318 Repository: repositoryName,
+1
pkg/appview/storage/context.go
···1717type RegistryContext struct {
1818 // Per-request identity and routing information
1919 DID string // User's DID (e.g., "did:plc:abc123")
2020+ Handle string // User's handle (e.g., "alice.bsky.social")
2021 HoldDID string // Hold service DID (e.g., "did:web:hold01.atcr.io")
2122 PDSEndpoint string // User's PDS endpoint URL
2223 Repository string // Image repository name (e.g., "debian")
+372
pkg/appview/storage/manifest_store.go
···11+package storage
22+33+import (
44+ "bytes"
55+ "context"
66+ "encoding/json"
77+ "errors"
88+ "fmt"
99+ "io"
1010+ "maps"
1111+ "net/http"
1212+ "strings"
1313+1414+ "atcr.io/pkg/atproto"
1515+ "github.com/distribution/distribution/v3"
1616+ "github.com/opencontainers/go-digest"
1717+)
1818+1919+// HoldNotifier interface for notifying holds about manifest uploads
2020+type HoldNotifier interface {
2121+ GetServiceToken(ctx context.Context, userDID, audienceDID string) (string, error)
2222+}
2323+2424+// ManifestStore implements distribution.ManifestService
2525+// It stores manifests in ATProto as records
2626+type ManifestStore struct {
2727+ ctx *RegistryContext // Context with user/hold info
2828+ notifier HoldNotifier // OAuth refresher for getting service tokens
2929+ lastFetchedHoldDID string // Hold DID from most recently fetched manifest (for pull)
3030+ blobStore distribution.BlobStore // Blob store for fetching config during push
3131+}
3232+3333+// NewManifestStore creates a new ATProto-backed manifest store
3434+func NewManifestStore(ctx *RegistryContext, notifier HoldNotifier, blobStore distribution.BlobStore) *ManifestStore {
3535+ return &ManifestStore{
3636+ ctx: ctx,
3737+ notifier: notifier,
3838+ blobStore: blobStore,
3939+ }
4040+}
4141+4242+// Exists checks if a manifest exists by digest
4343+func (s *ManifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
4444+ rkey := digestToRKey(dgst)
4545+ _, err := s.ctx.ATProtoClient.GetRecord(ctx, atproto.ManifestCollection, rkey)
4646+ if err != nil {
4747+ // If not found, return false without error
4848+ if errors.Is(err, atproto.ErrRecordNotFound) {
4949+ return false, nil
5050+ }
5151+ return false, err
5252+ }
5353+ return true, nil
5454+}
5555+5656+// Get retrieves a manifest by digest
5757+func (s *ManifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
5858+ rkey := digestToRKey(dgst)
5959+ record, err := s.ctx.ATProtoClient.GetRecord(ctx, atproto.ManifestCollection, rkey)
6060+ if err != nil {
6161+ return nil, distribution.ErrManifestUnknownRevision{
6262+ Name: s.ctx.Repository,
6363+ Revision: dgst,
6464+ }
6565+ }
6666+6767+ var manifestRecord atproto.ManifestRecord
6868+ if err := json.Unmarshal(record.Value, &manifestRecord); err != nil {
6969+ return nil, fmt.Errorf("failed to unmarshal manifest record: %w", err)
7070+ }
7171+7272+ // Store the hold DID for subsequent blob requests during pull
7373+ // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format)
7474+ // The routing repository will cache this for concurrent blob fetches
7575+ if manifestRecord.HoldDID != "" {
7676+ // New format: DID reference (preferred)
7777+ s.lastFetchedHoldDID = manifestRecord.HoldDID
7878+ } else if manifestRecord.HoldEndpoint != "" {
7979+ // Legacy format: URL reference - convert to DID
8080+ s.lastFetchedHoldDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
8181+ }
8282+8383+ var ociManifest []byte
8484+8585+ // New records: Download blob from ATProto blob storage
8686+ if manifestRecord.ManifestBlob != nil && manifestRecord.ManifestBlob.Ref.Link != "" {
8787+ ociManifest, err = s.ctx.ATProtoClient.GetBlob(ctx, manifestRecord.ManifestBlob.Ref.Link)
8888+ if err != nil {
8989+ return nil, fmt.Errorf("failed to download manifest blob: %w", err)
9090+ }
9191+ }
9292+9393+ // Track pull count (increment asynchronously to avoid blocking the response)
9494+ if s.ctx.Database != nil {
9595+ go func() {
9696+ if err := s.ctx.Database.IncrementPullCount(s.ctx.DID, s.ctx.Repository); err != nil {
9797+ fmt.Printf("WARNING: Failed to increment pull count for %s/%s: %v\n", s.ctx.DID, s.ctx.Repository, err)
9898+ }
9999+ }()
100100+ }
101101+102102+ // Parse the manifest based on media type
103103+ // For now, we'll return the raw bytes wrapped in a manifest object
104104+ // In a full implementation, you'd use distribution's manifest parsing
105105+ return &rawManifest{
106106+ mediaType: manifestRecord.MediaType,
107107+ payload: ociManifest,
108108+ }, nil
109109+}
110110+111111+// Put stores a manifest
112112+func (s *ManifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
113113+ // Get the manifest payload (raw bytes)
114114+ mediaType, payload, err := manifest.Payload()
115115+ if err != nil {
116116+ return "", err
117117+ }
118118+119119+ // Calculate digest
120120+ dgst := digest.FromBytes(payload)
121121+122122+ // Upload manifest as blob to PDS
123123+ blobRef, err := s.ctx.ATProtoClient.UploadBlob(ctx, payload, mediaType)
124124+ if err != nil {
125125+ return "", fmt.Errorf("failed to upload manifest blob: %w", err)
126126+ }
127127+128128+ // Create manifest record with structured metadata
129129+ manifestRecord, err := atproto.NewManifestRecord(s.ctx.Repository, dgst.String(), payload)
130130+ if err != nil {
131131+ return "", fmt.Errorf("failed to create manifest record: %w", err)
132132+ }
133133+134134+ // Set the blob reference, hold DID, and hold endpoint
135135+ manifestRecord.ManifestBlob = blobRef
136136+ manifestRecord.HoldDID = s.ctx.HoldDID // Primary reference (DID)
137137+138138+ // Resolve hold endpoint from DID for backward compatibility
139139+ if holdEndpoint, err := resolveDIDToHTTPSEndpoint(s.ctx.HoldDID); err == nil {
140140+ manifestRecord.HoldEndpoint = holdEndpoint // Legacy reference (URL) for backward compat
141141+ }
142142+143143+ // Extract Dockerfile labels from config blob and add to annotations
144144+ // Only for image manifests (not manifest lists which don't have config blobs)
145145+ isManifestList := strings.Contains(manifestRecord.MediaType, "manifest.list") ||
146146+ strings.Contains(manifestRecord.MediaType, "image.index")
147147+148148+ if !isManifestList && s.blobStore != nil && manifestRecord.Config != nil && manifestRecord.Config.Digest != "" {
149149+ labels, err := s.extractConfigLabels(ctx, manifestRecord.Config.Digest)
150150+ if err != nil {
151151+ // Log error but don't fail the push - labels are optional
152152+ fmt.Printf("WARNING: Failed to extract config labels: %v\n", err)
153153+ } else {
154154+ // Initialize annotations map if needed
155155+ if manifestRecord.Annotations == nil {
156156+ manifestRecord.Annotations = make(map[string]string)
157157+ }
158158+159159+ // Copy labels to annotations (Dockerfile LABELs → manifest annotations)
160160+ maps.Copy(manifestRecord.Annotations, labels)
161161+162162+ fmt.Printf("DEBUG: Extracted %d labels from config blob\n", len(labels))
163163+ }
164164+ }
165165+166166+ // Store manifest record in ATProto
167167+ rkey := digestToRKey(dgst)
168168+ _, err = s.ctx.ATProtoClient.PutRecord(ctx, atproto.ManifestCollection, rkey, manifestRecord)
169169+ if err != nil {
170170+ return "", fmt.Errorf("failed to store manifest record in ATProto: %w", err)
171171+ }
172172+173173+ // Track push count (increment asynchronously to avoid blocking the response)
174174+ if s.ctx.Database != nil {
175175+ go func() {
176176+ if err := s.ctx.Database.IncrementPushCount(s.ctx.DID, s.ctx.Repository); err != nil {
177177+ fmt.Printf("WARNING: Failed to increment push count for %s/%s: %v\n", s.ctx.DID, s.ctx.Repository, err)
178178+ }
179179+ }()
180180+ }
181181+182182+ // Also handle tag if specified
183183+ var tag string
184184+ for _, option := range options {
185185+ if tagOpt, ok := option.(distribution.WithTagOption); ok {
186186+ tag = tagOpt.Tag
187187+ tagRecord := atproto.NewTagRecord(s.ctx.ATProtoClient.DID(), s.ctx.Repository, tag, dgst.String())
188188+ tagRKey := atproto.RepositoryTagToRKey(s.ctx.Repository, tag)
189189+ _, err = s.ctx.ATProtoClient.PutRecord(ctx, atproto.TagCollection, tagRKey, tagRecord)
190190+ if err != nil {
191191+ return "", fmt.Errorf("failed to store tag in ATProto: %w", err)
192192+ }
193193+ }
194194+ }
195195+196196+ // Notify hold about manifest upload (for layer tracking and Bluesky posts)
197197+ // Do this asynchronously to avoid blocking the push
198198+ if tag != "" && s.notifier != nil && s.ctx.Handle != "" {
199199+ go func() {
200200+ if err := s.notifyHoldAboutManifest(context.Background(), manifestRecord, tag, dgst.String()); err != nil {
201201+ fmt.Printf("WARNING: Failed to notify hold about manifest: %v\n", err)
202202+ }
203203+ }()
204204+ }
205205+206206+ return dgst, nil
207207+}
208208+209209+// Delete removes a manifest
210210+func (s *ManifestStore) Delete(ctx context.Context, dgst digest.Digest) error {
211211+ rkey := digestToRKey(dgst)
212212+ return s.ctx.ATProtoClient.DeleteRecord(ctx, atproto.ManifestCollection, rkey)
213213+}
214214+215215+// digestToRKey converts a digest to an ATProto record key
216216+// ATProto rkeys must be valid strings, so we use the digest string without the algorithm prefix
217217+func digestToRKey(dgst digest.Digest) string {
218218+ // Remove the algorithm prefix (e.g., "sha256:")
219219+ return dgst.Encoded()
220220+}
221221+222222+// GetLastFetchedHoldDID returns the hold DID from the most recently fetched manifest
223223+// This is used by the routing repository to cache the hold for blob requests
224224+func (s *ManifestStore) GetLastFetchedHoldDID() string {
225225+ return s.lastFetchedHoldDID
226226+}
227227+228228+// rawManifest is a simple implementation of distribution.Manifest
229229+type rawManifest struct {
230230+ mediaType string
231231+ payload []byte
232232+}
233233+234234+func (m *rawManifest) References() []distribution.Descriptor {
235235+ // TODO: Parse the manifest and return actual references
236236+ return nil
237237+}
238238+239239+func (m *rawManifest) Payload() (string, []byte, error) {
240240+ return m.mediaType, m.payload, nil
241241+}
242242+243243+// extractConfigLabels fetches the image config blob and extracts Dockerfile LABELs
244244+func (s *ManifestStore) extractConfigLabels(ctx context.Context, configDigestStr string) (map[string]string, error) {
245245+ // Parse digest string
246246+ configDigest, err := digest.Parse(configDigestStr)
247247+ if err != nil {
248248+ return nil, fmt.Errorf("invalid config digest: %w", err)
249249+ }
250250+251251+ // Fetch config blob from storage
252252+ configData, err := s.blobStore.Get(ctx, configDigest)
253253+ if err != nil {
254254+ return nil, fmt.Errorf("failed to fetch config blob: %w", err)
255255+ }
256256+257257+ // Parse config JSON
258258+ var configJSON struct {
259259+ Config struct {
260260+ Labels map[string]string `json:"Labels"`
261261+ } `json:"config"`
262262+ }
263263+264264+ if err := json.Unmarshal(configData, &configJSON); err != nil {
265265+ return nil, fmt.Errorf("failed to parse config JSON: %w", err)
266266+ }
267267+268268+ return configJSON.Config.Labels, nil
269269+}
270270+271271+// resolveDIDToHTTPSEndpoint resolves a DID to an HTTPS endpoint
272272+// Currently supports did:web only (e.g., did:web:hold01.atcr.io → https://hold01.atcr.io)
273273+func resolveDIDToHTTPSEndpoint(did string) (string, error) {
274274+ if !strings.HasPrefix(did, "did:web:") {
275275+ return "", fmt.Errorf("only did:web is supported, got: %s", did)
276276+ }
277277+278278+ // Extract hostname from did:web
279279+ hostname := strings.TrimPrefix(did, "did:web:")
280280+281281+ // Handle port notation (did:web:example.com:8080 → https://example.com:8080)
282282+ hostname = strings.ReplaceAll(hostname, ":", ":")
283283+284284+ return "https://" + hostname, nil
285285+}
286286+287287+// notifyHoldAboutManifest notifies the hold service about a manifest upload
288288+// This enables the hold to create layer records and Bluesky posts
289289+func (s *ManifestStore) notifyHoldAboutManifest(ctx context.Context, manifestRecord *atproto.ManifestRecord, tag, manifestDigest string) error {
290290+ // Skip if no notifier configured
291291+ if s.notifier == nil {
292292+ return nil
293293+ }
294294+295295+ // Resolve hold DID to HTTP endpoint
296296+ // For did:web, this is straightforward (e.g., did:web:hold01.atcr.io → https://hold01.atcr.io)
297297+ holdEndpoint, err := resolveDIDToHTTPSEndpoint(s.ctx.HoldDID)
298298+ if err != nil {
299299+ return fmt.Errorf("failed to resolve hold DID %s: %w", s.ctx.HoldDID, err)
300300+ }
301301+302302+ // Get service token from user's PDS for hold authentication
303303+ serviceToken, err := s.notifier.GetServiceToken(ctx, s.ctx.DID, s.ctx.HoldDID)
304304+ if err != nil {
305305+ return fmt.Errorf("failed to get service token: %w", err)
306306+ }
307307+308308+ // Build notification request
309309+ notifyReq := map[string]any{
310310+ "repository": s.ctx.Repository,
311311+ "tag": tag,
312312+ "userDid": s.ctx.DID,
313313+ "userHandle": s.ctx.Handle,
314314+ "manifest": map[string]any{
315315+ "mediaType": manifestRecord.MediaType,
316316+ "config": map[string]any{
317317+ "digest": manifestRecord.Config.Digest,
318318+ "size": manifestRecord.Config.Size,
319319+ },
320320+ "layers": func() []map[string]any {
321321+ layers := make([]map[string]any, len(manifestRecord.Layers))
322322+ for i, layer := range manifestRecord.Layers {
323323+ layers[i] = map[string]any{
324324+ "digest": layer.Digest,
325325+ "size": layer.Size,
326326+ "mediaType": layer.MediaType,
327327+ }
328328+ }
329329+ return layers
330330+ }(),
331331+ },
332332+ }
333333+334334+ // Marshal request
335335+ reqBody, err := json.Marshal(notifyReq)
336336+ if err != nil {
337337+ return fmt.Errorf("failed to marshal notification request: %w", err)
338338+ }
339339+340340+ // Send notification to hold
341341+ req, err := http.NewRequestWithContext(
342342+ ctx,
343343+ "POST",
344344+ holdEndpoint+atproto.HoldNotifyManifest,
345345+ bytes.NewReader(reqBody),
346346+ )
347347+ if err != nil {
348348+ return fmt.Errorf("failed to create HTTP request: %w", err)
349349+ }
350350+351351+ req.Header.Set("Content-Type", "application/json")
352352+ req.Header.Set("Authorization", "Bearer "+serviceToken)
353353+354354+ resp, err := http.DefaultClient.Do(req)
355355+ if err != nil {
356356+ return fmt.Errorf("failed to send notification: %w", err)
357357+ }
358358+ defer resp.Body.Close()
359359+360360+ if resp.StatusCode != http.StatusOK {
361361+ body, _ := io.ReadAll(resp.Body)
362362+ return fmt.Errorf("hold notification failed: status %d, body: %s", resp.StatusCode, body)
363363+ }
364364+365365+ // Parse response (optional logging)
366366+ var notifyResp map[string]any
367367+ if err := json.NewDecoder(resp.Body).Decode(¬ifyResp); err == nil {
368368+ fmt.Printf("INFO: Hold notification successful for %s:%s - %+v\n", s.ctx.Repository, tag, notifyResp)
369369+ }
370370+371371+ return nil
372372+}
+10-11
pkg/appview/storage/proxy_blob_store.go
···564564565565// ProxyBlobWriter implements distribution.BlobWriter for proxy uploads using multipart upload
566566type ProxyBlobWriter struct {
567567- store *ProxyBlobStore
568568- options distribution.CreateOptions
569569- uploadID string // S3 multipart upload ID
570570- parts []CompletedPart // Track uploaded parts with ETags
571571- partNumber int // Current part number (starts at 1)
572572- buffer *bytes.Buffer // Buffer for current part
573573- size int64 // Total bytes written
574574- closed bool
575575- id string // Distribution's upload ID (for state)
576576- startedAt time.Time
577577- finalDigest string // Set on Commit
567567+ store *ProxyBlobStore
568568+ options distribution.CreateOptions
569569+ uploadID string // S3 multipart upload ID
570570+ parts []CompletedPart // Track uploaded parts with ETags
571571+ partNumber int // Current part number (starts at 1)
572572+ buffer *bytes.Buffer // Buffer for current part
573573+ size int64 // Total bytes written
574574+ closed bool
575575+ id string // Distribution's upload ID (for state)
576576+ startedAt time.Time
578577}
579578580579// ID returns the upload ID
+1-2
pkg/appview/storage/proxy_blob_store_test.go
···313313 testTokenStr := "eyJhbGciOiJIUzI1NiJ9." + base64URLEncode(testPayload) + ".signature"
314314 token.SetServiceToken(userDID, holdDID, testTokenStr)
315315316316- b.ResetTimer()
317317- for i := 0; i < b.N; i++ {
316316+ for b.Loop() {
318317 cachedToken, expiresAt := token.GetServiceToken(userDID, holdDID)
319318320319 if cachedToken == "" || time.Now().After(expiresAt) {
+62-16
pkg/appview/storage/routing_repository.go
···2233import (
44 "context"
55+ "encoding/json"
56 "fmt"
77+ "io"
88+ "net/http"
69 "time"
71088- "atcr.io/pkg/atproto"
1111+ "atcr.io/pkg/auth/oauth"
912 "github.com/distribution/distribution/v3"
1013)
1114···1316// The registry (AppView) is stateless and NEVER stores blobs locally
1417type RoutingRepository struct {
1518 distribution.Repository
1616- Ctx *RegistryContext // All context and services (exported for token updates)
1717- manifestStore *atproto.ManifestStore // Cached manifest store instance
1818- blobStore *ProxyBlobStore // Cached blob store instance
1919+ Ctx *RegistryContext // All context and services (exported for token updates)
2020+ manifestStore *ManifestStore // Cached manifest store instance
2121+ blobStore *ProxyBlobStore // Cached blob store instance
2222+}
2323+2424+// refresherAdapter adapts the oauth.Refresher to implement atproto.HoldNotifier
2525+type refresherAdapter struct {
2626+ refresher *oauth.Refresher
2727+ pdsEndpoint string
2828+}
2929+3030+// GetServiceToken implements atproto.HoldNotifier
3131+func (r *refresherAdapter) GetServiceToken(ctx context.Context, userDID, audienceDID string) (string, error) {
3232+ // Get OAuth session for the user
3333+ session, err := r.refresher.GetSession(ctx, userDID)
3434+ if err != nil {
3535+ return "", fmt.Errorf("failed to get OAuth session: %w", err)
3636+ }
3737+3838+ // Build service auth URL
3939+ serviceAuthURL := fmt.Sprintf("%s/xrpc/com.atproto.server.getServiceAuth?aud=%s", r.pdsEndpoint, audienceDID)
4040+4141+ req, err := http.NewRequestWithContext(ctx, "GET", serviceAuthURL, nil)
4242+ if err != nil {
4343+ return "", fmt.Errorf("failed to create request: %w", err)
4444+ }
4545+4646+ // Use session's DoWithAuth to handle OAuth authentication automatically
4747+ resp, err := session.DoWithAuth(session.Client, req, "com.atproto.server.getServiceAuth")
4848+ if err != nil {
4949+ return "", fmt.Errorf("failed to request service token: %w", err)
5050+ }
5151+ defer resp.Body.Close()
5252+5353+ if resp.StatusCode != http.StatusOK {
5454+ body, _ := io.ReadAll(resp.Body)
5555+ return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, body)
5656+ }
5757+5858+ var result struct {
5959+ Token string `json:"token"`
6060+ }
6161+ if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
6262+ return "", fmt.Errorf("failed to decode response: %w", err)
6363+ }
6464+6565+ return result.Token, nil
1966}
20672168// NewRoutingRepository creates a new routing repository
···3380 // Ensure blob store is created first (needed for label extraction during push)
3481 blobStore := r.Blobs(ctx)
35823636- // ManifestStore needs both DID and URL for backward compat (legacy holdEndpoint field)
3737- // For now, pass holdDID twice (will be cleaned up in manifest_store.go later)
3838- r.manifestStore = atproto.NewManifestStore(
3939- r.Ctx.ATProtoClient,
4040- r.Ctx.Repository,
4141- r.Ctx.HoldDID,
4242- r.Ctx.HoldDID,
4343- r.Ctx.DID,
4444- blobStore,
4545- r.Ctx.Database,
4646- )
8383+ // Wrap the Refresher in an adapter to implement HoldNotifier
8484+ var notifier HoldNotifier
8585+ if r.Ctx.Refresher != nil {
8686+ notifier = &refresherAdapter{
8787+ refresher: r.Ctx.Refresher,
8888+ pdsEndpoint: r.Ctx.PDSEndpoint,
8989+ }
9090+ }
9191+9292+ r.manifestStore = NewManifestStore(r.Ctx, notifier, blobStore)
4793 }
48944995 // After any manifest operation, cache the hold DID for blob fetches
···102148// Tags returns the tag service
103149// Tags are stored in ATProto as io.atcr.tag records
104150func (r *RoutingRepository) Tags(ctx context.Context) distribution.TagService {
105105- return atproto.NewTagStore(r.Ctx.ATProtoClient, r.Ctx.Repository)
151151+ return NewTagStore(r.Ctx.ATProtoClient, r.Ctx.Repository)
106152}
···300300 }
301301302302 cw := cbg.NewCborWriter(w)
303303- fieldCount := 7
303303+ fieldCount := 8
304304305305 if t.Region == "" {
306306 fieldCount--
···466466 if err := cbg.WriteBool(w, t.AllowAllCrew); err != nil {
467467 return err
468468 }
469469+470470+ // t.EnableManifestPosts (bool) (bool)
471471+ if len("enableManifestPosts") > 8192 {
472472+ return xerrors.Errorf("Value in field \"enableManifestPosts\" was too long")
473473+ }
474474+475475+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("enableManifestPosts"))); err != nil {
476476+ return err
477477+ }
478478+ if _, err := cw.WriteString(string("enableManifestPosts")); err != nil {
479479+ return err
480480+ }
481481+482482+ if err := cbg.WriteBool(w, t.EnableManifestPosts); err != nil {
483483+ return err
484484+ }
469485 return nil
470486}
471487···494510495511 n := extra
496512497497- nameBuf := make([]byte, 12)
513513+ nameBuf := make([]byte, 19)
498514 for i := uint64(0); i < n; i++ {
499515 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 8192)
500516 if err != nil {
···600616 t.AllowAllCrew = true
601617 default:
602618 return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
619619+ }
620620+ // t.EnableManifestPosts (bool) (bool)
621621+ case "enableManifestPosts":
622622+623623+ maj, extra, err = cr.ReadHeader()
624624+ if err != nil {
625625+ return err
626626+ }
627627+ if maj != cbg.MajOther {
628628+ return fmt.Errorf("booleans must be major type 7")
629629+ }
630630+ switch extra {
631631+ case 20:
632632+ t.EnableManifestPosts = false
633633+ case 21:
634634+ t.EnableManifestPosts = true
635635+ default:
636636+ return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
637637+ }
638638+639639+ default:
640640+ // Field doesn't exist on this type, so ignore it
641641+ if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
642642+ return err
643643+ }
644644+ }
645645+ }
646646+647647+ return nil
648648+}
649649+func (t *LayerRecord) MarshalCBOR(w io.Writer) error {
650650+ if t == nil {
651651+ _, err := w.Write(cbg.CborNull)
652652+ return err
653653+ }
654654+655655+ cw := cbg.NewCborWriter(w)
656656+657657+ if _, err := cw.Write([]byte{168}); err != nil {
658658+ return err
659659+ }
660660+661661+ // t.Size (int64) (int64)
662662+ if len("size") > 8192 {
663663+ return xerrors.Errorf("Value in field \"size\" was too long")
664664+ }
665665+666666+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("size"))); err != nil {
667667+ return err
668668+ }
669669+ if _, err := cw.WriteString(string("size")); err != nil {
670670+ return err
671671+ }
672672+673673+ if t.Size >= 0 {
674674+ if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Size)); err != nil {
675675+ return err
676676+ }
677677+ } else {
678678+ if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Size-1)); err != nil {
679679+ return err
680680+ }
681681+ }
682682+683683+ // t.Type (string) (string)
684684+ if len("$type") > 8192 {
685685+ return xerrors.Errorf("Value in field \"$type\" was too long")
686686+ }
687687+688688+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("$type"))); err != nil {
689689+ return err
690690+ }
691691+ if _, err := cw.WriteString(string("$type")); err != nil {
692692+ return err
693693+ }
694694+695695+ if len(t.Type) > 8192 {
696696+ return xerrors.Errorf("Value in field t.Type was too long")
697697+ }
698698+699699+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Type))); err != nil {
700700+ return err
701701+ }
702702+ if _, err := cw.WriteString(string(t.Type)); err != nil {
703703+ return err
704704+ }
705705+706706+ // t.Digest (string) (string)
707707+ if len("digest") > 8192 {
708708+ return xerrors.Errorf("Value in field \"digest\" was too long")
709709+ }
710710+711711+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("digest"))); err != nil {
712712+ return err
713713+ }
714714+ if _, err := cw.WriteString(string("digest")); err != nil {
715715+ return err
716716+ }
717717+718718+ if len(t.Digest) > 8192 {
719719+ return xerrors.Errorf("Value in field t.Digest was too long")
720720+ }
721721+722722+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Digest))); err != nil {
723723+ return err
724724+ }
725725+ if _, err := cw.WriteString(string(t.Digest)); err != nil {
726726+ return err
727727+ }
728728+729729+ // t.UserDID (string) (string)
730730+ if len("userDid") > 8192 {
731731+ return xerrors.Errorf("Value in field \"userDid\" was too long")
732732+ }
733733+734734+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("userDid"))); err != nil {
735735+ return err
736736+ }
737737+ if _, err := cw.WriteString(string("userDid")); err != nil {
738738+ return err
739739+ }
740740+741741+ if len(t.UserDID) > 8192 {
742742+ return xerrors.Errorf("Value in field t.UserDID was too long")
743743+ }
744744+745745+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.UserDID))); err != nil {
746746+ return err
747747+ }
748748+ if _, err := cw.WriteString(string(t.UserDID)); err != nil {
749749+ return err
750750+ }
751751+752752+ // t.CreatedAt (string) (string)
753753+ if len("createdAt") > 8192 {
754754+ return xerrors.Errorf("Value in field \"createdAt\" was too long")
755755+ }
756756+757757+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("createdAt"))); err != nil {
758758+ return err
759759+ }
760760+ if _, err := cw.WriteString(string("createdAt")); err != nil {
761761+ return err
762762+ }
763763+764764+ if len(t.CreatedAt) > 8192 {
765765+ return xerrors.Errorf("Value in field t.CreatedAt was too long")
766766+ }
767767+768768+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.CreatedAt))); err != nil {
769769+ return err
770770+ }
771771+ if _, err := cw.WriteString(string(t.CreatedAt)); err != nil {
772772+ return err
773773+ }
774774+775775+ // t.MediaType (string) (string)
776776+ if len("mediaType") > 8192 {
777777+ return xerrors.Errorf("Value in field \"mediaType\" was too long")
778778+ }
779779+780780+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("mediaType"))); err != nil {
781781+ return err
782782+ }
783783+ if _, err := cw.WriteString(string("mediaType")); err != nil {
784784+ return err
785785+ }
786786+787787+ if len(t.MediaType) > 8192 {
788788+ return xerrors.Errorf("Value in field t.MediaType was too long")
789789+ }
790790+791791+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.MediaType))); err != nil {
792792+ return err
793793+ }
794794+ if _, err := cw.WriteString(string(t.MediaType)); err != nil {
795795+ return err
796796+ }
797797+798798+ // t.Repository (string) (string)
799799+ if len("repository") > 8192 {
800800+ return xerrors.Errorf("Value in field \"repository\" was too long")
801801+ }
802802+803803+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("repository"))); err != nil {
804804+ return err
805805+ }
806806+ if _, err := cw.WriteString(string("repository")); err != nil {
807807+ return err
808808+ }
809809+810810+ if len(t.Repository) > 8192 {
811811+ return xerrors.Errorf("Value in field t.Repository was too long")
812812+ }
813813+814814+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Repository))); err != nil {
815815+ return err
816816+ }
817817+ if _, err := cw.WriteString(string(t.Repository)); err != nil {
818818+ return err
819819+ }
820820+821821+ // t.UserHandle (string) (string)
822822+ if len("userHandle") > 8192 {
823823+ return xerrors.Errorf("Value in field \"userHandle\" was too long")
824824+ }
825825+826826+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("userHandle"))); err != nil {
827827+ return err
828828+ }
829829+ if _, err := cw.WriteString(string("userHandle")); err != nil {
830830+ return err
831831+ }
832832+833833+ if len(t.UserHandle) > 8192 {
834834+ return xerrors.Errorf("Value in field t.UserHandle was too long")
835835+ }
836836+837837+ if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.UserHandle))); err != nil {
838838+ return err
839839+ }
840840+ if _, err := cw.WriteString(string(t.UserHandle)); err != nil {
841841+ return err
842842+ }
843843+ return nil
844844+}
845845+846846+func (t *LayerRecord) UnmarshalCBOR(r io.Reader) (err error) {
847847+ *t = LayerRecord{}
848848+849849+ cr := cbg.NewCborReader(r)
850850+851851+ maj, extra, err := cr.ReadHeader()
852852+ if err != nil {
853853+ return err
854854+ }
855855+ defer func() {
856856+ if err == io.EOF {
857857+ err = io.ErrUnexpectedEOF
858858+ }
859859+ }()
860860+861861+ if maj != cbg.MajMap {
862862+ return fmt.Errorf("cbor input should be of type map")
863863+ }
864864+865865+ if extra > cbg.MaxLength {
866866+ return fmt.Errorf("LayerRecord: map struct too large (%d)", extra)
867867+ }
868868+869869+ n := extra
870870+871871+ nameBuf := make([]byte, 10)
872872+ for i := uint64(0); i < n; i++ {
873873+ nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 8192)
874874+ if err != nil {
875875+ return err
876876+ }
877877+878878+ if !ok {
879879+ // Field doesn't exist on this type, so ignore it
880880+ if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
881881+ return err
882882+ }
883883+ continue
884884+ }
885885+886886+ switch string(nameBuf[:nameLen]) {
887887+ // t.Size (int64) (int64)
888888+ case "size":
889889+ {
890890+ maj, extra, err := cr.ReadHeader()
891891+ if err != nil {
892892+ return err
893893+ }
894894+ var extraI int64
895895+ switch maj {
896896+ case cbg.MajUnsignedInt:
897897+ extraI = int64(extra)
898898+ if extraI < 0 {
899899+ return fmt.Errorf("int64 positive overflow")
900900+ }
901901+ case cbg.MajNegativeInt:
902902+ extraI = int64(extra)
903903+ if extraI < 0 {
904904+ return fmt.Errorf("int64 negative overflow")
905905+ }
906906+ extraI = -1 - extraI
907907+ default:
908908+ return fmt.Errorf("wrong type for int64 field: %d", maj)
909909+ }
910910+911911+ t.Size = int64(extraI)
912912+ }
913913+ // t.Type (string) (string)
914914+ case "$type":
915915+916916+ {
917917+ sval, err := cbg.ReadStringWithMax(cr, 8192)
918918+ if err != nil {
919919+ return err
920920+ }
921921+922922+ t.Type = string(sval)
923923+ }
924924+ // t.Digest (string) (string)
925925+ case "digest":
926926+927927+ {
928928+ sval, err := cbg.ReadStringWithMax(cr, 8192)
929929+ if err != nil {
930930+ return err
931931+ }
932932+933933+ t.Digest = string(sval)
934934+ }
935935+ // t.UserDID (string) (string)
936936+ case "userDid":
937937+938938+ {
939939+ sval, err := cbg.ReadStringWithMax(cr, 8192)
940940+ if err != nil {
941941+ return err
942942+ }
943943+944944+ t.UserDID = string(sval)
945945+ }
946946+ // t.CreatedAt (string) (string)
947947+ case "createdAt":
948948+949949+ {
950950+ sval, err := cbg.ReadStringWithMax(cr, 8192)
951951+ if err != nil {
952952+ return err
953953+ }
954954+955955+ t.CreatedAt = string(sval)
956956+ }
957957+ // t.MediaType (string) (string)
958958+ case "mediaType":
959959+960960+ {
961961+ sval, err := cbg.ReadStringWithMax(cr, 8192)
962962+ if err != nil {
963963+ return err
964964+ }
965965+966966+ t.MediaType = string(sval)
967967+ }
968968+ // t.Repository (string) (string)
969969+ case "repository":
970970+971971+ {
972972+ sval, err := cbg.ReadStringWithMax(cr, 8192)
973973+ if err != nil {
974974+ return err
975975+ }
976976+977977+ t.Repository = string(sval)
978978+ }
979979+ // t.UserHandle (string) (string)
980980+ case "userHandle":
981981+982982+ {
983983+ sval, err := cbg.ReadStringWithMax(cr, 8192)
984984+ if err != nil {
985985+ return err
986986+ }
987987+988988+ t.UserHandle = string(sval)
603989 }
604990605991 default:
+4-4
pkg/atproto/client_test.go
···3434 name string
3535 collection string
3636 rkey string
3737- record interface{}
3737+ record any
3838 serverResponse string
3939 serverStatus int
4040 wantErr bool
···9393 }
94949595 // Verify request body
9696- var body map[string]interface{}
9696+ var body map[string]any
9797 if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
9898 t.Errorf("Failed to decode request body: %v", err)
9999 }
···158158 t.Errorf("URI = %v, want at://did:plc:test123/io.atcr.manifest/abc123", r.URI)
159159 }
160160161161- var value map[string]interface{}
161161+ var value map[string]any
162162 if err := json.Unmarshal(r.Value, &value); err != nil {
163163 t.Errorf("Failed to unmarshal value: %v", err)
164164 }
···290290 }
291291292292 // Verify request body
293293- var body map[string]interface{}
293293+ var body map[string]any
294294 if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
295295 t.Errorf("Failed to decode request body: %v", err)
296296 }
+6
pkg/atproto/endpoints.go
···3939 // Request: {"uploadId": "..."}
4040 // Response: {"status": "aborted"}
4141 HoldAbortUpload = "/xrpc/io.atcr.hold.abortUpload"
4242+4343+ // HoldNotifyManifest notifies hold about a manifest upload for layer tracking and Bluesky posting.
4444+ // Method: POST
4545+ // Request: {"repository": "...", "tag": "...", "userDid": "...", "userHandle": "...", "manifest": {...}}
4646+ // Response: {"success": true, "layersCreated": 5, "postCreated": true, "postUri": "at://..."}
4747+ HoldNotifyManifest = "/xrpc/io.atcr.hold.notifyManifest"
4248)
43494450// Hold service crew management endpoints (io.atcr.hold.*)
+2-1
pkg/atproto/generate.go
···2525)
26262727func main() {
2828- // Generate map-style encoders for CrewRecord, CaptainRecord, and TangledProfileRecord
2828+ // Generate map-style encoders for CrewRecord, CaptainRecord, LayerRecord, and TangledProfileRecord
2929 if err := cbg.WriteMapEncodersToFile("cbor_gen.go", "atproto",
3030 atproto.CrewRecord{},
3131 atproto.CaptainRecord{},
3232+ atproto.LayerRecord{},
3233 atproto.TangledProfileRecord{},
3334 ); err != nil {
3435 fmt.Printf("Failed to generate CBOR encoders: %v\n", err)
+75-7
pkg/atproto/lexicon.go
···3434 // Note: Uses same collection name as HoldCrewCollection but stored in different PDS (hold's PDS vs owner's PDS)
3535 CrewCollection = "io.atcr.hold.crew"
36363737+ // LayerCollection is the collection name for container layer metadata
3838+ // Stored in hold's embedded PDS to track which layers are stored
3939+ LayerCollection = "io.atcr.hold.layer"
4040+3741 // TangledProfileCollection is the collection name for tangled profiles
3842 // Stored in hold's embedded PDS (singleton record at rkey "self")
3943 TangledProfileCollection = "sh.tangled.actor.profile"
···434438 return len(s) > 4 && s[:4] == "did:"
435439}
436440441441+// RepositoryTagToRKey converts a repository and tag to an ATProto record key
442442+// ATProto record keys must match: ^[a-zA-Z0-9._~-]{1,512}$
443443+func RepositoryTagToRKey(repository, tag string) string {
444444+ // Combine repository and tag to create a unique key
445445+ // Replace invalid characters: slashes become tildes (~)
446446+ // We use tilde instead of dash to avoid ambiguity with repository names that contain hyphens
447447+ key := fmt.Sprintf("%s_%s", repository, tag)
448448+449449+ // Replace / with ~ (slash not allowed in rkeys, tilde is allowed and unlikely in repo names)
450450+ key = strings.ReplaceAll(key, "/", "~")
451451+452452+ return key
453453+}
454454+455455+// RKeyToRepositoryTag converts an ATProto record key back to repository and tag
456456+// This is the inverse of RepositoryTagToRKey
457457+// Note: If the tag contains underscores, this will split on the LAST underscore
458458+func RKeyToRepositoryTag(rkey string) (repository, tag string) {
459459+ // Find the last underscore to split repository and tag
460460+ lastUnderscore := strings.LastIndex(rkey, "_")
461461+ if lastUnderscore == -1 {
462462+ // No underscore found - treat entire string as tag with empty repository
463463+ return "", rkey
464464+ }
465465+466466+ repository = rkey[:lastUnderscore]
467467+ tag = rkey[lastUnderscore+1:]
468468+469469+ // Convert tildes back to slashes in repository (tilde was used to encode slashes)
470470+ repository = strings.ReplaceAll(repository, "~", "/")
471471+472472+ return repository, tag
473473+}
474474+437475// BuildManifestURI creates an AT-URI for a manifest record
438476// did: The DID of the user (e.g., "did:plc:xyz123")
439477// manifestDigest: The manifest digest (e.g., "sha256:abc123...")
···498536// Stored in the hold's embedded PDS to identify the hold owner and settings
499537// Uses CBOR encoding for efficient storage in hold's carstore
500538type CaptainRecord struct {
501501- Type string `json:"$type" cborgen:"$type"`
502502- Owner string `json:"owner" cborgen:"owner"` // DID of hold owner
503503- Public bool `json:"public" cborgen:"public"` // Public read access
504504- AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew
505505- DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp
506506- Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional)
507507- Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional)
539539+ Type string `json:"$type" cborgen:"$type"`
540540+ Owner string `json:"owner" cborgen:"owner"` // DID of hold owner
541541+ Public bool `json:"public" cborgen:"public"` // Public read access
542542+ AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew
543543+ EnableManifestPosts bool `json:"enableManifestPosts" cborgen:"enableManifestPosts"` // Enable Bluesky posts when manifests are pushed (overrides env var)
544544+ DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp
545545+ Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional)
546546+ Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional)
508547}
509548510549// CrewRecord represents a crew member in the hold
···518557 Role string `json:"role" cborgen:"role"`
519558 Permissions []string `json:"permissions" cborgen:"permissions"`
520559 AddedAt string `json:"addedAt" cborgen:"addedAt"` // RFC3339 timestamp
560560+}
561561+562562+// LayerRecord represents metadata about a container layer stored in the hold
563563+// Collection: io.atcr.hold.layer
564564+// Stored in the hold's embedded PDS for tracking and analytics
565565+// Uses CBOR encoding for efficient storage in hold's carstore
566566+type LayerRecord struct {
567567+ Type string `json:"$type" cborgen:"$type"`
568568+ Digest string `json:"digest" cborgen:"digest"` // Layer digest (e.g., "sha256:abc123...")
569569+ Size int64 `json:"size" cborgen:"size"` // Size in bytes
570570+ MediaType string `json:"mediaType" cborgen:"mediaType"` // Media type (e.g., "application/vnd.oci.image.layer.v1.tar+gzip")
571571+ Repository string `json:"repository" cborgen:"repository"` // Repository this layer belongs to
572572+ UserDID string `json:"userDid" cborgen:"userDid"` // DID of user who uploaded this layer
573573+ UserHandle string `json:"userHandle" cborgen:"userHandle"` // Handle of user (for display purposes)
574574+ CreatedAt string `json:"createdAt" cborgen:"createdAt"` // RFC3339 timestamp
575575+}
576576+577577+// NewLayerRecord creates a new layer record
578578+func NewLayerRecord(digest string, size int64, mediaType, repository, userDID, userHandle string) *LayerRecord {
579579+ return &LayerRecord{
580580+ Type: LayerCollection,
581581+ Digest: digest,
582582+ Size: size,
583583+ MediaType: mediaType,
584584+ Repository: repository,
585585+ UserDID: userDID,
586586+ UserHandle: userHandle,
587587+ CreatedAt: time.Now().Format(time.RFC3339),
588588+ }
521589}
522590523591// TangledProfileRecord represents a Tangled profile for the hold
-293
pkg/atproto/manifest_store.go
···11-package atproto
22-33-import (
44- "context"
55- "encoding/json"
66- "errors"
77- "fmt"
88- "maps"
99- "strings"
1010-1111- "github.com/distribution/distribution/v3"
1212- "github.com/opencontainers/go-digest"
1313-)
1414-1515-// DatabaseMetrics interface for tracking push and pull counts
1616-type DatabaseMetrics interface {
1717- IncrementPushCount(did, repository string) error
1818- IncrementPullCount(did, repository string) error
1919-}
2020-2121-// ManifestStore implements distribution.ManifestService
2222-// It stores manifests in ATProto as records
2323-type ManifestStore struct {
2424- client *Client
2525- repository string
2626- holdEndpoint string // Hold service endpoint URL (for legacy, to be deprecated)
2727- holdDID string // Hold service DID (primary reference)
2828- did string // User's DID for cache key
2929- lastFetchedHoldDID string // Hold DID from most recently fetched manifest (for pull)
3030- blobStore distribution.BlobStore // Blob store for fetching config during push
3131- database DatabaseMetrics // Database for metrics tracking
3232-}
3333-3434-// NewManifestStore creates a new ATProto-backed manifest store
3535-func NewManifestStore(client *Client, repository string, holdEndpoint string, holdDID string, did string, blobStore distribution.BlobStore, database DatabaseMetrics) *ManifestStore {
3636- return &ManifestStore{
3737- client: client,
3838- repository: repository,
3939- holdEndpoint: holdEndpoint,
4040- holdDID: holdDID,
4141- did: did,
4242- blobStore: blobStore,
4343- database: database,
4444- }
4545-}
4646-4747-// Exists checks if a manifest exists by digest
4848-func (s *ManifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
4949- rkey := digestToRKey(dgst)
5050- _, err := s.client.GetRecord(ctx, ManifestCollection, rkey)
5151- if err != nil {
5252- // If not found, return false without error
5353- if errors.Is(err, ErrRecordNotFound) {
5454- return false, nil
5555- }
5656- return false, err
5757- }
5858- return true, nil
5959-}
6060-6161-// Get retrieves a manifest by digest
6262-func (s *ManifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
6363- rkey := digestToRKey(dgst)
6464- record, err := s.client.GetRecord(ctx, ManifestCollection, rkey)
6565- if err != nil {
6666- return nil, distribution.ErrManifestUnknownRevision{
6767- Name: s.repository,
6868- Revision: dgst,
6969- }
7070- }
7171-7272- var manifestRecord ManifestRecord
7373- if err := json.Unmarshal(record.Value, &manifestRecord); err != nil {
7474- return nil, fmt.Errorf("failed to unmarshal manifest record: %w", err)
7575- }
7676-7777- // Store the hold DID for subsequent blob requests during pull
7878- // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format)
7979- // The routing repository will cache this for concurrent blob fetches
8080- if manifestRecord.HoldDID != "" {
8181- // New format: DID reference (preferred)
8282- s.lastFetchedHoldDID = manifestRecord.HoldDID
8383- } else if manifestRecord.HoldEndpoint != "" {
8484- // Legacy format: URL reference - convert to DID
8585- s.lastFetchedHoldDID = ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
8686- }
8787-8888- var ociManifest []byte
8989-9090- // New records: Download blob from ATProto blob storage
9191- if manifestRecord.ManifestBlob != nil && manifestRecord.ManifestBlob.Ref.Link != "" {
9292- ociManifest, err = s.client.GetBlob(ctx, manifestRecord.ManifestBlob.Ref.Link)
9393- if err != nil {
9494- return nil, fmt.Errorf("failed to download manifest blob: %w", err)
9595- }
9696- }
9797-9898- // Track pull count (increment asynchronously to avoid blocking the response)
9999- if s.database != nil {
100100- go func() {
101101- if err := s.database.IncrementPullCount(s.did, s.repository); err != nil {
102102- fmt.Printf("WARNING: Failed to increment pull count for %s/%s: %v\n", s.did, s.repository, err)
103103- }
104104- }()
105105- }
106106-107107- // Parse the manifest based on media type
108108- // For now, we'll return the raw bytes wrapped in a manifest object
109109- // In a full implementation, you'd use distribution's manifest parsing
110110- return &rawManifest{
111111- mediaType: manifestRecord.MediaType,
112112- payload: ociManifest,
113113- }, nil
114114-}
115115-116116-// Put stores a manifest
117117-func (s *ManifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
118118- // Get the manifest payload (raw bytes)
119119- mediaType, payload, err := manifest.Payload()
120120- if err != nil {
121121- return "", err
122122- }
123123-124124- // Calculate digest
125125- dgst := digest.FromBytes(payload)
126126-127127- // Upload manifest as blob to PDS
128128- blobRef, err := s.client.UploadBlob(ctx, payload, mediaType)
129129- if err != nil {
130130- return "", fmt.Errorf("failed to upload manifest blob: %w", err)
131131- }
132132-133133- // Create manifest record with structured metadata
134134- manifestRecord, err := NewManifestRecord(s.repository, dgst.String(), payload)
135135- if err != nil {
136136- return "", fmt.Errorf("failed to create manifest record: %w", err)
137137- }
138138-139139- // Set the blob reference, hold DID, and hold endpoint
140140- manifestRecord.ManifestBlob = blobRef
141141- manifestRecord.HoldDID = s.holdDID // Primary reference (DID)
142142- manifestRecord.HoldEndpoint = s.holdEndpoint // Legacy reference (URL) for backward compat
143143-144144- // Extract Dockerfile labels from config blob and add to annotations
145145- // Only for image manifests (not manifest lists which don't have config blobs)
146146- isManifestList := strings.Contains(manifestRecord.MediaType, "manifest.list") ||
147147- strings.Contains(manifestRecord.MediaType, "image.index")
148148-149149- if !isManifestList && s.blobStore != nil && manifestRecord.Config != nil && manifestRecord.Config.Digest != "" {
150150- labels, err := s.extractConfigLabels(ctx, manifestRecord.Config.Digest)
151151- if err != nil {
152152- // Log error but don't fail the push - labels are optional
153153- fmt.Printf("WARNING: Failed to extract config labels: %v\n", err)
154154- } else {
155155- // Initialize annotations map if needed
156156- if manifestRecord.Annotations == nil {
157157- manifestRecord.Annotations = make(map[string]string)
158158- }
159159-160160- // Copy labels to annotations (Dockerfile LABELs → manifest annotations)
161161- maps.Copy(manifestRecord.Annotations, labels)
162162-163163- fmt.Printf("DEBUG: Extracted %d labels from config blob\n", len(labels))
164164- }
165165- }
166166-167167- // Store manifest record in ATProto
168168- rkey := digestToRKey(dgst)
169169- _, err = s.client.PutRecord(ctx, ManifestCollection, rkey, manifestRecord)
170170- if err != nil {
171171- return "", fmt.Errorf("failed to store manifest record in ATProto: %w", err)
172172- }
173173-174174- // Track push count (increment asynchronously to avoid blocking the response)
175175- if s.database != nil {
176176- go func() {
177177- if err := s.database.IncrementPushCount(s.did, s.repository); err != nil {
178178- fmt.Printf("WARNING: Failed to increment push count for %s/%s: %v\n", s.did, s.repository, err)
179179- }
180180- }()
181181- }
182182-183183- // Also handle tag if specified
184184- for _, option := range options {
185185- if tagOpt, ok := option.(distribution.WithTagOption); ok {
186186- tag := tagOpt.Tag
187187- tagRecord := NewTagRecord(s.client.DID(), s.repository, tag, dgst.String())
188188- tagRKey := RepositoryTagToRKey(s.repository, tag)
189189- _, err = s.client.PutRecord(ctx, TagCollection, tagRKey, tagRecord)
190190- if err != nil {
191191- return "", fmt.Errorf("failed to store tag in ATProto: %w", err)
192192- }
193193- }
194194- }
195195-196196- return dgst, nil
197197-}
198198-199199-// Delete removes a manifest
200200-func (s *ManifestStore) Delete(ctx context.Context, dgst digest.Digest) error {
201201- rkey := digestToRKey(dgst)
202202- return s.client.DeleteRecord(ctx, ManifestCollection, rkey)
203203-}
204204-205205-// digestToRKey converts a digest to an ATProto record key
206206-// ATProto rkeys must be valid strings, so we use the digest string without the algorithm prefix
207207-func digestToRKey(dgst digest.Digest) string {
208208- // Remove the algorithm prefix (e.g., "sha256:")
209209- return dgst.Encoded()
210210-}
211211-212212-// RepositoryTagToRKey converts a repository and tag to an ATProto record key
213213-// ATProto record keys must match: ^[a-zA-Z0-9._~-]{1,512}$
214214-func RepositoryTagToRKey(repository, tag string) string {
215215- // Combine repository and tag to create a unique key
216216- // Replace invalid characters: slashes become tildes (~)
217217- // We use tilde instead of dash to avoid ambiguity with repository names that contain hyphens
218218- key := fmt.Sprintf("%s_%s", repository, tag)
219219-220220- // Replace / with ~ (slash not allowed in rkeys, tilde is allowed and unlikely in repo names)
221221- key = strings.ReplaceAll(key, "/", "~")
222222-223223- return key
224224-}
225225-226226-// RKeyToRepositoryTag converts an ATProto record key back to repository and tag
227227-// This is the inverse of RepositoryTagToRKey
228228-// Note: If the tag contains underscores, this will split on the LAST underscore
229229-func RKeyToRepositoryTag(rkey string) (repository, tag string) {
230230- // Find the last underscore to split repository and tag
231231- lastUnderscore := strings.LastIndex(rkey, "_")
232232- if lastUnderscore == -1 {
233233- // No underscore found - treat entire string as tag with empty repository
234234- return "", rkey
235235- }
236236-237237- repository = rkey[:lastUnderscore]
238238- tag = rkey[lastUnderscore+1:]
239239-240240- // Convert tildes back to slashes in repository (tilde was used to encode slashes)
241241- repository = strings.ReplaceAll(repository, "~", "/")
242242-243243- return repository, tag
244244-}
245245-246246-// GetLastFetchedHoldDID returns the hold DID from the most recently fetched manifest
247247-// This is used by the routing repository to cache the hold for blob requests
248248-func (s *ManifestStore) GetLastFetchedHoldDID() string {
249249- return s.lastFetchedHoldDID
250250-}
251251-252252-// rawManifest is a simple implementation of distribution.Manifest
253253-type rawManifest struct {
254254- mediaType string
255255- payload []byte
256256-}
257257-258258-func (m *rawManifest) References() []distribution.Descriptor {
259259- // TODO: Parse the manifest and return actual references
260260- return nil
261261-}
262262-263263-func (m *rawManifest) Payload() (string, []byte, error) {
264264- return m.mediaType, m.payload, nil
265265-}
266266-267267-// extractConfigLabels fetches the image config blob and extracts Dockerfile LABELs
268268-func (s *ManifestStore) extractConfigLabels(ctx context.Context, configDigestStr string) (map[string]string, error) {
269269- // Parse digest string
270270- configDigest, err := digest.Parse(configDigestStr)
271271- if err != nil {
272272- return nil, fmt.Errorf("invalid config digest: %w", err)
273273- }
274274-275275- // Fetch config blob from storage
276276- configData, err := s.blobStore.Get(ctx, configDigest)
277277- if err != nil {
278278- return nil, fmt.Errorf("failed to fetch config blob: %w", err)
279279- }
280280-281281- // Parse config JSON
282282- var configJSON struct {
283283- Config struct {
284284- Labels map[string]string `json:"Labels"`
285285- } `json:"config"`
286286- }
287287-288288- if err := json.Unmarshal(configData, &configJSON); err != nil {
289289- return nil, fmt.Errorf("failed to parse config JSON: %w", err)
290290- }
291291-292292- return configJSON.Config.Labels, nil
293293-}
···4646 r.Put(atproto.HoldUploadPart, h.HandleUploadPart)
4747 r.Post(atproto.HoldCompleteUpload, h.HandleCompleteUpload)
4848 r.Post(atproto.HoldAbortUpload, h.HandleAbortUpload)
4949+ r.Post(atproto.HoldNotifyManifest, h.HandleNotifyManifest)
4950 })
5051}
5152···195196 RespondJSON(w, http.StatusOK, map[string]any{
196197 "status": "aborted",
197198 })
199199+}
200200+201201+// HandleNotifyManifest handles manifest upload notifications from AppView
202202+// Creates layer records and optionally posts to Bluesky
203203+func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Request) {
204204+ ctx := r.Context()
205205+206206+ // Validate service token (same auth as blob:write endpoints)
207207+ validatedUser, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient)
208208+ if err != nil {
209209+ RespondError(w, http.StatusForbidden, fmt.Sprintf("authorization failed: %v", err))
210210+ return
211211+ }
212212+213213+ // Parse request
214214+ var req struct {
215215+ Repository string `json:"repository"`
216216+ Tag string `json:"tag"`
217217+ UserDID string `json:"userDid"`
218218+ UserHandle string `json:"userHandle"`
219219+ Manifest struct {
220220+ MediaType string `json:"mediaType"`
221221+ Config struct {
222222+ Digest string `json:"digest"`
223223+ Size int64 `json:"size"`
224224+ } `json:"config"`
225225+ Layers []struct {
226226+ Digest string `json:"digest"`
227227+ Size int64 `json:"size"`
228228+ MediaType string `json:"mediaType"`
229229+ } `json:"layers"`
230230+ } `json:"manifest"`
231231+ }
232232+233233+ if err := DecodeJSON(r, &req); err != nil {
234234+ RespondError(w, http.StatusBadRequest, err.Error())
235235+ return
236236+ }
237237+238238+ // Verify user DID matches token
239239+ if req.UserDID != validatedUser.DID {
240240+ RespondError(w, http.StatusForbidden, "user DID mismatch")
241241+ return
242242+ }
243243+244244+ // Check if manifest posts are enabled
245245+ // TODO: Check captain record enableManifestPosts field
246246+ // For now, posts are always created
247247+ postsEnabled := true
248248+249249+ // Create layer records for each blob
250250+ layersCreated := 0
251251+ for _, layer := range req.Manifest.Layers {
252252+ record := atproto.NewLayerRecord(
253253+ layer.Digest,
254254+ layer.Size,
255255+ layer.MediaType,
256256+ req.Repository,
257257+ req.UserDID,
258258+ req.UserHandle,
259259+ )
260260+261261+ _, _, err := h.pds.CreateLayerRecord(ctx, record)
262262+ if err != nil {
263263+ fmt.Printf("Failed to create layer record: %v\n", err)
264264+ // Continue creating other records
265265+ } else {
266266+ layersCreated++
267267+ }
268268+ }
269269+270270+ // Calculate total size from all layers
271271+ var totalSize int64
272272+ for _, layer := range req.Manifest.Layers {
273273+ totalSize += layer.Size
274274+ }
275275+ totalSize += req.Manifest.Config.Size // Add config blob size
276276+277277+ // Create Bluesky post if enabled
278278+ var postURI string
279279+ postCreated := false
280280+ if postsEnabled {
281281+ // Extract manifest digest from first layer (or use config digest as fallback)
282282+ manifestDigest := req.Manifest.Config.Digest
283283+ if len(req.Manifest.Layers) > 0 {
284284+ manifestDigest = req.Manifest.Layers[0].Digest
285285+ }
286286+287287+ postURI, err = h.pds.CreateManifestPost(
288288+ ctx,
289289+ req.Repository,
290290+ req.Tag,
291291+ req.UserHandle,
292292+ manifestDigest,
293293+ totalSize,
294294+ )
295295+ if err != nil {
296296+ fmt.Printf("Failed to create manifest post: %v\n", err)
297297+ } else {
298298+ postCreated = true
299299+ }
300300+ }
301301+302302+ // Return response
303303+ resp := map[string]any{
304304+ "success": layersCreated > 0 || postCreated,
305305+ "layersCreated": layersCreated,
306306+ "postCreated": postCreated,
307307+ }
308308+ if postURI != "" {
309309+ resp["postUri"] = postURI
310310+ }
311311+ if err != nil && layersCreated == 0 && !postCreated {
312312+ resp["error"] = err.Error()
313313+ }
314314+315315+ RespondJSON(w, http.StatusOK, resp)
198316}
199317200318// requireBlobWriteAccess middleware - validates DPoP + OAuth and checks for blob:write permission
+2-6
pkg/hold/pds/auth.go
···480480 }
481481482482 // Fetch public key from issuer's DID document
483483- publicKey, err := fetchPublicKeyFromDID(r.Context(), issuerDID, httpClient)
483483+ publicKey, err := fetchPublicKeyFromDID(r.Context(), issuerDID)
484484 if err != nil {
485485 return nil, fmt.Errorf("failed to fetch public key for issuer %s: %w", issuerDID, err)
486486 }
···502502// fetchPublicKeyFromDID fetches the public key from a DID document
503503// Supports did:plc and did:web
504504// Returns the atcrypto.PublicKey for signature verification
505505-func fetchPublicKeyFromDID(ctx context.Context, did string, httpClient HTTPClient) (atcrypto.PublicKey, error) {
506506- if httpClient == nil {
507507- httpClient = http.DefaultClient
508508- }
509509-505505+func fetchPublicKeyFromDID(ctx context.Context, did string) (atcrypto.PublicKey, error) {
510506 // Use indigo's identity resolution
511507 directory := identity.DefaultDirectory()
512508 atID, err := syntax.ParseAtIdentifier(did)
-28
pkg/hold/pds/auth_test.go
···239239 return nil
240240}
241241242242-// mockDIDResolver is a simple mock for DID resolution that returns a fixed public key
243243-type mockDIDResolver struct {
244244- publicKeys map[string]atcrypto.PublicKey
245245-}
246246-247247-// newMockDIDResolver creates a new mock DID resolver
248248-func newMockDIDResolver() *mockDIDResolver {
249249- return &mockDIDResolver{
250250- publicKeys: make(map[string]atcrypto.PublicKey),
251251- }
252252-}
253253-254254-// RegisterDID registers a DID with its public key
255255-func (m *mockDIDResolver) RegisterDID(did string, publicKey atcrypto.PublicKey) {
256256- m.publicKeys[did] = publicKey
257257-}
258258-259259-// Do implements the HTTPClient interface for mocking DID resolution
260260-// This intercepts fetchPublicKeyFromDID's indigo directory calls
261261-func (m *mockDIDResolver) Do(req *http.Request) (*http.Response, error) {
262262- // This mock is not used directly - we'll need to inject the public key differently
263263- // For now, return a 404 to indicate DID resolution should use our registered keys
264264- return &http.Response{
265265- StatusCode: http.StatusNotFound,
266266- Body: http.NoBody,
267267- }, nil
268268-}
269269-270242// TestValidateServiceToken_ValidToken tests validation of a properly formed service token
271243func TestValidateServiceToken_ValidToken(t *testing.T) {
272244 // This test validates token structure, audience, and expiration
+3-3
pkg/hold/pds/events.go
···2929 eventSeq int64
3030 eventHistory []HistoricalEvent // Ring buffer for cursor backfill (deprecated, kept for compatibility)
3131 maxHistory int
3232- holdDID string // DID of the hold for setting repo field
3333- db *sql.DB // Database for persistent event storage
3434- dbPath string // Path to database file
3232+ holdDID string // DID of the hold for setting repo field
3333+ db *sql.DB // Database for persistent event storage
3434+ dbPath string // Path to database file
3535}
36363737// Subscriber represents a WebSocket client subscribed to the firehose
+59
pkg/hold/pds/layer.go
···11+package pds
22+33+import (
44+ "context"
55+ "fmt"
66+77+ "atcr.io/pkg/atproto"
88+)
99+1010+// CreateLayerRecord creates a new layer record in the hold's PDS
1111+// Returns the rkey and CID of the created record
1212+func (p *HoldPDS) CreateLayerRecord(ctx context.Context, record *atproto.LayerRecord) (string, string, error) {
1313+ // Validate record
1414+ if record.Type != atproto.LayerCollection {
1515+ return "", "", fmt.Errorf("invalid record type: %s", record.Type)
1616+ }
1717+1818+ if record.Digest == "" {
1919+ return "", "", fmt.Errorf("digest is required")
2020+ }
2121+2222+ if record.Size <= 0 {
2323+ return "", "", fmt.Errorf("size must be positive")
2424+ }
2525+2626+ // Create record with auto-generated TID rkey
2727+ rkey, recordCID, err := p.repomgr.CreateRecord(
2828+ ctx,
2929+ p.uid,
3030+ atproto.LayerCollection,
3131+ record,
3232+ )
3333+3434+ if err != nil {
3535+ return "", "", fmt.Errorf("failed to create layer record: %w", err)
3636+ }
3737+3838+ return rkey, recordCID.String(), nil
3939+}
4040+4141+// GetLayerRecord retrieves a specific layer record by rkey
4242+// Note: This is a simplified implementation. For production, you may need to pass the CID
4343+func (p *HoldPDS) GetLayerRecord(ctx context.Context, rkey string) (*atproto.LayerRecord, error) {
4444+ // For now, we don't implement this as it's not needed for the manifest post feature
4545+ // Full implementation would require querying the carstore with a specific CID
4646+ return nil, fmt.Errorf("GetLayerRecord not yet implemented - use via XRPC listRecords instead")
4747+}
4848+4949+// ListLayerRecords lists layer records with pagination
5050+// Returns records, next cursor (empty if no more), and error
5151+// Note: This is a simplified implementation. For production, consider adding filters
5252+// (by repository, user, digest, etc.) and proper pagination
5353+func (p *HoldPDS) ListLayerRecords(ctx context.Context, limit int, cursor string) ([]*atproto.LayerRecord, string, error) {
5454+ // For now, return empty list - full implementation would query the carstore
5555+ // This would require iterating over records in the collection and filtering
5656+ // In practice, layer records are mainly for analytics and Bluesky posts,
5757+ // not for runtime queries
5858+ return nil, "", fmt.Errorf("ListLayerRecords not yet implemented")
5959+}
···2020// init registers our custom ATProto types with indigo's lexutil type registry
2121// This allows repomgr.GetRecord to automatically unmarshal our types
2222func init() {
2323- // Register captain, crew, and tangled profile record types
2323+ // Register captain, crew, tangled profile, and layer record types
2424 // These must match the $type field in the records
2525 lexutil.RegisterType(atproto.CaptainCollection, &atproto.CaptainRecord{})
2626 lexutil.RegisterType(atproto.CrewCollection, &atproto.CrewRecord{})
2727+ lexutil.RegisterType(atproto.LayerCollection, &atproto.LayerRecord{})
2728 lexutil.RegisterType(atproto.TangledProfileCollection, &atproto.TangledProfileRecord{})
2829}
2930
+1-1
pkg/hold/pds/xrpc.go
···339339// buildProfileResponse builds a profile response map (shared by GetProfile and GetProfiles)
340340func (h *XRPCHandler) buildProfileResponse(ctx context.Context) map[string]any {
341341 // Get profile record from repo
342342- _, profileVal, err := h.pds.repomgr.GetRecord(
342342+ _, profileVal, _ := h.pds.repomgr.GetRecord(
343343 ctx,
344344 h.pds.uid,
345345 "app.bsky.actor.profile",