A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: atproto rate limits

+82 -28
+82 -28
src/components/output/raw/atproto/element.js
··· 149 149 * @param {unknown} err 150 150 * @returns {boolean} 151 151 */ 152 + #isRateLimitError(err) { 153 + if (err instanceof ClientResponseError && err.status === 429) return true; 154 + if (err && typeof err === "object" && "cause" in err) { 155 + return this.#isRateLimitError(/** @type {any} */ (err).cause); 156 + } 157 + return false; 158 + } 159 + 160 + /** 161 + * Retry an async operation on rate-limit errors, respecting Retry-After. 162 + * 163 + * @template T 164 + * @param {() => Promise<T>} fn 165 + * @returns {Promise<T>} 166 + */ 167 + async #withRetry(fn) { 168 + let delay = 5_000; 169 + for (let attempt = 0;; attempt++) { 170 + try { 171 + return await fn(); 172 + } catch (err) { 173 + if (attempt < 3 && this.#isRateLimitError(err)) { 174 + let wait = delay; 175 + if (err instanceof ClientResponseError) { 176 + const retryAfter = err.headers.get("retry-after"); 177 + if (retryAfter) wait = parseFloat(retryAfter) * 1000; 178 + } 179 + await new Promise((r) => setTimeout(r, wait)); 180 + delay *= 2; 181 + continue; 182 + } 183 + throw err; 184 + } 185 + } 186 + } 187 + 188 + /** 189 + * @param {unknown} err 190 + * @returns {boolean} 191 + */ 152 192 #isSessionError(err) { 153 193 if (err instanceof TokenRefreshError) return true; 154 194 // OAuthUserAgent.handle() swallows TokenRefreshError and returns the ··· 216 256 */ 217 257 async getLatestCommit() { 218 258 const did = this.#did.value; 219 - if (!this.#rpc || !did) return null; 259 + 260 + const rpc = this.#rpc; 261 + if (!rpc || !did) return null; 220 262 221 263 try { 222 - /** @type {any} */ 223 - const result = await ok(this.#rpc.get( 224 - "com.atproto.sync.getLatestCommit", 225 - { params: { did } }, 226 - )); 264 + const result = await this.#withRetry(() => 265 + ok(rpc.get( 266 + "com.atproto.sync.getLatestCommit", 267 + { params: { did } }, 268 + )) 269 + ); 227 270 228 - this.#rev.value = result.rev; 229 - return result.rev; 271 + this.#rev.value = result?.rev; 272 + return result?.rev; 230 273 } catch (err) { 231 274 if (this.#isSessionError(err)) { 232 275 this.#clearSession(); ··· 246 289 async listRecords(collection, did) { 247 290 did ??= this.#did.value ?? undefined; 248 291 249 - if (!this.#rpc || !did) return []; 292 + const rpc = this.#rpc; 293 + if (!rpc || !did) return []; 250 294 251 295 try { 252 296 const records = []; 297 + 298 + /** @type {any} */ 253 299 let cursor; 254 300 255 301 do { 256 - /** @type {any} */ 257 - const page = await ok(this.#rpc.get( 258 - "com.atproto.repo.listRecords", 259 - { params: { repo: did, collection, limit: 100, cursor } }, 260 - )); 302 + const page = await this.#withRetry(() => 303 + ok(rpc.get( 304 + "com.atproto.repo.listRecords", 305 + { params: { repo: did, collection, limit: 100, cursor } }, 306 + )) 307 + ); 261 308 262 - for (const record of page.records) { 309 + for (const record of (page?.records ?? [])) { 263 310 records.push(record.value); 264 311 } 265 312 266 - cursor = page.cursor; 313 + cursor = page?.cursor; 267 314 } while (cursor); 268 315 269 316 return records; ··· 282 329 * @param {Array<{ id: string }>} data 283 330 */ 284 331 async #putRecords(collection, data) { 285 - if (!this.#rpc || !this.#did.value) return; 332 + const rpc = this.#rpc; 333 + if (!rpc || !this.#did.value) return; 286 334 287 335 try { 288 336 // 1. Fetch current state 289 337 /** @type {Map<string, { rkey: string, value: unknown }>} */ 290 338 const existing = new Map(); 339 + 340 + /** @type {any} */ 291 341 let cursor; 292 342 293 343 do { 294 - /** @type {any} */ 295 - const page = await ok(this.#rpc.get( 296 - "com.atproto.repo.listRecords", 297 - { params: { repo: this.#did.value, collection, limit: 100, cursor } }, 298 - )); 344 + const page = await this.#withRetry(() => 345 + ok(rpc.get( 346 + "com.atproto.repo.listRecords", 347 + { 348 + params: { repo: this.#did.value, collection, limit: 100, cursor }, 349 + }, 350 + )) 351 + ); 299 352 300 - for (const record of page.records) { 353 + for (const record of (page?.records ?? [])) { 301 354 const rkey = record.uri.split("/").pop(); 302 355 existing.set(record.value.id, { rkey, value: record.value }); 303 356 } 304 357 305 - cursor = page.cursor; 358 + cursor = page?.cursor; 306 359 } while (cursor); 307 360 308 361 // 2. Build desired state ··· 348 401 for (let i = 0; i < writes.length; i += 100) { 349 402 const batch = writes.slice(i, i + 100); 350 403 351 - /** @type {any} */ 352 - const result = await ok(this.#rpc.post("com.atproto.repo.applyWrites", { 353 - input: { repo: this.#did.value, writes: batch }, 354 - })); 404 + const result = await this.#withRetry(() => 405 + ok(rpc.post("com.atproto.repo.applyWrites", { 406 + input: { repo: this.#did.value, writes: batch }, 407 + })) 408 + ); 355 409 356 410 if (result?.commit?.rev) { 357 411 this.#rev.value = result.commit.rev;