Keep using Photos.app like you always do. Attic quietly backs up your originals and edits to an S3 bucket you control. One-way, append-only.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add concurrent uploads with network resilience

Concurrent S3 uploads via bounded TaskGroup (default 6 workers).
Two-tier error handling: network-down errors trigger drain-then-pause
with automatic retry on recovery, server-transient errors retry with
exponential backoff and jitter.

Key changes:
- Typed URLError.Code + S3ClientError matching in RetryPolicy
- Network pause/resume with manifest save before waiting
- Cancellation-safe manifest save via do/catch for CancellationError
- UploadContext parameter object to reduce function parameter count
- maxPauseRetries cap prevents infinite pause/resume on flaky networks
- MockNetworkMonitor moved from production to test target

+731 -289
+1 -2
Sources/AtticCLI/BackupCommand.swift
··· 65 65 66 66 // Prevent idle sleep during backup (released automatically via deinit) 67 67 let powerAssertion = PowerAssertion(reason: "Backing up photos to S3") 68 - let networkMonitor = NWPathNetworkMonitor() 69 68 70 69 let report = try await runBackup( 71 70 assets: assets, ··· 75 74 s3: s3, 76 75 options: options, 77 76 progress: progress, 78 - networkMonitor: networkMonitor, 77 + networkMonitor: NWPathNetworkMonitor(), 79 78 ) 80 79 81 80 _ = powerAssertion // prevent unused warning, released in deinit
+272 -249
Sources/AtticCore/BackupPipeline.swift
··· 21 21 public var type: AssetKind? 22 22 public var dryRun: Bool 23 23 public var saveInterval: Int 24 + public var concurrency: Int 24 25 public var networkTimeout: Duration 26 + public var maxPauseRetries: Int 25 27 26 28 public init( 27 29 batchSize: Int = 50, ··· 29 31 type: AssetKind? = nil, 30 32 dryRun: Bool = false, 31 33 saveInterval: Int = 50, 34 + concurrency: Int = 6, 32 35 networkTimeout: Duration = .seconds(900), 36 + maxPauseRetries: Int = 3, 33 37 ) { 34 38 self.batchSize = batchSize 35 39 self.limit = limit 36 40 self.type = type 37 41 self.dryRun = dryRun 38 42 self.saveInterval = saveInterval 43 + self.concurrency = concurrency 39 44 self.networkTimeout = networkTimeout 45 + self.maxPauseRetries = maxPauseRetries 40 46 } 41 47 } 42 48 ··· 55 61 } 56 62 } 57 63 64 + // swiftlint:disable cyclomatic_complexity function_body_length 58 65 /// Run the backup pipeline: filter → batch → export → upload → manifest. 59 66 public func runBackup( 60 67 assets: [AssetInfo], ··· 103 110 var sinceLastSave = 0 104 111 var deferred: [String] = [] 105 112 106 - // Process in batches 113 + let ctx = UploadContext( 114 + assetByUUID: assetByUUID, 115 + s3: s3, 116 + manifestStore: manifestStore, 117 + saveInterval: options.saveInterval, 118 + concurrency: options.concurrency, 119 + progress: progress, 120 + networkMonitor: networkMonitor, 121 + networkTimeout: options.networkTimeout, 122 + maxPauseRetries: options.maxPauseRetries, 123 + ) 124 + 125 + // Process in batches (wrapped to save manifest on cancellation) 107 126 let totalBatches = (pending.count + options.batchSize - 1) / options.batchSize 108 127 109 - for batchIndex in 0 ..< totalBatches { 110 - try Task.checkCancellation() 128 + do { 129 + for batchIndex in 0 ..< totalBatches { 130 + try Task.checkCancellation() 111 131 112 - let start = batchIndex * options.batchSize 113 - let end = min(start + options.batchSize, pending.count) 114 - let batch = Array(pending[start ..< end]) 115 - let batchUUIDs = batch.map(\.uuid) 132 + let start = batchIndex * options.batchSize 133 + let end = min(start + options.batchSize, pending.count) 134 + let batch = Array(pending[start ..< end]) 135 + let batchUUIDs = batch.map(\.uuid) 136 + 137 + progress.batchStarted( 138 + batchNumber: batchIndex + 1, 139 + totalBatches: totalBatches, 140 + assetCount: batch.count, 141 + ) 142 + 143 + // 1. Export via LadderKit 144 + let batchResult: ExportResponse 145 + do { 146 + batchResult = try await exporter.exportBatch(uuids: batchUUIDs) 147 + } catch let error as ExportProviderError where error.isTimeout { 148 + // Batch timeout: retry each asset individually 149 + var combinedResults: [ExportResult] = [] 150 + var combinedErrors: [LadderKit.ExportError] = [] 151 + for uuid in batchUUIDs { 152 + try Task.checkCancellation() 153 + do { 154 + let result = try await exporter.exportBatch(uuids: [uuid]) 155 + combinedResults.append(contentsOf: result.results) 156 + combinedErrors.append(contentsOf: result.errors) 157 + } catch let innerError as ExportProviderError where innerError.isTimeout { 158 + deferred.append(uuid) 159 + } catch { 160 + let msg = String(describing: error) 161 + report.appendError(uuid: uuid, message: msg) 162 + report.failed += 1 163 + let filename = assetByUUID[uuid]?.originalFilename ?? uuid 164 + progress.assetFailed(uuid: uuid, filename: filename, message: msg) 165 + } 166 + } 167 + let combined = ExportResponse(results: combinedResults, errors: combinedErrors) 168 + try await uploadExported( 169 + combined, ctx: ctx, 170 + manifest: &manifest, report: &report, 171 + sinceLastSave: &sinceLastSave, 172 + ) 173 + continue 174 + } catch let error as ExportProviderError where error.isPermission { 175 + // Permission error: abort all remaining batches 176 + let msg = String(describing: error) 177 + for uuid in batchUUIDs { 178 + report.appendError(uuid: uuid, message: msg) 179 + report.failed += 1 180 + } 181 + for asset in pending[end...] { 182 + report.appendError(uuid: asset.uuid, message: msg) 183 + report.failed += 1 184 + } 185 + break 186 + } catch { 187 + // Non-timeout error: fail the whole batch 188 + let msg = String(describing: error) 189 + for uuid in batchUUIDs { 190 + report.appendError(uuid: uuid, message: msg) 191 + report.failed += 1 192 + let filename = assetByUUID[uuid]?.originalFilename ?? uuid 193 + progress.assetFailed(uuid: uuid, filename: filename, message: msg) 194 + } 195 + continue 196 + } 116 197 117 - progress.batchStarted( 118 - batchNumber: batchIndex + 1, 119 - totalBatches: totalBatches, 120 - assetCount: batch.count, 121 - ) 198 + // 2. Upload exported assets 199 + try await uploadExported( 200 + batchResult, ctx: ctx, 201 + manifest: &manifest, report: &report, 202 + sinceLastSave: &sinceLastSave, 203 + ) 204 + } 122 205 123 - // 1. Export via LadderKit 124 - let batchResult: ExportResponse 125 - do { 126 - batchResult = try await exporter.exportBatch(uuids: batchUUIDs) 127 - } catch let error as ExportProviderError where error.isTimeout { 128 - // Batch timeout: retry each asset individually 129 - var combinedResults: [ExportResult] = [] 130 - var combinedErrors: [LadderKit.ExportError] = [] 131 - for uuid in batchUUIDs { 206 + // Retry deferred assets 207 + if !deferred.isEmpty { 208 + for uuid in deferred { 132 209 try Task.checkCancellation() 133 210 do { 134 211 let result = try await exporter.exportBatch(uuids: [uuid]) 135 - combinedResults.append(contentsOf: result.results) 136 - combinedErrors.append(contentsOf: result.errors) 137 - } catch let innerError as ExportProviderError where innerError.isTimeout { 138 - deferred.append(uuid) 212 + try await uploadExported( 213 + result, ctx: ctx, 214 + manifest: &manifest, report: &report, 215 + sinceLastSave: &sinceLastSave, 216 + ) 139 217 } catch { 140 218 let msg = String(describing: error) 141 219 report.appendError(uuid: uuid, message: msg) ··· 144 222 progress.assetFailed(uuid: uuid, filename: filename, message: msg) 145 223 } 146 224 } 147 - let combined = ExportResponse(results: combinedResults, errors: combinedErrors) 148 - try await uploadExported( 149 - combined, assetByUUID: assetByUUID, s3: s3, 150 - manifest: &manifest, manifestStore: manifestStore, 151 - report: &report, sinceLastSave: &sinceLastSave, 152 - saveInterval: options.saveInterval, 153 - progress: progress, 154 - networkMonitor: networkMonitor, 155 - networkTimeout: options.networkTimeout, 156 - ) 157 - continue 158 - } catch let error as ExportProviderError where error.isPermission { 159 - // Permission error: abort all remaining batches 160 - let msg = String(describing: error) 161 - for uuid in batchUUIDs { 162 - report.appendError(uuid: uuid, message: msg) 163 - report.failed += 1 164 - } 165 - for asset in pending[end...] { 166 - report.appendError(uuid: asset.uuid, message: msg) 167 - report.failed += 1 168 - } 169 - break 170 - } catch { 171 - // Non-timeout error: fail the whole batch 172 - let msg = String(describing: error) 173 - for uuid in batchUUIDs { 174 - report.appendError(uuid: uuid, message: msg) 175 - report.failed += 1 176 - let filename = assetByUUID[uuid]?.originalFilename ?? uuid 177 - progress.assetFailed(uuid: uuid, filename: filename, message: msg) 178 - } 179 - continue 180 225 } 181 - 182 - // 2. Upload exported assets 183 - try await uploadExported( 184 - batchResult, assetByUUID: assetByUUID, s3: s3, 185 - manifest: &manifest, manifestStore: manifestStore, 186 - report: &report, sinceLastSave: &sinceLastSave, 187 - saveInterval: options.saveInterval, 188 - progress: progress, 189 - networkMonitor: networkMonitor, 190 - networkTimeout: options.networkTimeout, 191 - ) 192 - } 193 - 194 - // Retry deferred assets 195 - if !deferred.isEmpty { 196 - for uuid in deferred { 197 - try Task.checkCancellation() 198 - do { 199 - let result = try await exporter.exportBatch(uuids: [uuid]) 200 - try await uploadExported( 201 - result, assetByUUID: assetByUUID, s3: s3, 202 - manifest: &manifest, manifestStore: manifestStore, 203 - report: &report, sinceLastSave: &sinceLastSave, 204 - saveInterval: options.saveInterval, 205 - progress: progress, 206 - networkMonitor: networkMonitor, 207 - networkTimeout: options.networkTimeout, 208 - ) 209 - } catch { 210 - let msg = String(describing: error) 211 - report.appendError(uuid: uuid, message: msg) 212 - report.failed += 1 213 - let filename = assetByUUID[uuid]?.originalFilename ?? uuid 214 - progress.assetFailed(uuid: uuid, filename: filename, message: msg) 215 - } 226 + } catch is CancellationError { 227 + // Save progress before propagating cancellation 228 + if sinceLastSave > 0 { 229 + try? await manifestStore.save(manifest) 230 + progress.manifestSaved(entriesCount: manifest.entries.count) 216 231 } 232 + throw CancellationError() 217 233 } 218 234 219 235 // Final save ··· 231 247 return report 232 248 } 233 249 234 - /// Check if an upload error looks like a transient network issue. 235 - /// 236 - /// Intentionally a superset of `RetryPolicy.isTransient` — includes 237 - /// "nsurlerrordomain" and "cfnetwork" so the pipeline's network-pause 238 - /// logic catches errors that `withRetry` deliberately does not retry 239 - /// (avoiding 7s of backoff before the network monitor can take over). 240 - private func isTransientUploadError(_ error: Error) -> Bool { 241 - let message = String(describing: error).lowercased() 242 - let patterns = [ 243 - "timeout", "timed out", "econnreset", "econnrefused", 244 - "epipe", "socket", "network", "fetch failed", 245 - "nsurlerrordomain", "cfnetwork", 246 - ] 247 - return patterns.contains { message.contains($0) } 250 + // swiftlint:enable cyclomatic_complexity function_body_length 251 + 252 + // MARK: - Upload context and helper 253 + 254 + /// Bundles the non-mutating dependencies for uploadExported, reducing parameter count 255 + /// and making the recursive retry call less error-prone. 256 + private struct UploadContext { 257 + let assetByUUID: [String: AssetInfo] 258 + let s3: any S3Providing 259 + let manifestStore: any ManifestStoring 260 + let saveInterval: Int 261 + let concurrency: Int 262 + let progress: any BackupProgressDelegate 263 + let networkMonitor: (any NetworkMonitoring)? 264 + let networkTimeout: Duration 265 + let maxPauseRetries: Int 248 266 } 249 267 250 - // MARK: - Upload helper 251 - 268 + // swiftlint:disable:next function_body_length cyclomatic_complexity 252 269 private func uploadExported( 253 270 _ batchResult: ExportResponse, 254 - assetByUUID: [String: AssetInfo], 255 - s3: any S3Providing, 271 + ctx: UploadContext, 256 272 manifest: inout Manifest, 257 - manifestStore: any ManifestStoring, 258 273 report: inout BackupReport, 259 274 sinceLastSave: inout Int, 260 - saveInterval: Int, 261 - progress: any BackupProgressDelegate, 262 - networkMonitor: (any NetworkMonitoring)? = nil, 263 - networkTimeout: Duration = .seconds(900), 275 + pauseRetryCount: Int = 0, 264 276 ) async throws { 265 277 // Record export errors 266 278 for err in batchResult.errors { 267 - let filename = assetByUUID[err.uuid]?.originalFilename ?? err.uuid 268 - progress.assetFailed(uuid: err.uuid, filename: filename, message: err.message) 279 + let filename = ctx.assetByUUID[err.uuid]?.originalFilename ?? err.uuid 280 + ctx.progress.assetFailed(uuid: err.uuid, filename: filename, message: err.message) 269 281 report.appendError(uuid: err.uuid, message: err.message) 270 282 report.failed += 1 271 283 } 272 284 273 - // Upload successful exports 274 - for exported in batchResult.results { 275 - try Task.checkCancellation() 276 - 277 - guard let asset = assetByUUID[exported.uuid] else { continue } 285 + let exports = batchResult.results 286 + if exports.isEmpty { return } 278 287 288 + // Build upload inputs (compute S3 keys on the caller's task to propagate errors) 289 + var inputs: [UploadInput] = [] 290 + for exported in exports { 291 + guard let asset = ctx.assetByUUID[exported.uuid] else { continue } 279 292 let ext = S3Paths.extensionFromUTIOrFilename( 280 293 uti: asset.uniformTypeIdentifier, 281 294 filename: asset.originalFilename ?? "unknown", ··· 285 298 dateCreated: asset.creationDate, 286 299 extension: ext, 287 300 ) 301 + inputs.append(UploadInput(exported: exported, asset: asset, s3Key: s3Key, ext: ext)) 302 + } 288 303 289 - do { 290 - try await uploadAssetToS3( 291 - exported: exported, asset: asset, 292 - s3Key: s3Key, ext: ext, s3: s3, 293 - manifest: &manifest, report: &report, 294 - sinceLastSave: &sinceLastSave, 295 - saveInterval: saveInterval, 296 - manifestStore: manifestStore, progress: progress, 297 - ) 298 - } catch is CancellationError { 299 - throw CancellationError() 300 - } catch { 301 - // Check if this is a network issue we can wait out 302 - if let monitor = networkMonitor, isTransientUploadError(error) { 303 - let networkUp = await monitor.isNetworkAvailable 304 - if !networkUp { 305 - // Save manifest before waiting (preserve progress) 306 - if sinceLastSave > 0 { 307 - try? await manifestStore.save(manifest) 308 - progress.manifestSaved(entriesCount: manifest.entries.count) 309 - sinceLastSave = 0 310 - } 304 + // Track inputs by UUID for retry lookup 305 + let inputByUUID = Dictionary(uniqueKeysWithValues: inputs.map { ($0.exported.uuid, $0) }) 311 306 312 - progress.backupPaused(reason: "Waiting for network...") 313 - let recovered = try await monitor.waitForNetwork( 314 - timeout: networkTimeout, 315 - ) 316 - progress.backupResumed() 307 + // Concurrent uploads with bounded TaskGroup 308 + let effectiveConcurrency = max(1, ctx.concurrency) 309 + var networkPaused = false 310 + var retryInputs: [UploadInput] = [] 311 + var retryUUIDs: Set<String> = [] 317 312 318 - if recovered { 319 - // Retry the same asset after network recovery 320 - do { 321 - try await uploadAssetToS3( 322 - exported: exported, asset: asset, 323 - s3Key: s3Key, ext: ext, s3: s3, 324 - manifest: &manifest, report: &report, 325 - sinceLastSave: &sinceLastSave, 326 - saveInterval: saveInterval, 327 - manifestStore: manifestStore, progress: progress, 328 - ) 329 - try? FileManager.default.removeItem(atPath: exported.path) 330 - continue 331 - } catch { 332 - // Retry after recovery also failed — fall through 333 - } 334 - } else { 335 - // Network timeout — save manifest and stop 336 - let timeoutMinutes = Int(networkTimeout.components.seconds) / 60 337 - report.appendError( 338 - uuid: exported.uuid, 339 - message: "Network unavailable for \(timeoutMinutes) minutes, backup paused", 340 - ) 341 - report.failed += 1 342 - if sinceLastSave > 0 { 343 - try? await manifestStore.save(manifest) 344 - sinceLastSave = 0 345 - } 346 - return 313 + try await withThrowingTaskGroup(of: UploadResult.self) { group in 314 + var cursor = 0 315 + 316 + // Seed initial tasks 317 + for _ in 0 ..< min(effectiveConcurrency, inputs.count) { 318 + let input = inputs[cursor] 319 + cursor += 1 320 + group.addTask { 321 + await uploadSingleAsset(input: input, s3: ctx.s3) 322 + } 323 + } 324 + 325 + // Process results and enqueue next 326 + for try await result in group { 327 + try Task.checkCancellation() 328 + 329 + if let checksum = result.checksum, result.error == nil { 330 + manifest.markBackedUp( 331 + uuid: result.uuid, 332 + s3Key: result.s3Key, 333 + checksum: checksum, 334 + size: result.size, 335 + ) 336 + sinceLastSave += 1 337 + report.uploaded += 1 338 + report.totalBytes += result.size 339 + ctx.progress.assetUploaded( 340 + uuid: result.uuid, 341 + filename: result.filename, 342 + type: result.type, 343 + size: result.size, 344 + ) 345 + 346 + // Periodic manifest save 347 + if sinceLastSave >= ctx.saveInterval { 348 + do { 349 + try await ctx.manifestStore.save(manifest) 350 + ctx.progress.manifestSaved(entriesCount: manifest.entries.count) 351 + sinceLastSave = 0 352 + } catch { 353 + debugPrint("Periodic manifest save failed: \(error)") 347 354 } 348 355 } 356 + } else if result.isNetworkDownError, 357 + let monitor = ctx.networkMonitor, 358 + await !monitor.isNetworkAvailable 359 + { // swiftlint:disable:this opening_brace 360 + // Network-down failure: queue for retry after recovery 361 + networkPaused = true 362 + if let input = inputByUUID[result.uuid] { 363 + retryInputs.append(input) 364 + retryUUIDs.insert(result.uuid) 365 + } 366 + } else { 367 + // Permanent or non-network failure 368 + ctx.progress.assetFailed( 369 + uuid: result.uuid, 370 + filename: result.filename, 371 + message: result.error ?? "Unknown error", 372 + ) 373 + report.appendError(uuid: result.uuid, message: result.error ?? "Unknown error") 374 + report.failed += 1 349 375 } 350 376 351 - let msg = String(describing: error) 352 - let filename = asset.originalFilename ?? exported.uuid 353 - progress.assetFailed(uuid: exported.uuid, filename: filename, message: msg) 354 - report.appendError(uuid: exported.uuid, message: msg) 355 - report.failed += 1 377 + // Clean up staged file (skip if queued for retry) 378 + if !retryUUIDs.contains(result.uuid) { 379 + try? FileManager.default.removeItem(atPath: result.path) 380 + } 381 + 382 + // Enqueue next upload (skip if network is down — let group drain) 383 + if !networkPaused, cursor < inputs.count { 384 + let input = inputs[cursor] 385 + cursor += 1 386 + group.addTask { 387 + await uploadSingleAsset(input: input, s3: ctx.s3) 388 + } 389 + } 356 390 } 357 391 358 - // Clean up staged file 359 - try? FileManager.default.removeItem(atPath: exported.path) 392 + // After drain: queue any remaining un-enqueued inputs for retry 393 + if networkPaused { 394 + while cursor < inputs.count { 395 + retryInputs.append(inputs[cursor]) 396 + cursor += 1 397 + } 398 + } 360 399 } 361 - } 362 400 363 - /// Upload a single asset (original + metadata) to S3 and update the manifest. 364 - private func uploadAssetToS3( 365 - exported: ExportResult, 366 - asset: AssetInfo, 367 - s3Key: String, 368 - ext: String, 369 - s3: any S3Providing, 370 - manifest: inout Manifest, 371 - report: inout BackupReport, 372 - sinceLastSave: inout Int, 373 - saveInterval: Int, 374 - manifestStore: any ManifestStoring, 375 - progress: any BackupProgressDelegate, 376 - ) async throws { 377 - // Upload original via file URL (avoids loading into memory) 378 - let fileURL = URL(fileURLWithPath: exported.path) 379 - try await withRetry { 380 - try await s3.putObject( 381 - key: s3Key, 382 - fileURL: fileURL, 383 - contentType: contentTypeForExtension(ext), 384 - ) 385 - } 401 + // Network pause/resume: wait for recovery and retry 402 + if networkPaused, !retryInputs.isEmpty { 403 + guard let monitor = ctx.networkMonitor else { return } 386 404 387 - // Build and upload metadata 388 - let isoNow = isoFormatter.string(from: Date()) 389 - let meta = buildMetadataJSON( 390 - asset: asset, 391 - s3Key: s3Key, 392 - checksum: "sha256:\(exported.sha256)", 393 - backedUpAt: isoNow, 394 - ) 395 - let metaData = try metadataEncoder.encode(meta) 396 - let metaKey = try S3Paths.metadataKey(uuid: asset.uuid) 397 - try await withRetry { 398 - try await s3.putObject( 399 - key: metaKey, 400 - body: metaData, 401 - contentType: "application/json", 402 - ) 403 - } 405 + // Save manifest before waiting (preserve progress) 406 + if sinceLastSave > 0 { 407 + do { 408 + try await ctx.manifestStore.save(manifest) 409 + ctx.progress.manifestSaved(entriesCount: manifest.entries.count) 410 + sinceLastSave = 0 411 + } catch { 412 + debugPrint("Pre-pause manifest save failed: \(error)") 413 + } 414 + } 404 415 405 - // Update manifest 406 - manifest.markBackedUp( 407 - uuid: asset.uuid, 408 - s3Key: s3Key, 409 - checksum: "sha256:\(exported.sha256)", 410 - size: Int(exported.size), 411 - ) 412 - sinceLastSave += 1 413 - report.uploaded += 1 414 - report.totalBytes += Int(exported.size) 416 + ctx.progress.backupPaused(reason: "Waiting for network...") 417 + let recovered = try await monitor.waitForNetwork(timeout: ctx.networkTimeout) 418 + ctx.progress.backupResumed() 415 419 416 - let filename = asset.originalFilename ?? "unknown" 417 - progress.assetUploaded( 418 - uuid: asset.uuid, 419 - filename: filename, 420 - type: asset.kind, 421 - size: Int(exported.size), 422 - ) 420 + if recovered, pauseRetryCount < ctx.maxPauseRetries { 421 + // Build a synthetic ExportResponse from retry inputs 422 + let retryResults = retryInputs.map(\.exported) 423 + let retryResponse = ExportResponse(results: retryResults, errors: []) 423 424 424 - // Periodic manifest save 425 - if sinceLastSave >= saveInterval { 426 - try await manifestStore.save(manifest) 427 - progress.manifestSaved(entriesCount: manifest.entries.count) 428 - sinceLastSave = 0 425 + do { 426 + try await uploadExported( 427 + retryResponse, ctx: ctx, 428 + manifest: &manifest, report: &report, 429 + sinceLastSave: &sinceLastSave, 430 + pauseRetryCount: pauseRetryCount + 1, 431 + ) 432 + } catch { 433 + // Clean up staged files before propagating 434 + for input in retryInputs { 435 + try? FileManager.default.removeItem(atPath: input.exported.path) 436 + } 437 + throw error 438 + } 439 + } else { 440 + // Timeout or max retries — record failures 441 + let reason = recovered 442 + ? "Max network pause retries exceeded" 443 + : "Network unavailable" 444 + for input in retryInputs { 445 + let filename = input.asset.originalFilename ?? input.exported.uuid 446 + ctx.progress.assetFailed(uuid: input.exported.uuid, filename: filename, message: reason) 447 + report.appendError(uuid: input.exported.uuid, message: reason) 448 + report.failed += 1 449 + try? FileManager.default.removeItem(atPath: input.exported.path) 450 + } 451 + } 429 452 } 430 453 }
+94
Sources/AtticCore/BackupUpload.swift
··· 1 + import Foundation 2 + import LadderKit 3 + 4 + /// Result of uploading a single asset, returned from TaskGroup child tasks. 5 + struct UploadResult { 6 + let uuid: String 7 + let s3Key: String 8 + let checksum: String? 9 + let filename: String 10 + let type: AssetKind 11 + let size: Int 12 + let error: String? 13 + let isNetworkDownError: Bool 14 + let path: String 15 + } 16 + 17 + /// Inputs for a single concurrent upload task. 18 + struct UploadInput { 19 + let exported: ExportResult 20 + let asset: AssetInfo 21 + let s3Key: String 22 + let ext: String 23 + } 24 + 25 + /// Upload a single asset (original + metadata) to S3. Returns an UploadResult. 26 + /// 27 + /// This is a pure `@Sendable` function — no mutation of shared state. 28 + /// Manifest updates happen in the caller's `for await` loop. 29 + func uploadSingleAsset( 30 + input: UploadInput, 31 + s3: any S3Providing, 32 + ) async -> UploadResult { 33 + let exported = input.exported 34 + let asset = input.asset 35 + let s3Key = input.s3Key 36 + let ext = input.ext 37 + 38 + do { 39 + // Upload original via file URL (avoids loading into memory) 40 + let fileURL = URL(fileURLWithPath: exported.path) 41 + try await withRetry { 42 + try await s3.putObject( 43 + key: s3Key, 44 + fileURL: fileURL, 45 + contentType: contentTypeForExtension(ext), 46 + ) 47 + } 48 + 49 + // Build and upload metadata (per-call formatter for thread safety) 50 + let formatter = ISO8601DateFormatter() 51 + let isoNow = formatter.string(from: Date()) 52 + let meta = buildMetadataJSON( 53 + asset: asset, 54 + s3Key: s3Key, 55 + checksum: "sha256:\(exported.sha256)", 56 + backedUpAt: isoNow, 57 + ) 58 + let encoder = JSONEncoder() 59 + encoder.outputFormatting = [.prettyPrinted, .sortedKeys] 60 + let metaData = try encoder.encode(meta) 61 + let metaKey = try S3Paths.metadataKey(uuid: asset.uuid) 62 + try await withRetry { 63 + try await s3.putObject( 64 + key: metaKey, 65 + body: metaData, 66 + contentType: "application/json", 67 + ) 68 + } 69 + 70 + return UploadResult( 71 + uuid: asset.uuid, 72 + s3Key: s3Key, 73 + checksum: "sha256:\(exported.sha256)", 74 + filename: asset.originalFilename ?? "unknown", 75 + type: asset.kind, 76 + size: Int(exported.size), 77 + error: nil, 78 + isNetworkDownError: false, 79 + path: exported.path, 80 + ) 81 + } catch { 82 + return UploadResult( 83 + uuid: asset.uuid, 84 + s3Key: s3Key, 85 + checksum: nil, 86 + filename: asset.originalFilename ?? "unknown", 87 + type: asset.kind, 88 + size: Int(exported.size), 89 + error: String(describing: error), 90 + isNetworkDownError: isNetworkDown(error), 91 + path: exported.path, 92 + ) 93 + } 94 + }
+7 -6
Sources/AtticCore/MockNetworkMonitor.swift Tests/AtticCoreTests/MockNetworkMonitor.swift
··· 1 + @testable import AtticCore 1 2 import Foundation 2 3 3 4 /// In-memory network monitor mock for tests. 4 5 /// 5 6 /// Controllable `isAvailable` property. `waitForNetwork` polls with a short 6 7 /// interval and respects both timeout and cancellation. 7 - public actor MockNetworkMonitor: NetworkMonitoring { 8 + actor MockNetworkMonitor: NetworkMonitoring { 8 9 private var available: Bool 9 10 10 - public init(available: Bool = true) { 11 + init(available: Bool = true) { 11 12 self.available = available 12 13 } 13 14 14 - public var isNetworkAvailable: Bool { 15 + var isNetworkAvailable: Bool { 15 16 available 16 17 } 17 18 18 19 /// Simulate network recovery. 19 - public func setAvailable() { 20 + func setAvailable() { 20 21 available = true 21 22 } 22 23 23 24 /// Simulate network loss. 24 - public func setUnavailable() { 25 + func setUnavailable() { 25 26 available = false 26 27 } 27 28 28 - public func waitForNetwork(timeout: Duration) async throws -> Bool { 29 + func waitForNetwork(timeout: Duration) async throws -> Bool { 29 30 if available { return true } 30 31 31 32 let deadline = ContinuousClock.now + timeout
+76 -12
Sources/AtticCore/RetryPolicy.swift
··· 1 1 import Foundation 2 2 3 - /// Retry an async operation with exponential backoff. 3 + /// Retry an async operation with exponential backoff and jitter. 4 4 /// 5 - /// Handles transient network failures (e.g. after sleep/wake). 5 + /// Handles transient server errors (5xx, throttling, timeouts). 6 + /// Network-down errors (no connectivity) are NOT retried — they fail fast 7 + /// so the pipeline-level network pause can activate sooner. 6 8 /// Respects Task cancellation to bail out immediately. 7 9 public func withRetry<T: Sendable>( 8 10 maxAttempts: Int = 3, 9 11 baseDelay: Duration = .seconds(1), 12 + maxDelay: Duration = .seconds(30), 10 13 operation: @Sendable () async throws -> T, 11 14 ) async throws -> T { 12 15 for attempt in 1 ... maxAttempts { ··· 18 21 // Don't retry if cancelled 19 22 try Task.checkCancellation() 20 23 21 - // Only retry on transient/network errors 24 + // Network-down errors fail fast (no retry) 25 + if isNetworkDown(error) { throw error } 26 + 27 + // Only retry on transient server errors 22 28 guard isTransient(error) else { throw error } 23 29 24 - let delay = baseDelay * Int(pow(2.0, Double(attempt - 1))) 25 - try await Task.sleep(for: delay) 30 + // Exponential backoff with jitter 31 + let exponential = baseDelay * Int(pow(2.0, Double(attempt - 1))) 32 + let capped = min(exponential, maxDelay) 33 + let cappedMs = Int(capped.components.seconds) * 1000 34 + + Int(capped.components.attoseconds / 1_000_000_000_000_000) 35 + let jittered: Duration = .milliseconds(Int.random(in: 0 ... max(1, cappedMs))) 36 + try await Task.sleep(for: jittered) 26 37 } 27 38 } 28 39 fatalError("unreachable") 29 40 } 30 41 31 - /// Determine whether an error is transient and worth retrying. 42 + // MARK: - Error classification 43 + 44 + /// Network-down errors: no connectivity, fail fast (don't waste retry attempts). 45 + public func isNetworkDown(_ error: Error) -> Bool { 46 + if let urlError = error as? URLError { 47 + switch urlError.code { 48 + case .notConnectedToInternet, 49 + .networkConnectionLost, 50 + .cannotConnectToHost, 51 + .cannotFindHost, 52 + .dnsLookupFailed, 53 + .dataNotAllowed, 54 + .internationalRoamingOff: 55 + return true 56 + default: 57 + return false 58 + } 59 + } 60 + let nsError = error as NSError 61 + if nsError.domain == NSURLErrorDomain { 62 + return [-1009, -1005, -1004, -1003, -1020, -1018].contains(nsError.code) 63 + } 64 + return false 65 + } 66 + 67 + /// Server-transient errors: worth retrying with backoff. 32 68 private func isTransient(_ error: Error) -> Bool { 33 - let message = String(describing: error).lowercased() 34 - let transientPatterns = [ 35 - "timeout", "econnreset", "econnrefused", "epipe", 36 - "socket", "network", "fetch failed", 37 - ] 38 - return transientPatterns.contains { message.contains($0) } 69 + // URLSession timeout (server-side) 70 + if let urlError = error as? URLError { 71 + switch urlError.code { 72 + case .timedOut, .secureConnectionFailed: 73 + return true 74 + default: 75 + return false 76 + } 77 + } 78 + 79 + // S3 HTTP-level transient errors 80 + if let s3Error = error as? S3ClientError { 81 + switch s3Error { 82 + case let .httpError(status, _): 83 + return [408, 429, 500, 502, 503, 504].contains(status) 84 + case let .s3Error(code, _): 85 + return [ 86 + "SlowDown", 87 + "ServiceUnavailable", 88 + "InternalError", 89 + "RequestTimeout", 90 + ].contains(code) 91 + case .unexpectedResponse: 92 + return false 93 + } 94 + } 95 + 96 + // NSError fallback for bridged errors 97 + let nsError = error as NSError 98 + if nsError.domain == NSURLErrorDomain { 99 + return [-1001].contains(nsError.code) // timedOut 100 + } 101 + 102 + return false 39 103 }
+97 -2
Tests/AtticCoreTests/BackupPipelineTests.swift
··· 57 57 struct TimeoutExportProvider: ExportProviding { 58 58 let inner: MockExportProvider 59 59 let slowUUIDs: Set<String> 60 - let deferredRetrySucceeds: Bool 61 60 private let retryCounter = RetryCounter() 62 61 63 62 actor RetryCounter { ··· 299 298 let exporter = TimeoutExportProvider( 300 299 inner: inner, 301 300 slowUUIDs: ["slow-1"], 302 - deferredRetrySucceeds: true, 303 301 ) 304 302 let (s3, manifestStore) = try await createTestContext() 305 303 var manifest = try await manifestStore.load() ··· 318 316 #expect(manifest.isBackedUp("fast-1")) 319 317 #expect(manifest.isBackedUp("fast-2")) 320 318 #expect(manifest.isBackedUp("slow-1")) 319 + } 320 + 321 + @Test func concurrentUploadsAllAppearInManifest() async throws { 322 + // Create enough assets to exercise concurrency (more than default 6) 323 + let count = 12 324 + var assets: [AssetInfo] = [] 325 + var exportMap: [String: (filename: String, data: Data)] = [:] 326 + for i in 1 ... count { 327 + let uuid = "concurrent-\(i)" 328 + assets.append(makeTestAsset(uuid: uuid)) 329 + exportMap[uuid] = ("IMG_\(i).HEIC", Data("photo\(i)".utf8)) 330 + } 331 + 332 + let exporter = MockExportProvider(assets: exportMap) 333 + let (s3, manifestStore) = try await createTestContext() 334 + var manifest = try await manifestStore.load() 335 + 336 + let report = try await runBackup( 337 + assets: assets, 338 + manifest: &manifest, 339 + manifestStore: manifestStore, 340 + exporter: exporter, 341 + s3: s3, 342 + options: BackupOptions(batchSize: 12, concurrency: 4), 343 + ) 344 + 345 + #expect(report.uploaded == count) 346 + #expect(report.failed == 0) 347 + for i in 1 ... count { 348 + #expect(manifest.isBackedUp("concurrent-\(i)")) 349 + } 350 + } 351 + 352 + @Test func concurrentMixedSuccessAndFailure() async throws { 353 + let assets = [ 354 + makeTestAsset(uuid: "ok-1"), 355 + makeTestAsset(uuid: "ok-2"), 356 + makeTestAsset(uuid: "missing-1"), 357 + makeTestAsset(uuid: "ok-3"), 358 + makeTestAsset(uuid: "missing-2"), 359 + ] 360 + 361 + let exporter = MockExportProvider(assets: [ 362 + "ok-1": ("IMG_1.HEIC", Data("p1".utf8)), 363 + "ok-2": ("IMG_2.HEIC", Data("p2".utf8)), 364 + "ok-3": ("IMG_3.HEIC", Data("p3".utf8)), 365 + ]) 366 + let (s3, manifestStore) = try await createTestContext() 367 + var manifest = try await manifestStore.load() 368 + 369 + let report = try await runBackup( 370 + assets: assets, 371 + manifest: &manifest, 372 + manifestStore: manifestStore, 373 + exporter: exporter, 374 + s3: s3, 375 + options: BackupOptions(batchSize: 10, concurrency: 3), 376 + ) 377 + 378 + #expect(report.uploaded == 3) 379 + #expect(report.failed == 2) 380 + #expect(manifest.isBackedUp("ok-1")) 381 + #expect(manifest.isBackedUp("ok-2")) 382 + #expect(manifest.isBackedUp("ok-3")) 383 + #expect(!manifest.isBackedUp("missing-1")) 384 + #expect(!manifest.isBackedUp("missing-2")) 385 + } 386 + 387 + @Test func concurrencyOneWorksLikeSequential() async throws { 388 + let assets = [ 389 + makeTestAsset(uuid: "seq-1"), 390 + makeTestAsset(uuid: "seq-2"), 391 + makeTestAsset(uuid: "seq-3"), 392 + ] 393 + 394 + let exporter = MockExportProvider(assets: [ 395 + "seq-1": ("IMG_1.HEIC", Data("p1".utf8)), 396 + "seq-2": ("IMG_2.HEIC", Data("p2".utf8)), 397 + "seq-3": ("IMG_3.HEIC", Data("p3".utf8)), 398 + ]) 399 + let (s3, manifestStore) = try await createTestContext() 400 + var manifest = try await manifestStore.load() 401 + 402 + let report = try await runBackup( 403 + assets: assets, 404 + manifest: &manifest, 405 + manifestStore: manifestStore, 406 + exporter: exporter, 407 + s3: s3, 408 + options: BackupOptions(batchSize: 10, concurrency: 1), 409 + ) 410 + 411 + #expect(report.uploaded == 3) 412 + #expect(report.failed == 0) 413 + #expect(manifest.isBackedUp("seq-1")) 414 + #expect(manifest.isBackedUp("seq-2")) 415 + #expect(manifest.isBackedUp("seq-3")) 321 416 } 322 417 323 418 @Test func savesManifestToS3() async throws {
+184 -18
Tests/AtticCoreTests/NetworkPauseTests.swift
··· 50 50 } 51 51 } 52 52 53 - enum NetworkError: Error, CustomStringConvertible { 54 - case networkDown 53 + /// Throws a real URLError so typed isNetworkDown() detection works end-to-end. 54 + enum NetworkError { 55 + static let networkDown = URLError(.notConnectedToInternet) 56 + } 57 + 58 + /// S3 provider that fails with a server-transient error (503) a fixed number of 59 + /// times, then succeeds. Used to verify withRetry handles transient errors 60 + /// without triggering network pause. 61 + actor TransientFailingS3Provider: S3Providing { 62 + private let inner = MockS3Provider() 63 + private var remainingFailures: Int 64 + 65 + init(failCount: Int) { 66 + remainingFailures = failCount 67 + } 68 + 69 + private func maybeThrow() throws { 70 + if remainingFailures > 0 { 71 + remainingFailures -= 1 72 + throw S3ClientError.httpError(503, "Service Unavailable") 73 + } 74 + } 75 + 76 + func putObject(key: String, body: Data, contentType: String?) async throws { 77 + try maybeThrow() 78 + try await inner.putObject(key: key, body: body, contentType: contentType) 79 + } 55 80 56 - var description: String { 57 - // Use "nsurlerrordomain" — recognized by isTransientUploadError in 58 - // BackupPipeline but NOT by withRetry's isTransient patterns, so 59 - // withRetry throws immediately without sleeping through retries. 60 - "NSURLErrorDomain Code=-1009" 81 + func putObject(key: String, fileURL: URL, contentType: String?) async throws { 82 + try maybeThrow() 83 + try await inner.putObject(key: key, fileURL: fileURL, contentType: contentType) 84 + } 85 + 86 + func getObject(key: String) async throws -> Data { 87 + try await inner.getObject(key: key) 88 + } 89 + 90 + func headObject(key: String) async throws -> S3ObjectMeta? { 91 + try await inner.headObject(key: key) 92 + } 93 + 94 + func listObjects(prefix: String) async throws -> [S3ListObject] { 95 + try await inner.listObjects(prefix: prefix) 61 96 } 62 97 } 63 98 ··· 152 187 ]) 153 188 let (s3, manifestStore) = try await createTestContext() 154 189 var manifest = try await manifestStore.load() 155 - let monitor = MockNetworkMonitor(available: true) 156 190 157 191 let report = try await runBackup( 158 192 assets: assets, ··· 161 195 exporter: exporter, 162 196 s3: s3, 163 197 options: BackupOptions(batchSize: 10), 164 - networkMonitor: monitor, 165 198 ) 166 199 167 200 #expect(report.uploaded == 1) ··· 176 209 177 210 // S3 provider that fails on first put (simulating network loss) 178 211 let s3 = NetworkFailingS3Provider(failAfterPuts: 0) 179 - let manifestStore = S3ManifestStore(s3: s3) 212 + // Use a working S3 for manifest so saves don't fail during pause 213 + let goodS3 = MockS3Provider() 214 + let manifestStore = S3ManifestStore(s3: goodS3) 180 215 var manifest = try await manifestStore.load() 181 216 182 217 let monitor = MockNetworkMonitor(available: false) ··· 232 267 manifestStore: manifestStore, 233 268 exporter: exporter, 234 269 s3: failingS3, 235 - options: BackupOptions(batchSize: 10, networkTimeout: .milliseconds(100)), 270 + options: BackupOptions(batchSize: 10, networkTimeout: .milliseconds(200)), 236 271 progress: progress, 237 - networkMonitor: monitor, 272 + networkMonitor: AlwaysUnavailableNetworkMonitor(), 238 273 ) 239 274 240 275 #expect(progress.events.contains("paused")) ··· 242 277 #expect(report.failed >= 1) 243 278 } 244 279 245 - @Test func cancellationDuringNetworkWaitExitsCleanly() async throws { 280 + @Test(.timeLimit(.minutes(1))) 281 + func cancellationDuringNetworkWaitExitsCleanly() async throws { 246 282 let assets = [makeTestAsset(uuid: "uuid-1")] 247 283 let exporter = MockExportProvider(assets: [ 248 284 "uuid-1": ("IMG_0001.HEIC", Data("photo1".utf8)), 249 285 ]) 250 286 251 287 let s3 = NetworkFailingS3Provider(failAfterPuts: 0) 252 - let manifestStore = S3ManifestStore(s3: s3) 288 + let goodS3 = MockS3Provider() 289 + let manifestStore = S3ManifestStore(s3: goodS3) 253 290 var manifest = try await manifestStore.load() 254 - 291 + // Network never recovers — cancellation should interrupt the wait 255 292 let monitor = AlwaysUnavailableNetworkMonitor() 256 293 257 294 let task = Task { ··· 261 298 manifestStore: manifestStore, 262 299 exporter: exporter, 263 300 s3: s3, 264 - options: BackupOptions(batchSize: 10, networkTimeout: .seconds(30)), 301 + options: BackupOptions(batchSize: 10, networkTimeout: .seconds(60)), 265 302 networkMonitor: monitor, 266 303 ) 267 304 } 268 305 269 - // Cancel after a brief delay 270 - try await Task.sleep(for: .milliseconds(200)) 306 + // Cancel after a brief delay (enough for upload to fail and pause to start) 307 + try await Task.sleep(for: .milliseconds(300)) 271 308 task.cancel() 272 309 273 310 // Should throw CancellationError ··· 279 316 } catch { 280 317 Issue.record("Expected CancellationError, got \(error)") 281 318 } 319 + } 320 + 321 + @Test func concurrentBatchRetriesAfterNetworkRecovery() async throws { 322 + // 6 assets: first 2 puts succeed (1 asset = 2 puts: original + metadata), 323 + // then network drops. After recovery, remaining assets should succeed. 324 + let assets = (1 ... 4).map { makeTestAsset(uuid: "net-\($0)") } 325 + var exportMap: [String: (filename: String, data: Data)] = [:] 326 + for i in 1 ... 4 { 327 + exportMap["net-\(i)"] = ("IMG_\(i).HEIC", Data("photo\(i)".utf8)) 328 + } 329 + 330 + let exporter = MockExportProvider(assets: exportMap) 331 + // failAfterPuts: 2 means first asset's 2 puts succeed, then failures start 332 + let s3 = NetworkFailingS3Provider(failAfterPuts: 2) 333 + let goodS3 = MockS3Provider() 334 + let manifestStore = S3ManifestStore(s3: goodS3) 335 + var manifest = try await manifestStore.load() 336 + 337 + let monitor = MockNetworkMonitor(available: false) 338 + let progress = RecordingProgressDelegate() 339 + 340 + // Simulate network recovery 341 + Task { 342 + try await Task.sleep(for: .milliseconds(150)) 343 + await s3.stopFailing() 344 + await monitor.setAvailable() 345 + } 346 + 347 + let report = try await runBackup( 348 + assets: assets, 349 + manifest: &manifest, 350 + manifestStore: manifestStore, 351 + exporter: exporter, 352 + s3: s3, 353 + options: BackupOptions(batchSize: 10, concurrency: 4), 354 + progress: progress, 355 + networkMonitor: monitor, 356 + ) 357 + 358 + // First asset succeeded before network drop, rest retried after recovery 359 + #expect(report.uploaded >= 1) 360 + #expect(report.failed == 0) 361 + #expect(progress.events.contains("paused")) 362 + #expect(progress.events.contains("resumed")) 363 + // All assets should be in manifest 364 + for i in 1 ... 4 { 365 + #expect(manifest.isBackedUp("net-\(i)")) 366 + } 367 + } 368 + 369 + @Test(.timeLimit(.minutes(1))) 370 + func maxPauseRetriesExceededExitsCleanly() async throws { 371 + let assets = [makeTestAsset(uuid: "uuid-1")] 372 + let exporter = MockExportProvider(assets: [ 373 + "uuid-1": ("IMG_0001.HEIC", Data("photo1".utf8)), 374 + ]) 375 + 376 + // S3 always fails, network monitor always says unavailable after check 377 + let s3 = NetworkFailingS3Provider(failAfterPuts: 0) 378 + let goodS3 = MockS3Provider() 379 + let manifestStore = S3ManifestStore(s3: goodS3) 380 + var manifest = try await manifestStore.load() 381 + let progress = RecordingProgressDelegate() 382 + 383 + // Monitor that briefly recovers (so waitForNetwork returns true) 384 + // but S3 keeps failing — triggers repeated pause/retry cycles 385 + let monitor = MockNetworkMonitor(available: false) 386 + 387 + // Recover quickly so the pause/retry loop cycles through maxPauseRetries 388 + Task { 389 + // Keep toggling: network "recovers" but S3 still fails 390 + for _ in 0 ..< 5 { 391 + try await Task.sleep(for: .milliseconds(50)) 392 + await monitor.setAvailable() 393 + try await Task.sleep(for: .milliseconds(50)) 394 + await monitor.setUnavailable() 395 + } 396 + } 397 + 398 + let report = try await runBackup( 399 + assets: assets, 400 + manifest: &manifest, 401 + manifestStore: manifestStore, 402 + exporter: exporter, 403 + s3: s3, 404 + options: BackupOptions( 405 + batchSize: 10, 406 + networkTimeout: .milliseconds(200), 407 + maxPauseRetries: 2, 408 + ), 409 + progress: progress, 410 + networkMonitor: monitor, 411 + ) 412 + 413 + // Should eventually fail after exhausting retries 414 + #expect(report.failed >= 1) 415 + #expect(report.uploaded == 0) 416 + } 417 + 418 + @Test func serverTransientErrorDoesNotTriggerNetworkPause() async throws { 419 + // A 503 error should be retried by withRetry, not trigger network pause 420 + let assets = [makeTestAsset(uuid: "uuid-1")] 421 + let exporter = MockExportProvider(assets: [ 422 + "uuid-1": ("IMG_0001.HEIC", Data("photo1".utf8)), 423 + ]) 424 + 425 + let s3 = TransientFailingS3Provider(failCount: 1) 426 + let goodS3 = MockS3Provider() 427 + let manifestStore = S3ManifestStore(s3: goodS3) 428 + var manifest = try await manifestStore.load() 429 + 430 + let monitor = MockNetworkMonitor(available: true) 431 + let progress = RecordingProgressDelegate() 432 + 433 + let report = try await runBackup( 434 + assets: assets, 435 + manifest: &manifest, 436 + manifestStore: manifestStore, 437 + exporter: exporter, 438 + s3: s3, 439 + options: BackupOptions(batchSize: 10), 440 + progress: progress, 441 + networkMonitor: monitor, 442 + ) 443 + 444 + // Should succeed after retry, no pause 445 + #expect(report.uploaded == 1) 446 + #expect(report.failed == 0) 447 + #expect(!progress.events.contains("paused")) 282 448 } 283 449 284 450 @Test func mockNetworkMonitorBasicBehavior() async throws {