A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
76
fork

Configure Feed

Select the types of activity you want to include in your feed.

Bluesky Manifest Posts#

Overview#

This document describes the feature for posting to Bluesky when OCI manifests are uploaded to ATCR holds. When a user pushes an image to the registry, the hold's embedded PDS will:

  1. Create io.atcr.hold.layer records for structured metadata tracking
  2. Post to Bluesky announcing the push (similar to the "what's new" feed on the AppView web UI)

Architecture#

High-Level Flow#

User pushes image
    ↓
AppView receives manifest PUT request
    ↓
AppView stores manifest in user's PDS
    ↓
AppView notifies hold via XRPC
    ↓
Hold creates layer records in embedded PDS
    ↓
Hold creates Bluesky post
    ↓
Post appears in Bluesky feed

Component Interactions#

AppView (pkg/appview/storage/manifest_store.go):

  • After successfully uploading manifest to user's PDS
  • Extracts manifest metadata (repository, tag, user info, layers)
  • Calls hold's io.atcr.hold.notifyManifest XRPC endpoint
  • Uses service token from user's PDS for authentication
  • Gracefully handles notification failures (doesn't fail manifest upload)

Hold (pkg/hold/oci/xrpc.go):

  • Receives manifest notification via new XRPC endpoint
  • Validates service token and extracts user DID
  • Creates layer records for each blob reference in manifest
  • Creates Bluesky post announcing the push
  • Returns success/failure status

Hold's Embedded PDS (pkg/hold/pds/):

  • Stores layer records in io.atcr.hold.layer collection
  • Stores Bluesky posts in app.bsky.feed.post collection
  • Both are ATProto records with auto-generated TID rkeys
  • Queryable via standard ATProto sync endpoints

Implementation Details#

1. Layer Record Schema#

File: pkg/atproto/lexicon.go

Collection: io.atcr.hold.layer

Purpose: Structured metadata about container layers stored in the hold

Schema:

type LayerRecord struct {
    // Type identifier (always "io.atcr.hold.layer")
    Type string `json:"$type" cborgen:"$type"`

    // Digest of the layer (e.g., "sha256:abc123...")
    Digest string `json:"digest" cborgen:"digest"`

    // Size in bytes
    Size int64 `json:"size" cborgen:"size"`

    // MediaType of the layer
    MediaType string `json:"mediaType" cborgen:"mediaType"`

    // Repository this layer belongs to (e.g., "alice/myapp")
    Repository string `json:"repository" cborgen:"repository"`

    // User DID who uploaded this layer
    UserDID string `json:"userDid" cborgen:"userDid"`

    // User handle (for display purposes)
    UserHandle string `json:"userHandle,omitempty" cborgen:"userHandle,omitempty"`

    // Timestamp
    CreatedAt time.Time `json:"createdAt" cborgen:"createdAt"`
}

Constructor:

func NewLayerRecord(digest string, size int64, mediaType, repository, userDID, userHandle string) *LayerRecord {
    return &LayerRecord{
        Type:       LayerCollection,
        Digest:     digest,
        Size:       size,
        MediaType:  mediaType,
        Repository: repository,
        UserDID:    userDID,
        UserHandle: userHandle,
        CreatedAt:  time.Now(),
    }
}

Why CBOR tags: The hold's embedded PDS uses CBOR encoding for efficient storage in the SQLite-backed carstore. All records stored in the hold must have cborgen: tags.

2. XRPC Manifest Notification Endpoint#

File: pkg/hold/oci/xrpc.go

Endpoint: POST /xrpc/io.atcr.hold.notifyManifest

Authentication: Service token from user's PDS (same pattern as blob upload endpoints)

Request Schema:

type NotifyManifestRequest struct {
    // Repository name (e.g., "alice/myapp")
    Repository string `json:"repository"`

    // Tag (e.g., "latest", "v1.0.0")
    Tag string `json:"tag"`

    // User DID (e.g., "did:plc:abc123")
    UserDID string `json:"userDid"`

    // User handle (e.g., "alice.bsky.social")
    UserHandle string `json:"userHandle"`

    // Manifest content (parsed from uploaded manifest)
    Manifest struct {
        MediaType string `json:"mediaType"`
        Config    struct {
            Digest string `json:"digest"`
            Size   int64  `json:"size"`
        } `json:"config"`
        Layers []struct {
            Digest    string `json:"digest"`
            Size      int64  `json:"size"`
            MediaType string `json:"mediaType"`
        } `json:"layers"`
    } `json:"manifest"`
}

Response Schema:

type NotifyManifestResponse struct {
    Success       bool   `json:"success"`
    LayersCreated int    `json:"layersCreated"`
    PostCreated   bool   `json:"postCreated"`
    PostURI       string `json:"postUri,omitempty"` // ATProto URI if post created
    Error         string `json:"error,omitempty"`
}

Handler Implementation:

func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    // 1. Validate service token (reuse existing auth middleware pattern)
    userDID, err := h.validateServiceToken(ctx, r)
    if err != nil {
        writeXRPCError(w, "InvalidToken", err.Error())
        return
    }

    // 2. Parse request
    var req NotifyManifestRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        writeXRPCError(w, "InvalidRequest", err.Error())
        return
    }

    // 3. Verify user DID matches token
    if req.UserDID != userDID {
        writeXRPCError(w, "Unauthorized", "user DID mismatch")
        return
    }

    // 4. Create layer records for each blob
    layersCreated := 0
    for _, layer := range req.Manifest.Layers {
        record := atproto.NewLayerRecord(
            layer.Digest,
            layer.Size,
            layer.MediaType,
            req.Repository,
            req.UserDID,
            req.UserHandle,
        )

        _, _, err := h.pds.CreateLayerRecord(ctx, record)
        if err != nil {
            log.Printf("Failed to create layer record: %v", err)
            // Continue creating other records
        } else {
            layersCreated++
        }
    }

    // 5. Create Bluesky post
    postURI, err := h.pds.CreateManifestPost(ctx, req.Repository, req.Tag, req.UserHandle)

    // 6. Return response
    resp := NotifyManifestResponse{
        Success:       layersCreated > 0 || err == nil,
        LayersCreated: layersCreated,
        PostCreated:   err == nil,
        PostURI:       postURI,
    }

    if err != nil && layersCreated == 0 {
        resp.Error = err.Error()
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(resp)
}

3. Hold PDS Layer Record Methods#

File: pkg/hold/pds/layer.go (new file)

Methods:

// CreateLayerRecord creates a new layer record in the hold's PDS
func (p *HoldPDS) CreateLayerRecord(ctx context.Context, record *atproto.LayerRecord) (string, string, error) {
    // Validate record
    if record.Type != atproto.LayerCollection {
        return "", "", fmt.Errorf("invalid record type: %s", record.Type)
    }

    if record.Digest == "" {
        return "", "", fmt.Errorf("digest is required")
    }

    // Create record with auto-generated TID rkey
    rkey, recordCID, err := p.repomgr.CreateRecord(
        ctx,
        p.uid,
        atproto.LayerCollection,
        record,
    )

    if err != nil {
        return "", "", fmt.Errorf("failed to create layer record: %w", err)
    }

    log.Printf("Created layer record at %s/%s (digest: %s, size: %d)",
        atproto.LayerCollection, rkey, record.Digest, record.Size)

    return rkey, recordCID.String(), nil
}

// ListLayerRecords lists layer records with optional filtering
func (p *HoldPDS) ListLayerRecords(ctx context.Context, limit int, cursor string) ([]*atproto.LayerRecord, string, error) {
    // Implementation using repomgr.GetRecord for pagination
    // This would query the carstore and unmarshal layer records
    // Return records + next cursor for pagination
}

// GetLayerRecord retrieves a specific layer record by rkey
func (p *HoldPDS) GetLayerRecord(ctx context.Context, rkey string) (*atproto.LayerRecord, error) {
    // Implementation using repomgr.GetRecord
}

4. Bluesky Post Creation#

File: pkg/hold/pds/manifest_post.go (new file)

Pattern: Reuse existing status.go pattern

// CreateManifestPost creates a Bluesky post announcing a manifest upload
func (p *HoldPDS) CreateManifestPost(ctx context.Context, repository, tag, userHandle string) (string, error) {
    now := time.Now()

    // Format post text (similar to "what's new" feed)
    text := formatManifestPostText(repository, tag, userHandle)

    // Create post struct
    post := &bsky.FeedPost{
        LexiconTypeID: "app.bsky.feed.post",
        Text:          text,
        CreatedAt:     now.Format(time.RFC3339),
        // Optional: Add embed with link to AppView
        // Embed: &bsky.FeedPost_Embed{...}
    }

    // Create record with auto-generated TID
    rkey, recordCID, err := p.repomgr.CreateRecord(
        ctx,
        p.uid,
        "app.bsky.feed.post",
        post,
    )

    if err != nil {
        return "", fmt.Errorf("failed to create manifest post: %w", err)
    }

    // Build ATProto URI for the post
    postURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", p.did, rkey)

    log.Printf("Created manifest post: %s (cid: %s)", postURI, recordCID)

    return postURI, nil
}

// formatManifestPostText generates the post text
func formatManifestPostText(repository, tag, userHandle string) string {
    // Example formats:
    // "@alice.bsky.social pushed alice/myapp:latest to ATCR"
    // "New image pushed: alice/myapp:v1.0.0 by @alice.bsky.social"
    // "📦 alice/myapp:latest pushed by @alice.bsky.social"

    return fmt.Sprintf("📦 %s:%s pushed by @%s", repository, tag, userHandle)
}

Advanced Post Options:

// Example with embedded link to AppView
post := &bsky.FeedPost{
    LexiconTypeID: "app.bsky.feed.post",
    Text:          text,
    CreatedAt:     now.Format(time.RFC3339),
    Embed: &bsky.FeedPost_Embed{
        FeedPost_External: &bsky.EmbedExternal{
            External: &bsky.EmbedExternal_External{
                Uri:         fmt.Sprintf("https://atcr.io/%s", repository),
                Title:       fmt.Sprintf("%s:%s", repository, tag),
                Description: "View on ATCR",
            },
        },
    },
}

// Example with facets (mentions)
// This would require parsing the text and creating facet structs
// for @mentions to be clickable in Bluesky

5. AppView Integration#

File: pkg/appview/storage/manifest_store.go

Integration Point: After client.PutRecord() succeeds (around line 130-140)

// Existing code:
recordURI, recordCID, err := ms.client.PutRecord(ctx, atproto.ManifestCollection, rkey, manifestRecord)
if err != nil {
    return "", fmt.Errorf("failed to store manifest in PDS: %w", err)
}

// NEW: Notify hold about manifest upload
if err := ms.notifyHoldAboutManifest(ctx, desc, manifestRecord, tag); err != nil {
    // Log error but don't fail the manifest upload
    log.Printf("Failed to notify hold about manifest: %v", err)
}

return desc.Digest.String(), nil

Implementation:

// notifyHoldAboutManifest sends manifest metadata to the hold
func (ms *ManifestStore) notifyHoldAboutManifest(
    ctx context.Context,
    desc distribution.Descriptor,
    manifestRecord *atproto.ManifestRecord,
    tag string,
) error {
    // 1. Get registry context
    regCtx, err := storage.GetRegistryContext(ctx)
    if err != nil {
        return fmt.Errorf("failed to get registry context: %w", err)
    }

    // 2. Resolve hold DID to endpoint
    holdEndpoint, err := ms.resolver.ResolveDIDToHTTPEndpoint(ctx, manifestRecord.HoldDID)
    if err != nil {
        return fmt.Errorf("failed to resolve hold DID: %w", err)
    }

    // 3. Get service token from user's PDS
    serviceToken, err := regCtx.Refresher.GetServiceToken(ctx, regCtx.DID, manifestRecord.HoldDID)
    if err != nil {
        return fmt.Errorf("failed to get service token: %w", err)
    }

    // 4. Parse manifest to extract layer info
    var parsedManifest struct {
        MediaType string                 `json:"mediaType"`
        Config    distribution.Descriptor `json:"config"`
        Layers    []distribution.Descriptor `json:"layers"`
    }

    if err := json.Unmarshal(manifestRecord.ManifestBlob.Data, &parsedManifest); err != nil {
        return fmt.Errorf("failed to parse manifest: %w", err)
    }

    // 5. Build notification request
    notifyReq := map[string]interface{}{
        "repository": ms.repository,
        "tag":        tag,
        "userDid":    regCtx.DID,
        "userHandle": regCtx.Handle, // Need to add this to RegistryContext
        "manifest": map[string]interface{}{
            "mediaType": parsedManifest.MediaType,
            "config": map[string]interface{}{
                "digest": parsedManifest.Config.Digest.String(),
                "size":   parsedManifest.Config.Size,
            },
            "layers": func() []map[string]interface{} {
                layers := make([]map[string]interface{}, len(parsedManifest.Layers))
                for i, layer := range parsedManifest.Layers {
                    layers[i] = map[string]interface{}{
                        "digest":    layer.Digest.String(),
                        "size":      layer.Size,
                        "mediaType": layer.MediaType,
                    }
                }
                return layers
            }(),
        },
    }

    // 6. Call hold's XRPC endpoint
    reqBody, _ := json.Marshal(notifyReq)
    req, err := http.NewRequestWithContext(
        ctx,
        "POST",
        holdEndpoint+"/xrpc/io.atcr.hold.notifyManifest",
        bytes.NewReader(reqBody),
    )
    if err != nil {
        return err
    }

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Authorization", "Bearer "+serviceToken)

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return fmt.Errorf("hold notification failed: %s (status: %d)", body, resp.StatusCode)
    }

    // 7. Parse response (optional logging)
    var notifyResp map[string]interface{}
    if err := json.NewDecoder(resp.Body).Decode(&notifyResp); err == nil {
        log.Printf("Hold notification successful: %+v", notifyResp)
    }

    return nil
}

6. Record Type Registration#

File: pkg/hold/pds/server.go

In init() function (around line 30):

func init() {
    // Existing registrations
    lexutil.RegisterType(atproto.CaptainCollection, &atproto.CaptainRecord{})
    lexutil.RegisterType(atproto.CrewCollection, &atproto.CrewRecord{})
    lexutil.RegisterType(atproto.TangledProfileCollection, &atproto.TangledProfileRecord{})

    // NEW: Register layer record type
    lexutil.RegisterType(atproto.LayerCollection, &atproto.LayerRecord{})
}

Why needed: ATProto's CBOR unmarshaling requires type registration to automatically deserialize records when reading from the carstore.

Testing Strategy#

Unit Tests#

Test Layer Record Creation (pkg/hold/pds/layer_test.go):

func TestCreateLayerRecord(t *testing.T) {
    pds := setupTestPDS(t)
    ctx := context.Background()

    record := atproto.NewLayerRecord(
        "sha256:abc123",
        1024,
        "application/vnd.docker.image.rootfs.diff.tar.gzip",
        "alice/myapp",
        "did:plc:alice123",
        "alice.bsky.social",
    )

    rkey, cid, err := pds.CreateLayerRecord(ctx, record)
    assert.NoError(t, err)
    assert.NotEmpty(t, rkey)
    assert.NotEmpty(t, cid)

    // Verify record was stored
    retrieved, err := pds.GetLayerRecord(ctx, rkey)
    assert.NoError(t, err)
    assert.Equal(t, record.Digest, retrieved.Digest)
}

Test Manifest Post Creation (pkg/hold/pds/manifest_post_test.go):

func TestCreateManifestPost(t *testing.T) {
    pds := setupTestPDS(t)
    ctx := context.Background()

    postURI, err := pds.CreateManifestPost(ctx, "alice/myapp", "latest", "alice.bsky.social")
    assert.NoError(t, err)
    assert.Contains(t, postURI, "app.bsky.feed.post")

    // Parse URI and verify post exists
    // at://did:web:hold01.atcr.io/app.bsky.feed.post/{rkey}
}

Test XRPC Endpoint (pkg/hold/oci/xrpc_test.go):

func TestHandleNotifyManifest(t *testing.T) {
    handler := setupTestHandler(t)

    req := NotifyManifestRequest{
        Repository: "alice/myapp",
        Tag:        "latest",
        UserDID:    "did:plc:alice123",
        UserHandle: "alice.bsky.social",
        Manifest: /* ... */,
    }

    // Make HTTP request with service token
    resp := makeRequest(t, handler, req, validServiceToken)

    assert.Equal(t, http.StatusOK, resp.StatusCode)

    var result NotifyManifestResponse
    json.NewDecoder(resp.Body).Decode(&result)

    assert.True(t, result.Success)
    assert.Equal(t, 3, result.LayersCreated) // if manifest has 3 layers
    assert.True(t, result.PostCreated)
}

Integration Tests#

End-to-End Test:

  1. Push a test image to ATCR
  2. Verify manifest is stored in user's PDS
  3. Verify layer records are created in hold's PDS
  4. Verify Bluesky post is created in hold's PDS
  5. Query ATProto endpoints to retrieve records

Error Handling#

AppView Side#

Notification failures should NOT break manifest uploads:

  • If hold is unreachable: Log error, continue
  • If service token fails: Log error, continue
  • If hold returns error: Log error, continue

Rationale: Bluesky posts are a "nice to have" feature, not critical infrastructure. Image pushes must succeed even if social features fail.

Hold Side#

Partial failures are acceptable:

  • If some layer records fail: Create what we can, return partial success
  • If Bluesky post fails but layers succeed: Return success with postCreated: false
  • If all operations fail: Return error response

Logging:

  • Log all errors for debugging
  • Include user DID, repository, and error details
  • Use structured logging for easy querying

Configuration#

Environment Variables#

Hold Service (.env.hold.example):

# Enable/disable Bluesky posting
HOLD_BLUESKY_POSTS_ENABLED=true

# Enable/disable layer record creation
HOLD_LAYER_RECORDS_ENABLED=true

AppView (.env.appview.example):

# Enable/disable manifest notifications to holds
ATCR_NOTIFY_HOLDS_ENABLED=true

Feature Flags#

Consider making this feature opt-in initially:

  • Add flag to captain record: enableSocialPosts bool
  • Check flag before creating posts
  • Allow hold owners to disable social features

Performance Considerations#

Database Impact#

Layer records: Each manifest upload creates N records (where N = number of layers)

  • Typical image: 5-10 layers
  • Large image: 50+ layers
  • Storage: ~500 bytes per record (CBOR compressed)

Bluesky posts: One post per manifest

  • Storage: ~200 bytes per post
  • Indexed by creation time for feed queries

Carstore growth: Estimate ~5KB per manifest upload (records + post)

Network Impact#

AppView → Hold notification:

  • One HTTP POST per manifest upload
  • Payload size: ~2-10KB (depends on layer count)
  • Should complete in <100ms on local network

Service token requests:

  • Tokens cached for 50 seconds
  • Minimal overhead if pushing multiple manifests quickly

Optimization Opportunities#

  1. Batch layer record creation: Use BatchWrite for multiple records
  2. Async processing: Queue notifications and process in background
  3. Rate limiting: Limit posts per user/hold to prevent spam
  4. Deduplication: Skip layer records for already-seen digests

Future Enhancements#

Phase 2: Enhanced Posts#

Rich embeds:

  • Link preview to AppView repository page
  • Thumbnail image from first layer
  • Metadata badges (image size, layer count, tags)

Mentions:

  • Parse user handle and create Bluesky facets for @mentions
  • Enable clickable mentions in posts

Tags/hashtags:

  • Add #container, #docker, repository tags
  • Improve discoverability in Bluesky

Phase 3: Feed Customization#

Hold-specific feeds:

  • Query layer records by repository
  • Filter by user DID
  • Time-based queries

ATProto feed generator:

  • Implement app.bsky.feed.getFeedSkeleton XRPC endpoint
  • Publish hold's feed to Bluesky
  • Users can subscribe to hold activity feeds

Phase 4: Analytics#

Track metrics:

  • Posts per day/week/month
  • Most active users
  • Most popular repositories
  • Storage growth over time

Dashboards:

  • Visualize activity on AppView UI
  • Show trending images
  • Leaderboards for most pushed repositories

Security Considerations#

Authentication#

Service tokens:

  • Validate tokens against user's PDS
  • Verify DID matches in token claims
  • Check token expiration (60s from PDS)

Authorization:

  • Only authenticated users can trigger posts
  • Posts created under hold's DID (not user's DID)
  • User information is metadata in post text

Privacy#

User handles:

  • Posts include user handle (@alice.bsky.social)
  • Consider opt-out mechanism for privacy-conscious users

Repository names:

  • Public information (already visible in AppView)
  • Consider private repository flags in future

Rate Limiting#

Prevent spam:

  • Limit posts per user per hour
  • Detect rapid-fire pushes (CI/CD)
  • Consider aggregating multiple pushes into single post

Resource protection:

  • Limit layer record creation to prevent storage exhaustion
  • Cap manifest notification payload size
  • Timeout long-running operations

Monitoring and Observability#

Metrics to Track#

AppView:

  • atcr_hold_notifications_total - Counter of notifications sent
  • atcr_hold_notifications_errors - Counter of failures
  • atcr_hold_notification_duration_ms - Histogram of latency

Hold:

  • hold_layer_records_created_total - Counter of layer records
  • hold_bluesky_posts_created_total - Counter of posts
  • hold_manifest_notifications_received_total - Counter of incoming notifications
  • hold_notification_errors_total - Counter of errors by type

Logging#

Structured logs:

{
  "level": "info",
  "msg": "manifest notification received",
  "repository": "alice/myapp",
  "tag": "latest",
  "userDid": "did:plc:alice123",
  "layerCount": 5,
  "layersCreated": 5,
  "postCreated": true,
  "duration_ms": 45
}

Alerts#

Critical issues:

  • High error rate (>10% failures)
  • Service token failures (auth issues)
  • PDS carstore errors (database problems)

Warning issues:

  • Slow notifications (>1s latency)
  • Partial failures (some layers not created)
  • Missing user handle in context

Migration Strategy#

Rollout Plan#

Phase 1: Development

  • Implement core functionality
  • Add comprehensive tests
  • Deploy to staging environment

Phase 2: Beta

  • Enable for test holds only
  • Gather feedback from early users
  • Monitor performance and errors

Phase 3: Opt-in

  • Add configuration flags
  • Allow hold owners to enable feature
  • Document setup process

Phase 4: Default On

  • Enable by default for new holds
  • Migrate existing holds (opt-out available)
  • Announce feature publicly

Backward Compatibility#

No breaking changes:

  • New XRPC endpoint (doesn't affect existing endpoints)
  • New record types (isolated collections)
  • Optional feature (can be disabled)

Existing holds:

  • Work without changes
  • Can opt-in by updating hold service
  • No data migration required

Example Post Formats#

Simple Format#

📦 alice/myapp:latest pushed by @alice.bsky.social

Detailed Format#

📦 New container image pushed!

alice/myapp:v1.2.3
Pushed by @alice.bsky.social
5 layers, 125 MB total

View: https://atcr.io/alice/myapp

With Emoji/Styling#

🚀 alice/myapp:latest

✅ 5 layers
📦 125.4 MB
👤 @alice.bsky.social
🔗 atcr.io/alice/myapp

With Tags#

📦 alice/myapp:latest pushed by @alice.bsky.social

#container #docker #atcr

References#

  • Existing Bluesky post implementation: pkg/hold/pds/status.go
  • XRPC endpoint pattern: pkg/hold/oci/xrpc.go
  • Record type definitions: pkg/atproto/lexicon.go
  • Manifest storage: pkg/appview/storage/manifest_store.go
  • Service token handling: pkg/auth/oauth/refresher.go

External Documentation#

Tools#

  • CBOR code generation: github.com/whyrusleeping/cbor-gen
  • ATProto libraries: github.com/bluesky-social/indigo
  • Testing: Standard Go testing + testify/assert