A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
80
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix pull counts, add pingpong to jetstream

+122 -16
+3 -3
Dockerfile
··· 47 47 LABEL org.opencontainers.image.title="ATCR AppView" \ 48 48 org.opencontainers.image.description="ATProto Container Registry - OCI-compliant registry using AT Protocol for manifest storage" \ 49 49 org.opencontainers.image.authors="ATCR Contributors" \ 50 - org.opencontainers.image.source="https://github.com/example/atcr" \ 51 - org.opencontainers.image.documentation="https://atcr.io/docs" \ 50 + org.opencontainers.image.source="https://tangled.org/@evan.jarrett.net/at-container-registry" \ 51 + org.opencontainers.image.documentation="https://tangled.org/@evan.jarrett.net/at-container-registry" \ 52 52 org.opencontainers.image.licenses="MIT" \ 53 53 org.opencontainers.image.version="0.1.0" \ 54 - io.atcr.icon="https://atcr.io/images/appview-icon.png" 54 + io.atcr.icon="https://imgs.blue/evan.jarrett.net/1TpTNrRelfloN2emuWZDrWmPT0o93bAjEnozjD6UPgoVV9m4" 55 55 56 56 # Run the AppView 57 57 ENTRYPOINT ["/app/atcr-appview"]
+3 -3
Dockerfile.hold
··· 36 36 LABEL org.opencontainers.image.title="ATCR Hold Service" \ 37 37 org.opencontainers.image.description="ATCR Hold Service - Bring Your Own Storage component for ATCR" \ 38 38 org.opencontainers.image.authors="ATCR Contributors" \ 39 - org.opencontainers.image.source="https://github.com/example/atcr" \ 40 - org.opencontainers.image.documentation="https://atcr.io/docs/hold" \ 39 + org.opencontainers.image.source="https://tangled.org/@evan.jarrett.net/at-container-registry" \ 40 + org.opencontainers.image.documentation="https://tangled.org/@evan.jarrett.net/at-container-registry" \ 41 41 org.opencontainers.image.licenses="MIT" \ 42 42 org.opencontainers.image.version="0.1.0" \ 43 - io.atcr.icon="https://atcr.io/images/hold-icon.png" 43 + io.atcr.icon="https://imgs.blue/evan.jarrett.net/1TpTOdtS60GdJWBYEqtK22y688jajbQ9a5kbYRFtwuqrkBAE" 44 44 45 45 # Run the hold service 46 46 ENTRYPOINT ["./atcr-hold"]
+104
pkg/appview/jetstream/worker.go
··· 7 7 "fmt" 8 8 "net/url" 9 9 "strings" 10 + "sync" 10 11 "time" 11 12 12 13 "github.com/bluesky-social/indigo/atproto/identity" ··· 37 38 directory identity.Directory 38 39 eventCallback EventCallback 39 40 connStartTime time.Time // Track when connection started for debugging 41 + 42 + // Ping/pong tracking for connection health 43 + pingsSent int64 44 + pongsReceived int64 45 + lastPongTime time.Time 46 + pongMutex sync.Mutex 40 47 } 41 48 42 49 // NewWorker creates a new Jetstream worker ··· 98 105 // Track connection start time for debugging 99 106 w.connStartTime = time.Now() 100 107 108 + // Reset ping/pong counters for this connection 109 + w.pongMutex.Lock() 110 + w.pingsSent = 0 111 + w.pongsReceived = 0 112 + w.lastPongTime = time.Now() 113 + w.pongMutex.Unlock() 114 + 115 + // Set up pong handler - called when server responds to our ping 116 + conn.SetPongHandler(func(appData string) error { 117 + w.pongMutex.Lock() 118 + w.pongsReceived++ 119 + w.lastPongTime = time.Now() 120 + w.pongMutex.Unlock() 121 + 122 + // Reset read deadline - we know connection is alive 123 + // Allow 90 seconds for next pong (3x ping interval) 124 + conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 125 + return nil 126 + }) 127 + 128 + // Set initial read deadline 129 + conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 130 + 101 131 // Create zstd decoder for decompressing messages 102 132 decoder, err := zstd.NewReader(nil) 103 133 if err != nil { ··· 111 141 heartbeatTicker := time.NewTicker(30 * time.Second) 112 142 defer heartbeatTicker.Stop() 113 143 144 + // Start ping ticker for keepalive 145 + pingTicker := time.NewTicker(30 * time.Second) 146 + defer pingTicker.Stop() 147 + 148 + // Start ping sender goroutine 149 + pingDone := make(chan struct{}) 150 + defer close(pingDone) 151 + 152 + go func() { 153 + for { 154 + select { 155 + case <-ctx.Done(): 156 + return 157 + case <-pingDone: 158 + return 159 + case <-pingTicker.C: 160 + // Check if we've received a pong recently 161 + w.pongMutex.Lock() 162 + timeSinceLastPong := time.Since(w.lastPongTime) 163 + pingsTotal := w.pingsSent 164 + pongsTotal := w.pongsReceived 165 + w.pongMutex.Unlock() 166 + 167 + // If no pong for 60 seconds, connection is likely dead 168 + if timeSinceLastPong > 60*time.Second { 169 + fmt.Printf("Jetstream: No pong received for %s (sent %d pings, got %d pongs), closing connection\n", 170 + timeSinceLastPong, pingsTotal, pongsTotal) 171 + conn.Close() 172 + return 173 + } 174 + 175 + // Send ping with write deadline 176 + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) 177 + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 178 + fmt.Printf("Jetstream: Failed to send ping: %v\n", err) 179 + conn.Close() 180 + return 181 + } 182 + 183 + w.pongMutex.Lock() 184 + w.pingsSent++ 185 + w.pongMutex.Unlock() 186 + } 187 + } 188 + }() 189 + 114 190 eventCount := 0 115 191 lastHeartbeat := time.Now() 116 192 ··· 131 207 connDuration := time.Since(w.connStartTime) 132 208 timeSinceLastEvent := time.Since(lastHeartbeat) 133 209 210 + // Get ping/pong stats 211 + w.pongMutex.Lock() 212 + pingsTotal := w.pingsSent 213 + pongsTotal := w.pongsReceived 214 + timeSinceLastPong := time.Since(w.lastPongTime) 215 + w.pongMutex.Unlock() 216 + 217 + // Calculate ping/pong success rate 218 + var pongRate float64 219 + if pingsTotal > 0 { 220 + pongRate = float64(pongsTotal) / float64(pingsTotal) * 100 221 + } 222 + 223 + // Determine diagnosis 224 + var diagnosis string 225 + if pongRate >= 95 && timeSinceLastPong < 60*time.Second { 226 + diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption" 227 + } else if timeSinceLastPong > 60*time.Second { 228 + diagnosis = "Connection died (no pong for >60s), network issue detected" 229 + } else if pongRate < 80 { 230 + diagnosis = "Connection unstable (low pong rate), network quality issues" 231 + } else { 232 + diagnosis = "Connection closed unexpectedly" 233 + } 234 + 134 235 // Log detailed context about the failure 135 236 fmt.Printf("Jetstream: Connection closed after %s\n", connDuration) 136 237 fmt.Printf(" - Events in last 30s: %d\n", eventCount) 137 238 fmt.Printf(" - Time since last event: %s\n", timeSinceLastEvent) 239 + fmt.Printf(" - Ping/Pong: %d/%d (%.1f%% success)\n", pongsTotal, pingsTotal, pongRate) 240 + fmt.Printf(" - Last pong: %s ago\n", timeSinceLastPong) 138 241 fmt.Printf(" - Error: %v\n", err) 242 + fmt.Printf(" - Diagnosis: %s\n", diagnosis) 139 243 140 244 return fmt.Errorf("failed to read message: %w", err) 141 245 }
+12 -1
pkg/atproto/manifest_store.go
··· 11 11 "github.com/opencontainers/go-digest" 12 12 ) 13 13 14 - // DatabaseMetrics interface for tracking push counts 14 + // DatabaseMetrics interface for tracking push and pull counts 15 15 type DatabaseMetrics interface { 16 16 IncrementPushCount(did, repository string) error 17 + IncrementPullCount(did, repository string) error 17 18 } 18 19 19 20 // ManifestStore implements distribution.ManifestService ··· 83 84 return nil, fmt.Errorf("failed to download manifest blob: %w", err) 84 85 } 85 86 } 87 + 88 + // Track pull count (increment asynchronously to avoid blocking the response) 89 + if s.database != nil { 90 + go func() { 91 + if err := s.database.IncrementPullCount(s.did, s.repository); err != nil { 92 + fmt.Printf("WARNING: Failed to increment pull count for %s/%s: %v\n", s.did, s.repository, err) 93 + } 94 + }() 95 + } 96 + 86 97 // Parse the manifest based on media type 87 98 // For now, we'll return the raw bytes wrapped in a manifest object 88 99 // In a full implementation, you'd use distribution's manifest parsing
-9
pkg/storage/proxy_blob_store.go
··· 183 183 return err 184 184 } 185 185 186 - // Track pull count (increment asynchronously to avoid blocking the response) 187 - if p.database != nil && p.repository != "" { 188 - go func() { 189 - if err := p.database.IncrementPullCount(p.did, p.repository); err != nil { 190 - fmt.Printf("WARNING: Failed to increment pull count for %s/%s: %v\n", p.did, p.repository, err) 191 - } 192 - }() 193 - } 194 - 195 186 // Redirect to presigned URL 196 187 http.Redirect(w, r, url, http.StatusTemporaryRedirect) 197 188 return nil