A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more jetstream and ui improvements

+696 -98
+2 -2
Dockerfile
··· 22 22 # Runtime stage 23 23 FROM alpine:latest 24 24 25 - # Install CA certificates for HTTPS and SQLite runtime libraries 26 - RUN apk --no-cache add ca-certificates sqlite-libs 25 + # Install CA certificates for HTTPS, SQLite runtime libraries, and sqlite CLI for debugging 26 + RUN apk --no-cache add ca-certificates sqlite-libs sqlite 27 27 28 28 # Set working directory 29 29 WORKDIR /app
+60 -11
cmd/registry/serve.go
··· 27 27 "atcr.io/pkg/appview" 28 28 "atcr.io/pkg/appview/db" 29 29 uihandlers "atcr.io/pkg/appview/handlers" 30 + "atcr.io/pkg/appview/jetstream" 30 31 appmiddleware "atcr.io/pkg/appview/middleware" 31 32 appsession "atcr.io/pkg/appview/session" 32 33 "github.com/gorilla/mux" ··· 127 128 middleware.SetGlobalRefresher(refresher) 128 129 129 130 // 6. Initialize UI components (get session store for OAuth integration) 130 - uiDatabase, uiSessionStore, uiTemplates, uiRouter := initializeUI(config) 131 + uiDatabase, uiSessionStore, uiTemplates, uiRouter := initializeUI(config, refresher, baseURL) 131 132 132 133 // 7. Create OAuth server 133 134 oauthServer := oauth.NewServer(refreshStorage, sessionManager, baseURL) ··· 338 339 } 339 340 340 341 // initializeUI initializes the web UI components 341 - func initializeUI(config *configuration.Configuration) (*sql.DB, *appsession.Store, *template.Template, *mux.Router) { 342 + func initializeUI(config *configuration.Configuration, refresher *oauth.Refresher, baseURL string) (*sql.DB, *appsession.Store, *template.Template, *mux.Router) { 342 343 // Check if UI is enabled (optional configuration) 343 344 uiEnabled := os.Getenv("ATCR_UI_ENABLED") 344 345 if uiEnabled == "false" { ··· 367 368 368 369 fmt.Printf("UI database initialized at %s\n", dbPath) 369 370 370 - // Create session store 371 - sessionStore := appsession.NewStore() 371 + // Create session store with file persistence 372 + sessionStorePath := os.Getenv("ATCR_UI_SESSION_PATH") 373 + if sessionStorePath == "" { 374 + sessionStorePath = "/var/lib/atcr/ui-sessions.json" 375 + } 376 + sessionStore := appsession.NewStore(sessionStorePath) 372 377 373 378 // Start cleanup goroutine 374 379 go func() { ··· 399 404 // Public routes (with optional auth for navbar) 400 405 router.Handle("/", appmiddleware.OptionalAuth(sessionStore)( 401 406 &uihandlers.HomeHandler{ 402 - DB: database, 403 - Templates: templates, 407 + DB: database, 408 + Templates: templates, 409 + RegistryURL: baseURL, 404 410 }, 405 411 )).Methods("GET") 406 412 407 413 router.Handle("/api/recent-pushes", appmiddleware.OptionalAuth(sessionStore)( 408 414 &uihandlers.RecentPushesHandler{ 409 - DB: database, 410 - Templates: templates, 415 + DB: database, 416 + Templates: templates, 417 + RegistryURL: baseURL, 411 418 }, 412 419 )).Methods("GET") 413 420 ··· 416 423 authRouter.Use(appmiddleware.RequireAuth(sessionStore)) 417 424 418 425 authRouter.Handle("/images", &uihandlers.ImagesHandler{ 419 - DB: database, 420 - Templates: templates, 426 + DB: database, 427 + Templates: templates, 428 + RegistryURL: baseURL, 421 429 }).Methods("GET") 422 430 423 431 authRouter.Handle("/settings", &uihandlers.SettingsHandler{ 424 432 Templates: templates, 433 + Refresher: refresher, 425 434 }).Methods("GET") 426 435 427 - authRouter.Handle("/api/profile/default-hold", &uihandlers.UpdateDefaultHoldHandler{}).Methods("POST") 436 + authRouter.Handle("/api/profile/default-hold", &uihandlers.UpdateDefaultHoldHandler{ 437 + Refresher: refresher, 438 + }).Methods("POST") 428 439 429 440 authRouter.Handle("/api/images/{repository}/tags/{tag}", &uihandlers.DeleteTagHandler{ 430 441 DB: database, ··· 442 453 appsession.ClearCookie(w) 443 454 http.Redirect(w, r, "/", http.StatusFound) 444 455 }).Methods("POST") 456 + 457 + // Start Jetstream worker 458 + jetstreamURL := os.Getenv("JETSTREAM_URL") 459 + if jetstreamURL == "" { 460 + jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe" 461 + } 462 + 463 + // Parse cursor for backfilling historical data 464 + // Set to Unix microseconds timestamp to replay from that point 465 + // Examples: 466 + // - 2 weeks ago: use `date -d '2 weeks ago' +%s` * 1000000 467 + // - Leave unset (or 0) to start from now 468 + var startCursor int64 469 + if cursorStr := os.Getenv("JETSTREAM_START_CURSOR"); cursorStr != "" { 470 + if cursor, err := time.Parse(time.RFC3339, cursorStr); err == nil { 471 + // Support RFC3339 format: "2025-09-23T00:00:00Z" 472 + startCursor = cursor.UnixMicro() 473 + fmt.Printf("Jetstream: Starting from %s (%d microseconds)\n", cursorStr, startCursor) 474 + } else if cursor, err := time.ParseDuration(cursorStr); err == nil { 475 + // Support duration format: "-336h" (2 weeks ago) 476 + startCursor = time.Now().Add(cursor).UnixMicro() 477 + fmt.Printf("Jetstream: Starting from %s ago (%d microseconds)\n", cursorStr, startCursor) 478 + } else { 479 + fmt.Printf("Warning: Invalid JETSTREAM_START_CURSOR format: %s\n", cursorStr) 480 + } 481 + } 482 + 483 + worker := jetstream.NewWorker(database, jetstreamURL, startCursor) 484 + go func() { 485 + for { 486 + if err := worker.Start(context.Background()); err != nil { 487 + fmt.Printf("Jetstream worker error: %v, reconnecting in 10s...\n", err) 488 + time.Sleep(10 * time.Second) 489 + } 490 + } 491 + }() 492 + 493 + fmt.Println("Jetstream worker started") 445 494 446 495 return database, sessionStore, templates, router 447 496 }
+2
docker-compose.yml
··· 10 10 environment: 11 11 - ATCR_TOKEN_STORAGE_PATH=/var/lib/atcr/tokens/oauth-tokens.json 12 12 - ATCR_UI_ENABLED=true 13 + # Jetstream backfill: Replay 5 days of historical events 14 + # - JETSTREAM_START_CURSOR=-120h 13 15 volumes: 14 16 # Auth keys (JWT signing keys) 15 17 - atcr-auth:/var/lib/atcr/auth
+2 -1
go.mod
··· 30 30 github.com/go-logr/stdr v1.2.2 // indirect 31 31 github.com/gorilla/handlers v1.5.2 // indirect 32 32 github.com/gorilla/mux v1.8.1 // indirect 33 + github.com/gorilla/websocket v1.5.3 // indirect 33 34 github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect 34 35 github.com/hashicorp/golang-lru/arc/v2 v2.0.6 // indirect 35 36 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 36 37 github.com/inconshreveable/mousetrap v1.1.0 // indirect 37 38 github.com/jmespath/go-jmespath v0.4.0 // indirect 38 - github.com/klauspost/compress v1.17.11 // indirect 39 + github.com/klauspost/compress v1.18.0 // indirect 39 40 github.com/mattn/go-sqlite3 v1.14.32 // indirect 40 41 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect 41 42 github.com/opencontainers/image-spec v1.1.0 // indirect
+4
go.sum
··· 77 77 github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= 78 78 github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= 79 79 github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= 80 + github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 81 + github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 80 82 github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= 81 83 github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= 82 84 github.com/hashicorp/golang-lru/arc/v2 v2.0.6 h1:4NU7uP5vSoK6TbaMj3NtY478TTAWLso/vL1gpNrInHg= ··· 94 96 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= 95 97 github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= 96 98 github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= 99 + github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= 100 + github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= 97 101 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= 98 102 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= 99 103 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+22 -1
pkg/appview/db/queries.go
··· 2 2 3 3 import ( 4 4 "database/sql" 5 + "time" 5 6 ) 6 7 7 8 // GetRecentPushes fetches recent pushes with pagination ··· 81 82 var repos []Repository 82 83 for rows.Next() { 83 84 var r Repository 84 - if err := rows.Scan(&r.Name, &r.TagCount, &r.ManifestCount, &r.LastPush); err != nil { 85 + var lastPushStr string 86 + if err := rows.Scan(&r.Name, &r.TagCount, &r.ManifestCount, &lastPushStr); err != nil { 85 87 return nil, err 88 + } 89 + 90 + // Parse the timestamp string into time.Time 91 + if lastPushStr != "" { 92 + // Try multiple timestamp formats 93 + formats := []string{ 94 + time.RFC3339Nano, // 2006-01-02T15:04:05.999999999Z07:00 95 + "2006-01-02 15:04:05.999999999-07:00", // SQLite with microseconds and timezone 96 + "2006-01-02 15:04:05.999999999", // SQLite with microseconds 97 + time.RFC3339, // 2006-01-02T15:04:05Z07:00 98 + "2006-01-02 15:04:05", // SQLite default 99 + } 100 + 101 + for _, format := range formats { 102 + if t, err := time.Parse(format, lastPushStr); err == nil { 103 + r.LastPush = t 104 + break 105 + } 106 + } 86 107 } 87 108 88 109 // Get tags for this repo
+20 -14
pkg/appview/handlers/home.go
··· 12 12 13 13 // HomeHandler handles the home page 14 14 type HomeHandler struct { 15 - DB *sql.DB 16 - Templates *template.Template 15 + DB *sql.DB 16 + Templates *template.Template 17 + RegistryURL string 17 18 } 18 19 19 20 func (h *HomeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 20 21 data := struct { 21 - User *db.User 22 - Query string 22 + User *db.User 23 + Query string 24 + RegistryURL string 23 25 }{ 24 - User: middleware.GetUser(r), 25 - Query: r.URL.Query().Get("q"), 26 + User: middleware.GetUser(r), 27 + Query: r.URL.Query().Get("q"), 28 + RegistryURL: h.RegistryURL, 26 29 } 27 30 28 31 if err := h.Templates.ExecuteTemplate(w, "home", data); err != nil { ··· 33 36 34 37 // RecentPushesHandler handles the HTMX request for recent pushes 35 38 type RecentPushesHandler struct { 36 - DB *sql.DB 37 - Templates *template.Template 39 + DB *sql.DB 40 + Templates *template.Template 41 + RegistryURL string 38 42 } 39 43 40 44 func (h *RecentPushesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ··· 57 61 } 58 62 59 63 data := struct { 60 - Pushes []db.Push 61 - HasMore bool 62 - NextOffset int 64 + Pushes []db.Push 65 + HasMore bool 66 + NextOffset int 67 + RegistryURL string 63 68 }{ 64 - Pushes: pushes, 65 - HasMore: offset+limit < total, 66 - NextOffset: offset + limit, 69 + Pushes: pushes, 70 + HasMore: offset+limit < total, 71 + NextOffset: offset + limit, 72 + RegistryURL: h.RegistryURL, 67 73 } 68 74 69 75 if err := h.Templates.ExecuteTemplate(w, "push-list.html", data); err != nil {
+5 -2
pkg/appview/handlers/images.go
··· 12 12 13 13 // ImagesHandler handles the images management page 14 14 type ImagesHandler struct { 15 - DB *sql.DB 16 - Templates *template.Template 15 + DB *sql.DB 16 + Templates *template.Template 17 + RegistryURL string 17 18 } 18 19 19 20 func (h *ImagesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ··· 34 35 User *db.User 35 36 Repositories []db.Repository 36 37 Query string 38 + RegistryURL string 37 39 }{ 38 40 User: user, 39 41 Repositories: repos, 40 42 Query: r.URL.Query().Get("q"), 43 + RegistryURL: h.RegistryURL, 41 44 } 42 45 43 46 if err := h.Templates.ExecuteTemplate(w, "images", data); err != nil {
+53 -9
pkg/appview/handlers/settings.go
··· 1 1 package handlers 2 2 3 3 import ( 4 + "fmt" 4 5 "html/template" 5 6 "net/http" 6 7 "time" 7 8 8 9 "atcr.io/pkg/appview/db" 9 10 "atcr.io/pkg/appview/middleware" 11 + "atcr.io/pkg/atproto" 12 + "atcr.io/pkg/auth/oauth" 10 13 ) 11 14 12 15 // SettingsHandler handles the settings page 13 16 type SettingsHandler struct { 14 17 Templates *template.Template 15 - // TODO: Add ATProto client when implementing profile fetching 18 + Refresher *oauth.Refresher 16 19 } 17 20 18 21 func (h *SettingsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 19 22 user := middleware.GetUser(r) 20 23 if user == nil { 21 - http.Redirect(w, r, "/auth/oauth/login?return_to=/ui/settings", http.StatusFound) 24 + http.Redirect(w, r, "/auth/oauth/login?return_to=/settings", http.StatusFound) 22 25 return 23 26 } 24 27 25 - // TODO: Fetch actual profile from PDS using ATProto client 26 - // For now, using mock data from session 28 + // Get access token and DPoP transport for the user 29 + accessToken, _, dpopTransport, err := h.Refresher.GetAccessToken(r.Context(), user.DID) 30 + if err != nil { 31 + http.Error(w, "Failed to get access token: "+err.Error(), http.StatusInternalServerError) 32 + return 33 + } 34 + 35 + // Create ATProto client with DPoP transport 36 + client := atproto.NewClientWithDPoP(user.PDSEndpoint, user.DID, accessToken, nil, dpopTransport) 37 + 38 + // Fetch sailor profile 39 + profile, err := atproto.GetProfile(r.Context(), client) 40 + if err != nil { 41 + // Log error but don't fail - profile might not exist yet 42 + fmt.Printf("WARNING [settings]: Failed to fetch profile for %s: %v\n", user.DID, err) 43 + profile = &atproto.SailorProfileRecord{} 44 + } else { 45 + fmt.Printf("DEBUG [settings]: Fetched profile for %s: defaultHold=%s\n", user.DID, profile.DefaultHold) 46 + } 47 + 27 48 data := struct { 28 49 User *db.User 29 50 Profile struct { ··· 43 64 data.Profile.Handle = user.Handle 44 65 data.Profile.DID = user.DID 45 66 data.Profile.PDSEndpoint = user.PDSEndpoint 46 - // data.Profile.DefaultHold will be empty for now 67 + data.Profile.DefaultHold = profile.DefaultHold 47 68 48 69 if err := h.Templates.ExecuteTemplate(w, "settings", data); err != nil { 49 70 http.Error(w, err.Error(), http.StatusInternalServerError) ··· 53 74 54 75 // UpdateDefaultHoldHandler handles updating the default hold 55 76 type UpdateDefaultHoldHandler struct { 56 - // TODO: Add ATProto client for updating profile 77 + Refresher *oauth.Refresher 57 78 } 58 79 59 80 func (h *UpdateDefaultHoldHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ··· 65 86 66 87 holdEndpoint := r.FormValue("hold_endpoint") 67 88 68 - // TODO: Update profile in PDS via ATProto client 69 - // For now, just return success 70 - _ = holdEndpoint 89 + // Get access token and DPoP transport for the user 90 + accessToken, _, dpopTransport, err := h.Refresher.GetAccessToken(r.Context(), user.DID) 91 + if err != nil { 92 + http.Error(w, "Failed to get access token: "+err.Error(), http.StatusInternalServerError) 93 + return 94 + } 95 + 96 + // Create ATProto client with DPoP transport 97 + client := atproto.NewClientWithDPoP(user.PDSEndpoint, user.DID, accessToken, nil, dpopTransport) 98 + 99 + // Fetch existing profile or create new one 100 + profile, err := atproto.GetProfile(r.Context(), client) 101 + if err != nil || profile == nil { 102 + // Profile doesn't exist, create new one 103 + profile = atproto.NewSailorProfileRecord(holdEndpoint) 104 + } else { 105 + // Update existing profile 106 + profile.DefaultHold = holdEndpoint 107 + profile.UpdatedAt = time.Now() 108 + } 109 + 110 + // Save profile 111 + if err := atproto.UpdateProfile(r.Context(), client, profile); err != nil { 112 + http.Error(w, "Failed to update profile: "+err.Error(), http.StatusInternalServerError) 113 + return 114 + } 71 115 72 116 w.Header().Set("Content-Type", "text/html") 73 117 w.Write([]byte(`<div class="success">✓ Default hold updated successfully!</div>`))
+383
pkg/appview/jetstream/worker.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "net/url" 9 + "strings" 10 + "time" 11 + 12 + "atcr.io/pkg/appview/db" 13 + "atcr.io/pkg/atproto" 14 + "github.com/gorilla/websocket" 15 + "github.com/klauspost/compress/zstd" 16 + ) 17 + 18 + // UserCache caches DID -> handle/PDS mappings to avoid repeated lookups 19 + type UserCache struct { 20 + cache map[string]*db.User 21 + } 22 + 23 + // Worker consumes Jetstream events and populates the UI database 24 + type Worker struct { 25 + db *sql.DB 26 + jetstreamURL string 27 + startCursor int64 28 + wantedCollections []string 29 + debugCollectionCount int 30 + userCache *UserCache 31 + resolver *atproto.Resolver 32 + } 33 + 34 + // NewWorker creates a new Jetstream worker 35 + // startCursor: Unix microseconds timestamp to start from (0 = start from now) 36 + func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker { 37 + if jetstreamURL == "" { 38 + jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe" 39 + } 40 + 41 + return &Worker{ 42 + db: database, 43 + jetstreamURL: jetstreamURL, 44 + startCursor: startCursor, 45 + wantedCollections: []string{ 46 + atproto.ManifestCollection, // io.atcr.manifest 47 + atproto.TagCollection, // io.atcr.tag 48 + }, 49 + userCache: &UserCache{ 50 + cache: make(map[string]*db.User), 51 + }, 52 + resolver: atproto.NewResolver(), 53 + } 54 + } 55 + 56 + // Start begins consuming Jetstream events 57 + // This is a blocking function that runs until the context is cancelled 58 + func (w *Worker) Start(ctx context.Context) error { 59 + // Build connection URL with filters 60 + u, err := url.Parse(w.jetstreamURL) 61 + if err != nil { 62 + return fmt.Errorf("invalid jetstream URL: %w", err) 63 + } 64 + 65 + q := u.Query() 66 + for _, collection := range w.wantedCollections { 67 + q.Add("wantedCollections", collection) 68 + } 69 + 70 + // Add cursor if specified (for backfilling historical data) 71 + if w.startCursor > 0 { 72 + q.Set("cursor", fmt.Sprintf("%d", w.startCursor)) 73 + fmt.Printf("Starting from cursor: %d (replaying historical events)\n", w.startCursor) 74 + } 75 + 76 + // Disable compression for now to debug 77 + // q.Set("compress", "true") 78 + u.RawQuery = q.Encode() 79 + 80 + fmt.Printf("Connecting to Jetstream: %s\n", u.String()) 81 + 82 + // Connect to Jetstream 83 + conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) 84 + if err != nil { 85 + return fmt.Errorf("failed to connect to jetstream: %w", err) 86 + } 87 + defer conn.Close() 88 + 89 + // Create zstd decoder for decompressing messages 90 + decoder, err := zstd.NewReader(nil) 91 + if err != nil { 92 + return fmt.Errorf("failed to create zstd decoder: %w", err) 93 + } 94 + defer decoder.Close() 95 + 96 + fmt.Println("Connected to Jetstream, listening for events...") 97 + 98 + // Start heartbeat ticker to show Jetstream is alive 99 + heartbeatTicker := time.NewTicker(30 * time.Second) 100 + defer heartbeatTicker.Stop() 101 + 102 + eventCount := 0 103 + lastHeartbeat := time.Now() 104 + 105 + // Read messages 106 + for { 107 + select { 108 + case <-ctx.Done(): 109 + return ctx.Err() 110 + case <-heartbeatTicker.C: 111 + elapsed := time.Since(lastHeartbeat) 112 + fmt.Printf("Jetstream: Alive (processed %d events in last %.0fs)\n", eventCount, elapsed.Seconds()) 113 + eventCount = 0 114 + lastHeartbeat = time.Now() 115 + default: 116 + _, message, err := conn.ReadMessage() 117 + if err != nil { 118 + return fmt.Errorf("failed to read message: %w", err) 119 + } 120 + 121 + // For now, process uncompressed messages 122 + // TODO: Re-enable compression once debugging is complete 123 + _ = decoder // Keep decoder to avoid unused variable error 124 + 125 + if err := w.processMessage(message); err != nil { 126 + fmt.Printf("ERROR processing message: %v\n", err) 127 + // Continue processing other messages 128 + } else { 129 + eventCount++ 130 + } 131 + } 132 + } 133 + } 134 + 135 + // processMessage processes a single Jetstream event 136 + func (w *Worker) processMessage(message []byte) error { 137 + var event JetstreamEvent 138 + if err := json.Unmarshal(message, &event); err != nil { 139 + return fmt.Errorf("failed to unmarshal event: %w", err) 140 + } 141 + 142 + // Only process commit events 143 + if event.Kind != "commit" { 144 + return nil 145 + } 146 + 147 + commit := event.Commit 148 + if commit == nil { 149 + return nil 150 + } 151 + 152 + // Set DID on commit from parent event 153 + commit.DID = event.DID 154 + 155 + // Debug: log first few collections we see to understand what's coming through 156 + if w.debugCollectionCount < 5 { 157 + fmt.Printf("Jetstream DEBUG: Received collection=%s, did=%s\n", commit.Collection, commit.DID) 158 + w.debugCollectionCount++ 159 + } 160 + 161 + // Process based on collection 162 + switch commit.Collection { 163 + case atproto.ManifestCollection: 164 + fmt.Printf("Jetstream: Processing manifest event: did=%s, operation=%s, rkey=%s\n", 165 + commit.DID, commit.Operation, commit.RKey) 166 + return w.processManifest(commit) 167 + case atproto.TagCollection: 168 + fmt.Printf("Jetstream: Processing tag event: did=%s, operation=%s, rkey=%s\n", 169 + commit.DID, commit.Operation, commit.RKey) 170 + return w.processTag(commit) 171 + default: 172 + // Ignore other collections 173 + return nil 174 + } 175 + } 176 + 177 + // ensureUser resolves and upserts a user by DID 178 + func (w *Worker) ensureUser(ctx context.Context, did string) error { 179 + // Check cache first 180 + if user, ok := w.userCache.cache[did]; ok { 181 + // Update last seen 182 + user.LastSeen = time.Now() 183 + return db.UpsertUser(w.db, user) 184 + } 185 + 186 + // Resolve DID to get handle and PDS endpoint 187 + resolvedDID, pdsEndpoint, err := w.resolver.ResolveIdentity(ctx, did) 188 + if err != nil { 189 + fmt.Printf("WARNING: Failed to resolve DID %s: %v (using DID as handle)\n", did, err) 190 + // Fallback: use DID as handle 191 + resolvedDID = did 192 + pdsEndpoint = "https://bsky.social" // Default PDS endpoint as fallback 193 + } 194 + 195 + // Get handle from DID document 196 + handle, err := w.resolver.ResolveHandleFromDID(ctx, resolvedDID) 197 + if err != nil { 198 + fmt.Printf("WARNING: Failed to get handle for DID %s: %v (using DID as handle)\n", resolvedDID, err) 199 + handle = resolvedDID // Fallback to DID 200 + } 201 + 202 + // Cache the user 203 + user := &db.User{ 204 + DID: resolvedDID, 205 + Handle: handle, 206 + PDSEndpoint: pdsEndpoint, 207 + LastSeen: time.Now(), 208 + } 209 + w.userCache.cache[did] = user 210 + 211 + // Upsert to database 212 + return db.UpsertUser(w.db, user) 213 + } 214 + 215 + // processManifest processes a manifest commit event 216 + func (w *Worker) processManifest(commit *CommitEvent) error { 217 + // Resolve and upsert user with handle/PDS endpoint 218 + if err := w.ensureUser(context.Background(), commit.DID); err != nil { 219 + return fmt.Errorf("failed to ensure user: %w", err) 220 + } 221 + 222 + if commit.Operation == "delete" { 223 + // Delete manifest 224 + repo := extractRepoFromRKey(commit.RKey) 225 + digest := commit.RKey 226 + return db.DeleteManifest(w.db, commit.DID, repo, digest) 227 + } 228 + 229 + // Parse manifest record 230 + var manifestRecord atproto.ManifestRecord 231 + if commit.Record != nil { 232 + recordBytes, err := json.Marshal(commit.Record) 233 + if err != nil { 234 + return fmt.Errorf("failed to marshal record: %w", err) 235 + } 236 + if err := json.Unmarshal(recordBytes, &manifestRecord); err != nil { 237 + return fmt.Errorf("failed to unmarshal manifest: %w", err) 238 + } 239 + } else { 240 + // No record data, can't process 241 + return nil 242 + } 243 + 244 + // Serialize full manifest as JSON for storage 245 + manifestJSON, err := json.Marshal(manifestRecord) 246 + if err != nil { 247 + return fmt.Errorf("failed to marshal manifest: %w", err) 248 + } 249 + 250 + // Insert manifest 251 + manifestID, err := db.InsertManifest(w.db, &db.Manifest{ 252 + DID: commit.DID, 253 + Repository: manifestRecord.Repository, 254 + Digest: manifestRecord.Digest, 255 + MediaType: manifestRecord.MediaType, 256 + SchemaVersion: manifestRecord.SchemaVersion, 257 + ConfigDigest: manifestRecord.Config.Digest, 258 + ConfigSize: manifestRecord.Config.Size, 259 + RawManifest: string(manifestJSON), 260 + HoldEndpoint: manifestRecord.HoldEndpoint, 261 + CreatedAt: manifestRecord.CreatedAt, 262 + }) 263 + if err != nil { 264 + return fmt.Errorf("failed to insert manifest: %w", err) 265 + } 266 + 267 + // Insert layers 268 + for i, layer := range manifestRecord.Layers { 269 + if err := db.InsertLayer(w.db, &db.Layer{ 270 + ManifestID: manifestID, 271 + Digest: layer.Digest, 272 + MediaType: layer.MediaType, 273 + Size: layer.Size, 274 + LayerIndex: i, 275 + }); err != nil { 276 + // Continue on error - layer might already exist 277 + continue 278 + } 279 + } 280 + 281 + return nil 282 + } 283 + 284 + // processTag processes a tag commit event 285 + func (w *Worker) processTag(commit *CommitEvent) error { 286 + // Resolve and upsert user with handle/PDS endpoint 287 + if err := w.ensureUser(context.Background(), commit.DID); err != nil { 288 + return fmt.Errorf("failed to ensure user: %w", err) 289 + } 290 + 291 + if commit.Operation == "delete" { 292 + // Delete tag 293 + parts := strings.Split(commit.RKey, "/") 294 + if len(parts) < 2 { 295 + return fmt.Errorf("invalid tag rkey: %s", commit.RKey) 296 + } 297 + repo := strings.Join(parts[:len(parts)-1], "/") 298 + tag := parts[len(parts)-1] 299 + return db.DeleteTag(w.db, commit.DID, repo, tag) 300 + } 301 + 302 + // Parse tag record 303 + var tagRecord atproto.TagRecord 304 + if commit.Record != nil { 305 + recordBytes, err := json.Marshal(commit.Record) 306 + if err != nil { 307 + return fmt.Errorf("failed to marshal record: %w", err) 308 + } 309 + if err := json.Unmarshal(recordBytes, &tagRecord); err != nil { 310 + return fmt.Errorf("failed to unmarshal tag: %w", err) 311 + } 312 + } else { 313 + return nil 314 + } 315 + 316 + // Insert or update tag 317 + return db.UpsertTag(w.db, &db.Tag{ 318 + DID: commit.DID, 319 + Repository: tagRecord.Repository, 320 + Tag: tagRecord.Tag, 321 + Digest: tagRecord.ManifestDigest, 322 + CreatedAt: tagRecord.UpdatedAt, 323 + }) 324 + } 325 + 326 + // JetstreamEvent represents a Jetstream event 327 + type JetstreamEvent struct { 328 + DID string `json:"did"` 329 + TimeUS int64 `json:"time_us"` 330 + Kind string `json:"kind"` // "commit", "identity", "account" 331 + Commit *CommitEvent `json:"commit,omitempty"` 332 + Identity *IdentityInfo `json:"identity,omitempty"` 333 + Account *AccountInfo `json:"account,omitempty"` 334 + } 335 + 336 + // CommitEvent represents a commit event (create/update/delete) 337 + type CommitEvent struct { 338 + Rev string `json:"rev"` 339 + Operation string `json:"operation"` // "create", "update", "delete" 340 + Collection string `json:"collection"` 341 + RKey string `json:"rkey"` 342 + Record map[string]interface{} `json:"record,omitempty"` 343 + CID string `json:"cid,omitempty"` 344 + DID string `json:"-"` // Set from parent event 345 + } 346 + 347 + // IdentityInfo represents an identity event 348 + type IdentityInfo struct { 349 + DID string `json:"did"` 350 + Handle string `json:"handle"` 351 + Seq int64 `json:"seq"` 352 + Time string `json:"time"` 353 + } 354 + 355 + // AccountInfo represents an account status event 356 + type AccountInfo struct { 357 + Active bool `json:"active"` 358 + DID string `json:"did"` 359 + Seq int64 `json:"seq"` 360 + Time string `json:"time"` 361 + Status string `json:"status,omitempty"` 362 + } 363 + 364 + // Helper functions 365 + 366 + func extractRepoFromRKey(rkey string) string { 367 + // RKey format: <digest> or <repo>/<digest> 368 + // For manifest, it's just the digest 369 + parts := strings.Split(rkey, "/") 370 + if len(parts) > 1 { 371 + return parts[0] 372 + } 373 + return "" 374 + } 375 + 376 + func calculateManifestSize(manifest *atproto.ManifestRecord) int64 { 377 + var total int64 378 + total += manifest.Config.Size 379 + for _, layer := range manifest.Layers { 380 + total += layer.Size 381 + } 382 + return total 383 + }
+6 -4
pkg/appview/middleware/auth.go
··· 29 29 } 30 30 31 31 user := &db.User{ 32 - DID: sess.DID, 33 - Handle: sess.Handle, 32 + DID: sess.DID, 33 + Handle: sess.Handle, 34 + PDSEndpoint: sess.PDSEndpoint, 34 35 } 35 36 36 37 ctx := context.WithValue(r.Context(), userKey, user) ··· 47 48 if ok { 48 49 if sess, ok := store.Get(sessionID); ok { 49 50 user := &db.User{ 50 - DID: sess.DID, 51 - Handle: sess.Handle, 51 + DID: sess.DID, 52 + Handle: sess.Handle, 53 + PDSEndpoint: sess.PDSEndpoint, 52 54 } 53 55 ctx := context.WithValue(r.Context(), userKey, user) 54 56 r = r.WithContext(ctx)
+92 -13
pkg/appview/session/session.go
··· 3 3 import ( 4 4 "crypto/rand" 5 5 "encoding/base64" 6 + "encoding/json" 7 + "fmt" 6 8 "net/http" 9 + "os" 7 10 "sync" 8 11 "time" 9 12 ) 10 13 11 14 // Session represents a user session 12 15 type Session struct { 13 - ID string 14 - DID string 15 - Handle string 16 - ExpiresAt time.Time 16 + ID string 17 + DID string 18 + Handle string 19 + PDSEndpoint string 20 + ExpiresAt time.Time 17 21 } 18 22 19 23 // Store manages user sessions 20 24 type Store struct { 21 25 mu sync.RWMutex 22 26 sessions map[string]*Session 27 + filePath string 23 28 } 24 29 25 - // NewStore creates a new session store 26 - func NewStore() *Store { 27 - return &Store{ 30 + // NewStore creates a new session store with file persistence 31 + func NewStore(filePath string) *Store { 32 + store := &Store{ 28 33 sessions: make(map[string]*Session), 34 + filePath: filePath, 29 35 } 36 + 37 + // Load existing sessions from file 38 + if err := store.load(); err != nil { 39 + fmt.Printf("Warning: Failed to load sessions from %s: %v\n", filePath, err) 40 + } 41 + 42 + return store 30 43 } 31 44 32 - // Create creates a new session and returns the full Session struct 33 - func (s *Store) Create(did, handle string, duration time.Duration) (string, error) { 45 + // load reads sessions from disk 46 + func (s *Store) load() error { 47 + if s.filePath == "" { 48 + return nil 49 + } 50 + 51 + data, err := os.ReadFile(s.filePath) 52 + if err != nil { 53 + if os.IsNotExist(err) { 54 + return nil // File doesn't exist yet, that's fine 55 + } 56 + return err 57 + } 58 + 59 + var sessions map[string]*Session 60 + if err := json.Unmarshal(data, &sessions); err != nil { 61 + return err 62 + } 63 + 64 + // Filter out expired sessions 65 + now := time.Now() 66 + for id, sess := range sessions { 67 + if now.Before(sess.ExpiresAt) { 68 + s.sessions[id] = sess 69 + } 70 + } 71 + 72 + fmt.Printf("Loaded %d active sessions from disk\n", len(s.sessions)) 73 + return nil 74 + } 75 + 76 + // save writes sessions to disk 77 + func (s *Store) save() error { 78 + if s.filePath == "" { 79 + return nil 80 + } 81 + 82 + data, err := json.Marshal(s.sessions) 83 + if err != nil { 84 + return err 85 + } 86 + 87 + return os.WriteFile(s.filePath, data, 0600) 88 + } 89 + 90 + // Create creates a new session and returns the session ID 91 + func (s *Store) Create(did, handle, pdsEndpoint string, duration time.Duration) (string, error) { 34 92 s.mu.Lock() 35 93 defer s.mu.Unlock() 36 94 ··· 41 99 } 42 100 43 101 sess := &Session{ 44 - ID: base64.URLEncoding.EncodeToString(b), 45 - DID: did, 46 - Handle: handle, 47 - ExpiresAt: time.Now().Add(duration), 102 + ID: base64.URLEncoding.EncodeToString(b), 103 + DID: did, 104 + Handle: handle, 105 + PDSEndpoint: pdsEndpoint, 106 + ExpiresAt: time.Now().Add(duration), 48 107 } 49 108 50 109 s.sessions[sess.ID] = sess 110 + 111 + // Save to disk 112 + if err := s.save(); err != nil { 113 + fmt.Printf("Warning: Failed to save sessions to disk: %v\n", err) 114 + } 115 + 51 116 return sess.ID, nil 52 117 } 53 118 ··· 70 135 defer s.mu.Unlock() 71 136 72 137 delete(s.sessions, id) 138 + 139 + // Save to disk 140 + if err := s.save(); err != nil { 141 + fmt.Printf("Warning: Failed to save sessions to disk: %v\n", err) 142 + } 73 143 } 74 144 75 145 // Cleanup removes expired sessions ··· 78 148 defer s.mu.Unlock() 79 149 80 150 now := time.Now() 151 + deleted := 0 81 152 for id, sess := range s.sessions { 82 153 if now.After(sess.ExpiresAt) { 83 154 delete(s.sessions, id) 155 + deleted++ 156 + } 157 + } 158 + 159 + if deleted > 0 { 160 + // Save to disk 161 + if err := s.save(); err != nil { 162 + fmt.Printf("Warning: Failed to save sessions to disk: %v\n", err) 84 163 } 85 164 } 86 165 }
+14
pkg/appview/static/js/app.js
··· 58 58 59 59 // Update timestamps periodically 60 60 setInterval(updateTimestamps, 60000); // Every minute 61 + 62 + // Toggle repository details (for images page) 63 + function toggleRepo(name) { 64 + const details = document.getElementById('repo-' + name); 65 + const btn = document.getElementById('btn-' + name); 66 + 67 + if (details.style.display === 'none') { 68 + details.style.display = 'block'; 69 + btn.textContent = '▲'; 70 + } else { 71 + details.style.display = 'none'; 72 + btn.textContent = '▼'; 73 + } 74 + }
+10 -25
pkg/appview/templates/pages/images.html
··· 7 7 <title>Your Images - ATCR</title> 8 8 <link rel="stylesheet" href="/static/css/style.css"> 9 9 <script src="https://unpkg.com/htmx.org@1.9.10"></script> 10 + <script src="/static/js/app.js"></script> 10 11 </head> 11 12 <body> 12 13 {{ template "nav" . }} ··· 17 18 18 19 {{ if .Repositories }} 19 20 {{ range .Repositories }} 21 + {{ $repoName := .Name }} 20 22 <div class="repository-card"> 21 - <div class="repo-header" onclick="toggleRepo('{{ .Name }}')"> 23 + <div class="repo-header" onclick="toggleRepo('{{ $repoName }}')"> 22 24 <div> 23 - <h2>{{ .Name }}</h2> 25 + <h2>{{ $repoName }}</h2> 24 26 <div class="repo-stats"> 25 27 <span>{{ .TagCount }} tags</span> 26 28 <span>•</span> ··· 31 33 </time> 32 34 </div> 33 35 </div> 34 - <button class="expand-btn" id="btn-{{ .Name }}">▼</button> 36 + <button class="expand-btn" id="btn-{{ $repoName }}">▼</button> 35 37 </div> 36 38 37 - <div id="repo-{{ .Name }}" class="repo-details" style="display: none;"> 39 + <div id="repo-{{ $repoName }}" class="repo-details" style="display: none;"> 38 40 <!-- Tags Section --> 39 41 <div class="tags-section"> 40 42 <h3>Tags</h3> 41 43 {{ if .Tags }} 42 44 {{ range .Tags }} 43 - <div class="tag-row" id="tag-{{ $.Name }}-{{ .Tag }}"> 45 + <div class="tag-row" id="tag-{{ $repoName }}-{{ .Tag }}"> 44 46 <span class="tag-name">{{ .Tag }}</span> 45 47 <span class="tag-arrow">→</span> 46 48 <code class="tag-digest">{{ truncateDigest .Digest 12 }}</code> ··· 49 51 </time> 50 52 51 53 <button class="delete-btn" 52 - hx-delete="/api/images/{{ $.Name }}/tags/{{ .Tag }}" 54 + hx-delete="/api/images/{{ $repoName }}/tags/{{ .Tag }}" 53 55 hx-confirm="Delete tag {{ .Tag }}?" 54 - hx-target="#tag-{{ $.Name }}-{{ .Tag }}" 56 + hx-target="#tag-{{ $repoName }}-{{ .Tag }}" 55 57 hx-swap="outerHTML"> 56 58 🗑️ 57 59 </button> ··· 85 87 {{ else }} 86 88 <div class="empty-state"> 87 89 <p>No images yet. Push your first image:</p> 88 - <pre><code>docker push atcr.io/{{ .User.Handle }}/myapp:latest</code></pre> 90 + <pre><code>docker push {{ .RegistryURL }}/{{ .User.Handle }}/myapp:latest</code></pre> 89 91 </div> 90 92 {{ end }} 91 93 </div> ··· 93 95 94 96 <!-- Modal container for HTMX --> 95 97 <div id="modal"></div> 96 - 97 - <script src="/static/js/app.js"></script> 98 - <script> 99 - // Toggle repository details 100 - function toggleRepo(name) { 101 - const details = document.getElementById('repo-' + name); 102 - const btn = document.getElementById('btn-' + name); 103 - 104 - if (details.style.display === 'none') { 105 - details.style.display = 'block'; 106 - btn.textContent = '▲'; 107 - } else { 108 - details.style.display = 'none'; 109 - btn.textContent = '▼'; 110 - } 111 - } 112 - </script> 113 98 </body> 114 99 </html> 115 100 {{ end }}
+3 -3
pkg/appview/templates/partials/push-list.html
··· 19 19 </div> 20 20 21 21 <div class="push-command"> 22 - <code class="pull-command">docker pull atcr.io/{{ .Handle }}/{{ .Repository }}:{{ .Tag }}</code> 23 - <button class="copy-btn" onclick="copyToClipboard('docker pull atcr.io/{{ .Handle }}/{{ .Repository }}:{{ .Tag }}')"> 22 + <code class="pull-command">docker pull {{ $.RegistryURL }}/{{ .Handle }}/{{ .Repository }}:{{ .Tag }}</code> 23 + <button class="copy-btn" onclick="copyToClipboard('docker pull {{ $.RegistryURL }}/{{ .Handle }}/{{ .Repository }}:{{ .Tag }}')"> 24 24 📋 Copy 25 25 </button> 26 26 </div> ··· 39 39 {{ if eq (len .Pushes) 0 }} 40 40 <div class="empty-state"> 41 41 <p>No pushes yet. Start using ATCR by pushing your first image!</p> 42 - <pre><code>docker push atcr.io/yourhandle/myapp:latest</code></pre> 42 + <pre><code>docker push {{ .RegistryURL }}/yourhandle/myapp:latest</code></pre> 43 43 </div> 44 44 {{ end }}
+15 -10
pkg/auth/oauth/refresher.go
··· 12 12 type AccessTokenEntry struct { 13 13 Token string 14 14 DPoPKey *ecdsa.PrivateKey 15 - Transport *DPoPTransport // Cache the transport to preserve nonce across requests 15 + PDS string // Store PDS endpoint to create fresh transports 16 16 ExpiresAt time.Time 17 17 } 18 18 ··· 46 46 r.mu.RUnlock() 47 47 48 48 if ok && time.Now().Before(entry.ExpiresAt) { 49 - // Token still valid 50 - return entry.Token, entry.DPoPKey, entry.Transport, nil 49 + // Token still valid - create fresh transport to avoid nonce reuse 50 + transport := NewDPoPTransport(nil, entry.DPoPKey) 51 + transport.SetAccessToken(entry.Token) 52 + return entry.Token, entry.DPoPKey, transport, nil 51 53 } 52 54 53 55 // Token expired or not cached, need to refresh ··· 70 72 r.mu.RUnlock() 71 73 72 74 if ok && time.Now().Before(entry.ExpiresAt) { 73 - // Token was refreshed while we waited for the lock 74 - return entry.Token, entry.DPoPKey, entry.Transport, nil 75 + // Token was refreshed while we waited for the lock - create fresh transport 76 + transport := NewDPoPTransport(nil, entry.DPoPKey) 77 + transport.SetAccessToken(entry.Token) 78 + return entry.Token, entry.DPoPKey, transport, nil 75 79 } 76 80 77 81 // Actually refresh the token ··· 122 126 } 123 127 } 124 128 125 - // Get DPoP transport (already has access token set by client.RefreshToken) 126 - dpopTransport := client.DPoPTransport() 127 - 128 - // Cache the access token and transport 129 + // Cache the access token (but not transport - create fresh each time) 129 130 // Expire 1 minute early to avoid edge cases 130 131 expiresAt := token.Expiry.Add(-1 * time.Minute) 131 132 ··· 133 134 r.accessTokens[did] = &AccessTokenEntry{ 134 135 Token: token.AccessToken, 135 136 DPoPKey: dpopKey, 136 - Transport: dpopTransport, // Cache transport to preserve nonce 137 + PDS: entry.PDS, 137 138 ExpiresAt: expiresAt, 138 139 } 139 140 r.mu.Unlock() 141 + 142 + // Create fresh transport for this request 143 + dpopTransport := NewDPoPTransport(nil, dpopKey) 144 + dpopTransport.SetAccessToken(token.AccessToken) 140 145 141 146 return token.AccessToken, dpopKey, dpopTransport, nil 142 147 }
+3 -3
pkg/auth/oauth/server.go
··· 16 16 17 17 // UISessionStore is the interface for UI session management 18 18 type UISessionStore interface { 19 - Create(did, handle string, duration time.Duration) (string, error) 19 + Create(did, handle, pdsEndpoint string, duration time.Duration) (string, error) 20 20 } 21 21 22 22 // Server handles OAuth authorization for the AppView ··· 172 172 173 173 // Check if this is a UI login (has oauth_return_to cookie) 174 174 if cookie, err := r.Cookie("oauth_return_to"); err == nil && s.uiSessionStore != nil { 175 - // Create UI session 176 - sessionID, err := s.uiSessionStore.Create(oauthState.DID, oauthState.Handle, 24*time.Hour) 175 + // Create UI session with PDS endpoint 176 + sessionID, err := s.uiSessionStore.Create(oauthState.DID, oauthState.Handle, oauthState.PDSEndpoint, 24*time.Hour) 177 177 if err != nil { 178 178 s.renderError(w, fmt.Sprintf("Failed to create UI session: %v", err)) 179 179 return