···207207208208 return result.Records, nil
209209}
210210+211211+// ATProtoBlobRef represents a reference to a blob in ATProto's native blob storage
212212+// This is different from OCIBlobDescriptor which describes OCI image layers
213213+type ATProtoBlobRef struct {
214214+ Type string `json:"$type"`
215215+ Ref Link `json:"ref"`
216216+ MimeType string `json:"mimeType"`
217217+ Size int64 `json:"size"`
218218+}
219219+220220+// Link represents an IPFS link to blob content
221221+type Link struct {
222222+ Link string `json:"$link"`
223223+}
224224+225225+// UploadBlob uploads binary data to the PDS and returns a blob reference
226226+func (c *Client) UploadBlob(ctx context.Context, data []byte, mimeType string) (*ATProtoBlobRef, error) {
227227+ url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", c.pdsEndpoint)
228228+229229+ req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data))
230230+ if err != nil {
231231+ return nil, err
232232+ }
233233+234234+ req.Header.Set("Authorization", c.authHeader())
235235+ req.Header.Set("Content-Type", mimeType)
236236+237237+ resp, err := c.httpClient.Do(req)
238238+ if err != nil {
239239+ return nil, fmt.Errorf("failed to upload blob: %w", err)
240240+ }
241241+ defer resp.Body.Close()
242242+243243+ if resp.StatusCode != http.StatusOK {
244244+ bodyBytes, _ := io.ReadAll(resp.Body)
245245+ return nil, fmt.Errorf("upload blob failed with status %d: %s", resp.StatusCode, string(bodyBytes))
246246+ }
247247+248248+ var result struct {
249249+ Blob ATProtoBlobRef `json:"blob"`
250250+ }
251251+ if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
252252+ return nil, fmt.Errorf("failed to decode response: %w", err)
253253+ }
254254+255255+ return &result.Blob, nil
256256+}
257257+258258+// GetBlob downloads a blob by its CID from the PDS
259259+func (c *Client) GetBlob(ctx context.Context, cid string) ([]byte, error) {
260260+ url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s",
261261+ c.pdsEndpoint, c.did, cid)
262262+263263+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
264264+ if err != nil {
265265+ return nil, err
266266+ }
267267+268268+ // Note: getBlob may not require auth for public repos, but we include it anyway
269269+ req.Header.Set("Authorization", c.authHeader())
270270+271271+ resp, err := c.httpClient.Do(req)
272272+ if err != nil {
273273+ return nil, fmt.Errorf("failed to get blob: %w", err)
274274+ }
275275+ defer resp.Body.Close()
276276+277277+ if resp.StatusCode == http.StatusNotFound {
278278+ return nil, fmt.Errorf("blob not found")
279279+ }
280280+281281+ if resp.StatusCode != http.StatusOK {
282282+ bodyBytes, _ := io.ReadAll(resp.Body)
283283+ return nil, fmt.Errorf("get blob failed with status %d: %s", resp.StatusCode, string(bodyBytes))
284284+ }
285285+286286+ // Read the blob data
287287+ data, err := io.ReadAll(resp.Body)
288288+ if err != nil {
289289+ return nil, fmt.Errorf("failed to read blob data: %w", err)
290290+ }
291291+292292+ return data, nil
293293+}
+29-1
pkg/atproto/lexicon.go
···11package atproto
2233import (
44+ "encoding/base64"
45 "encoding/json"
56 "time"
67)
···5758 // Subject references another manifest (for attestations, signatures, etc.)
5859 Subject *BlobReference `json:"subject,omitempty"`
59606161+ // ManifestBlob is a reference to the manifest blob stored in ATProto blob storage
6262+ // This is the new way of storing manifests (replaces RawManifest)
6363+ ManifestBlob *ATProtoBlobRef `json:"manifestBlob,omitempty"`
6464+6565+ // RawManifest stores the original manifest bytes (base64 encoded) - DEPRECATED
6666+ // Kept for backward compatibility with old records
6767+ // New records should use ManifestBlob instead
6868+ RawManifest string `json:"rawManifest,omitempty"`
6969+6070 // CreatedAt timestamp
6171 CreatedAt time.Time `json:"createdAt"`
6272}
···103113 MediaType: ociData.MediaType,
104114 SchemaVersion: ociData.SchemaVersion,
105115 Annotations: ociData.Annotations,
106106- CreatedAt: time.Now(),
116116+ // ManifestBlob will be set by the caller after uploading to blob storage
117117+ // RawManifest no longer stored for new records (backward compat only)
118118+ CreatedAt: time.Now(),
107119 }
108120109121 // Parse config
···132144}
133145134146// ToOCIManifest converts the manifest record back to OCI manifest JSON
147147+// This should NOT be used directly - use manifest_store.Get() which downloads the blob
148148+// This is kept for backward compatibility only
135149func (m *ManifestRecord) ToOCIManifest() ([]byte, error) {
150150+ // New records: ManifestBlob reference (blob downloaded separately by manifest store)
151151+ // This function should not be called for new records - it's a fallback only
152152+153153+ // Backward compatibility: If we have the raw manifest stored, return it
154154+ if m.RawManifest != "" {
155155+ rawBytes, err := base64.StdEncoding.DecodeString(m.RawManifest)
156156+ if err != nil {
157157+ return nil, err
158158+ }
159159+ return rawBytes, nil
160160+ }
161161+162162+ // Last resort: reconstruct from fields (will have different digest!)
163163+ // This should only happen for very old records
136164 ociManifest := map[string]any{
137165 "schemaVersion": m.SchemaVersion,
138166 "mediaType": m.MediaType,
+27-10
pkg/atproto/manifest_store.go
···6464 // The routing repository will cache this for concurrent blob fetches
6565 s.lastFetchedHoldEndpoint = manifestRecord.HoldEndpoint
66666767- // Convert back to OCI manifest
6868- ociManifest, err := manifestRecord.ToOCIManifest()
6969- if err != nil {
7070- return nil, fmt.Errorf("failed to convert to OCI manifest: %w", err)
6767+ var ociManifest []byte
6868+6969+ // New records: Download blob from ATProto blob storage
7070+ if manifestRecord.ManifestBlob != nil && manifestRecord.ManifestBlob.Ref.Link != "" {
7171+ ociManifest, err = s.client.GetBlob(ctx, manifestRecord.ManifestBlob.Ref.Link)
7272+ if err != nil {
7373+ return nil, fmt.Errorf("failed to download manifest blob: %w", err)
7474+ }
7575+ } else {
7676+ // Backward compatibility: Use ToOCIManifest for old records
7777+ ociManifest, err = manifestRecord.ToOCIManifest()
7878+ if err != nil {
7979+ return nil, fmt.Errorf("failed to convert to OCI manifest: %w", err)
8080+ }
7181 }
72827383 // Parse the manifest based on media type
···81918292// Put stores a manifest
8393func (s *ManifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
8484- // Get the manifest payload
8585- _, payload, err := manifest.Payload()
9494+ // Get the manifest payload (raw bytes)
9595+ mediaType, payload, err := manifest.Payload()
8696 if err != nil {
8797 return "", err
8898 }
···90100 // Calculate digest
91101 dgst := digest.FromBytes(payload)
921029393- // Create manifest record
103103+ // Upload manifest as blob to PDS
104104+ blobRef, err := s.client.UploadBlob(ctx, payload, mediaType)
105105+ if err != nil {
106106+ return "", fmt.Errorf("failed to upload manifest blob: %w", err)
107107+ }
108108+109109+ // Create manifest record with structured metadata
94110 manifestRecord, err := NewManifestRecord(s.repository, dgst.String(), payload)
95111 if err != nil {
96112 return "", fmt.Errorf("failed to create manifest record: %w", err)
97113 }
981149999- // Set the hold endpoint where blobs are stored
115115+ // Set the blob reference and hold endpoint
116116+ manifestRecord.ManifestBlob = blobRef
100117 manifestRecord.HoldEndpoint = s.holdEndpoint
101118102102- // Store in ATProto
119119+ // Store manifest record in ATProto
103120 rkey := digestToRKey(dgst)
104121 _, err = s.client.PutRecord(ctx, ManifestCollection, rkey, manifestRecord)
105122 if err != nil {
106106- return "", fmt.Errorf("failed to store manifest in ATProto: %w", err)
123123+ return "", fmt.Errorf("failed to store manifest record in ATProto: %w", err)
107124 }
108125109126 // Also handle tag if specified
+65
pkg/auth/atproto/session.go
···33import (
44 "bytes"
55 "context"
66+ "crypto/sha256"
77+ "encoding/hex"
68 "encoding/json"
79 "fmt"
810 "io"
911 "net/http"
1212+ "sync"
1313+ "time"
10141115 atprotoclient "atcr.io/pkg/atproto"
1216)
13171818+// CachedSession represents a cached session
1919+type CachedSession struct {
2020+ DID string
2121+ PDS string
2222+ AccessToken string
2323+ ExpiresAt time.Time
2424+}
2525+1426// SessionValidator validates ATProto credentials
1527type SessionValidator struct {
1628 resolver *atprotoclient.Resolver
1729 httpClient *http.Client
3030+ cache map[string]*CachedSession
3131+ cacheMu sync.RWMutex
1832}
19332034// NewSessionValidator creates a new ATProto session validator
···2236 return &SessionValidator{
2337 resolver: atprotoclient.NewResolver(),
2438 httpClient: &http.Client{},
3939+ cache: make(map[string]*CachedSession),
2540 }
2641}
27424343+// getCacheKey generates a cache key from username and password
4444+func getCacheKey(username, password string) string {
4545+ h := sha256.New()
4646+ h.Write([]byte(username + ":" + password))
4747+ return hex.EncodeToString(h.Sum(nil))
4848+}
4949+5050+// getCachedSession retrieves a cached session if valid
5151+func (v *SessionValidator) getCachedSession(cacheKey string) (*CachedSession, bool) {
5252+ v.cacheMu.RLock()
5353+ defer v.cacheMu.RUnlock()
5454+5555+ session, ok := v.cache[cacheKey]
5656+ if !ok {
5757+ return nil, false
5858+ }
5959+6060+ // Check if expired (with 5 minute buffer)
6161+ if time.Now().After(session.ExpiresAt.Add(-5 * time.Minute)) {
6262+ return nil, false
6363+ }
6464+6565+ return session, true
6666+}
6767+6868+// setCachedSession stores a session in the cache
6969+func (v *SessionValidator) setCachedSession(cacheKey string, session *CachedSession) {
7070+ v.cacheMu.Lock()
7171+ defer v.cacheMu.Unlock()
7272+ v.cache[cacheKey] = session
7373+}
7474+2875// SessionResponse represents the response from createSession
2976type SessionResponse struct {
3077 DID string `json:"did"`
···6210963110// CreateSessionAndGetToken creates a session and returns the DID, PDS endpoint, and access token
64111func (v *SessionValidator) CreateSessionAndGetToken(ctx context.Context, identifier, password string) (did, pdsEndpoint, accessToken string, err error) {
112112+ // Check cache first
113113+ cacheKey := getCacheKey(identifier, password)
114114+ if cached, ok := v.getCachedSession(cacheKey); ok {
115115+ fmt.Printf("DEBUG [atproto/session]: Using cached session for %s (DID=%s)\n", identifier, cached.DID)
116116+ return cached.DID, cached.PDS, cached.AccessToken, nil
117117+ }
118118+119119+ fmt.Printf("DEBUG [atproto/session]: No cached session for %s, creating new session\n", identifier)
120120+65121 // Resolve identifier to PDS endpoint
66122 did, pds, err := v.resolver.ResolveIdentity(ctx, identifier)
67123 if err != nil {
···73129 if err != nil {
74130 return "", "", "", fmt.Errorf("authentication failed: %w", err)
75131 }
132132+133133+ // Cache the session (ATProto sessions typically last 2 hours)
134134+ v.setCachedSession(cacheKey, &CachedSession{
135135+ DID: sessionResp.DID,
136136+ PDS: pds,
137137+ AccessToken: sessionResp.AccessJWT,
138138+ ExpiresAt: time.Now().Add(2 * time.Hour),
139139+ })
140140+ fmt.Printf("DEBUG [atproto/session]: Cached session for %s (expires in 2 hours)\n", identifier)
7614177142 return sessionResp.DID, pds, sessionResp.AccessJWT, nil
78143}