this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more error handling

alice 20dce5ec b06d6b4e

+53 -20
+26 -13
src/subscription.ts
··· 4 4 isCommit, 5 5 } from './lexicon/types/com/atproto/sync/subscribeRepos' 6 6 import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription' 7 + import { error } from 'console' 7 8 8 9 export class FirehoseSubscription extends FirehoseSubscriptionBase { 9 10 isAlice(alices, did) { ··· 93 94 .where('did', '=', create.author) 94 95 .execute() 95 96 if (user.length === 0) { 96 - // console.log(`!!!!! fetching profile for ${create.author}`) 97 - const profile = await agent.api.app.bsky.actor.getProfile({ 98 - actor: create.author, 99 - }) 100 - await this.db 101 - .insertInto('atproto_user') 102 - .values({ 103 - did: create.author, 104 - handle: profile.data.handle, 105 - displayName: profile.data.displayName, 106 - bio: profile.data.description, 107 - indexedAt: new Date().toISOString(), 97 + console.log(`!!!!! fetching profile for ${create.author}`) 98 + let profile 99 + try { 100 + profile = await agent.api.app.bsky.actor.getProfile({ 101 + actor: create.author, 108 102 }) 109 - .execute() 103 + } catch (e) { 104 + console.error('error fetching profile: ', e) 105 + return 106 + } 107 + 108 + try { 109 + await this.db 110 + .insertInto('atproto_user') 111 + .values({ 112 + did: create.author, 113 + handle: profile.data.handle, 114 + displayName: profile.data.displayName, 115 + bio: profile.data.description, 116 + indexedAt: new Date().toISOString(), 117 + }) 118 + .execute() 119 + } catch (e) { 120 + console.error('error inserting user: ', e) 121 + } 122 + 110 123 if (profile.data.displayName?.toLowerCase().includes('alice')) { 111 124 const alice = await this.db 112 125 .selectFrom('alice')
+27 -7
src/util/subscription.ts
··· 38 38 abstract handleEvent(evt: RepoEvent, agent): Promise<void> 39 39 40 40 async run(agent) { 41 - for await (const evt of this.sub) { 41 + // eslint-disable-next-line no-constant-condition 42 + while (true) { 42 43 try { 43 - await this.handleEvent(evt, agent) 44 + try { 45 + for await (const evt of this.sub) { 46 + try { 47 + await this.handleEvent(evt, agent) 48 + } catch (err) { 49 + console.error('repo subscription could not handle message', err) 50 + } 51 + // update stored cursor every 20 events or so 52 + if (isCommit(evt) && evt.seq % 20 === 0) { 53 + await this.updateCursor(evt.seq) 54 + } 55 + } 56 + } catch (err) { 57 + console.error( 58 + 'util/subscription/run/FirehoseSubscriptionBase/run got borked: ', 59 + err, 60 + ) 61 + } 44 62 } catch (err) { 45 - console.error('repo subscription could not handle message', err) 46 - } 47 - // update stored cursor every 20 events or so 48 - if (isCommit(evt) && evt.seq % 20 === 0) { 49 - await this.updateCursor(evt.seq) 63 + // handle the error 64 + // Wait for 2 seconds before trying to wait on the subscription again 65 + console.error( 66 + 'We had a worse error at util/subscription/run/FirehoseSubscriptionBase/run got borked: ', 67 + err, 68 + ) 69 + await new Promise((r) => setTimeout(r, 2000)) 50 70 } 51 71 } 52 72 }