Keep using Photos.app like you always do. Attic quietly backs up your originals and edits to an S3 bucket you control. One-way, append-only.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: simplify pipeline, drop legacy retry codec and UUID normalization

- RetryQueue: drop legacy failedUUIDs decoder, custom Codable, and
failedUUIDs initializer. Synthesized via compiler-generated Codable now.
- BackupPipeline: check classification == .permanentlyUnavailable directly
(not legacy unavailable bool). Drop normalizeUUID — ladder now returns
bare UUIDs at source.
- BackupPipeline: extract filterPending, exportBatchWithFallback, and
finalizeBackup. Drop dead ExportProviderError.isPermission catch
(permission is pre-flight only, never raised during exportBatch).
- BackupUpload: convert network-pause retry from recursion to loop.
- LadderKitExportProvider: translate AppleScriptError into
ExportProviderError.permissionDenied / .timeout.

+290 -384
+12 -1
Sources/AtticCLI/LadderKitExportProvider.swift
··· 32 32 } 33 33 34 34 func checkPermissions() async throws { 35 - try await exporter.checkPermissions() 35 + do { 36 + try await exporter.checkPermissions() 37 + } catch AppleScriptError.automationPermissionDenied { 38 + throw ExportProviderError.permissionDenied( 39 + AppleScriptError.automationPermissionDenied.localizedDescription 40 + ) 41 + } catch let err as AppleScriptError { 42 + if case .timeout(_, let seconds) = err { 43 + throw ExportProviderError.timeout(seconds: Int(seconds)) 44 + } 45 + throw err 46 + } 36 47 } 37 48 }
+161 -153
Sources/AtticCore/BackupPipeline.swift
··· 51 51 } 52 52 } 53 53 54 - // swiftlint:disable cyclomatic_complexity function_body_length 54 + /// Filter `assets` to what this run should attempt: pending (not backed up, 55 + /// not known-unavailable), optionally restricted by type, limited to 56 + /// `options.limit`, with retry-queue UUIDs partitioned to the front. 57 + func filterPending( 58 + assets: [AssetInfo], 59 + manifest: Manifest, 60 + unavailable: UnavailableAssets, 61 + retryQueue: RetryQueue?, 62 + options: BackupOptions, 63 + ) -> [AssetInfo] { 64 + var pending = assets.filter { asset in 65 + if manifest.isBackedUp(asset.uuid) { return false } 66 + if unavailable.contains(asset.uuid) { return false } 67 + if let type = options.type, asset.kind != type { return false } 68 + return true 69 + } 70 + 71 + if let retryUUIDs = retryQueue?.failedUUIDs { 72 + let retrySet = Set(retryUUIDs) 73 + _ = pending.partition { !retrySet.contains($0.uuid) } 74 + } 75 + 76 + if options.limit > 0 { 77 + pending = Array(pending.prefix(options.limit)) 78 + } 79 + 80 + return pending 81 + } 82 + 83 + /// Export a batch. On batch timeout, fall back to per-asset exports and 84 + /// track UUIDs that still time out in `deferred` for a final retry pass. 85 + /// Returns the combined response (reclaimed + freshly exported). 86 + func exportBatchWithFallback( 87 + uuids: [String], 88 + reclaimed: [ExportResult], 89 + exporter: any ExportProviding, 90 + deferred: inout [String], 91 + assetByUUID: [String: AssetInfo], 92 + report: inout BackupReport, 93 + progress: any BackupProgressDelegate, 94 + ) async throws -> ExportResponse { 95 + if uuids.isEmpty { 96 + return ExportResponse(results: reclaimed, errors: []) 97 + } 98 + 99 + do { 100 + let exported = try await exporter.exportBatch(uuids: uuids) 101 + return ExportResponse( 102 + results: reclaimed + exported.results, 103 + errors: exported.errors, 104 + ) 105 + } catch let error as ExportProviderError where error.isTimeout { 106 + // Batch timeout: retry each asset individually 107 + var results = reclaimed 108 + var errors: [LadderKit.ExportError] = [] 109 + for uuid in uuids { 110 + try Task.checkCancellation() 111 + do { 112 + let single = try await exporter.exportBatch(uuids: [uuid]) 113 + results.append(contentsOf: single.results) 114 + errors.append(contentsOf: single.errors) 115 + } catch let innerError as ExportProviderError where innerError.isTimeout { 116 + deferred.append(uuid) 117 + } catch { 118 + let msg = String(describing: error) 119 + report.appendError(uuid: uuid, message: msg) 120 + report.failed += 1 121 + let filename = assetByUUID[uuid]?.originalFilename ?? uuid 122 + progress.assetFailed(uuid: uuid, filename: filename, message: msg) 123 + } 124 + } 125 + return ExportResponse(results: results, errors: errors) 126 + } 127 + } 128 + 129 + // swiftlint:disable function_body_length 55 130 /// Run the backup pipeline: filter → batch → export → upload → manifest. 56 131 public func runBackup( 57 132 assets: [AssetInfo], ··· 68 143 ) async throws -> BackupReport { 69 144 var unavailable = unavailableStore?.load() ?? UnavailableAssets() 70 145 71 - // Filter to pending assets, optionally by type 72 - var pending = assets.filter { asset in 73 - if manifest.isBackedUp(asset.uuid) { return false } 74 - if unavailable.contains(asset.uuid) { return false } 75 - if let type = options.type, asset.kind != type { return false } 76 - return true 77 - } 78 - 79 - // Partition retry-queue UUIDs to the front so failed assets are retried first 80 - if let retryUUIDs = retryQueue?.load()?.failedUUIDs { 81 - let retrySet = Set(retryUUIDs) 82 - _ = pending.partition { !retrySet.contains($0.uuid) } 83 - } 84 - 85 - // Apply limit 86 - if options.limit > 0 { 87 - pending = Array(pending.prefix(options.limit)) 88 - } 146 + let pending = filterPending( 147 + assets: assets, 148 + manifest: manifest, 149 + unavailable: unavailable, 150 + retryQueue: retryQueue?.load(), 151 + options: options, 152 + ) 89 153 90 154 if pending.isEmpty { 91 155 return BackupReport() ··· 96 160 for asset in pending { 97 161 if asset.kind == .photo { photoCount += 1 } else { videoCount += 1 } 98 162 } 99 - 100 163 progress.backupStarted(pending: pending.count, photos: photoCount, videos: videoCount) 101 164 102 165 if options.dryRun { ··· 105 168 return report 106 169 } 107 170 108 - // Build UUID-to-asset lookup 109 171 let assetByUUID = Dictionary(uniqueKeysWithValues: pending.map { ($0.uuid, $0) }) 110 172 111 173 var report = BackupReport() ··· 116 178 // retry queue is written. 117 179 var failureClassifications: [String: ExportClassification] = [:] 118 180 119 - func normalizeUUID(_ uuid: String) -> String { 120 - uuid.split(separator: "/").first.map(String.init) ?? uuid 121 - } 122 - 123 181 let ctx = UploadContext( 124 182 assetByUUID: assetByUUID, 125 183 s3: s3, ··· 132 190 maxPauseRetries: options.maxPauseRetries, 133 191 ) 134 192 135 - // Helper: record a classified "unavailable" error. Returns true if the 136 - // error was an unavailable-marker (already tracked; should not be retried). 137 - // LadderKit may report errors using the full PhotoKit identifier 138 - // ("UUID/L0/001"); normalize to bare UUID so `pending` filter matches. 139 - func recordIfUnavailable(_ err: LadderKit.ExportError) -> Bool { 140 - guard err.unavailable else { return false } 141 - let bareUUID = err.uuid.split(separator: "/").first.map(String.init) ?? err.uuid 142 - let asset = assetByUUID[bareUUID] ?? assetByUUID[err.uuid] 143 - unavailable.record( 144 - uuid: bareUUID, 145 - filename: asset?.originalFilename, 146 - reason: err.message, 147 - ) 148 - return true 193 + func recordFailures(_ errors: [LadderKit.ExportError]) { 194 + for err in errors { 195 + failureClassifications[err.uuid] = err.classification 196 + if err.classification == .permanentlyUnavailable { 197 + unavailable.record( 198 + uuid: err.uuid, 199 + filename: assetByUUID[err.uuid]?.originalFilename, 200 + reason: err.message, 201 + ) 202 + } 203 + } 149 204 } 150 205 151 - // Process in batches (wrapped to save manifest on cancellation) 152 206 let totalBatches = (pending.count + options.batchSize - 1) / options.batchSize 153 207 154 - // Emit an initial concurrency limit for UIs that want to show it, then 155 - // re-emit between batches whenever the AIMD controller adjusts. 208 + // Emit initial and between-batch concurrency limit updates. 156 209 var lastEmittedLimit: Int? 157 210 if let controller = adaptiveController { 158 211 let limit = await controller.currentLimit() ··· 183 236 assetCount: batch.count, 184 237 ) 185 238 186 - // 1. Reclaim previously-staged files, then export the rest via LadderKit 187 239 var reclaimedResults: [ExportResult] = [] 188 240 var uuidsToExport = batchUUIDs 189 241 if let stagingDir = options.stagingDir { ··· 192 244 uuidsToExport = reclaim.remaining 193 245 } 194 246 195 - let batchResult: ExportResponse 196 - if uuidsToExport.isEmpty { 197 - batchResult = ExportResponse(results: reclaimedResults, errors: []) 198 - } else { 199 - do { 200 - let exported = try await exporter.exportBatch(uuids: uuidsToExport) 201 - batchResult = ExportResponse( 202 - results: reclaimedResults + exported.results, 203 - errors: exported.errors, 204 - ) 205 - } catch let error as ExportProviderError where error.isTimeout { 206 - // Batch timeout: retry each asset individually 207 - var combinedResults: [ExportResult] = reclaimedResults 208 - var combinedErrors: [LadderKit.ExportError] = [] 209 - for uuid in uuidsToExport { 210 - try Task.checkCancellation() 211 - do { 212 - let result = try await exporter.exportBatch(uuids: [uuid]) 213 - combinedResults.append(contentsOf: result.results) 214 - combinedErrors.append(contentsOf: result.errors) 215 - } catch let innerError as ExportProviderError where innerError.isTimeout { 216 - deferred.append(uuid) 217 - } catch { 218 - let msg = String(describing: error) 219 - report.appendError(uuid: uuid, message: msg) 220 - report.failed += 1 221 - let filename = assetByUUID[uuid]?.originalFilename ?? uuid 222 - progress.assetFailed(uuid: uuid, filename: filename, message: msg) 223 - } 224 - } 225 - for err in combinedErrors { 226 - failureClassifications[normalizeUUID(err.uuid)] = err.classification 227 - _ = recordIfUnavailable(err) 228 - } 229 - let combined = ExportResponse(results: combinedResults, errors: combinedErrors) 230 - try await uploadExported( 231 - combined, ctx: ctx, 232 - manifest: &manifest, report: &report, 233 - sinceLastSave: &sinceLastSave, 234 - ) 235 - continue 236 - } catch let error as ExportProviderError where error.isPermission { 237 - // Permission error: abort all remaining batches 238 - let msg = String(describing: error) 239 - for uuid in batchUUIDs { 240 - report.appendError(uuid: uuid, message: msg) 241 - report.failed += 1 242 - } 243 - for asset in pending[end...] { 244 - report.appendError(uuid: asset.uuid, message: msg) 245 - report.failed += 1 246 - } 247 - break 248 - } catch { 249 - // Non-timeout error: fail the whole batch 250 - let msg = String(describing: error) 251 - for uuid in batchUUIDs { 252 - report.appendError(uuid: uuid, message: msg) 253 - report.failed += 1 254 - let filename = assetByUUID[uuid]?.originalFilename ?? uuid 255 - progress.assetFailed(uuid: uuid, filename: filename, message: msg) 256 - } 257 - continue 258 - } 259 - } 247 + let batchResult = try await exportBatchWithFallback( 248 + uuids: uuidsToExport, 249 + reclaimed: reclaimedResults, 250 + exporter: exporter, 251 + deferred: &deferred, 252 + assetByUUID: assetByUUID, 253 + report: &report, 254 + progress: progress, 255 + ) 260 256 261 - for err in batchResult.errors { 262 - failureClassifications[normalizeUUID(err.uuid)] = err.classification 263 - _ = recordIfUnavailable(err) 264 - } 257 + recordFailures(batchResult.errors) 265 258 266 - // 2. Upload exported assets 267 259 try await uploadExported( 268 260 batchResult, ctx: ctx, 269 261 manifest: &manifest, report: &report, ··· 271 263 ) 272 264 } 273 265 274 - // Retry deferred assets 275 - if !deferred.isEmpty { 276 - for uuid in deferred { 277 - try Task.checkCancellation() 278 - do { 279 - let result = try await exporter.exportBatch(uuids: [uuid]) 280 - for err in result.errors { 281 - failureClassifications[normalizeUUID(err.uuid)] = err.classification 282 - _ = recordIfUnavailable(err) 283 - } 284 - try await uploadExported( 285 - result, ctx: ctx, 286 - manifest: &manifest, report: &report, 287 - sinceLastSave: &sinceLastSave, 288 - ) 289 - } catch { 290 - let msg = String(describing: error) 291 - report.appendError(uuid: uuid, message: msg) 292 - report.failed += 1 293 - let filename = assetByUUID[uuid]?.originalFilename ?? uuid 294 - progress.assetFailed(uuid: uuid, filename: filename, message: msg) 295 - } 266 + // Retry deferred assets (single-asset timeouts from batch fallback) 267 + for uuid in deferred { 268 + try Task.checkCancellation() 269 + do { 270 + let result = try await exporter.exportBatch(uuids: [uuid]) 271 + recordFailures(result.errors) 272 + try await uploadExported( 273 + result, ctx: ctx, 274 + manifest: &manifest, report: &report, 275 + sinceLastSave: &sinceLastSave, 276 + ) 277 + } catch { 278 + let msg = String(describing: error) 279 + report.appendError(uuid: uuid, message: msg) 280 + report.failed += 1 281 + let filename = assetByUUID[uuid]?.originalFilename ?? uuid 282 + progress.assetFailed(uuid: uuid, filename: filename, message: msg) 296 283 } 297 284 } 298 285 } catch is CancellationError { ··· 305 292 throw CancellationError() 306 293 } 307 294 308 - // Final save 295 + try await finalizeBackup( 296 + manifest: manifest, 297 + manifestStore: manifestStore, 298 + sinceLastSave: sinceLastSave, 299 + unavailable: unavailable, 300 + unavailableStore: unavailableStore, 301 + retryQueue: retryQueue, 302 + report: report, 303 + pending: pending, 304 + failureClassifications: failureClassifications, 305 + progress: progress, 306 + ) 307 + 308 + return report 309 + } 310 + 311 + // swiftlint:enable function_body_length 312 + 313 + /// Persist manifest, unavailable set, and retry queue at the end of a run. 314 + private func finalizeBackup( 315 + manifest: Manifest, 316 + manifestStore: any ManifestStoring, 317 + sinceLastSave: Int, 318 + unavailable: UnavailableAssets, 319 + unavailableStore: (any UnavailableAssetStoring)?, 320 + retryQueue: (any RetryQueueProviding)?, 321 + report: BackupReport, 322 + pending: [AssetInfo], 323 + failureClassifications: [String: ExportClassification], 324 + progress: any BackupProgressDelegate, 325 + ) async throws { 309 326 if sinceLastSave > 0 { 310 327 try await manifestStore.save(manifest) 311 328 progress.manifestSaved(entriesCount: manifest.entries.count) 312 329 } 313 330 314 - // Persist unavailable set so these assets are skipped on future runs. 315 331 do { 316 332 try unavailableStore?.save(unavailable) 317 333 } catch { 318 334 debugPrint("Failed to save unavailable assets store: \(error)") 319 335 } 320 336 321 - // Update retry queue: merge this run's outcome into the previous queue so 322 - // attempt counts and firstFailedAt survive across runs. UUIDs we just 323 - // marked unavailable are excluded — retrying them is futile. UUIDs in 324 - // the prior queue that weren't attempted this run (cut off by --limit) 325 - // are preserved so their history doesn't get wiped. 326 - let retryableErrors = report.errors.filter { !unavailable.contains(normalizeUUID($0.uuid)) } 327 - let now = formatISO8601(Date()) 337 + // Merge this run's failures into the retry queue. UUIDs marked unavailable 338 + // are excluded (retrying is futile). UUIDs in the prior queue that weren't 339 + // attempted this run (cut off by --limit) are preserved. 340 + let retryableErrors = report.errors.filter { !unavailable.contains($0.uuid) } 328 341 let attempted = Set(pending.map(\.uuid)) 329 342 let failures: [FailureRecord] = retryableErrors.map { entry in 330 - let bare = normalizeUUID(entry.uuid) 331 - return FailureRecord( 332 - uuid: bare, 333 - classification: failureClassifications[bare] ?? .other, 343 + FailureRecord( 344 + uuid: entry.uuid, 345 + classification: failureClassifications[entry.uuid] ?? .other, 334 346 message: entry.message, 335 347 ) 336 348 } ··· 338 350 previous: retryQueue?.load(), 339 351 attempted: attempted, 340 352 failures: failures, 341 - now: now, 353 + now: formatISO8601(Date()), 342 354 ) 343 355 do { 344 356 if merged.entries.isEmpty { ··· 355 367 failed: report.failed, 356 368 totalBytes: report.totalBytes, 357 369 ) 358 - 359 - return report 360 370 } 361 - 362 - // swiftlint:enable cyclomatic_complexity function_body_length
+104 -129
Sources/AtticCore/BackupUpload.swift
··· 1 1 import Foundation 2 2 import LadderKit 3 3 4 - /// Bundles the non-mutating dependencies for uploadExported, reducing parameter count 5 - /// and making the recursive retry call less error-prone. 4 + /// Bundles the non-mutating dependencies for uploadExported. 6 5 struct UploadContext { 7 6 let assetByUUID: [String: AssetInfo] 8 7 let s3: any S3Providing ··· 23 22 manifest: inout Manifest, 24 23 report: inout BackupReport, 25 24 sinceLastSave: inout Int, 26 - pauseRetryCount: Int = 0, 27 25 ) async throws { 28 - // Record export errors. LadderKit reports full PhotoKit identifiers 29 - // ("UUID/L0/001"); normalize to bare UUID so retry partitioning and the 30 - // assetByUUID lookup line up with the pending list. 26 + // Record export errors for reporting (failures handled upstream by callers). 31 27 for err in batchResult.errors { 32 - let bareUUID = err.uuid.split(separator: "/").first.map(String.init) ?? err.uuid 33 - let asset = ctx.assetByUUID[bareUUID] ?? ctx.assetByUUID[err.uuid] 34 - let filename = asset?.originalFilename ?? bareUUID 35 - ctx.progress.assetFailed(uuid: bareUUID, filename: filename, message: err.message) 36 - report.appendError(uuid: bareUUID, message: err.message) 28 + let filename = ctx.assetByUUID[err.uuid]?.originalFilename ?? err.uuid 29 + ctx.progress.assetFailed(uuid: err.uuid, filename: filename, message: err.message) 30 + report.appendError(uuid: err.uuid, message: err.message) 37 31 report.failed += 1 38 32 } 39 33 ··· 59 53 // Track inputs by UUID for retry lookup 60 54 let inputByUUID = Dictionary(uniqueKeysWithValues: inputs.map { ($0.exported.uuid, $0) }) 61 55 62 - // Concurrent uploads with bounded TaskGroup 56 + // Concurrent uploads. On a network-down failure we drain the current pass, 57 + // wait for recovery, then loop with the queued retry set as the next pass. 63 58 let effectiveConcurrency = max(1, ctx.concurrency) 64 - var networkPaused = false 65 - var retryInputs: [UploadInput] = [] 66 - var retryUUIDs: Set<String> = [] 59 + var passInputs = inputs 60 + var pauseRetryCount = 0 67 61 68 - try await withThrowingTaskGroup(of: UploadResult.self) { group in 69 - var cursor = 0 62 + while !passInputs.isEmpty { 63 + var networkPaused = false 64 + var retryInputs: [UploadInput] = [] 65 + var retryUUIDs: Set<String> = [] 66 + let inputByUUID = Dictionary(uniqueKeysWithValues: passInputs.map { ($0.exported.uuid, $0) }) 70 67 71 - // Seed initial tasks 72 - for _ in 0 ..< min(effectiveConcurrency, inputs.count) { 73 - let input = inputs[cursor] 74 - cursor += 1 75 - ctx.progress.assetStarting( 76 - uuid: input.asset.uuid, 77 - filename: input.asset.originalFilename ?? "unknown", 78 - size: actualFileSize(input), 79 - ) 80 - group.addTask { 81 - await uploadSingleAsset(input: input, s3: ctx.s3, progress: ctx.progress) 82 - } 83 - } 68 + try await withThrowingTaskGroup(of: UploadResult.self) { group in 69 + var cursor = 0 84 70 85 - // Process results and enqueue next 86 - for try await result in group { 87 - try Task.checkCancellation() 88 - 89 - if let checksum = result.checksum, result.error == nil { 90 - manifest.markBackedUp( 91 - uuid: result.uuid, 92 - s3Key: result.s3Key, 93 - checksum: checksum, 94 - size: result.size, 95 - ) 96 - sinceLastSave += 1 97 - report.uploaded += 1 98 - report.totalBytes += result.size 99 - ctx.progress.assetUploaded( 100 - uuid: result.uuid, 101 - filename: result.filename, 102 - type: result.type, 103 - size: result.size, 104 - ) 105 - 106 - // Periodic manifest save 107 - if sinceLastSave >= ctx.saveInterval { 108 - do { 109 - try await ctx.manifestStore.save(manifest) 110 - ctx.progress.manifestSaved(entriesCount: manifest.entries.count) 111 - sinceLastSave = 0 112 - } catch { 113 - debugPrint("Periodic manifest save failed: \(error)") 114 - } 115 - } 116 - } else if result.isNetworkDownError, 117 - let monitor = ctx.networkMonitor, 118 - await !monitor.isNetworkAvailable 119 - { // swiftlint:disable:this opening_brace 120 - // Network-down failure: queue for retry after recovery 121 - networkPaused = true 122 - if let input = inputByUUID[result.uuid] { 123 - retryInputs.append(input) 124 - retryUUIDs.insert(result.uuid) 125 - } 126 - } else { 127 - // Permanent or non-network failure 128 - ctx.progress.assetFailed( 129 - uuid: result.uuid, 130 - filename: result.filename, 131 - message: result.error ?? "Unknown error", 132 - ) 133 - report.appendError(uuid: result.uuid, message: result.error ?? "Unknown error") 134 - report.failed += 1 135 - } 136 - 137 - // Clean up staged file (skip if queued for retry) 138 - if !retryUUIDs.contains(result.uuid) { 139 - try? FileManager.default.removeItem(atPath: result.path) 140 - } 141 - 142 - // Enqueue next upload (skip if network is down — let group drain) 143 - if !networkPaused, cursor < inputs.count { 144 - let input = inputs[cursor] 71 + for _ in 0 ..< min(effectiveConcurrency, passInputs.count) { 72 + let input = passInputs[cursor] 145 73 cursor += 1 146 74 ctx.progress.assetStarting( 147 75 uuid: input.asset.uuid, ··· 152 80 await uploadSingleAsset(input: input, s3: ctx.s3, progress: ctx.progress) 153 81 } 154 82 } 155 - } 83 + 84 + for try await result in group { 85 + try Task.checkCancellation() 86 + 87 + if let checksum = result.checksum, result.error == nil { 88 + manifest.markBackedUp( 89 + uuid: result.uuid, 90 + s3Key: result.s3Key, 91 + checksum: checksum, 92 + size: result.size, 93 + ) 94 + sinceLastSave += 1 95 + report.uploaded += 1 96 + report.totalBytes += result.size 97 + ctx.progress.assetUploaded( 98 + uuid: result.uuid, 99 + filename: result.filename, 100 + type: result.type, 101 + size: result.size, 102 + ) 156 103 157 - // After drain: queue any remaining un-enqueued inputs for retry 158 - if networkPaused { 159 - while cursor < inputs.count { 160 - retryInputs.append(inputs[cursor]) 161 - cursor += 1 104 + if sinceLastSave >= ctx.saveInterval { 105 + do { 106 + try await ctx.manifestStore.save(manifest) 107 + ctx.progress.manifestSaved(entriesCount: manifest.entries.count) 108 + sinceLastSave = 0 109 + } catch { 110 + debugPrint("Periodic manifest save failed: \(error)") 111 + } 112 + } 113 + } else if result.isNetworkDownError, 114 + let monitor = ctx.networkMonitor, 115 + await !monitor.isNetworkAvailable 116 + { // swiftlint:disable:this opening_brace 117 + networkPaused = true 118 + if let input = inputByUUID[result.uuid] { 119 + retryInputs.append(input) 120 + retryUUIDs.insert(result.uuid) 121 + } 122 + } else { 123 + ctx.progress.assetFailed( 124 + uuid: result.uuid, 125 + filename: result.filename, 126 + message: result.error ?? "Unknown error", 127 + ) 128 + report.appendError(uuid: result.uuid, message: result.error ?? "Unknown error") 129 + report.failed += 1 130 + } 131 + 132 + if !retryUUIDs.contains(result.uuid) { 133 + try? FileManager.default.removeItem(atPath: result.path) 134 + } 135 + 136 + if !networkPaused, cursor < passInputs.count { 137 + let input = passInputs[cursor] 138 + cursor += 1 139 + ctx.progress.assetStarting( 140 + uuid: input.asset.uuid, 141 + filename: input.asset.originalFilename ?? "unknown", 142 + size: actualFileSize(input), 143 + ) 144 + group.addTask { 145 + await uploadSingleAsset(input: input, s3: ctx.s3, progress: ctx.progress) 146 + } 147 + } 148 + } 149 + 150 + if networkPaused { 151 + while cursor < passInputs.count { 152 + retryInputs.append(passInputs[cursor]) 153 + cursor += 1 154 + } 162 155 } 163 156 } 164 - } 165 157 166 - // Network pause/resume: wait for recovery and retry 167 - if networkPaused, !retryInputs.isEmpty { 158 + guard networkPaused, !retryInputs.isEmpty else { return } 168 159 guard let monitor = ctx.networkMonitor else { return } 169 160 170 - // Save manifest before waiting (preserve progress) 161 + // Save progress before waiting 171 162 if sinceLastSave > 0 { 172 163 do { 173 164 try await ctx.manifestStore.save(manifest) ··· 183 174 ctx.progress.backupResumed() 184 175 185 176 if recovered, pauseRetryCount < ctx.maxPauseRetries { 186 - // Build a synthetic ExportResponse from retry inputs 187 - let retryResults = retryInputs.map(\.exported) 188 - let retryResponse = ExportResponse(results: retryResults, errors: []) 177 + pauseRetryCount += 1 178 + passInputs = retryInputs 179 + continue 180 + } 189 181 190 - do { 191 - try await uploadExported( 192 - retryResponse, ctx: ctx, 193 - manifest: &manifest, report: &report, 194 - sinceLastSave: &sinceLastSave, 195 - pauseRetryCount: pauseRetryCount + 1, 196 - ) 197 - } catch { 198 - // Clean up staged files before propagating 199 - for input in retryInputs { 200 - try? FileManager.default.removeItem(atPath: input.exported.path) 201 - } 202 - throw error 203 - } 204 - } else { 205 - // Timeout or max retries — record failures 206 - let reason = recovered 207 - ? "Max network pause retries exceeded" 208 - : "Network unavailable" 209 - for input in retryInputs { 210 - let filename = input.asset.originalFilename ?? input.exported.uuid 211 - ctx.progress.assetFailed(uuid: input.exported.uuid, filename: filename, message: reason) 212 - report.appendError(uuid: input.exported.uuid, message: reason) 213 - report.failed += 1 214 - try? FileManager.default.removeItem(atPath: input.exported.path) 215 - } 182 + // Timeout or max retries — record failures and clean up staged files. 183 + let reason = recovered ? "Max network pause retries exceeded" : "Network unavailable" 184 + for input in retryInputs { 185 + let filename = input.asset.originalFilename ?? input.exported.uuid 186 + ctx.progress.assetFailed(uuid: input.exported.uuid, filename: filename, message: reason) 187 + report.appendError(uuid: input.exported.uuid, message: reason) 188 + report.failed += 1 189 + try? FileManager.default.removeItem(atPath: input.exported.path) 216 190 } 191 + return 217 192 } 218 193 } 219 194
+1 -78
Sources/AtticCore/RetryQueue.swift
··· 26 26 self.lastFailedAt = lastFailedAt 27 27 self.lastMessage = lastMessage 28 28 } 29 - 30 - private enum CodingKeys: String, CodingKey { 31 - case uuid, classification, attempts, firstFailedAt, lastFailedAt, lastMessage 32 - } 33 - 34 - public init(from decoder: Decoder) throws { 35 - let container = try decoder.container(keyedBy: CodingKeys.self) 36 - uuid = try container.decode(String.self, forKey: .uuid) 37 - attempts = try container.decodeIfPresent(Int.self, forKey: .attempts) ?? 1 38 - firstFailedAt = try container.decodeIfPresent(String.self, forKey: .firstFailedAt) ?? "" 39 - lastFailedAt = try container.decodeIfPresent(String.self, forKey: .lastFailedAt) ?? firstFailedAt 40 - lastMessage = try container.decodeIfPresent(String.self, forKey: .lastMessage) 41 - classification = try container.decodeIfPresent( 42 - ExportClassification.self, forKey: .classification, 43 - ) ?? .other 44 - } 45 - 46 - public func encode(to encoder: Encoder) throws { 47 - var container = encoder.container(keyedBy: CodingKeys.self) 48 - try container.encode(uuid, forKey: .uuid) 49 - try container.encode(classification, forKey: .classification) 50 - try container.encode(attempts, forKey: .attempts) 51 - try container.encode(firstFailedAt, forKey: .firstFailedAt) 52 - try container.encode(lastFailedAt, forKey: .lastFailedAt) 53 - try container.encodeIfPresent(lastMessage, forKey: .lastMessage) 54 - } 55 29 } 56 30 57 31 /// Assets that failed in recent runs, persisted so the next run retries them ··· 65 39 self.updatedAt = updatedAt 66 40 } 67 41 68 - /// Convenience initializer for call sites that only care about UUIDs 69 - /// (mainly tests). Each UUID gets a fresh entry with attempts = 1. 70 - public init(failedUUIDs: [String], updatedAt: String) { 71 - self.entries = failedUUIDs.map { 72 - RetryEntry( 73 - uuid: $0, 74 - classification: .other, 75 - attempts: 1, 76 - firstFailedAt: updatedAt, 77 - lastFailedAt: updatedAt, 78 - ) 79 - } 80 - self.updatedAt = updatedAt 81 - } 82 - 83 42 /// UUIDs in insertion order. Used by the pipeline to partition pending 84 43 /// assets so failed ones are retried first. 85 44 public var failedUUIDs: [String] { 86 45 entries.map(\.uuid) 87 46 } 88 47 89 - private enum CodingKeys: String, CodingKey { 90 - case entries, failedUUIDs, updatedAt 91 - } 92 - 93 - public init(from decoder: Decoder) throws { 94 - let container = try decoder.container(keyedBy: CodingKeys.self) 95 - let decodedUpdatedAt = try container.decodeIfPresent(String.self, forKey: .updatedAt) ?? "" 96 - 97 - let decodedEntries: [RetryEntry] 98 - if let entries = try container.decodeIfPresent([RetryEntry].self, forKey: .entries) { 99 - decodedEntries = entries 100 - } else if let legacy = try container.decodeIfPresent([String].self, forKey: .failedUUIDs) { 101 - // Migrate from pre-beta.6 schema: `failedUUIDs: [String]`. 102 - decodedEntries = legacy.map { 103 - RetryEntry( 104 - uuid: $0, 105 - classification: .other, 106 - attempts: 1, 107 - firstFailedAt: decodedUpdatedAt, 108 - lastFailedAt: decodedUpdatedAt, 109 - ) 110 - } 111 - } else { 112 - decodedEntries = [] 113 - } 114 - 115 - updatedAt = decodedUpdatedAt 116 - entries = decodedEntries 117 - } 118 - 119 - public func encode(to encoder: Encoder) throws { 120 - var container = encoder.container(keyedBy: CodingKeys.self) 121 - try container.encode(entries, forKey: .entries) 122 - try container.encode(updatedAt, forKey: .updatedAt) 123 - } 124 - 125 48 /// Merge a run's outcome into a previous queue. 126 49 /// 127 50 /// - `attempted` is the set of bare UUIDs actually processed in this run. ··· 140 63 now: String, 141 64 ) -> RetryQueue { 142 65 let priorEntries = previous?.entries ?? [] 143 - let priorByUUID = Dictionary(uniqueKeysWithValues: priorEntries.map { ($0.uuid, $0) }) 66 + let priorByUUID = Dictionary(priorEntries.map { ($0.uuid, $0) }, uniquingKeysWith: { a, _ in a }) 144 67 145 68 // Carry forward prior entries we didn't attempt. 146 69 var entries = priorEntries.filter { !attempted.contains($0.uuid) }
+12 -23
Tests/AtticCoreTests/RetryQueueTests.swift
··· 19 19 #expect(store.load() == nil) 20 20 } 21 21 22 + private func makeEntry(_ uuid: String, at timestamp: String = "2025-01-15T12:00:00Z") -> RetryEntry { 23 + RetryEntry(uuid: uuid, firstFailedAt: timestamp, lastFailedAt: timestamp) 24 + } 25 + 22 26 @Test func saveAndLoadRoundTrip() throws { 23 27 let dir = try makeTempDir() 24 28 defer { try? FileManager.default.removeItem(at: dir) } 25 29 26 30 let store = FileRetryQueueStore(directory: dir) 27 31 let queue = RetryQueue( 28 - failedUUIDs: ["uuid-1", "uuid-2", "uuid-3"], 32 + entries: ["uuid-1", "uuid-2", "uuid-3"].map { makeEntry($0) }, 29 33 updatedAt: "2025-01-15T12:00:00Z", 30 34 ) 31 35 try store.save(queue) ··· 41 45 defer { try? FileManager.default.removeItem(at: dir) } 42 46 43 47 let store = FileRetryQueueStore(directory: dir) 44 - let queue = RetryQueue(failedUUIDs: ["uuid-1"], updatedAt: "2025-01-15T12:00:00Z") 48 + let queue = RetryQueue(entries: [makeEntry("uuid-1")], updatedAt: "2025-01-15T12:00:00Z") 45 49 try store.save(queue) 46 50 #expect(store.load() != nil) 47 51 ··· 62 66 defer { try? FileManager.default.removeItem(at: dir) } 63 67 64 68 let store = FileRetryQueueStore(directory: dir) 65 - try store.save(RetryQueue(failedUUIDs: ["old"], updatedAt: "2025-01-01T00:00:00Z")) 66 - try store.save(RetryQueue(failedUUIDs: ["new-1", "new-2"], updatedAt: "2025-01-02T00:00:00Z")) 69 + try store.save(RetryQueue(entries: [makeEntry("old")], updatedAt: "2025-01-01T00:00:00Z")) 70 + try store.save(RetryQueue( 71 + entries: [makeEntry("new-1"), makeEntry("new-2")], 72 + updatedAt: "2025-01-02T00:00:00Z", 73 + )) 67 74 68 75 let loaded = store.load() 69 76 #expect(loaded?.failedUUIDs == ["new-1", "new-2"]) 70 77 } 71 78 72 - @Test("Legacy `failedUUIDs: [String]` payload decodes into entries") 73 - func decodesLegacySchema() throws { 74 - let json = """ 75 - { 76 - "failedUUIDs": ["uuid-1", "uuid-2"], 77 - "updatedAt": "2025-01-15T12:00:00Z" 78 - } 79 - """ 80 - let data = Data(json.utf8) 81 - let queue = try JSONDecoder().decode(RetryQueue.self, from: data) 82 - 83 - #expect(queue.failedUUIDs == ["uuid-1", "uuid-2"]) 84 - #expect(queue.entries.count == 2) 85 - #expect(queue.entries[0].attempts == 1) 86 - #expect(queue.entries[0].classification == .other) 87 - #expect(queue.entries[0].firstFailedAt == "2025-01-15T12:00:00Z") 88 - } 89 - 90 79 @Test("New schema roundtrips with classification, attempts, and timestamps") 91 80 func roundtripsNewSchema() throws { 92 81 let entry = RetryEntry( ··· 146 135 @Test("`merged` drops attempted UUIDs that succeeded this run") 147 136 func mergedDropsResolvedUUIDs() { 148 137 let previous = RetryQueue( 149 - failedUUIDs: ["uuid-1", "uuid-2"], 138 + entries: [makeEntry("uuid-1", at: "2025-01-01T00:00:00Z"), makeEntry("uuid-2", at: "2025-01-01T00:00:00Z")], 150 139 updatedAt: "2025-01-01T00:00:00Z", 151 140 ) 152 141