A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
80
fork

Configure Feed

Select the types of activity you want to include in your feed.

add log shipper begin envvar cleanup

+1430 -203
+26 -12
.env.appview.example
··· 1 + # ============================================================================== 2 + # DEPRECATED: This file is deprecated. Use .env.example instead. 3 + # This file will be removed in a future version. 4 + # See .env.example for the unified configuration file. 5 + # ============================================================================== 6 + 1 7 # ATCR AppView Configuration 2 8 # Copy this file to .env.appview and fill in your values 3 9 # Load with: source .env.appview && ./bin/atcr-appview serve ··· 8 14 9 15 # HTTP listen address (default: :5000) 10 16 ATCR_HTTP_ADDR=:5000 11 - 12 - # Debug listen address (default: :5001) 13 - # ATCR_DEBUG_ADDR=:5001 14 17 15 18 # Base URL for the AppView service (REQUIRED for production) 16 19 # Used to generate OAuth redirect URIs and JWT realms ··· 63 66 # UI Configuration 64 67 # ============================================================================== 65 68 66 - # Enable web UI (default: true) 67 - # Set to "false" to disable web interface and run registry-only 68 - ATCR_UI_ENABLED=true 69 - 70 69 # SQLite database path for UI data (sessions, stars, pull counts, etc.) 71 70 # Default: /var/lib/atcr/ui.db 72 71 # ATCR_UI_DATABASE_PATH=/var/lib/atcr/ui.db 73 72 74 - # Skip database migrations on startup (default: false) 75 - # Set to "true" to skip running migrations (useful for tests or fresh databases) 76 - # Production: Keep as "false" to ensure migrations are applied 77 - SKIP_DB_MIGRATIONS=false 78 - 79 73 # ============================================================================== 80 74 # Logging Configuration 81 75 # ============================================================================== ··· 85 79 86 80 # Log formatter: text, json (default: text) 87 81 # ATCR_LOG_FORMATTER=text 82 + 83 + # ============================================================================== 84 + # Remote Log Shipping (optional) 85 + # ============================================================================== 86 + 87 + # Backend: victoria, opensearch, loki (empty = disabled) 88 + # ATCR_LOG_SHIPPER_BACKEND=victoria 89 + 90 + # Remote log service URL 91 + # ATCR_LOG_SHIPPER_URL=http://victorialogs:9428 92 + 93 + # Number of logs to batch before flushing (default: 100) 94 + # ATCR_LOG_SHIPPER_BATCH_SIZE=100 95 + 96 + # Max time between flushes (default: 5s) 97 + # ATCR_LOG_SHIPPER_FLUSH_INTERVAL=5s 98 + 99 + # Basic auth credentials (optional) 100 + # ATCR_LOG_SHIPPER_USERNAME= 101 + # ATCR_LOG_SHIPPER_PASSWORD= 88 102 89 103 # ============================================================================== 90 104 # Hold Health Check Configuration
+293
.env.example
··· 1 + # ============================================================================== 2 + # ATCR Configuration 3 + # ============================================================================== 4 + # This file contains ALL configuration options for both AppView and Hold services. 5 + # Copy to .env and uncomment/modify the values you need. 6 + # 7 + # QUICKSTART (minimum for local development): 8 + # HOLD_PUBLIC_URL=http://127.0.0.1:8080 9 + # ATCR_DEFAULT_HOLD_DID=did:web:127.0.0.1:8080 10 + # 11 + # QUICKSTART (minimum for production): 12 + # APPVIEW_DOMAIN=atcr.io 13 + # HOLD_DOMAIN=hold01.atcr.io 14 + # HOLD_OWNER=did:plc:your-did 15 + # AWS_ACCESS_KEY_ID=xxx 16 + # AWS_SECRET_ACCESS_KEY=xxx 17 + # S3_BUCKET=xxx 18 + # S3_ENDPOINT=https://xxx 19 + # 20 + # ============================================================================== 21 + 22 + # ============================================================================== 23 + # DOMAIN CONFIGURATION (Production) 24 + # ============================================================================== 25 + # These are used by docker-compose.prod.yml to derive other values automatically. 26 + # For local dev, skip these and set the explicit URLs below instead. 27 + 28 + # Main AppView domain (registry API + web UI) 29 + # APPVIEW_DOMAIN=atcr.io 30 + 31 + # Hold service domain 32 + # Used to derive: HOLD_PUBLIC_URL, ATCR_DEFAULT_HOLD_DID 33 + # HOLD_DOMAIN=hold01.atcr.io 34 + 35 + # ============================================================================== 36 + # APPVIEW - SERVER CONFIGURATION 37 + # ============================================================================== 38 + 39 + # HTTP listen address 40 + # Default: :5000 41 + # ATCR_HTTP_ADDR=:5000 42 + 43 + # Public URL for OAuth redirect URIs and JWT realms 44 + # Development: Auto-detected from ATCR_HTTP_ADDR (e.g., http://127.0.0.1:5000) 45 + # Production: Set to your public URL (e.g., https://atcr.io) 46 + # ATCR_BASE_URL=https://atcr.io 47 + 48 + # Service name for JWT issuer/service fields 49 + # Default: Derived from ATCR_BASE_URL hostname, or "atcr.io" 50 + # ATCR_SERVICE_NAME=atcr.io 51 + 52 + # ============================================================================== 53 + # APPVIEW - STORAGE CONFIGURATION (REQUIRED) 54 + # ============================================================================== 55 + 56 + # Default hold service DID for users without their own storage (REQUIRED) 57 + # Format: did:web:hostname[:port] 58 + # Docker dev: did:web:172.28.0.3:8080 59 + # Local dev: did:web:127.0.0.1:8080 60 + # Production: did:web:hold01.atcr.io 61 + ATCR_DEFAULT_HOLD_DID=did:web:127.0.0.1:8080 62 + 63 + # ============================================================================== 64 + # APPVIEW - AUTHENTICATION 65 + # ============================================================================== 66 + 67 + # Path to JWT signing private key (auto-generated if missing) 68 + # Default: /var/lib/atcr/auth/private-key.pem 69 + # ATCR_AUTH_KEY_PATH=/var/lib/atcr/auth/private-key.pem 70 + 71 + # Path to JWT signing certificate (auto-generated if missing) 72 + # Default: /var/lib/atcr/auth/private-key.crt 73 + # ATCR_AUTH_CERT_PATH=/var/lib/atcr/auth/private-key.crt 74 + 75 + # JWT token expiration in seconds 76 + # Default: 300 (5 minutes) 77 + # ATCR_TOKEN_EXPIRATION=300 78 + 79 + # Path to OAuth client P-256 signing key (auto-generated for production) 80 + # Used for confidential OAuth client authentication 81 + # Localhost deployments always use public OAuth clients (no key needed) 82 + # Default: /var/lib/atcr/oauth/client.key 83 + # ATCR_OAUTH_KEY_PATH=/var/lib/atcr/oauth/client.key 84 + 85 + # OAuth client display name (shown in authorization screens) 86 + # Default: AT Container Registry 87 + # ATCR_CLIENT_NAME=AT Container Registry 88 + 89 + # ============================================================================== 90 + # APPVIEW - WEB UI 91 + # ============================================================================== 92 + 93 + # SQLite database path for UI data (sessions, stars, pull counts, etc.) 94 + # Default: /var/lib/atcr/ui.db 95 + # ATCR_UI_DATABASE_PATH=/var/lib/atcr/ui.db 96 + 97 + # ============================================================================== 98 + # APPVIEW - JETSTREAM (ATProto Event Streaming) 99 + # ============================================================================== 100 + 101 + # Jetstream WebSocket URL for real-time ATProto events 102 + # Default: wss://jetstream2.us-west.bsky.network/subscribe 103 + # JETSTREAM_URL=wss://jetstream2.us-west.bsky.network/subscribe 104 + 105 + # Enable backfill worker to sync historical records 106 + # Default: true 107 + # ATCR_BACKFILL_ENABLED=true 108 + 109 + # ATProto relay endpoint for backfill sync API 110 + # Default: https://relay1.us-east.bsky.network 111 + # ATCR_RELAY_ENDPOINT=https://relay1.us-east.bsky.network 112 + 113 + # Backfill sync interval 114 + # Default: 1h 115 + # Examples: 30m, 1h, 2h, 24h 116 + # ATCR_BACKFILL_INTERVAL=1h 117 + 118 + # ============================================================================== 119 + # APPVIEW - HEALTH CHECKS 120 + # ============================================================================== 121 + 122 + # How often to check health of hold endpoints in the background 123 + # Default: 15m 124 + # ATCR_HEALTH_CHECK_INTERVAL=15m 125 + 126 + # How long to cache health check results 127 + # Default: 15m 128 + # ATCR_HEALTH_CACHE_TTL=15m 129 + 130 + # ============================================================================== 131 + # HOLD SERVICE - SERVER CONFIGURATION (REQUIRED) 132 + # ============================================================================== 133 + 134 + # Public URL of hold service (REQUIRED) 135 + # The hostname becomes the hold name/record key 136 + # Local dev: http://127.0.0.1:8080 137 + # Production: https://hold01.atcr.io 138 + HOLD_PUBLIC_URL=http://127.0.0.1:8080 139 + 140 + # HTTP listen address 141 + # Default: :8080 142 + # HOLD_SERVER_ADDR=:8080 143 + 144 + # Allow public blob reads (pulls) without authentication 145 + # Writes (pushes) always require crew membership via PDS 146 + # Default: false 147 + # HOLD_PUBLIC=false 148 + 149 + # ATProto relay endpoint for requesting crawl on startup 150 + # Makes the hold's embedded PDS discoverable by the relay network 151 + # Default: (empty - disabled) 152 + # Set to https://bsky.network to enable 153 + # HOLD_RELAY_ENDPOINT=https://bsky.network 154 + 155 + # ============================================================================== 156 + # HOLD SERVICE - EMBEDDED PDS 157 + # ============================================================================== 158 + 159 + # Directory path for embedded PDS carstore (SQLite database) 160 + # Default: /var/lib/atcr-hold 161 + # If empty, embedded PDS is disabled 162 + # Note: This is a directory path, NOT a file path 163 + # Carstore creates db.sqlite3 inside this directory 164 + HOLD_DATABASE_DIR=/var/lib/atcr-hold 165 + 166 + # Path to signing key (auto-generated on first run if missing) 167 + # Default: {HOLD_DATABASE_DIR}/signing.key 168 + # HOLD_KEY_PATH=/var/lib/atcr-hold/signing.key 169 + 170 + # ============================================================================== 171 + # HOLD SERVICE - REGISTRATION & ACCESS CONTROL 172 + # ============================================================================== 173 + 174 + # Your ATProto DID (REQUIRED for registration) 175 + # Get your DID: https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=yourhandle.bsky.social 176 + # On first run with HOLD_OWNER set: 177 + # 1. Hold service prints OAuth URL to logs 178 + # 2. Visit URL to authorize 179 + # 3. Hold creates captain + crew records 180 + # 4. Registration complete! 181 + # HOLD_OWNER=did:plc:your-did-here 182 + 183 + # Allow any authenticated user to register as crew 184 + # Default: false (only explicit crew members can write) 185 + # Set to true for open/community holds 186 + # HOLD_ALLOW_ALL_CREW=false 187 + 188 + # ============================================================================== 189 + # HOLD SERVICE - BLUESKY INTEGRATION 190 + # ============================================================================== 191 + 192 + # Enable Bluesky posts when users push container images 193 + # When enabled, creates posts announcing image pushes 194 + # Default: false 195 + # HOLD_BLUESKY_POSTS_ENABLED=false 196 + 197 + # Avatar image URL to download during bootstrap 198 + # HOLD_PROFILE_AVATAR=https://imgs.blue/evan.jarrett.net/1TpTOdtS60GdJWBYEqtK22y688jajbQ9a5kbYRFtwuqrkBAE 199 + 200 + # ============================================================================== 201 + # HOLD SERVICE - ADMIN 202 + # ============================================================================== 203 + 204 + # Enable admin panel 205 + # Default: false 206 + # HOLD_ADMIN_ENABLED=false 207 + 208 + # ============================================================================== 209 + # STORAGE - S3 CONFIGURATION 210 + # ============================================================================== 211 + 212 + # Storage driver type 213 + # Options: s3, filesystem 214 + # Default: s3 215 + STORAGE_DRIVER=s3 216 + 217 + # S3 Access Credentials 218 + AWS_ACCESS_KEY_ID=your_access_key 219 + AWS_SECRET_ACCESS_KEY=your_secret_key 220 + 221 + # S3 Region 222 + # For third-party S3 providers, this is ignored when S3_ENDPOINT is set, 223 + # but must be a valid AWS region to pass validation. 224 + # Default: us-east-1 225 + AWS_REGION=us-east-1 226 + 227 + # S3 Bucket Name 228 + S3_BUCKET=atcr-blobs 229 + 230 + # S3 Endpoint (for S3-compatible services) 231 + # Examples: 232 + # - Storj: https://gateway.storjshare.io 233 + # - UpCloud: https://[bucket-id].upcloudobjects.com 234 + # - Minio: http://minio:9000 235 + # Leave empty for AWS S3 236 + # S3_ENDPOINT=https://gateway.storjshare.io 237 + 238 + # ============================================================================== 239 + # STORAGE - FILESYSTEM CONFIGURATION 240 + # ============================================================================== 241 + 242 + # Root directory for filesystem storage (when STORAGE_DRIVER=filesystem) 243 + # Default: /var/lib/atcr/hold 244 + # STORAGE_ROOT_DIR=/var/lib/atcr/hold 245 + 246 + # ============================================================================== 247 + # LOGGING (Shared by AppView and Hold) 248 + # ============================================================================== 249 + 250 + # Log level: debug, info, warn, error 251 + # Default: info 252 + ATCR_LOG_LEVEL=info 253 + 254 + # Log formatter: text, json 255 + # Default: text 256 + # ATCR_LOG_FORMATTER=text 257 + 258 + # ============================================================================== 259 + # REMOTE LOG SHIPPING (Optional) 260 + # ============================================================================== 261 + 262 + # Backend: victoria, opensearch, loki (empty = disabled) 263 + # ATCR_LOG_SHIPPER_BACKEND=victoria 264 + 265 + # Remote log service URL 266 + # ATCR_LOG_SHIPPER_URL=http://victorialogs:9428 267 + 268 + # Number of logs to batch before flushing 269 + # Default: 100 270 + # ATCR_LOG_SHIPPER_BATCH_SIZE=100 271 + 272 + # Max time between flushes 273 + # Default: 5s 274 + # ATCR_LOG_SHIPPER_FLUSH_INTERVAL=5s 275 + 276 + # Basic auth credentials (optional) 277 + # ATCR_LOG_SHIPPER_USERNAME= 278 + # ATCR_LOG_SHIPPER_PASSWORD= 279 + 280 + # ============================================================================== 281 + # DEVELOPMENT / TESTING 282 + # ============================================================================== 283 + 284 + # Enable test mode 285 + # - Uses HTTP for local DID resolution 286 + # - Adds transition:generic scope for OAuth 287 + # - Uses localhost for OAuth redirects while storing real URL in hold record 288 + # Default: false 289 + # TEST_MODE=false 290 + 291 + # Disable presigned URLs (force proxy mode for testing) 292 + # Default: false 293 + # DISABLE_PRESIGNED_URLS=false
+26
.env.hold.example
··· 1 + # ============================================================================== 2 + # DEPRECATED: This file is deprecated. Use .env.example instead. 3 + # This file will be removed in a future version. 4 + # See .env.example for the unified configuration file. 5 + # ============================================================================== 6 + 1 7 # ATCR Hold Service Configuration 2 8 # Copy this file to .env and fill in your values 3 9 ··· 125 131 126 132 # Log formatter: text, json (default: text) 127 133 # ATCR_LOG_FORMATTER=text 134 + 135 + # ============================================================================== 136 + # Remote Log Shipping (optional) 137 + # ============================================================================== 138 + 139 + # Backend: victoria, opensearch, loki (empty = disabled) 140 + # ATCR_LOG_SHIPPER_BACKEND=victoria 141 + 142 + # Remote log service URL 143 + # ATCR_LOG_SHIPPER_URL=http://victorialogs:9428 144 + 145 + # Number of logs to batch before flushing (default: 100) 146 + # ATCR_LOG_SHIPPER_BATCH_SIZE=100 147 + 148 + # Max time between flushes (default: 5s) 149 + # ATCR_LOG_SHIPPER_FLUSH_INTERVAL=5s 150 + 151 + # Basic auth credentials (optional) 152 + # ATCR_LOG_SHIPPER_USERNAME= 153 + # ATCR_LOG_SHIPPER_PASSWORD=
+3
.gitignore
··· 12 12 # Environment configuration 13 13 .env 14 14 15 + # Docker-created quota config (actual config is in deploy/quotas.yaml) 16 + quotas.yaml 17 + 15 18 # Generated assets (run go generate to rebuild) 16 19 pkg/appview/licenses/spdx-licenses.json 17 20 pkg/appview/static/js/htmx.min.js
-1
CLAUDE.md
··· 669 669 - `ATCR_TOKEN_EXPIRATION` - JWT expiration in seconds (default: 300) 670 670 671 671 **UI:** 672 - - `ATCR_UI_ENABLED` - Enable web interface (default: true) 673 672 - `ATCR_UI_DATABASE_PATH` - SQLite database path (default: `/var/lib/atcr/ui.db`) 674 673 675 674 **Jetstream:**
+35 -35
cmd/appview/serve.go
··· 5 5 "database/sql" 6 6 "encoding/json" 7 7 "fmt" 8 - "html/template" 9 8 "log/slog" 10 9 "net/http" 11 10 "os" ··· 66 65 return fmt.Errorf("failed to load config from environment: %w", err) 67 66 } 68 67 69 - // Initialize structured logging 70 - logging.InitLogger(cfg.LogLevel) 68 + // Initialize structured logging with optional remote shipping 69 + logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{ 70 + Backend: cfg.LogShipper.Backend, 71 + URL: cfg.LogShipper.URL, 72 + BatchSize: cfg.LogShipper.BatchSize, 73 + FlushInterval: cfg.LogShipper.FlushInterval, 74 + Service: "appview", 75 + Username: cfg.LogShipper.Username, 76 + Password: cfg.LogShipper.Password, 77 + }) 71 78 72 79 slog.Info("Configuration loaded successfully from environment") 73 80 74 81 // Initialize UI database first (required for all stores) 75 82 slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath) 76 - uiDatabase, uiReadOnlyDB, uiSessionStore := db.InitializeDatabase(cfg.UI.Enabled, cfg.UI.DatabasePath, cfg.UI.SkipDBMigrations) 83 + uiDatabase, uiReadOnlyDB, uiSessionStore := db.InitializeDatabase(cfg.UI.DatabasePath) 77 84 if uiDatabase == nil { 78 85 return fmt.Errorf("failed to initialize UI database - required for session storage") 79 86 } ··· 183 190 mainRouter.Use(chimiddleware.GetHead) // Automatically handle HEAD requests for GET routes 184 191 mainRouter.Use(routes.CORSMiddleware()) 185 192 186 - // Load templates if UI is enabled 187 - var uiTemplates *template.Template 188 - if cfg.UI.Enabled { 189 - var err error 190 - uiTemplates, err = appview.Templates() 191 - if err != nil { 192 - slog.Warn("Failed to load UI templates", "error", err) 193 - } else { 194 - // Register UI routes with dependencies 195 - routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{ 196 - Database: uiDatabase, 197 - ReadOnlyDB: uiReadOnlyDB, 198 - SessionStore: uiSessionStore, 199 - OAuthClientApp: oauthClientApp, 200 - OAuthStore: oauthStore, 201 - Refresher: refresher, 202 - BaseURL: baseURL, 203 - DeviceStore: deviceStore, 204 - HealthChecker: healthChecker, 205 - ReadmeFetcher: readmeFetcher, 206 - Templates: uiTemplates, 207 - DefaultHoldDID: defaultHoldDID, 208 - }) 209 - } 193 + // Load templates (UI is always enabled) 194 + uiTemplates, err := appview.Templates() 195 + if err != nil { 196 + return fmt.Errorf("failed to load UI templates: %w", err) 210 197 } 198 + 199 + // Register UI routes with dependencies 200 + routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{ 201 + Database: uiDatabase, 202 + ReadOnlyDB: uiReadOnlyDB, 203 + SessionStore: uiSessionStore, 204 + OAuthClientApp: oauthClientApp, 205 + OAuthStore: oauthStore, 206 + Refresher: refresher, 207 + BaseURL: baseURL, 208 + DeviceStore: deviceStore, 209 + HealthChecker: healthChecker, 210 + ReadmeFetcher: readmeFetcher, 211 + Templates: uiTemplates, 212 + DefaultHoldDID: defaultHoldDID, 213 + }) 211 214 212 215 // Create OAuth server 213 216 oauthServer := oauth.NewServer(oauthClientApp) ··· 458 461 459 462 // Register credential helper version API (public endpoint) 460 463 mainRouter.Handle("/api/credential-helper/version", &uihandlers.CredentialHelperVersionHandler{ 461 - Version: cfg.CredentialHelper.Version, 462 464 TangledRepo: cfg.CredentialHelper.TangledRepo, 463 - Checksums: cfg.CredentialHelper.Checksums, 464 465 }) 465 - if cfg.CredentialHelper.Version != "" { 466 - slog.Info("Credential helper version API enabled", 467 - "endpoint", "/api/credential-helper/version", 468 - "version", cfg.CredentialHelper.Version) 469 - } 470 466 471 467 // Create HTTP server 472 468 server := &http.Server{ ··· 500 496 defer cancel() 501 497 502 498 if err := server.Shutdown(shutdownCtx); err != nil { 499 + logging.Shutdown() // Flush remaining logs 503 500 return fmt.Errorf("server shutdown error: %w", err) 504 501 } 505 502 case err := <-errChan: 506 503 // Stop health worker on error (workerCancel called by defer) 507 504 healthWorker.Stop() 505 + logging.Shutdown() // Flush remaining logs 508 506 return fmt.Errorf("server error: %w", err) 509 507 } 510 508 509 + // Flush any remaining logs before exit 510 + logging.Shutdown() 511 511 return nil 512 512 } 513 513
+14 -2
cmd/hold/main.go
··· 35 35 os.Exit(1) 36 36 } 37 37 38 - // Initialize structured logging 39 - logging.InitLogger(cfg.LogLevel) 38 + // Initialize structured logging with optional remote shipping 39 + logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{ 40 + Backend: cfg.LogShipper.Backend, 41 + URL: cfg.LogShipper.URL, 42 + BatchSize: cfg.LogShipper.BatchSize, 43 + FlushInterval: cfg.LogShipper.FlushInterval, 44 + Service: "hold", 45 + Username: cfg.LogShipper.Username, 46 + Password: cfg.LogShipper.Password, 47 + }) 40 48 41 49 // Initialize embedded PDS if database path is configured 42 50 // This must happen before creating HoldService since service needs PDS for authorization ··· 234 242 select { 235 243 case err := <-serverErr: 236 244 slog.Error("Server failed", "error", err) 245 + logging.Shutdown() // Flush remaining logs 237 246 os.Exit(1) 238 247 case sig := <-sigChan: 239 248 slog.Info("Received signal, shutting down gracefully", "signal", sig) ··· 275 284 } else { 276 285 slog.Info("Server shutdown complete") 277 286 } 287 + 288 + // Flush any remaining logs before exit 289 + logging.Shutdown() 278 290 } 279 291 }
-13
deploy/.env.prod.template
··· 150 150 # Default: AT Container Registry 151 151 # ATCR_CLIENT_NAME=AT Container Registry 152 152 153 - # Enable web UI 154 - # Default: true 155 - ATCR_UI_ENABLED=true 156 - 157 - # Skip database migrations on startup 158 - # Default: false (migrations are applied on startup) 159 - # Set to "true" only for testing or when migrations are managed externally 160 - # Production: Keep as "false" to ensure migrations are applied 161 - SKIP_DB_MIGRATIONS=false 162 - 163 153 # ============================================================================== 164 154 # Logging Configuration 165 155 # ============================================================================== ··· 211 201 212 202 # Override service name (defaults to APPVIEW_DOMAIN) 213 203 # ATCR_SERVICE_NAME=atcr.io 214 - 215 - # Debug listen address (optional - for pprof debugging) 216 - # ATCR_DEBUG_ADDR=:5001 217 204 218 205 # ============================================================================== 219 206 # CHECKLIST
-1
deploy/docker-compose.prod.yml
··· 59 59 ATCR_TOKEN_EXPIRATION: ${ATCR_TOKEN_EXPIRATION:-300} 60 60 61 61 # UI configuration 62 - ATCR_UI_ENABLED: ${ATCR_UI_ENABLED:-true} 63 62 ATCR_UI_DATABASE_PATH: /var/lib/atcr/ui.db 64 63 65 64 # Logging
+38 -2
docker-compose.yml
··· 14 14 # Server configuration 15 15 ATCR_HTTP_ADDR: :5000 16 16 ATCR_DEFAULT_HOLD_DID: did:web:172.28.0.3:8080 17 - # UI configuration 18 - ATCR_UI_ENABLED: "true" 19 17 ATCR_BACKFILL_ENABLED: "true" 20 18 # Test mode - fallback to default hold when user's hold is unreachable 21 19 TEST_MODE: "true" 22 20 # Logging 23 21 ATCR_LOG_LEVEL: debug 22 + # Log shipping (uncomment to enable) 23 + ATCR_LOG_SHIPPER_BACKEND: victoria 24 + ATCR_LOG_SHIPPER_URL: http://172.28.0.10:9428 25 + # Limit local Docker logs - real logs go to Victoria Logs 26 + # Local logs just for live tailing (docker logs -f) 27 + logging: 28 + driver: json-file 29 + options: 30 + max-size: "10m" 31 + max-file: "1" 24 32 volumes: 25 33 # Mount source code for Air hot reload 26 34 - .:/app ··· 56 64 # DISABLE_PRESIGNED_URLS: true 57 65 # Logging 58 66 ATCR_LOG_LEVEL: debug 67 + # Log shipping (uncomment to enable) 68 + ATCR_LOG_SHIPPER_BACKEND: victoria 69 + ATCR_LOG_SHIPPER_URL: http://172.28.0.10:9428 59 70 # Storage config comes from env_file (STORAGE_DRIVER, AWS_*, S3_*) 71 + # Limit local Docker logs - real logs go to Victoria Logs 72 + # Local logs just for live tailing (docker logs -f) 73 + logging: 74 + driver: json-file 75 + options: 76 + max-size: "10m" 77 + max-file: "1" 60 78 build: 61 79 context: . 62 80 dockerfile: Dockerfile.dev ··· 82 100 atcr-network: 83 101 ipv4_address: 172.28.0.3 84 102 103 + # Victoria Logs for centralized log storage 104 + # Uncomment to enable, then set ATCR_LOG_SHIPPER_* env vars above 105 + victorialogs: 106 + image: victoriametrics/victoria-logs:latest 107 + container_name: victorialogs 108 + ports: 109 + - "9428:9428" 110 + volumes: 111 + - victorialogs-data:/victoria-logs-data 112 + command: 113 + - "-storageDataPath=/victoria-logs-data" 114 + - "-retentionPeriod=7d" 115 + restart: unless-stopped 116 + networks: 117 + atcr-network: 118 + ipv4_address: 172.28.0.10 119 + 85 120 networks: 86 121 atcr-network: 87 122 driver: bridge ··· 94 129 atcr-auth: 95 130 atcr-ui: 96 131 go-mod-cache: 132 + victorialogs-data:
-4
docs/DEVELOPMENT.md
··· 165 165 # Auth 166 166 ATCR_AUTH_KEY_PATH: "/var/lib/atcr/auth/private-key.pem" 167 167 168 - # UI 169 - ATCR_UI_ENABLED: "true" 170 - 171 168 # Jetstream (optional) 172 169 # JETSTREAM_URL: "wss://jetstream2.us-east.bsky.network/subscribe" 173 170 # ATCR_BACKFILL_ENABLED: "false" ··· 524 521 export ATCR_DEFAULT_HOLD_DID=did:web:hold01.atcr.io 525 522 export ATCR_UI_DATABASE_PATH=/tmp/atcr-ui.db 526 523 export ATCR_AUTH_KEY_PATH=/tmp/atcr-auth-key.pem 527 - export ATCR_UI_ENABLED=true 528 524 529 525 # Or use .env file 530 526 source .env.appview
-12
docs/appview.md
··· 115 115 - **Description:** Service name used for JWT `service` and `issuer` fields. Controls token scope. 116 116 - **Example:** `atcr.io`, `registry.example.com` 117 117 118 - #### `ATCR_DEBUG_ADDR` 119 - - **Default:** `:5001` 120 - - **Description:** Debug listen address for pprof debugging endpoints 121 - - **Example:** `:5001`, `:6060` 122 - 123 118 ### Storage Configuration 124 119 125 120 #### `ATCR_DEFAULT_HOLD_DID` ⚠️ REQUIRED ··· 149 144 - **Recommendation:** Keep between 300-900 seconds (5-15 minutes) 150 145 151 146 ### Web UI Configuration 152 - 153 - #### `ATCR_UI_ENABLED` 154 - - **Default:** `true` 155 - - **Description:** Enable the web interface. Set to `false` to run registry API only (no web UI, no database). 156 - - **Use case:** API-only deployments where you don't need the browsing interface 157 147 158 148 #### `ATCR_UI_DATABASE_PATH` 159 149 - **Default:** `/var/lib/atcr/ui.db` ··· 245 235 # AppView config 246 236 ATCR_BASE_URL=https://registry.example.com 247 237 ATCR_DEFAULT_HOLD_DID=did:web:hold01.example.com 248 - ATCR_UI_ENABLED=true 249 238 ATCR_BACKFILL_ENABLED=true 250 239 251 240 # Hold config (linked hold service) ··· 261 250 # AppView config 262 251 ATCR_BASE_URL=https://registry.internal.example.com 263 252 ATCR_DEFAULT_HOLD_DID=did:web:hold.internal.example.com 264 - ATCR_UI_ENABLED=true 265 253 266 254 # Hold config (linked hold service) 267 255 HOLD_PUBLIC=false # Require auth for pulls
+48 -47
pkg/appview/config.go
··· 13 13 "net/url" 14 14 "os" 15 15 "strconv" 16 - "strings" 17 16 "time" 18 17 19 18 "github.com/distribution/distribution/v3/configuration" ··· 23 22 type Config struct { 24 23 Version string `yaml:"version"` 25 24 LogLevel string `yaml:"log_level"` 25 + LogShipper LogShipperConfig `yaml:"log_shipper"` 26 26 Server ServerConfig `yaml:"server"` 27 27 UI UIConfig `yaml:"ui"` 28 28 Health HealthConfig `yaml:"health"` ··· 30 30 Auth AuthConfig `yaml:"auth"` 31 31 CredentialHelper CredentialHelperConfig `yaml:"credential_helper"` 32 32 Distribution *configuration.Configuration `yaml:"-"` // Wrapped distribution config for compatibility 33 + } 34 + 35 + // LogShipperConfig defines remote log shipping settings 36 + type LogShipperConfig struct { 37 + // Backend selects the log shipping backend (from env: ATCR_LOG_SHIPPER_BACKEND) 38 + // Valid values: "victoria", "opensearch", "loki", or empty to disable 39 + Backend string `yaml:"backend"` 40 + 41 + // URL is the remote log service endpoint (from env: ATCR_LOG_SHIPPER_URL) 42 + URL string `yaml:"url"` 43 + 44 + // BatchSize is the number of logs to batch before flushing (from env: ATCR_LOG_SHIPPER_BATCH_SIZE, default: 100) 45 + BatchSize int `yaml:"batch_size"` 46 + 47 + // FlushInterval is the max time between flushes (from env: ATCR_LOG_SHIPPER_FLUSH_INTERVAL, default: 5s) 48 + FlushInterval time.Duration `yaml:"flush_interval"` 49 + 50 + // Username for basic auth (from env: ATCR_LOG_SHIPPER_USERNAME, optional) 51 + Username string `yaml:"username"` 52 + 53 + // Password for basic auth (from env: ATCR_LOG_SHIPPER_PASSWORD, optional) 54 + Password string `yaml:"password"` 33 55 } 34 56 35 57 // ServerConfig defines server settings ··· 48 70 // TestMode enables HTTP for local DID resolution and transition:generic scope (from env: TEST_MODE) 49 71 TestMode bool `yaml:"test_mode"` 50 72 51 - // DebugAddr is the debug/pprof HTTP listen address (from env: ATCR_DEBUG_ADDR, default: ":5001") 52 - DebugAddr string `yaml:"debug_addr"` 53 - 54 73 // OAuthKeyPath is the path to the OAuth client P-256 signing key (from env: ATCR_OAUTH_KEY_PATH, default: "/var/lib/atcr/oauth/client.key") 55 74 // Auto-generated on first run for production (non-localhost) deployments 56 75 OAuthKeyPath string `yaml:"oauth_key_path"` ··· 62 81 63 82 // UIConfig defines web UI settings 64 83 type UIConfig struct { 65 - // Enabled controls whether the web UI is enabled (from env: ATCR_UI_ENABLED, default: true) 66 - Enabled bool `yaml:"enabled"` 67 - 68 84 // DatabasePath is the path to the UI SQLite database (from env: ATCR_UI_DATABASE_PATH, default: "/var/lib/atcr/ui.db") 69 85 DatabasePath string `yaml:"database_path"` 70 - 71 - // SkipDBMigrations controls whether to skip running database migrations (from env: SKIP_DB_MIGRATIONS, default: false) 72 - SkipDBMigrations bool `yaml:"skip_db_migrations"` 73 86 } 74 87 75 88 // HealthConfig defines health check and cache settings ··· 112 125 ServiceName string `yaml:"service_name"` 113 126 } 114 127 115 - // CredentialHelperConfig defines credential helper version and download settings 128 + // CredentialHelperConfig defines credential helper download settings 116 129 type CredentialHelperConfig struct { 117 - // Version is the latest credential helper version (from env: ATCR_CREDENTIAL_HELPER_VERSION) 118 - // e.g., "v0.0.2" 119 - Version string `yaml:"version"` 120 - 121 - // TangledRepo is the Tangled repository URL for downloads (from env: ATCR_CREDENTIAL_HELPER_TANGLED_REPO) 122 - // Default: "https://tangled.org/@evan.jarrett.net/at-container-registry" 130 + // TangledRepo is the Tangled repository URL for downloads 131 + // Hardcoded default: "https://tangled.org/@evan.jarrett.net/at-container-registry" 123 132 TangledRepo string `yaml:"tangled_repo"` 124 - 125 - // Checksums is a comma-separated list of platform:sha256 pairs (from env: ATCR_CREDENTIAL_HELPER_CHECKSUMS) 126 - // e.g., "linux_amd64:abc123,darwin_arm64:def456" 127 - Checksums map[string]string `yaml:"-"` 128 133 } 129 134 130 135 // LoadConfigFromEnv builds a complete configuration from environment variables ··· 137 142 // Logging configuration 138 143 cfg.LogLevel = getEnvOrDefault("ATCR_LOG_LEVEL", "info") 139 144 145 + // Log shipper configuration 146 + cfg.LogShipper.Backend = os.Getenv("ATCR_LOG_SHIPPER_BACKEND") 147 + cfg.LogShipper.URL = os.Getenv("ATCR_LOG_SHIPPER_URL") 148 + cfg.LogShipper.BatchSize = getIntOrDefault("ATCR_LOG_SHIPPER_BATCH_SIZE", 100) 149 + cfg.LogShipper.FlushInterval = getDurationOrDefault("ATCR_LOG_SHIPPER_FLUSH_INTERVAL", 5*time.Second) 150 + cfg.LogShipper.Username = os.Getenv("ATCR_LOG_SHIPPER_USERNAME") 151 + cfg.LogShipper.Password = os.Getenv("ATCR_LOG_SHIPPER_PASSWORD") 152 + 140 153 // Server configuration 141 154 cfg.Server.Addr = getEnvOrDefault("ATCR_HTTP_ADDR", ":5000") 142 - cfg.Server.DebugAddr = getEnvOrDefault("ATCR_DEBUG_ADDR", ":5001") 143 155 cfg.Server.DefaultHoldDID = os.Getenv("ATCR_DEFAULT_HOLD_DID") 144 156 if cfg.Server.DefaultHoldDID == "" { 145 157 return nil, fmt.Errorf("ATCR_DEFAULT_HOLD_DID is required") ··· 155 167 } 156 168 157 169 // UI configuration 158 - cfg.UI.Enabled = os.Getenv("ATCR_UI_ENABLED") != "false" 159 170 cfg.UI.DatabasePath = getEnvOrDefault("ATCR_UI_DATABASE_PATH", "/var/lib/atcr/ui.db") 160 - cfg.UI.SkipDBMigrations = os.Getenv("SKIP_DB_MIGRATIONS") == "true" 161 171 162 172 // Health and cache configuration 163 173 cfg.Health.CacheTTL = getDurationOrDefault("ATCR_HEALTH_CACHE_TTL", 15*time.Minute) ··· 184 194 // Derive service name from base URL or env var (used for JWT issuer and service) 185 195 cfg.Auth.ServiceName = getServiceName(cfg.Server.BaseURL) 186 196 187 - // Credential helper configuration 188 - cfg.CredentialHelper.Version = os.Getenv("ATCR_CREDENTIAL_HELPER_VERSION") 189 - cfg.CredentialHelper.TangledRepo = getEnvOrDefault("ATCR_CREDENTIAL_HELPER_TANGLED_REPO", "https://tangled.org/@evan.jarrett.net/at-container-registry") 190 - cfg.CredentialHelper.Checksums = parseChecksums(os.Getenv("ATCR_CREDENTIAL_HELPER_CHECKSUMS")) 197 + // Credential helper configuration (hardcoded - no env vars needed) 198 + cfg.CredentialHelper.TangledRepo = "https://tangled.org/@evan.jarrett.net/at-container-registry" 191 199 192 200 // Build distribution configuration for compatibility with distribution library 193 201 distConfig, err := buildDistributionConfig(cfg) ··· 232 240 Secret: httpSecret, 233 241 Headers: map[string][]string{ 234 242 "X-Content-Type-Options": {"nosniff"}, 235 - }, 236 - Debug: configuration.Debug{ 237 - Addr: cfg.Server.DebugAddr, 238 243 }, 239 244 } 240 245 ··· 380 385 return parsed 381 386 } 382 387 383 - // parseChecksums parses a comma-separated list of platform:sha256 pairs 384 - // e.g., "linux_amd64:abc123,darwin_arm64:def456" 385 - func parseChecksums(checksumsStr string) map[string]string { 386 - checksums := make(map[string]string) 387 - if checksumsStr == "" { 388 - return checksums 388 + // getIntOrDefault parses an int from environment variable or returns default 389 + // Logs a warning if parsing fails 390 + func getIntOrDefault(envKey string, defaultValue int) int { 391 + envVal := os.Getenv(envKey) 392 + if envVal == "" { 393 + return defaultValue 389 394 } 390 395 391 - for pair := range strings.SplitSeq(checksumsStr, ",") { 392 - parts := strings.SplitN(strings.TrimSpace(pair), ":", 2) 393 - if len(parts) == 2 { 394 - platform := strings.TrimSpace(parts[0]) 395 - hash := strings.TrimSpace(parts[1]) 396 - if platform != "" && hash != "" { 397 - checksums[platform] = hash 398 - } 399 - } 396 + parsed, err := strconv.Atoi(envVal) 397 + if err != nil { 398 + slog.Warn("Invalid int, using default", "env_key", envKey, "env_value", envVal, "default", defaultValue) 399 + return defaultValue 400 400 } 401 - return checksums 401 + 402 + return parsed 402 403 }
+1 -1
pkg/appview/db/annotations_test.go
··· 21 21 func setupAnnotationsTestDB(t *testing.T) *sql.DB { 22 22 t.Helper() 23 23 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB 24 - db, err := InitDB("file::memory:?cache=shared", true) 24 + db, err := InitDB("file::memory:?cache=shared") 25 25 if err != nil { 26 26 t.Fatalf("Failed to initialize test database: %v", err) 27 27 }
+1 -1
pkg/appview/db/device_store_test.go
··· 14 14 t.Helper() 15 15 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB 16 16 // This prevents race conditions where different connections see different databases 17 - db, err := InitDB("file::memory:?cache=shared", true) 17 + db, err := InitDB("file::memory:?cache=shared") 18 18 if err != nil { 19 19 t.Fatalf("Failed to initialize test database: %v", err) 20 20 }
+1 -1
pkg/appview/db/hold_store_test.go
··· 81 81 func setupHoldTestDB(t *testing.T) *sql.DB { 82 82 t.Helper() 83 83 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB 84 - db, err := InitDB("file::memory:?cache=shared", true) 84 + db, err := InitDB("file::memory:?cache=shared") 85 85 if err != nil { 86 86 t.Fatalf("Failed to initialize test database: %v", err) 87 87 }
+3 -3
pkg/appview/db/oauth_store_test.go
··· 12 12 13 13 func TestInvalidateSessionsWithMismatchedScopes(t *testing.T) { 14 14 // Create in-memory test database 15 - db, err := InitDB(":memory:", true) 15 + db, err := InitDB(":memory:") 16 16 if err != nil { 17 17 t.Fatalf("Failed to init database: %v", err) 18 18 } ··· 232 232 233 233 func TestOAuthStoreSessionLifecycle(t *testing.T) { 234 234 // Basic test to ensure SaveSession, GetSession, DeleteSession work correctly 235 - db, err := InitDB(":memory:", true) 235 + db, err := InitDB(":memory:") 236 236 if err != nil { 237 237 t.Fatalf("Failed to init database: %v", err) 238 238 } ··· 304 304 } 305 305 306 306 func TestCleanupOldSessions(t *testing.T) { 307 - db, err := InitDB(":memory:", true) 307 + db, err := InitDB(":memory:") 308 308 if err != nil { 309 309 t.Fatalf("Failed to init database: %v", err) 310 310 }
+9 -9
pkg/appview/db/queries_test.go
··· 7 7 8 8 func TestGetRepositoryMetadata(t *testing.T) { 9 9 // Create in-memory test database 10 - db, err := InitDB(":memory:", true) 10 + db, err := InitDB(":memory:") 11 11 if err != nil { 12 12 t.Fatalf("Failed to init database: %v", err) 13 13 } ··· 143 143 144 144 func TestInsertManifest(t *testing.T) { 145 145 // Create in-memory test database 146 - db, err := InitDB(":memory:", true) 146 + db, err := InitDB(":memory:") 147 147 if err != nil { 148 148 t.Fatalf("Failed to init database: %v", err) 149 149 } ··· 320 320 321 321 func TestUserManagement(t *testing.T) { 322 322 // Create in-memory test database 323 - db, err := InitDB(":memory:", true) 323 + db, err := InitDB(":memory:") 324 324 if err != nil { 325 325 t.Fatalf("Failed to init database: %v", err) 326 326 } ··· 432 432 433 433 func TestManifestOperations(t *testing.T) { 434 434 // Create in-memory test database 435 - db, err := InitDB(":memory:", true) 435 + db, err := InitDB(":memory:") 436 436 if err != nil { 437 437 t.Fatalf("Failed to init database: %v", err) 438 438 } ··· 609 609 610 610 func TestIsManifestTagged(t *testing.T) { 611 611 // Create in-memory test database 612 - db, err := InitDB(":memory:", true) 612 + db, err := InitDB(":memory:") 613 613 if err != nil { 614 614 t.Fatalf("Failed to init database: %v", err) 615 615 } ··· 675 675 676 676 func TestTagOperations(t *testing.T) { 677 677 // Create in-memory test database 678 - db, err := InitDB(":memory:", true) 678 + db, err := InitDB(":memory:") 679 679 if err != nil { 680 680 t.Fatalf("Failed to init database: %v", err) 681 681 } ··· 838 838 839 839 func TestGetTagsWithPlatforms(t *testing.T) { 840 840 // Create in-memory test database 841 - db, err := InitDB(":memory:", true) 841 + db, err := InitDB(":memory:") 842 842 if err != nil { 843 843 t.Fatalf("Failed to init database: %v", err) 844 844 } ··· 980 980 981 981 func TestUpdateUserHandle(t *testing.T) { 982 982 // Create in-memory test database 983 - db, err := InitDB(":memory:", true) 983 + db, err := InitDB(":memory:") 984 984 if err != nil { 985 985 t.Fatalf("Failed to init database: %v", err) 986 986 } ··· 1201 1201 } 1202 1202 1203 1203 func TestDeleteUserData(t *testing.T) { 1204 - db, err := InitDB(":memory:", true) 1204 + db, err := InitDB(":memory:") 1205 1205 if err != nil { 1206 1206 t.Fatalf("Failed to init database: %v", err) 1207 1207 }
+2 -6
pkg/appview/db/readonly.go
··· 57 57 58 58 // InitializeDatabase initializes the SQLite database and session store 59 59 // Returns: (read-write DB, read-only DB, session store) 60 - func InitializeDatabase(uiEnabled bool, dbPath string, skipMigrations bool) (*sql.DB, *sql.DB, *SessionStore) { 61 - if !uiEnabled { 62 - return nil, nil, nil 63 - } 64 - 60 + func InitializeDatabase(dbPath string) (*sql.DB, *sql.DB, *SessionStore) { 65 61 // Ensure directory exists 66 62 dbDir := filepath.Dir(dbPath) 67 63 if err := os.MkdirAll(dbDir, 0700); err != nil { ··· 70 66 } 71 67 72 68 // Initialize read-write database (for writes and auth operations) 73 - database, err := InitDB(dbPath, skipMigrations) 69 + database, err := InitDB(dbPath) 74 70 if err != nil { 75 71 slog.Warn("Failed to initialize UI database", "error", err) 76 72 return nil, nil, nil
+1 -1
pkg/appview/db/readonly_test.go
··· 19 19 defer os.Unsetenv("ATCR_UI_DATABASE_PATH") 20 20 21 21 // Initialize database (creates schema) 22 - database, err := InitDB(dbPath, true) 22 + database, err := InitDB(dbPath) 23 23 if err != nil { 24 24 t.Fatalf("Failed to initialize database: %v", err) 25 25 }
+4 -6
pkg/appview/db/schema.go
··· 26 26 var schemaSQL string 27 27 28 28 // InitDB initializes the SQLite database with the schema 29 - func InitDB(path string, skipMigrations bool) (*sql.DB, error) { 29 + func InitDB(path string) (*sql.DB, error) { 30 30 db, err := sql.Open("sqlite3", path) 31 31 if err != nil { 32 32 return nil, err ··· 54 54 } 55 55 } 56 56 57 - // Run migrations unless skipped 57 + // Run migrations 58 58 // For fresh databases, migrations are recorded but not executed (schema.sql is already complete) 59 - if !skipMigrations { 60 - if err := runMigrations(db, !isExisting); err != nil { 61 - return nil, err 62 - } 59 + if err := runMigrations(db, !isExisting); err != nil { 60 + return nil, err 63 61 } 64 62 65 63 return db, nil
+1 -1
pkg/appview/db/session_store_test.go
··· 13 13 func setupSessionTestDB(t *testing.T) *SessionStore { 14 14 t.Helper() 15 15 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB 16 - db, err := InitDB("file::memory:?cache=shared", true) 16 + db, err := InitDB("file::memory:?cache=shared") 17 17 if err != nil { 18 18 t.Fatalf("Failed to initialize test database: %v", err) 19 19 }
+1 -1
pkg/appview/db/tag_delete_test.go
··· 11 11 // This simulates what Jetstream does: encode repo/tag to rkey, then decode and delete 12 12 func TestTagDeleteRoundTrip(t *testing.T) { 13 13 // Create in-memory test database 14 - db, err := InitDB(":memory:", true) 14 + db, err := InitDB(":memory:") 15 15 if err != nil { 16 16 t.Fatalf("Failed to init database: %v", err) 17 17 }
+7 -37
pkg/appview/handlers/api.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "net/http" 9 - "strings" 10 9 11 10 "atcr.io/pkg/appview/db" 12 11 "atcr.io/pkg/appview/middleware" ··· 246 245 } 247 246 248 247 // CredentialHelperVersionHandler returns the latest credential helper version info 248 + // Note: Version info is fetched dynamically from TangledRepo's releases 249 249 type CredentialHelperVersionHandler struct { 250 - Version string 251 250 TangledRepo string 252 - Checksums map[string]string 253 - } 254 - 255 - // Supported platforms for download URLs 256 - var credentialHelperPlatforms = []struct { 257 - key string // API key (e.g., "linux_amd64") 258 - os string // OS name in archive (e.g., "Linux") 259 - arch string // Arch name in archive (e.g., "x86_64") 260 - ext string // Archive extension (e.g., "tar.gz" or "zip") 261 - }{ 262 - {"linux_amd64", "Linux", "x86_64", "tar.gz"}, 263 - {"linux_arm64", "Linux", "arm64", "tar.gz"}, 264 - {"darwin_amd64", "Darwin", "x86_64", "tar.gz"}, 265 - {"darwin_arm64", "Darwin", "arm64", "tar.gz"}, 266 - {"windows_amd64", "Windows", "x86_64", "zip"}, 267 - {"windows_arm64", "Windows", "arm64", "zip"}, 268 251 } 269 252 270 253 func (h *CredentialHelperVersionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 271 - // Check if version is configured 272 - if h.Version == "" { 273 - http.Error(w, "Credential helper version not configured", http.StatusServiceUnavailable) 274 - return 275 - } 276 - 277 - // Build download URLs for all platforms 278 - // URL format: {TangledRepo}/tags/{version}/download/docker-credential-atcr_{version_without_v}_{OS}_{Arch}.{ext} 279 - downloadURLs := make(map[string]string) 280 - versionWithoutV := strings.TrimPrefix(h.Version, "v") 281 - 282 - for _, p := range credentialHelperPlatforms { 283 - filename := fmt.Sprintf("docker-credential-atcr_%s_%s_%s.%s", versionWithoutV, p.os, p.arch, p.ext) 284 - downloadURLs[p.key] = fmt.Sprintf("%s/tags/%s/download/%s", h.TangledRepo, h.Version, filename) 285 - } 286 - 254 + // This endpoint directs users to the Tangled repository for downloads 255 + // Version info should be fetched from the repository's releases page 287 256 response := CredentialHelperVersionResponse{ 288 - Latest: h.Version, 289 - DownloadURLs: downloadURLs, 290 - Checksums: h.Checksums, 257 + Latest: "", 258 + DownloadURLs: map[string]string{"tangled_repo": h.TangledRepo}, 259 + Checksums: nil, 260 + ReleaseNotes: "Visit the Tangled repository for the latest releases: " + h.TangledRepo, 291 261 } 292 262 293 263 render.SetContentType(render.ContentTypeJSON)
+1 -1
pkg/appview/handlers/device_test.go
··· 18 18 19 19 // setupTestDB creates an in-memory SQLite database with full schema for testing 20 20 func setupTestDB(t *testing.T) *sql.DB { 21 - database, err := db.InitDB(":memory:", true) 21 + database, err := db.InitDB(":memory:") 22 22 if err != nil { 23 23 t.Fatalf("Failed to initialize test database: %v", err) 24 24 }
+2 -2
pkg/appview/middleware/auth_test.go
··· 26 26 27 27 // setupTestDB creates an in-memory SQLite database for testing 28 28 func setupTestDB(t *testing.T) *sql.DB { 29 - database, err := db.InitDB(":memory:", true) 29 + database, err := db.InitDB(":memory:") 30 30 require.NoError(t, err) 31 31 32 32 t.Cleanup(func() { ··· 307 307 func TestMiddleware_ConcurrentAccess(t *testing.T) { 308 308 // Use a shared in-memory database for concurrent access 309 309 // (SQLite's default :memory: creates separate DBs per connection) 310 - database, err := db.InitDB("file::memory:?cache=shared", true) 310 + database, err := db.InitDB("file::memory:?cache=shared") 311 311 require.NoError(t, err) 312 312 t.Cleanup(func() { 313 313 database.Close()
+1 -1
pkg/auth/hold_remote_test.go
··· 34 34 35 35 // setupTestDB creates an in-memory database for testing 36 36 func setupTestDB(t *testing.T) *sql.DB { 37 - testDB, err := db.InitDB(":memory:", true) 37 + testDB, err := db.InitDB(":memory:") 38 38 if err != nil { 39 39 t.Fatalf("Failed to initialize test database: %v", err) 40 40 }
+1 -1
pkg/auth/token/handler_test.go
··· 52 52 53 53 // setupTestDeviceStore creates an in-memory SQLite database for testing 54 54 func setupTestDeviceStore(t *testing.T) (*db.DeviceStore, *sql.DB) { 55 - testDB, err := db.InitDB(":memory:", true) 55 + testDB, err := db.InitDB(":memory:") 56 56 if err != nil { 57 57 t.Fatalf("Failed to initialize test database: %v", err) 58 58 }
+63
pkg/hold/config.go
··· 24 24 type Config struct { 25 25 Version string `yaml:"version"` 26 26 LogLevel string `yaml:"log_level"` 27 + LogShipper LogShipperConfig `yaml:"log_shipper"` 27 28 Storage StorageConfig `yaml:"storage"` 28 29 Server ServerConfig `yaml:"server"` 29 30 Registration RegistrationConfig `yaml:"registration"` ··· 31 32 Admin AdminConfig `yaml:"admin"` 32 33 } 33 34 35 + // LogShipperConfig defines remote log shipping settings 36 + type LogShipperConfig struct { 37 + // Backend selects the log shipping backend (from env: ATCR_LOG_SHIPPER_BACKEND) 38 + // Valid values: "victoria", "opensearch", "loki", or empty to disable 39 + Backend string `yaml:"backend"` 40 + 41 + // URL is the remote log service endpoint (from env: ATCR_LOG_SHIPPER_URL) 42 + URL string `yaml:"url"` 43 + 44 + // BatchSize is the number of logs to batch before flushing (from env: ATCR_LOG_SHIPPER_BATCH_SIZE, default: 100) 45 + BatchSize int `yaml:"batch_size"` 46 + 47 + // FlushInterval is the max time between flushes (from env: ATCR_LOG_SHIPPER_FLUSH_INTERVAL, default: 5s) 48 + FlushInterval time.Duration `yaml:"flush_interval"` 49 + 50 + // Username for basic auth (from env: ATCR_LOG_SHIPPER_USERNAME, optional) 51 + Username string `yaml:"username"` 52 + 53 + // Password for basic auth (from env: ATCR_LOG_SHIPPER_PASSWORD, optional) 54 + Password string `yaml:"password"` 55 + } 56 + 34 57 // AdminConfig defines admin panel settings 35 58 type AdminConfig struct { 36 59 // Enabled controls whether the admin panel is accessible (from env: HOLD_ADMIN_ENABLED) ··· 113 136 114 137 // Logging configuration 115 138 cfg.LogLevel = getEnvOrDefault("ATCR_LOG_LEVEL", "info") 139 + 140 + // Log shipper configuration 141 + cfg.LogShipper.Backend = os.Getenv("ATCR_LOG_SHIPPER_BACKEND") 142 + cfg.LogShipper.URL = os.Getenv("ATCR_LOG_SHIPPER_URL") 143 + cfg.LogShipper.BatchSize = getIntOrDefault("ATCR_LOG_SHIPPER_BATCH_SIZE", 100) 144 + cfg.LogShipper.FlushInterval = getDurationOrDefault("ATCR_LOG_SHIPPER_FLUSH_INTERVAL", 5*time.Second) 145 + cfg.LogShipper.Username = os.Getenv("ATCR_LOG_SHIPPER_USERNAME") 146 + cfg.LogShipper.Password = os.Getenv("ATCR_LOG_SHIPPER_PASSWORD") 116 147 117 148 // Server configuration 118 149 cfg.Server.Addr = getEnvOrDefault("HOLD_SERVER_ADDR", ":8080") ··· 215 246 return val 216 247 } 217 248 return defaultValue 249 + } 250 + 251 + // getIntOrDefault parses an int from environment variable or returns default 252 + func getIntOrDefault(envKey string, defaultValue int) int { 253 + envVal := os.Getenv(envKey) 254 + if envVal == "" { 255 + return defaultValue 256 + } 257 + 258 + var parsed int 259 + if _, err := fmt.Sscanf(envVal, "%d", &parsed); err != nil { 260 + slog.Warn("Invalid int, using default", "env_key", envKey, "env_value", envVal, "default", defaultValue) 261 + return defaultValue 262 + } 263 + 264 + return parsed 265 + } 266 + 267 + // getDurationOrDefault parses a duration from environment variable or returns default 268 + func getDurationOrDefault(envKey string, defaultValue time.Duration) time.Duration { 269 + envVal := os.Getenv(envKey) 270 + if envVal == "" { 271 + return defaultValue 272 + } 273 + 274 + parsed, err := time.ParseDuration(envVal) 275 + if err != nil { 276 + slog.Warn("Invalid duration, using default", "env_key", envKey, "env_value", envVal, "default", defaultValue) 277 + return defaultValue 278 + } 279 + 280 + return parsed 218 281 } 219 282 220 283 // RequestCrawl sends a crawl request to the ATProto relay for the given hostname.
+42 -2
pkg/logging/logger.go
··· 27 27 debugEnabled atomic.Bool 28 28 revertTimer *time.Timer 29 29 revertMu sync.Mutex 30 + 31 + // asyncHandler holds the global async handler for shutdown 32 + asyncHandler *AsyncHandler 30 33 ) 31 34 32 35 // InitLogger initializes the global slog default logger with the specified log level. ··· 36 39 // 37 40 // Also starts a signal handler for SIGUSR1 to toggle debug mode at runtime. 38 41 func InitLogger(level string) { 42 + InitLoggerWithShipper(level, ShipperConfig{}) 43 + } 44 + 45 + // InitLoggerWithShipper initializes the global slog default logger with the specified 46 + // log level and optional remote log shipping. 47 + // Valid levels: debug, info, warn, error (case-insensitive) 48 + // If level is empty or invalid, defaults to INFO. 49 + // Call this from main() at startup. 50 + // 51 + // If shipperCfg.Backend is non-empty, logs will be shipped to the configured 52 + // remote service in addition to stdout. 53 + // 54 + // Also starts a signal handler for SIGUSR1 to toggle debug mode at runtime. 55 + func InitLoggerWithShipper(level string, shipperCfg ShipperConfig) { 39 56 var logLevel slog.Level 40 57 41 58 switch strings.ToLower(strings.TrimSpace(level)) { ··· 69 86 }, 70 87 } 71 88 72 - handler := slog.NewTextHandler(os.Stdout, opts) 73 - slog.SetDefault(slog.New(handler)) 89 + // Create stdout handler 90 + stdoutHandler := slog.NewTextHandler(os.Stdout, opts) 91 + 92 + // Create shipper if configured 93 + var shipper Shipper 94 + if shipperCfg.Backend != "" { 95 + var err error 96 + shipper, err = NewShipper(shipperCfg) 97 + if err != nil { 98 + // Log error but continue without shipping 99 + fmt.Fprintf(os.Stderr, "log shipper initialization failed: %v (continuing with stdout only)\n", err) 100 + } 101 + } 102 + 103 + // Create async handler (wraps stdout + optional shipper) 104 + asyncHandler = NewAsyncHandler(stdoutHandler, shipper, shipperCfg, opts) 105 + slog.SetDefault(slog.New(asyncHandler)) 74 106 75 107 // Start signal handler for dynamic debug toggle 76 108 go handleDebugSignal() 109 + } 110 + 111 + // Shutdown flushes any remaining logs and closes the log shipper. 112 + // Call this during graceful shutdown to ensure all logs are delivered. 113 + func Shutdown() { 114 + if asyncHandler != nil { 115 + asyncHandler.Shutdown() 116 + } 77 117 } 78 118 79 119 func handleDebugSignal() {
+308
pkg/logging/shipper.go
··· 1 + // Package logging provides centralized structured logging with optional remote log shipping. 2 + package logging 3 + 4 + import ( 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + "sync" 9 + "time" 10 + ) 11 + 12 + // Default configuration values 13 + const ( 14 + DefaultBatchSize = 100 15 + DefaultFlushInterval = 5 * time.Second 16 + ) 17 + 18 + // Shipper defines the interface for log shipping backends. 19 + // Implementations should be safe for concurrent use. 20 + type Shipper interface { 21 + // Ship sends a batch of log entries to the remote service. 22 + // Returns an error if the batch could not be shipped. 23 + Ship(ctx context.Context, entries []LogEntry) error 24 + 25 + // Close cleanly shuts down the shipper, releasing any resources. 26 + Close() error 27 + } 28 + 29 + // LogEntry represents a single log entry to be shipped. 30 + type LogEntry struct { 31 + Time time.Time 32 + Level slog.Level 33 + Message string 34 + Source string 35 + Attrs map[string]any 36 + } 37 + 38 + // ShipperConfig configures the log shipper. 39 + type ShipperConfig struct { 40 + // Backend selects the shipping backend: "victoria", "opensearch", "loki", etc. 41 + // Empty string disables remote shipping (stdout only). 42 + Backend string 43 + 44 + // URL is the remote service endpoint URL. 45 + URL string 46 + 47 + // BatchSize is the number of logs to batch before flushing. 48 + // Default: 100 49 + BatchSize int 50 + 51 + // FlushInterval is the maximum time between flushes. 52 + // Default: 5s 53 + FlushInterval time.Duration 54 + 55 + // Service identifies the source service ("appview" or "hold"). 56 + // Added to all log entries. 57 + Service string 58 + 59 + // Username for basic auth (optional). 60 + Username string 61 + 62 + // Password for basic auth (optional). 63 + Password string 64 + } 65 + 66 + // NewShipper creates a shipper for the configured backend. 67 + // Returns nil if no backend is configured (remote shipping disabled). 68 + func NewShipper(cfg ShipperConfig) (Shipper, error) { 69 + switch cfg.Backend { 70 + case "victoria": 71 + return NewVictoriaShipper(cfg) 72 + case "opensearch": 73 + return nil, fmt.Errorf("opensearch backend not yet implemented") 74 + case "loki": 75 + return nil, fmt.Errorf("loki backend not yet implemented") 76 + case "": 77 + return nil, nil // No remote shipping 78 + default: 79 + return nil, fmt.Errorf("unknown log shipper backend: %s", cfg.Backend) 80 + } 81 + } 82 + 83 + // asyncState holds the shared state for async log shipping. 84 + // This is separate from AsyncHandler to allow WithAttrs/WithGroup 85 + // to create new handlers that share the same batch and flush state. 86 + type asyncState struct { 87 + shipper Shipper 88 + 89 + // Batching 90 + batch []LogEntry 91 + batchMu sync.Mutex 92 + batchSize int 93 + 94 + // Async flush 95 + flushInterval time.Duration 96 + flushCh chan struct{} 97 + doneCh chan struct{} 98 + wg sync.WaitGroup 99 + } 100 + 101 + // AsyncHandler is an slog.Handler that writes to stdout and optionally 102 + // ships logs to a remote service asynchronously. 103 + type AsyncHandler struct { 104 + stdout slog.Handler 105 + opts *slog.HandlerOptions 106 + state *asyncState // Shared state for batching and flushing 107 + } 108 + 109 + // NewAsyncHandler creates a new AsyncHandler that wraps stdout logging 110 + // and optionally ships logs to a remote service. 111 + func NewAsyncHandler(stdout slog.Handler, shipper Shipper, cfg ShipperConfig, opts *slog.HandlerOptions) *AsyncHandler { 112 + batchSize := cfg.BatchSize 113 + if batchSize <= 0 { 114 + batchSize = DefaultBatchSize 115 + } 116 + 117 + flushInterval := cfg.FlushInterval 118 + if flushInterval <= 0 { 119 + flushInterval = DefaultFlushInterval 120 + } 121 + 122 + state := &asyncState{ 123 + shipper: shipper, 124 + batch: make([]LogEntry, 0, batchSize), 125 + batchSize: batchSize, 126 + flushInterval: flushInterval, 127 + flushCh: make(chan struct{}, 1), 128 + doneCh: make(chan struct{}), 129 + } 130 + 131 + h := &AsyncHandler{ 132 + stdout: stdout, 133 + opts: opts, 134 + state: state, 135 + } 136 + 137 + // Start background flusher if shipping is enabled 138 + if shipper != nil { 139 + state.wg.Add(1) 140 + go h.runFlusher() 141 + } 142 + 143 + return h 144 + } 145 + 146 + // Enabled reports whether the handler handles records at the given level. 147 + func (h *AsyncHandler) Enabled(ctx context.Context, level slog.Level) bool { 148 + return h.stdout.Enabled(ctx, level) 149 + } 150 + 151 + // Handle handles the Record by writing to stdout and queuing for remote shipping. 152 + func (h *AsyncHandler) Handle(ctx context.Context, r slog.Record) error { 153 + // Always write to stdout 154 + if err := h.stdout.Handle(ctx, r); err != nil { 155 + return err 156 + } 157 + 158 + // Skip remote shipping if no shipper configured 159 + if h.state.shipper == nil { 160 + return nil 161 + } 162 + 163 + // Build log entry 164 + entry := LogEntry{ 165 + Time: r.Time, 166 + Level: r.Level, 167 + Message: r.Message, 168 + Attrs: make(map[string]any), 169 + } 170 + 171 + r.Attrs(func(a slog.Attr) bool { 172 + if a.Key == slog.SourceKey { 173 + if src, ok := a.Value.Any().(*slog.Source); ok { 174 + entry.Source = shortenSource(src.File, src.Line) 175 + } 176 + } else { 177 + entry.Attrs[a.Key] = resolveAttrValue(a.Value) 178 + } 179 + return true 180 + }) 181 + 182 + // Add to batch 183 + h.state.batchMu.Lock() 184 + h.state.batch = append(h.state.batch, entry) 185 + shouldFlush := len(h.state.batch) >= h.state.batchSize 186 + h.state.batchMu.Unlock() 187 + 188 + if shouldFlush { 189 + h.triggerFlush() 190 + } 191 + 192 + return nil 193 + } 194 + 195 + // WithAttrs returns a new Handler with the given attributes added. 196 + func (h *AsyncHandler) WithAttrs(attrs []slog.Attr) slog.Handler { 197 + return &AsyncHandler{ 198 + stdout: h.stdout.WithAttrs(attrs), 199 + opts: h.opts, 200 + state: h.state, // Share the same state 201 + } 202 + } 203 + 204 + // WithGroup returns a new Handler with the given group name. 205 + func (h *AsyncHandler) WithGroup(name string) slog.Handler { 206 + return &AsyncHandler{ 207 + stdout: h.stdout.WithGroup(name), 208 + opts: h.opts, 209 + state: h.state, // Share the same state 210 + } 211 + } 212 + 213 + // triggerFlush signals the flusher goroutine to flush immediately. 214 + func (h *AsyncHandler) triggerFlush() { 215 + select { 216 + case h.state.flushCh <- struct{}{}: 217 + default: // Flush already pending 218 + } 219 + } 220 + 221 + // runFlusher runs in a goroutine and periodically flushes the batch. 222 + func (h *AsyncHandler) runFlusher() { 223 + defer h.state.wg.Done() 224 + 225 + ticker := time.NewTicker(h.state.flushInterval) 226 + defer ticker.Stop() 227 + 228 + for { 229 + select { 230 + case <-ticker.C: 231 + h.flush() 232 + case <-h.state.flushCh: 233 + h.flush() 234 + case <-h.state.doneCh: 235 + h.flush() // Final flush 236 + return 237 + } 238 + } 239 + } 240 + 241 + // flush sends the current batch to the remote service. 242 + func (h *AsyncHandler) flush() { 243 + h.state.batchMu.Lock() 244 + if len(h.state.batch) == 0 { 245 + h.state.batchMu.Unlock() 246 + return 247 + } 248 + 249 + // Take ownership of the batch 250 + batch := h.state.batch 251 + h.state.batch = make([]LogEntry, 0, h.state.batchSize) 252 + h.state.batchMu.Unlock() 253 + 254 + // Ship with a timeout context 255 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 256 + defer cancel() 257 + 258 + if err := h.state.shipper.Ship(ctx, batch); err != nil { 259 + // Log to stderr (not through slog to avoid recursion) 260 + fmt.Printf("log shipper error: %v (dropped %d entries)\n", err, len(batch)) 261 + } 262 + } 263 + 264 + // Shutdown flushes any remaining logs and closes the shipper. 265 + // Call this during graceful shutdown. 266 + func (h *AsyncHandler) Shutdown() { 267 + if h.state.shipper == nil { 268 + return 269 + } 270 + 271 + close(h.state.doneCh) 272 + h.state.wg.Wait() 273 + 274 + if err := h.state.shipper.Close(); err != nil { 275 + fmt.Printf("log shipper close error: %v\n", err) 276 + } 277 + } 278 + 279 + // resolveAttrValue converts slog.Value to a plain Go value for JSON encoding. 280 + func resolveAttrValue(v slog.Value) any { 281 + switch v.Kind() { 282 + case slog.KindString: 283 + return v.String() 284 + case slog.KindInt64: 285 + return v.Int64() 286 + case slog.KindUint64: 287 + return v.Uint64() 288 + case slog.KindFloat64: 289 + return v.Float64() 290 + case slog.KindBool: 291 + return v.Bool() 292 + case slog.KindDuration: 293 + return v.Duration().String() 294 + case slog.KindTime: 295 + return v.Time().Format(time.RFC3339Nano) 296 + case slog.KindGroup: 297 + attrs := v.Group() 298 + m := make(map[string]any, len(attrs)) 299 + for _, a := range attrs { 300 + m[a.Key] = resolveAttrValue(a.Value) 301 + } 302 + return m 303 + case slog.KindAny: 304 + return v.Any() 305 + default: 306 + return v.String() 307 + } 308 + }
+386
pkg/logging/shipper_test.go
··· 1 + package logging 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "io" 7 + "log/slog" 8 + "net/http" 9 + "net/http/httptest" 10 + "strings" 11 + "sync" 12 + "sync/atomic" 13 + "testing" 14 + "time" 15 + ) 16 + 17 + func TestNewShipper(t *testing.T) { 18 + tests := []struct { 19 + name string 20 + cfg ShipperConfig 21 + wantErr bool 22 + errMsg string 23 + }{ 24 + { 25 + name: "empty backend returns nil", 26 + cfg: ShipperConfig{Backend: ""}, 27 + wantErr: false, 28 + }, 29 + { 30 + name: "victoria backend requires URL", 31 + cfg: ShipperConfig{Backend: "victoria", URL: ""}, 32 + wantErr: true, 33 + errMsg: "URL is required", 34 + }, 35 + { 36 + name: "victoria backend with URL succeeds", 37 + cfg: ShipperConfig{Backend: "victoria", URL: "http://localhost:9428"}, 38 + wantErr: false, 39 + }, 40 + { 41 + name: "unknown backend returns error", 42 + cfg: ShipperConfig{Backend: "unknown"}, 43 + wantErr: true, 44 + errMsg: "unknown log shipper backend", 45 + }, 46 + { 47 + name: "opensearch not implemented", 48 + cfg: ShipperConfig{Backend: "opensearch"}, 49 + wantErr: true, 50 + errMsg: "not yet implemented", 51 + }, 52 + { 53 + name: "loki not implemented", 54 + cfg: ShipperConfig{Backend: "loki"}, 55 + wantErr: true, 56 + errMsg: "not yet implemented", 57 + }, 58 + } 59 + 60 + for _, tt := range tests { 61 + t.Run(tt.name, func(t *testing.T) { 62 + shipper, err := NewShipper(tt.cfg) 63 + if tt.wantErr { 64 + if err == nil { 65 + t.Errorf("NewShipper() expected error, got nil") 66 + } else if tt.errMsg != "" && !strings.Contains(err.Error(), tt.errMsg) { 67 + t.Errorf("NewShipper() error = %v, want error containing %q", err, tt.errMsg) 68 + } 69 + return 70 + } 71 + if err != nil { 72 + t.Errorf("NewShipper() unexpected error: %v", err) 73 + } 74 + if tt.cfg.Backend == "" && shipper != nil { 75 + t.Error("NewShipper() with empty backend should return nil shipper") 76 + } 77 + if shipper != nil { 78 + shipper.Close() 79 + } 80 + }) 81 + } 82 + } 83 + 84 + func TestVictoriaShipper_Ship(t *testing.T) { 85 + var receivedLogs []map[string]any 86 + var mu sync.Mutex 87 + 88 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 89 + if r.Method != http.MethodPost { 90 + t.Errorf("expected POST, got %s", r.Method) 91 + } 92 + if !strings.Contains(r.URL.Path, "/insert/jsonline") { 93 + t.Errorf("expected /insert/jsonline path, got %s", r.URL.Path) 94 + } 95 + if r.Header.Get("Content-Type") != "application/stream+json" { 96 + t.Errorf("expected application/stream+json content type, got %s", r.Header.Get("Content-Type")) 97 + } 98 + 99 + body, _ := io.ReadAll(r.Body) 100 + lines := strings.Split(strings.TrimSpace(string(body)), "\n") 101 + 102 + mu.Lock() 103 + for _, line := range lines { 104 + var doc map[string]any 105 + if err := json.Unmarshal([]byte(line), &doc); err != nil { 106 + t.Errorf("failed to unmarshal log line: %v", err) 107 + } 108 + receivedLogs = append(receivedLogs, doc) 109 + } 110 + mu.Unlock() 111 + 112 + w.WriteHeader(http.StatusOK) 113 + })) 114 + defer server.Close() 115 + 116 + shipper, err := NewVictoriaShipper(ShipperConfig{ 117 + URL: server.URL, 118 + Service: "test-service", 119 + }) 120 + if err != nil { 121 + t.Fatalf("NewVictoriaShipper() error: %v", err) 122 + } 123 + defer shipper.Close() 124 + 125 + entries := []LogEntry{ 126 + { 127 + Time: time.Date(2024, 1, 8, 12, 0, 0, 0, time.UTC), 128 + Level: slog.LevelInfo, 129 + Message: "test message 1", 130 + Source: "test.go:42", 131 + Attrs: map[string]any{"key1": "value1"}, 132 + }, 133 + { 134 + Time: time.Date(2024, 1, 8, 12, 0, 1, 0, time.UTC), 135 + Level: slog.LevelError, 136 + Message: "test message 2", 137 + Source: "test.go:43", 138 + Attrs: map[string]any{"key2": 123}, 139 + }, 140 + } 141 + 142 + ctx := context.Background() 143 + if err := shipper.Ship(ctx, entries); err != nil { 144 + t.Fatalf("Ship() error: %v", err) 145 + } 146 + 147 + mu.Lock() 148 + defer mu.Unlock() 149 + 150 + if len(receivedLogs) != 2 { 151 + t.Errorf("expected 2 logs, got %d", len(receivedLogs)) 152 + } 153 + 154 + // Check first log 155 + if receivedLogs[0]["_msg"] != "test message 1" { 156 + t.Errorf("expected _msg 'test message 1', got %v", receivedLogs[0]["_msg"]) 157 + } 158 + if receivedLogs[0]["level"] != "INFO" { 159 + t.Errorf("expected level 'INFO', got %v", receivedLogs[0]["level"]) 160 + } 161 + if receivedLogs[0]["service"] != "test-service" { 162 + t.Errorf("expected service 'test-service', got %v", receivedLogs[0]["service"]) 163 + } 164 + if receivedLogs[0]["key1"] != "value1" { 165 + t.Errorf("expected key1 'value1', got %v", receivedLogs[0]["key1"]) 166 + } 167 + 168 + // Check second log 169 + if receivedLogs[1]["level"] != "ERROR" { 170 + t.Errorf("expected level 'ERROR', got %v", receivedLogs[1]["level"]) 171 + } 172 + } 173 + 174 + func TestVictoriaShipper_BasicAuth(t *testing.T) { 175 + var authHeader string 176 + 177 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 178 + authHeader = r.Header.Get("Authorization") 179 + w.WriteHeader(http.StatusOK) 180 + })) 181 + defer server.Close() 182 + 183 + shipper, err := NewVictoriaShipper(ShipperConfig{ 184 + URL: server.URL, 185 + Username: "testuser", 186 + Password: "testpass", 187 + }) 188 + if err != nil { 189 + t.Fatalf("NewVictoriaShipper() error: %v", err) 190 + } 191 + defer shipper.Close() 192 + 193 + entries := []LogEntry{{Time: time.Now(), Level: slog.LevelInfo, Message: "test"}} 194 + if err := shipper.Ship(context.Background(), entries); err != nil { 195 + t.Fatalf("Ship() error: %v", err) 196 + } 197 + 198 + if authHeader == "" { 199 + t.Error("expected Authorization header to be set") 200 + } 201 + if !strings.HasPrefix(authHeader, "Basic ") { 202 + t.Errorf("expected Basic auth, got: %s", authHeader) 203 + } 204 + } 205 + 206 + func TestVictoriaShipper_EmptyBatch(t *testing.T) { 207 + var called bool 208 + 209 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 210 + called = true 211 + w.WriteHeader(http.StatusOK) 212 + })) 213 + defer server.Close() 214 + 215 + shipper, _ := NewVictoriaShipper(ShipperConfig{URL: server.URL}) 216 + defer shipper.Close() 217 + 218 + if err := shipper.Ship(context.Background(), nil); err != nil { 219 + t.Errorf("Ship() with nil entries should not error: %v", err) 220 + } 221 + if err := shipper.Ship(context.Background(), []LogEntry{}); err != nil { 222 + t.Errorf("Ship() with empty entries should not error: %v", err) 223 + } 224 + if called { 225 + t.Error("Ship() should not make HTTP request for empty batch") 226 + } 227 + } 228 + 229 + func TestVictoriaShipper_ServerError(t *testing.T) { 230 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 231 + w.WriteHeader(http.StatusInternalServerError) 232 + w.Write([]byte("internal error")) 233 + })) 234 + defer server.Close() 235 + 236 + shipper, _ := NewVictoriaShipper(ShipperConfig{URL: server.URL}) 237 + defer shipper.Close() 238 + 239 + entries := []LogEntry{{Time: time.Now(), Level: slog.LevelInfo, Message: "test"}} 240 + err := shipper.Ship(context.Background(), entries) 241 + if err == nil { 242 + t.Error("Ship() should return error on server error") 243 + } 244 + if !strings.Contains(err.Error(), "500") { 245 + t.Errorf("error should contain status code, got: %v", err) 246 + } 247 + } 248 + 249 + func TestAsyncHandler_Batching(t *testing.T) { 250 + var shipCount atomic.Int32 251 + var totalEntries atomic.Int32 252 + 253 + mockShipper := &mockShipper{ 254 + shipFunc: func(ctx context.Context, entries []LogEntry) error { 255 + shipCount.Add(1) 256 + totalEntries.Add(int32(len(entries))) 257 + return nil 258 + }, 259 + } 260 + 261 + cfg := ShipperConfig{ 262 + BatchSize: 5, 263 + FlushInterval: 100 * time.Millisecond, 264 + } 265 + 266 + stdoutHandler := slog.NewTextHandler(io.Discard, nil) 267 + handler := NewAsyncHandler(stdoutHandler, mockShipper, cfg, nil) 268 + 269 + // Log 12 entries - should trigger 2 batch flushes (at 5 and 10) plus 2 remaining 270 + for i := 0; i < 12; i++ { 271 + record := slog.NewRecord(time.Now(), slog.LevelInfo, "test message", 0) 272 + handler.Handle(context.Background(), record) 273 + } 274 + 275 + // Wait for flush interval to trigger 276 + time.Sleep(200 * time.Millisecond) 277 + 278 + handler.Shutdown() 279 + 280 + if totalEntries.Load() != 12 { 281 + t.Errorf("expected 12 total entries shipped, got %d", totalEntries.Load()) 282 + } 283 + } 284 + 285 + func TestAsyncHandler_Shutdown(t *testing.T) { 286 + var shipped []LogEntry 287 + var mu sync.Mutex 288 + 289 + mockShipper := &mockShipper{ 290 + shipFunc: func(ctx context.Context, entries []LogEntry) error { 291 + mu.Lock() 292 + shipped = append(shipped, entries...) 293 + mu.Unlock() 294 + return nil 295 + }, 296 + } 297 + 298 + cfg := ShipperConfig{ 299 + BatchSize: 100, // Large batch size so nothing flushes immediately 300 + FlushInterval: 10 * time.Second, 301 + } 302 + 303 + stdoutHandler := slog.NewTextHandler(io.Discard, nil) 304 + handler := NewAsyncHandler(stdoutHandler, mockShipper, cfg, nil) 305 + 306 + // Log a few entries 307 + for i := 0; i < 3; i++ { 308 + record := slog.NewRecord(time.Now(), slog.LevelInfo, "test", 0) 309 + handler.Handle(context.Background(), record) 310 + } 311 + 312 + // Entries should be pending (not shipped yet due to large batch size) 313 + mu.Lock() 314 + if len(shipped) != 0 { 315 + t.Errorf("expected 0 shipped before shutdown, got %d", len(shipped)) 316 + } 317 + mu.Unlock() 318 + 319 + // Shutdown should flush pending entries 320 + handler.Shutdown() 321 + 322 + mu.Lock() 323 + if len(shipped) != 3 { 324 + t.Errorf("expected 3 shipped after shutdown, got %d", len(shipped)) 325 + } 326 + mu.Unlock() 327 + } 328 + 329 + func TestAsyncHandler_NoShipper(t *testing.T) { 330 + cfg := ShipperConfig{} 331 + stdoutHandler := slog.NewTextHandler(io.Discard, nil) 332 + handler := NewAsyncHandler(stdoutHandler, nil, cfg, nil) 333 + 334 + // Should not panic with nil shipper 335 + record := slog.NewRecord(time.Now(), slog.LevelInfo, "test", 0) 336 + if err := handler.Handle(context.Background(), record); err != nil { 337 + t.Errorf("Handle() with nil shipper should not error: %v", err) 338 + } 339 + 340 + // Shutdown should not panic 341 + handler.Shutdown() 342 + } 343 + 344 + func TestResolveAttrValue(t *testing.T) { 345 + tests := []struct { 346 + name string 347 + value slog.Value 348 + expected any 349 + }{ 350 + {"string", slog.StringValue("test"), "test"}, 351 + {"int64", slog.Int64Value(42), int64(42)}, 352 + {"uint64", slog.Uint64Value(42), uint64(42)}, 353 + {"float64", slog.Float64Value(3.14), 3.14}, 354 + {"bool", slog.BoolValue(true), true}, 355 + {"duration", slog.DurationValue(5 * time.Second), "5s"}, 356 + } 357 + 358 + for _, tt := range tests { 359 + t.Run(tt.name, func(t *testing.T) { 360 + got := resolveAttrValue(tt.value) 361 + if got != tt.expected { 362 + t.Errorf("resolveAttrValue() = %v, want %v", got, tt.expected) 363 + } 364 + }) 365 + } 366 + } 367 + 368 + // mockShipper implements Shipper for testing 369 + type mockShipper struct { 370 + shipFunc func(ctx context.Context, entries []LogEntry) error 371 + closeFunc func() error 372 + } 373 + 374 + func (m *mockShipper) Ship(ctx context.Context, entries []LogEntry) error { 375 + if m.shipFunc != nil { 376 + return m.shipFunc(ctx, entries) 377 + } 378 + return nil 379 + } 380 + 381 + func (m *mockShipper) Close() error { 382 + if m.closeFunc != nil { 383 + return m.closeFunc() 384 + } 385 + return nil 386 + }
+112
pkg/logging/victoria.go
··· 1 + package logging 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "time" 11 + ) 12 + 13 + // VictoriaShipper ships logs to Victoria Logs using the native JSON lines endpoint. 14 + type VictoriaShipper struct { 15 + url string 16 + client *http.Client 17 + service string 18 + username string 19 + password string 20 + } 21 + 22 + // NewVictoriaShipper creates a new Victoria Logs shipper. 23 + func NewVictoriaShipper(cfg ShipperConfig) (*VictoriaShipper, error) { 24 + if cfg.URL == "" { 25 + return nil, fmt.Errorf("victoria logs URL is required") 26 + } 27 + 28 + return &VictoriaShipper{ 29 + url: cfg.URL, 30 + service: cfg.Service, 31 + client: &http.Client{ 32 + Timeout: 30 * time.Second, 33 + }, 34 + username: cfg.Username, 35 + password: cfg.Password, 36 + }, nil 37 + } 38 + 39 + // Ship sends a batch of log entries to Victoria Logs. 40 + func (v *VictoriaShipper) Ship(ctx context.Context, entries []LogEntry) error { 41 + if len(entries) == 0 { 42 + return nil 43 + } 44 + 45 + var buf bytes.Buffer 46 + 47 + for _, entry := range entries { 48 + doc := map[string]any{ 49 + // Victoria Logs special fields 50 + "_time": entry.Time.UTC().Format(time.RFC3339Nano), 51 + "_msg": entry.Message, 52 + 53 + // Standard fields 54 + "level": entry.Level.String(), 55 + "source": entry.Source, 56 + } 57 + 58 + // Add service if configured 59 + if v.service != "" { 60 + doc["service"] = v.service 61 + } 62 + 63 + // Add all custom attributes 64 + for k, val := range entry.Attrs { 65 + // Don't overwrite special fields 66 + if k != "_time" && k != "_msg" && k != "level" && k != "source" && k != "service" { 67 + doc[k] = val 68 + } 69 + } 70 + 71 + if err := json.NewEncoder(&buf).Encode(doc); err != nil { 72 + return fmt.Errorf("failed to encode log entry: %w", err) 73 + } 74 + } 75 + 76 + // Use the JSON lines endpoint with stream fields for efficient querying 77 + url := v.url + "/insert/jsonline?_stream_fields=service,level" 78 + 79 + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buf) 80 + if err != nil { 81 + return fmt.Errorf("failed to create request: %w", err) 82 + } 83 + 84 + req.Header.Set("Content-Type", "application/stream+json") 85 + 86 + // Add basic auth if configured 87 + if v.username != "" && v.password != "" { 88 + req.SetBasicAuth(v.username, v.password) 89 + } 90 + 91 + resp, err := v.client.Do(req) 92 + if err != nil { 93 + return fmt.Errorf("failed to send logs: %w", err) 94 + } 95 + defer resp.Body.Close() 96 + 97 + if resp.StatusCode >= 400 { 98 + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) 99 + return fmt.Errorf("victoria logs error %d: %s", resp.StatusCode, string(body)) 100 + } 101 + 102 + // Drain and close body to allow connection reuse 103 + _, _ = io.Copy(io.Discard, resp.Body) 104 + 105 + return nil 106 + } 107 + 108 + // Close releases any resources held by the shipper. 109 + func (v *VictoriaShipper) Close() error { 110 + v.client.CloseIdleConnections() 111 + return nil 112 + }